From 127ee0805383cc46f8aaca47356fa73c9819f106 Mon Sep 17 00:00:00 2001 From: Vanshdeep Singh Date: Wed, 18 Apr 2018 10:46:16 +0200 Subject: [PATCH] Problem: No crash recovery mechanism (#2207) Solution: Add crash recovery logic based on BEP#8 --- bigchaindb/backend/localmongodb/query.py | 29 +----- bigchaindb/backend/query.py | 15 +-- bigchaindb/commands/bigchaindb.py | 31 +++--- bigchaindb/tendermint/core.py | 15 ++- bigchaindb/tendermint/lib.py | 3 + tests/backend/localmongodb/test_queries.py | 42 -------- tests/commands/test_commands.py | 112 ++++++++------------- tests/tendermint/test_core.py | 32 ++++++ 8 files changed, 109 insertions(+), 170 deletions(-) diff --git a/bigchaindb/backend/localmongodb/query.py b/bigchaindb/backend/localmongodb/query.py index add329d7..46f6ee37 100644 --- a/bigchaindb/backend/localmongodb/query.py +++ b/bigchaindb/backend/localmongodb/query.py @@ -194,33 +194,6 @@ def get_block_with_transaction(conn, txid): projection={'_id': False, 'height': True})) -@register_query(LocalMongoDBConnection) -def delete_zombie_transactions(conn): - txns = conn.run(conn.collection('transactions').find({})) - for txn in txns: - txn_id = txn['id'] - block = list(get_block_with_transaction(conn, txn_id)) - if len(block) == 0: - delete_transaction(conn, txn_id) - - -def delete_transaction(conn, txn_id): - conn.run( - conn.collection('transactions').delete_one({'id': txn_id})) - conn.run( - conn.collection('assets').delete_one({'id': txn_id})) - conn.run( - conn.collection('metadata').delete_one({'id': txn_id})) - - -@register_query(LocalMongoDBConnection) -def delete_latest_block(conn): - block = get_latest_block(conn) - txn_ids = block['transactions'] - delete_transactions(conn, txn_ids) - conn.run(conn.collection('blocks').delete_one({'height': block['height']})) - - @register_query(LocalMongoDBConnection) def delete_transactions(conn, txn_ids): conn.run(conn.collection('assets').delete_many({'id': {'$in': txn_ids}})) @@ -271,7 +244,7 @@ def store_pre_commit_state(conn, state): commit_id = state['commit_id'] return conn.run( conn.collection('pre_commit') - .update({'id': commit_id}, state, upsert=True) + .update({'commit_id': commit_id}, state, upsert=True) ) diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 9181cb34..19c2b975 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -5,6 +5,7 @@ from functools import singledispatch from bigchaindb.backend.exceptions import OperationError VALIDATOR_UPDATE_ID = 'a_unique_id_string' +PRE_COMMIT_ID = 'a_unique_id_string' @singledispatch @@ -551,13 +552,6 @@ def store_block(conn, block): raise NotImplementedError -@singledispatch -def delete_zombie_transactions(conn): - """Delete transactions not included in any block""" - - raise NotImplementedError - - @singledispatch def store_unspent_outputs(connection, unspent_outputs): """Store unspent outputs in ``utxo_set`` table.""" @@ -565,13 +559,6 @@ def store_unspent_outputs(connection, unspent_outputs): raise NotImplementedError -@singledispatch -def delete_latest_block(conn): - """Delete the latest block along with its transactions""" - - raise NotImplementedError - - @singledispatch def delete_unspent_outputs(connection, unspent_outputs): """Delete unspent outputs in ``utxo_set`` table.""" diff --git a/bigchaindb/commands/bigchaindb.py b/bigchaindb/commands/bigchaindb.py index acb7b64c..76a28689 100644 --- a/bigchaindb/commands/bigchaindb.py +++ b/bigchaindb/commands/bigchaindb.py @@ -19,7 +19,8 @@ from bigchaindb.backend import query from bigchaindb.commands import utils from bigchaindb.commands.utils import ( configure_bigchaindb, start_logging_process, input_on_stderr) -from bigchaindb.backend.query import VALIDATOR_UPDATE_ID +from bigchaindb.backend.query import VALIDATOR_UPDATE_ID, PRE_COMMIT_ID +from bigchaindb.tendermint.lib import BigchainDB logging.basicConfig(level=logging.INFO) @@ -131,19 +132,6 @@ def run_init(args): print('If you wish to re-initialize it, first drop it.', file=sys.stderr) -def run_recover(b): - query.delete_zombie_transactions(b.connection) - - tendermint_height = b.get_latest_block_height_from_tendermint() - block = b.get_latest_block() - - if block: - while block['height'] > tendermint_height: - logger.info('BigchainDB is ahead of tendermint, removing block %s', block['height']) - query.delete_latest_block(b.connection) - block = b.get_latest_block() - - @configure_bigchaindb def run_drop(args): """Drop the database""" @@ -162,13 +150,26 @@ def run_drop(args): print("Cannot drop '{name}'. The database does not exist.".format(name=dbname), file=sys.stderr) +def run_recover(b): + pre_commit = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID) + + # Initially the pre-commit collection would be empty + if pre_commit: + latest_block = query.get_latest_block(b.connection) + + # NOTE: the pre-commit state can only be ahead of the commited state + # by 1 block + if latest_block and (latest_block['height'] < pre_commit['height']): + query.delete_transactions(b.connection, pre_commit['transactions']) + + @configure_bigchaindb @start_logging_process def run_start(args): """Start the processes to run the node""" logger.info('BigchainDB Version %s', bigchaindb.__version__) - # run_recover(BigchainDB()) + run_recover(BigchainDB()) try: if not args.skip_initialize_database: diff --git a/bigchaindb/tendermint/core.py b/bigchaindb/tendermint/core.py index f9c6ddfa..369d5daa 100644 --- a/bigchaindb/tendermint/core.py +++ b/bigchaindb/tendermint/core.py @@ -7,7 +7,8 @@ from abci.types_pb2 import ResponseEndBlock, ResponseInfo, Validator from bigchaindb.tendermint import BigchainDB from bigchaindb.tendermint.utils import decode_transaction, calculate_hash -from bigchaindb.tendermint.lib import Block +from bigchaindb.tendermint.lib import Block, PreCommitState +from bigchaindb.backend.query import PRE_COMMIT_ID logger = logging.getLogger(__name__) @@ -112,6 +113,13 @@ class App(BaseApplication): # set sync status to true self.bigchaindb.delete_validator_update() + # Store pre-commit state to recover in case there is a crash + # during `commit` + pre_commit_state = PreCommitState(commit_id=PRE_COMMIT_ID, + height=self.new_height, + transactions=self.block_txn_ids) + self.bigchaindb.store_pre_commit_state(pre_commit_state._asdict()) + # NOTE: interface for `ResponseEndBlock` has be changed in the latest # version of py-abci i.e. the validator updates should be return # as follows: @@ -121,15 +129,18 @@ class App(BaseApplication): def commit(self): """Store the new height and along with block hash.""" + data = self.block_txn_hash.encode('utf-8') + # register a new block only when new transactions are received if self.block_txn_ids: self.bigchaindb.store_bulk_transactions(self.block_transactions) block = Block(app_hash=self.block_txn_hash, height=self.new_height, transactions=self.block_txn_ids) + # NOTE: storing the block should be the last operation during commit + # this effects crash recovery. Refer BEP#8 for details self.bigchaindb.store_block(block._asdict()) - data = self.block_txn_hash.encode('utf-8') return Result.ok(data=data) diff --git a/bigchaindb/tendermint/lib.py b/bigchaindb/tendermint/lib.py index 26d3a9e8..8dc576c3 100644 --- a/bigchaindb/tendermint/lib.py +++ b/bigchaindb/tendermint/lib.py @@ -378,6 +378,9 @@ class BigchainDB(Bigchain): def delete_validator_update(self): return backend.query.delete_validator_update(self.connection) + def store_pre_commit_state(self, state): + return backend.query.store_pre_commit_state(self.connection, state) + Block = namedtuple('Block', ('app_hash', 'height', 'transactions')) diff --git a/tests/backend/localmongodb/test_queries.py b/tests/backend/localmongodb/test_queries.py index 4eff1bec..c6a83e4c 100644 --- a/tests/backend/localmongodb/test_queries.py +++ b/tests/backend/localmongodb/test_queries.py @@ -182,48 +182,6 @@ def test_get_block(): assert block['height'] == 3 -def test_delete_zombie_transactions(signed_create_tx, signed_transfer_tx): - from bigchaindb.backend import connect, query - from bigchaindb.tendermint.lib import Block - conn = connect() - - conn.db.transactions.insert_one(signed_create_tx.to_dict()) - query.store_asset(conn, {'id': signed_create_tx.id}) - block = Block(app_hash='random_utxo', - height=3, - transactions=[signed_create_tx.id]) - query.store_block(conn, block._asdict()) - - conn.db.transactions.insert_one(signed_transfer_tx.to_dict()) - query.store_metadatas(conn, [{'id': signed_transfer_tx.id}]) - - query.delete_zombie_transactions(conn) - assert query.get_transaction(conn, signed_transfer_tx.id) is None - assert query.get_asset(conn, signed_transfer_tx.id) is None - assert list(query.get_metadata(conn, [signed_transfer_tx.id])) == [] - - assert query.get_transaction(conn, signed_create_tx.id) is not None - assert query.get_asset(conn, signed_create_tx.id) is not None - - -def test_delete_latest_block(signed_create_tx, signed_transfer_tx): - from bigchaindb.backend import connect, query - from bigchaindb.tendermint.lib import Block - conn = connect() - - conn.db.transactions.insert_one(signed_create_tx.to_dict()) - query.store_asset(conn, {'id': signed_create_tx.id}) - block = Block(app_hash='random_utxo', - height=51, - transactions=[signed_create_tx.id]) - query.store_block(conn, block._asdict()) - query.delete_latest_block(conn) - - assert query.get_transaction(conn, signed_create_tx.id) is None - assert query.get_asset(conn, signed_create_tx.id) is None - assert query.get_block(conn, 51) is None - - def test_delete_zero_unspent_outputs(db_context, utxoset): from bigchaindb.backend import query unspent_outputs, utxo_collection = utxoset diff --git a/tests/commands/test_commands.py b/tests/commands/test_commands.py index ef385fb8..f67400ae 100644 --- a/tests/commands/test_commands.py +++ b/tests/commands/test_commands.py @@ -271,75 +271,6 @@ def test_calling_main(start_mock, base_parser_mock, parse_args_mock, assert start_mock.called is True -@pytest.mark.tendermint -@pytest.mark.bdb -def test_recover_db_from_zombie_txn(b, monkeypatch): - from bigchaindb.commands.bigchaindb import run_recover - from bigchaindb.models import Transaction - from bigchaindb.common.crypto import generate_key_pair - from bigchaindb.tendermint.lib import Block - from bigchaindb import backend - - alice = generate_key_pair() - tx = Transaction.create([alice.public_key], - [([alice.public_key], 1)], - asset={'cycle': 'hero'}, - metadata={'name': 'hohenheim'}) \ - .sign([alice.private_key]) - b.store_bulk_transactions([tx]) - block = Block(app_hash='random_app_hash', height=10, - transactions=[])._asdict() - b.store_block(block) - - def mock_get(uri): - return MockResponse(10) - monkeypatch.setattr('requests.get', mock_get) - - run_recover(b) - - assert list(backend.query.get_metadata(b.connection, [tx.id])) == [] - assert not backend.query.get_asset(b.connection, tx.id) - assert not b.get_transaction(tx.id) - - -@pytest.mark.tendermint -@pytest.mark.bdb -def test_recover_db_from_zombie_block(b, monkeypatch): - from bigchaindb.commands.bigchaindb import run_recover - from bigchaindb.models import Transaction - from bigchaindb.common.crypto import generate_key_pair - from bigchaindb.tendermint.lib import Block - from bigchaindb import backend - - alice = generate_key_pair() - tx = Transaction.create([alice.public_key], - [([alice.public_key], 1)], - asset={'cycle': 'hero'}, - metadata={'name': 'hohenheim'}) \ - .sign([alice.private_key]) - b.store_bulk_transactions([tx]) - - block9 = Block(app_hash='random_app_hash', height=9, - transactions=[])._asdict() - b.store_block(block9) - block10 = Block(app_hash='random_app_hash', height=10, - transactions=[tx.id])._asdict() - b.store_block(block10) - - def mock_get(uri): - return MockResponse(9) - monkeypatch.setattr('requests.get', mock_get) - - run_recover(b) - - assert list(backend.query.get_metadata(b.connection, [tx.id])) == [] - assert not backend.query.get_asset(b.connection, tx.id) - assert not b.get_transaction(tx.id) - - block = b.get_latest_block() - assert block['height'] == 9 - - @patch('bigchaindb.config_utils.autoconfigure') @patch('bigchaindb.commands.bigchaindb.run_recover') @patch('bigchaindb.tendermint.commands.start') @@ -356,6 +287,49 @@ def test_recover_db_on_start(mock_autoconfigure, assert mock_start.called +@pytest.mark.tendermint +@pytest.mark.bdb +def test_run_recover(b, alice, bob): + from bigchaindb.commands.bigchaindb import run_recover + from bigchaindb.models import Transaction + from bigchaindb.tendermint.lib import Block, PreCommitState + from bigchaindb.backend.query import PRE_COMMIT_ID + from bigchaindb.backend import query + + tx1 = Transaction.create([alice.public_key], + [([alice.public_key], 1)], + asset={'cycle': 'hero'}, + metadata={'name': 'hohenheim'}) \ + .sign([alice.private_key]) + tx2 = Transaction.create([bob.public_key], + [([bob.public_key], 1)], + asset={'cycle': 'hero'}, + metadata={'name': 'hohenheim'}) \ + .sign([bob.private_key]) + + # store the transactions + b.store_bulk_transactions([tx1, tx2]) + + # create a random block + block8 = Block(app_hash='random_app_hash1', height=8, + transactions=['txid_doesnt_matter'])._asdict() + b.store_block(block8) + + # create the next block + block9 = Block(app_hash='random_app_hash1', height=9, + transactions=[tx1.id])._asdict() + b.store_block(block9) + + # create a pre_commit state which is ahead of the commit state + pre_commit_state = PreCommitState(commit_id=PRE_COMMIT_ID, height=10, + transactions=[tx2.id])._asdict() + b.store_pre_commit_state(pre_commit_state) + + run_recover(b) + + assert not query.get_transaction(b.connection, tx2.id) + + # Helper class MockResponse(): diff --git a/tests/tendermint/test_core.py b/tests/tendermint/test_core.py index cbdbd73e..73e8bf1d 100644 --- a/tests/tendermint/test_core.py +++ b/tests/tendermint/test_core.py @@ -166,3 +166,35 @@ def test_end_block_return_validator_updates(b): updates = b.get_validator_update() assert updates == [] + + +def test_store_pre_commit_state_in_end_block(b, alice): + from bigchaindb.tendermint import App + from bigchaindb.backend import query + from bigchaindb.models import Transaction + from bigchaindb.backend.query import PRE_COMMIT_ID + + tx = Transaction.create([alice.public_key], + [([alice.public_key], 1)], + asset={'msg': 'live long and prosper'})\ + .sign([alice.private_key]) + + app = App(b) + app.init_chain(['ignore']) + + app.begin_block('ignore') + app.deliver_tx(encode_tx_to_bytes(tx)) + app.end_block(99) + + resp = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID) + assert resp['commit_id'] == PRE_COMMIT_ID + assert resp['height'] == 99 + assert resp['transactions'] == [tx.id] + + app.begin_block('ignore') + app.deliver_tx(encode_tx_to_bytes(tx)) + app.end_block(100) + resp = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID) + assert resp['commit_id'] == PRE_COMMIT_ID + assert resp['height'] == 100 + assert resp['transactions'] == [tx.id]