Crash recovery mechanism (#2045)

* Crash recovery mechanism

* Propogate exception

* Added docs and crash receovery during block write

* Fix flake8 issue

* Remove approach 1 for crash recovery, recover db on 'bigchiandb start'

* Fix CI build issues

* Remove documentation
This commit is contained in:
Vanshdeep Singh 2018-02-21 15:20:12 +05:30 committed by vrde
parent 58c5498d35
commit 5bfa8e29d8
11 changed files with 240 additions and 23 deletions

View File

@ -24,11 +24,8 @@ def store_transaction(conn, signed_transaction):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_transactions(conn, signed_transactions): def store_transactions(conn, signed_transactions):
try: return conn.run(conn.collection('transactions')
return conn.run(conn.collection('transactions') .insert_many(signed_transactions))
.insert_many(signed_transactions))
except DuplicateKeyError:
pass
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -54,12 +51,9 @@ def get_transactions(conn, transaction_ids):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_metadatas(conn, metadata): def store_metadatas(conn, metadata):
try: return conn.run(
return conn.run( conn.collection('metadata')
conn.collection('metadata') .insert_many(metadata, ordered=False))
.insert_many(metadata, ordered=False))
except DuplicateKeyError:
pass
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -82,12 +76,9 @@ def store_asset(conn, asset):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_assets(conn, assets): def store_assets(conn, assets):
try: return conn.run(
return conn.run( conn.collection('assets')
conn.collection('assets') .insert_many(assets, ordered=False))
.insert_many(assets, ordered=False))
except DuplicateKeyError:
pass
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -201,6 +192,40 @@ def get_block_with_transaction(conn, txid):
projection={'_id': False, 'height': True})) projection={'_id': False, 'height': True}))
@register_query(LocalMongoDBConnection)
def delete_zombie_transactions(conn):
txns = conn.run(conn.collection('transactions').find({}))
for txn in txns:
txn_id = txn['id']
block = list(get_block_with_transaction(conn, txn_id))
if len(block) == 0:
delete_transaction(conn, txn_id)
def delete_transaction(conn, txn_id):
conn.run(
conn.collection('transactions').delete_one({'id': txn_id}))
conn.run(
conn.collection('assets').delete_one({'id': txn_id}))
conn.run(
conn.collection('metadata').delete_one({'id': txn_id}))
@register_query(LocalMongoDBConnection)
def delete_latest_block(conn):
block = get_latest_block(conn)
txn_ids = block['transactions']
delete_transactions(conn, txn_ids)
conn.run(conn.collection('blocks').delete_one({'height': block['height']}))
@register_query(LocalMongoDBConnection)
def delete_transactions(conn, txn_ids):
conn.run(conn.collection('assets').delete_many({'id': {'$in': txn_ids}}))
conn.run(conn.collection('metadata').delete_many({'id': {'$in': txn_ids}}))
conn.run(conn.collection('transactions').delete_many({'id': {'$in': txn_ids}}))
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_unspent_outputs(conn, *unspent_outputs): def store_unspent_outputs(conn, *unspent_outputs):
try: try:

View File

@ -549,6 +549,13 @@ def store_block(conn, block):
raise NotImplementedError raise NotImplementedError
@singledispatch
def delete_zombie_transactions(conn):
"""Delete transactions not included in any block"""
raise NotImplementedError
@singledispatch @singledispatch
def store_unspent_outputs(connection, unspent_outputs): def store_unspent_outputs(connection, unspent_outputs):
"""Store unspent outputs in ``utxo_set`` table.""" """Store unspent outputs in ``utxo_set`` table."""
@ -556,6 +563,13 @@ def store_unspent_outputs(connection, unspent_outputs):
raise NotImplementedError raise NotImplementedError
@singledispatch
def delete_latest_block(conn):
"""Delete the latest block along with its transactions"""
raise NotImplementedError
@singledispatch @singledispatch
def delete_unspent_outputs(connection, unspent_outputs): def delete_unspent_outputs(connection, unspent_outputs):
"""Delete unspent outputs in ``utxo_set`` table.""" """Delete unspent outputs in ``utxo_set`` table."""
@ -563,6 +577,20 @@ def delete_unspent_outputs(connection, unspent_outputs):
raise NotImplementedError raise NotImplementedError
@singledispatch
def delete_transactions(conn, txn_ids):
"""Delete transactions from database
Args:
txn_ids (list): list of transaction ids
Returns:
The result of the operation.
"""
raise NotImplementedError
@singledispatch @singledispatch
def get_unspent_outputs(connection, *, query=None): def get_unspent_outputs(connection, *, query=None):
"""Retrieves unspent outputs. """Retrieves unspent outputs.

View File

@ -15,8 +15,10 @@ from bigchaindb.common.exceptions import (StartupError,
KeypairNotFoundException, KeypairNotFoundException,
DatabaseDoesNotExist) DatabaseDoesNotExist)
import bigchaindb import bigchaindb
from bigchaindb.tendermint.core import BigchainDB
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.backend import schema from bigchaindb.backend import schema
from bigchaindb.backend import query
from bigchaindb.backend.admin import (set_replicas, set_shards, add_replicas, from bigchaindb.backend.admin import (set_replicas, set_shards, add_replicas,
remove_replicas) remove_replicas)
from bigchaindb.backend.exceptions import OperationError from bigchaindb.backend.exceptions import OperationError
@ -154,6 +156,19 @@ def run_init(args):
print('If you wish to re-initialize it, first drop it.', file=sys.stderr) print('If you wish to re-initialize it, first drop it.', file=sys.stderr)
def run_recover(b):
query.delete_zombie_transactions(b.connection)
tendermint_height = b.get_latest_block_height_from_tendermint()
block = b.get_latest_block()
if block:
while block['height'] > tendermint_height:
logger.info('BigchainDB is ahead of tendermint, removing block %s', block['height'])
query.delete_latest_block(b.connection)
block = b.get_latest_block()
@configure_bigchaindb @configure_bigchaindb
def run_drop(args): def run_drop(args):
"""Drop the database""" """Drop the database"""
@ -178,6 +193,8 @@ def run_start(args):
"""Start the processes to run the node""" """Start the processes to run the node"""
logger.info('BigchainDB Version %s', bigchaindb.__version__) logger.info('BigchainDB Version %s', bigchaindb.__version__)
run_recover(BigchainDB())
if args.allow_temp_keypair: if args.allow_temp_keypair:
if not (bigchaindb.config['keypair']['private'] or if not (bigchaindb.config['keypair']['private'] or
bigchaindb.config['keypair']['public']): bigchaindb.config['keypair']['public']):

View File

@ -50,6 +50,10 @@ class BigchainDB(Bigchain):
"""Submit a valid transaction to the mempool.""" """Submit a valid transaction to the mempool."""
self.post_transaction(transaction, mode) self.post_transaction(transaction, mode)
def get_latest_block_height_from_tendermint(self):
r = requests.get(ENDPOINT + 'status')
return r.json()['result']['latest_block_height']
def store_transaction(self, transaction): def store_transaction(self, transaction):
"""Store a valid transaction to the transactions collection.""" """Store a valid transaction to the transactions collection."""

View File

@ -25,5 +25,6 @@ ENV BIGCHAINDB_TENDERMINT_PORT 46657
RUN mkdir -p /usr/src/app RUN mkdir -p /usr/src/app
COPY . /usr/src/app/ COPY . /usr/src/app/
WORKDIR /usr/src/app WORKDIR /usr/src/app
RUN find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf
RUN pip install --no-cache-dir .[test] RUN pip install --no-cache-dir .[test]
RUN bigchaindb -y configure "$backend" RUN bigchaindb -y configure "$backend"

View File

@ -29,5 +29,6 @@ services:
image: tendermint/tendermint:0.13 image: tendermint/tendermint:0.13
volumes: volumes:
- ./tmdata/config.toml:/tendermint/config.toml - ./tmdata/config.toml:/tendermint/config.toml
entrypoint: '' - ./tmdata/genesis.json:/tendermint/genesis.json
command: bash -c "tendermint init && tendermint node" - ./tmdata/priv_validator.json:/tendermint/priv_validator.json
entrypoint: ["/bin/tendermint", "node", "--proxy_app=dummy"]

View File

@ -199,6 +199,48 @@ def test_get_block():
assert block['height'] == 3 assert block['height'] == 3
def test_delete_zombie_transactions(signed_create_tx, signed_transfer_tx):
from bigchaindb.backend import connect, query
from bigchaindb.tendermint.lib import Block
conn = connect()
conn.db.transactions.insert_one(signed_create_tx.to_dict())
query.store_asset(conn, {'id': signed_create_tx.id})
block = Block(app_hash='random_utxo',
height=3,
transactions=[signed_create_tx.id])
query.store_block(conn, block._asdict())
conn.db.transactions.insert_one(signed_transfer_tx.to_dict())
query.store_metadatas(conn, [{'id': signed_transfer_tx.id}])
query.delete_zombie_transactions(conn)
assert query.get_transaction(conn, signed_transfer_tx.id) is None
assert query.get_asset(conn, signed_transfer_tx.id) is None
assert list(query.get_metadata(conn, [signed_transfer_tx.id])) == []
assert query.get_transaction(conn, signed_create_tx.id) is not None
assert query.get_asset(conn, signed_create_tx.id) is not None
def test_delete_latest_block(signed_create_tx, signed_transfer_tx):
from bigchaindb.backend import connect, query
from bigchaindb.tendermint.lib import Block
conn = connect()
conn.db.transactions.insert_one(signed_create_tx.to_dict())
query.store_asset(conn, {'id': signed_create_tx.id})
block = Block(app_hash='random_utxo',
height=51,
transactions=[signed_create_tx.id])
query.store_block(conn, block._asdict())
query.delete_latest_block(conn)
assert query.get_transaction(conn, signed_create_tx.id) is None
assert query.get_asset(conn, signed_create_tx.id) is None
assert query.get_block(conn, 51) is None
def test_delete_unspent_outputs(db_context, utxoset): def test_delete_unspent_outputs(db_context, utxoset):
from bigchaindb.backend import query from bigchaindb.backend import query
unspent_outputs, utxo_collection = utxoset unspent_outputs, utxo_collection = utxoset

View File

@ -6,14 +6,15 @@ import copy
import pytest import pytest
@pytest.mark.tendermint
def test_make_sure_we_dont_remove_any_command(): def test_make_sure_we_dont_remove_any_command():
# thanks to: http://stackoverflow.com/a/18161115/597097 # thanks to: http://stackoverflow.com/a/18161115/597097
from bigchaindb.commands.bigchaindb import create_parser from bigchaindb.commands.bigchaindb import create_parser
parser = create_parser() parser = create_parser()
assert parser.parse_args(['configure', 'rethinkdb']).command assert parser.parse_args(['configure', 'localmongodb']).command
assert parser.parse_args(['configure', 'mongodb']).command assert parser.parse_args(['configure', 'localmongodb']).command
assert parser.parse_args(['show-config']).command assert parser.parse_args(['show-config']).command
assert parser.parse_args(['export-my-pubkey']).command assert parser.parse_args(['export-my-pubkey']).command
assert parser.parse_args(['init']).command assert parser.parse_args(['init']).command
@ -517,3 +518,99 @@ def test_run_remove_replicas(mock_remove_replicas):
assert exc.value.args == ('err',) assert exc.value.args == ('err',)
assert mock_remove_replicas.call_count == 1 assert mock_remove_replicas.call_count == 1
mock_remove_replicas.reset_mock() mock_remove_replicas.reset_mock()
@pytest.mark.tendermint
@pytest.mark.bdb
def test_recover_db_from_zombie_txn(b, monkeypatch):
from bigchaindb.commands.bigchaindb import run_recover
from bigchaindb.models import Transaction
from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.tendermint.lib import Block
from bigchaindb import backend
alice = generate_key_pair()
tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)],
asset={'cycle': 'hero'},
metadata={'name': 'hohenheim'}) \
.sign([alice.private_key])
b.store_bulk_transactions([tx])
block = Block(app_hash='random_app_hash', height=10,
transactions=[])._asdict()
b.store_block(block)
def mock_get(uri):
return MockResponse(10)
monkeypatch.setattr('requests.get', mock_get)
run_recover(b)
assert list(backend.query.get_metadata(b.connection, [tx.id])) == []
assert not backend.query.get_asset(b.connection, tx.id)
assert not b.get_transaction(tx.id)
@pytest.mark.tendermint
@pytest.mark.bdb
def test_recover_db_from_zombie_block(b, monkeypatch):
from bigchaindb.commands.bigchaindb import run_recover
from bigchaindb.models import Transaction
from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.tendermint.lib import Block
from bigchaindb import backend
alice = generate_key_pair()
tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)],
asset={'cycle': 'hero'},
metadata={'name': 'hohenheim'}) \
.sign([alice.private_key])
b.store_bulk_transactions([tx])
block9 = Block(app_hash='random_app_hash', height=9,
transactions=[])._asdict()
b.store_block(block9)
block10 = Block(app_hash='random_app_hash', height=10,
transactions=[tx.id])._asdict()
b.store_block(block10)
def mock_get(uri):
return MockResponse(9)
monkeypatch.setattr('requests.get', mock_get)
run_recover(b)
assert list(backend.query.get_metadata(b.connection, [tx.id])) == []
assert not backend.query.get_asset(b.connection, tx.id)
assert not b.get_transaction(tx.id)
block = b.get_latest_block()
assert block['height'] == 9
@pytest.mark.tendermint
@patch('bigchaindb.config_utils.autoconfigure')
@patch('bigchaindb.commands.bigchaindb.run_recover')
@patch('bigchaindb.tendermint.commands.start')
def test_recover_db_on_start(mock_autoconfigure,
mock_run_recover,
mock_start,
mocked_setup_logging):
from bigchaindb.commands.bigchaindb import run_start
args = Namespace(start_rethinkdb=False, allow_temp_keypair=False, config=None, yes=True,
skip_initialize_database=False)
run_start(args)
assert mock_run_recover.called
assert mock_start.called
# Helper
class MockResponse():
def __init__(self, height):
self.height = height
def json(self):
return {'result': {'latest_block_height': self.height}}

View File

@ -328,8 +328,8 @@ def merlin_pubkey(merlin):
@pytest.fixture @pytest.fixture
def b(): def b():
from bigchaindb import Bigchain from bigchaindb.tendermint import BigchainDB
return Bigchain() return BigchainDB()
@pytest.fixture @pytest.fixture

1
tmdata/genesis.json Normal file
View File

@ -0,0 +1 @@
{"genesis_time":"0001-01-01T00:00:00Z","chain_id":"test-chain-JCYeEN","validators":[{"pub_key":{"type":"ed25519","data":"0C988282C02CFF72E5E296DB78CE26D922178549B327C375D992548C9AFCCE6D"},"power":10,"name":""}],"app_hash":""}

View File

@ -0,0 +1 @@
{"address":"E6CB05DA326F70BB4CC0A4AF83FC3BBF70B9A4D5","pub_key":{"type":"ed25519","data":"0C988282C02CFF72E5E296DB78CE26D922178549B327C375D992548C9AFCCE6D"},"last_height":0,"last_round":0,"last_step":0,"last_signature":null,"priv_key":{"type":"ed25519","data":"D4488996BDF92CE1D80670C66923D4996AE1B772FE0F76DAE33EDC410DC1D58F0C988282C02CFF72E5E296DB78CE26D922178549B327C375D992548C9AFCCE6D"}}