Problem: No crash recovery mechanism (#2207)

Solution: Add crash recovery logic based on BEP#8
This commit is contained in:
Vanshdeep Singh 2018-04-18 10:46:16 +02:00 committed by vrde
parent 1823818315
commit 127ee08053
8 changed files with 109 additions and 170 deletions

View File

@ -194,33 +194,6 @@ def get_block_with_transaction(conn, txid):
projection={'_id': False, 'height': True}))
@register_query(LocalMongoDBConnection)
def delete_zombie_transactions(conn):
txns = conn.run(conn.collection('transactions').find({}))
for txn in txns:
txn_id = txn['id']
block = list(get_block_with_transaction(conn, txn_id))
if len(block) == 0:
delete_transaction(conn, txn_id)
def delete_transaction(conn, txn_id):
conn.run(
conn.collection('transactions').delete_one({'id': txn_id}))
conn.run(
conn.collection('assets').delete_one({'id': txn_id}))
conn.run(
conn.collection('metadata').delete_one({'id': txn_id}))
@register_query(LocalMongoDBConnection)
def delete_latest_block(conn):
block = get_latest_block(conn)
txn_ids = block['transactions']
delete_transactions(conn, txn_ids)
conn.run(conn.collection('blocks').delete_one({'height': block['height']}))
@register_query(LocalMongoDBConnection)
def delete_transactions(conn, txn_ids):
conn.run(conn.collection('assets').delete_many({'id': {'$in': txn_ids}}))
@ -271,7 +244,7 @@ def store_pre_commit_state(conn, state):
commit_id = state['commit_id']
return conn.run(
conn.collection('pre_commit')
.update({'id': commit_id}, state, upsert=True)
.update({'commit_id': commit_id}, state, upsert=True)
)

View File

@ -5,6 +5,7 @@ from functools import singledispatch
from bigchaindb.backend.exceptions import OperationError
VALIDATOR_UPDATE_ID = 'a_unique_id_string'
PRE_COMMIT_ID = 'a_unique_id_string'
@singledispatch
@ -551,13 +552,6 @@ def store_block(conn, block):
raise NotImplementedError
@singledispatch
def delete_zombie_transactions(conn):
"""Delete transactions not included in any block"""
raise NotImplementedError
@singledispatch
def store_unspent_outputs(connection, unspent_outputs):
"""Store unspent outputs in ``utxo_set`` table."""
@ -565,13 +559,6 @@ def store_unspent_outputs(connection, unspent_outputs):
raise NotImplementedError
@singledispatch
def delete_latest_block(conn):
"""Delete the latest block along with its transactions"""
raise NotImplementedError
@singledispatch
def delete_unspent_outputs(connection, unspent_outputs):
"""Delete unspent outputs in ``utxo_set`` table."""

View File

@ -19,7 +19,8 @@ from bigchaindb.backend import query
from bigchaindb.commands import utils
from bigchaindb.commands.utils import (
configure_bigchaindb, start_logging_process, input_on_stderr)
from bigchaindb.backend.query import VALIDATOR_UPDATE_ID
from bigchaindb.backend.query import VALIDATOR_UPDATE_ID, PRE_COMMIT_ID
from bigchaindb.tendermint.lib import BigchainDB
logging.basicConfig(level=logging.INFO)
@ -131,19 +132,6 @@ def run_init(args):
print('If you wish to re-initialize it, first drop it.', file=sys.stderr)
def run_recover(b):
query.delete_zombie_transactions(b.connection)
tendermint_height = b.get_latest_block_height_from_tendermint()
block = b.get_latest_block()
if block:
while block['height'] > tendermint_height:
logger.info('BigchainDB is ahead of tendermint, removing block %s', block['height'])
query.delete_latest_block(b.connection)
block = b.get_latest_block()
@configure_bigchaindb
def run_drop(args):
"""Drop the database"""
@ -162,13 +150,26 @@ def run_drop(args):
print("Cannot drop '{name}'. The database does not exist.".format(name=dbname), file=sys.stderr)
def run_recover(b):
pre_commit = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID)
# Initially the pre-commit collection would be empty
if pre_commit:
latest_block = query.get_latest_block(b.connection)
# NOTE: the pre-commit state can only be ahead of the commited state
# by 1 block
if latest_block and (latest_block['height'] < pre_commit['height']):
query.delete_transactions(b.connection, pre_commit['transactions'])
@configure_bigchaindb
@start_logging_process
def run_start(args):
"""Start the processes to run the node"""
logger.info('BigchainDB Version %s', bigchaindb.__version__)
# run_recover(BigchainDB())
run_recover(BigchainDB())
try:
if not args.skip_initialize_database:

View File

@ -7,7 +7,8 @@ from abci.types_pb2 import ResponseEndBlock, ResponseInfo, Validator
from bigchaindb.tendermint import BigchainDB
from bigchaindb.tendermint.utils import decode_transaction, calculate_hash
from bigchaindb.tendermint.lib import Block
from bigchaindb.tendermint.lib import Block, PreCommitState
from bigchaindb.backend.query import PRE_COMMIT_ID
logger = logging.getLogger(__name__)
@ -112,6 +113,13 @@ class App(BaseApplication):
# set sync status to true
self.bigchaindb.delete_validator_update()
# Store pre-commit state to recover in case there is a crash
# during `commit`
pre_commit_state = PreCommitState(commit_id=PRE_COMMIT_ID,
height=self.new_height,
transactions=self.block_txn_ids)
self.bigchaindb.store_pre_commit_state(pre_commit_state._asdict())
# NOTE: interface for `ResponseEndBlock` has be changed in the latest
# version of py-abci i.e. the validator updates should be return
# as follows:
@ -121,15 +129,18 @@ class App(BaseApplication):
def commit(self):
"""Store the new height and along with block hash."""
data = self.block_txn_hash.encode('utf-8')
# register a new block only when new transactions are received
if self.block_txn_ids:
self.bigchaindb.store_bulk_transactions(self.block_transactions)
block = Block(app_hash=self.block_txn_hash,
height=self.new_height,
transactions=self.block_txn_ids)
# NOTE: storing the block should be the last operation during commit
# this effects crash recovery. Refer BEP#8 for details
self.bigchaindb.store_block(block._asdict())
data = self.block_txn_hash.encode('utf-8')
return Result.ok(data=data)

View File

@ -378,6 +378,9 @@ class BigchainDB(Bigchain):
def delete_validator_update(self):
return backend.query.delete_validator_update(self.connection)
def store_pre_commit_state(self, state):
return backend.query.store_pre_commit_state(self.connection, state)
Block = namedtuple('Block', ('app_hash', 'height', 'transactions'))

View File

@ -182,48 +182,6 @@ def test_get_block():
assert block['height'] == 3
def test_delete_zombie_transactions(signed_create_tx, signed_transfer_tx):
from bigchaindb.backend import connect, query
from bigchaindb.tendermint.lib import Block
conn = connect()
conn.db.transactions.insert_one(signed_create_tx.to_dict())
query.store_asset(conn, {'id': signed_create_tx.id})
block = Block(app_hash='random_utxo',
height=3,
transactions=[signed_create_tx.id])
query.store_block(conn, block._asdict())
conn.db.transactions.insert_one(signed_transfer_tx.to_dict())
query.store_metadatas(conn, [{'id': signed_transfer_tx.id}])
query.delete_zombie_transactions(conn)
assert query.get_transaction(conn, signed_transfer_tx.id) is None
assert query.get_asset(conn, signed_transfer_tx.id) is None
assert list(query.get_metadata(conn, [signed_transfer_tx.id])) == []
assert query.get_transaction(conn, signed_create_tx.id) is not None
assert query.get_asset(conn, signed_create_tx.id) is not None
def test_delete_latest_block(signed_create_tx, signed_transfer_tx):
from bigchaindb.backend import connect, query
from bigchaindb.tendermint.lib import Block
conn = connect()
conn.db.transactions.insert_one(signed_create_tx.to_dict())
query.store_asset(conn, {'id': signed_create_tx.id})
block = Block(app_hash='random_utxo',
height=51,
transactions=[signed_create_tx.id])
query.store_block(conn, block._asdict())
query.delete_latest_block(conn)
assert query.get_transaction(conn, signed_create_tx.id) is None
assert query.get_asset(conn, signed_create_tx.id) is None
assert query.get_block(conn, 51) is None
def test_delete_zero_unspent_outputs(db_context, utxoset):
from bigchaindb.backend import query
unspent_outputs, utxo_collection = utxoset

View File

@ -271,75 +271,6 @@ def test_calling_main(start_mock, base_parser_mock, parse_args_mock,
assert start_mock.called is True
@pytest.mark.tendermint
@pytest.mark.bdb
def test_recover_db_from_zombie_txn(b, monkeypatch):
from bigchaindb.commands.bigchaindb import run_recover
from bigchaindb.models import Transaction
from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.tendermint.lib import Block
from bigchaindb import backend
alice = generate_key_pair()
tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)],
asset={'cycle': 'hero'},
metadata={'name': 'hohenheim'}) \
.sign([alice.private_key])
b.store_bulk_transactions([tx])
block = Block(app_hash='random_app_hash', height=10,
transactions=[])._asdict()
b.store_block(block)
def mock_get(uri):
return MockResponse(10)
monkeypatch.setattr('requests.get', mock_get)
run_recover(b)
assert list(backend.query.get_metadata(b.connection, [tx.id])) == []
assert not backend.query.get_asset(b.connection, tx.id)
assert not b.get_transaction(tx.id)
@pytest.mark.tendermint
@pytest.mark.bdb
def test_recover_db_from_zombie_block(b, monkeypatch):
from bigchaindb.commands.bigchaindb import run_recover
from bigchaindb.models import Transaction
from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.tendermint.lib import Block
from bigchaindb import backend
alice = generate_key_pair()
tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)],
asset={'cycle': 'hero'},
metadata={'name': 'hohenheim'}) \
.sign([alice.private_key])
b.store_bulk_transactions([tx])
block9 = Block(app_hash='random_app_hash', height=9,
transactions=[])._asdict()
b.store_block(block9)
block10 = Block(app_hash='random_app_hash', height=10,
transactions=[tx.id])._asdict()
b.store_block(block10)
def mock_get(uri):
return MockResponse(9)
monkeypatch.setattr('requests.get', mock_get)
run_recover(b)
assert list(backend.query.get_metadata(b.connection, [tx.id])) == []
assert not backend.query.get_asset(b.connection, tx.id)
assert not b.get_transaction(tx.id)
block = b.get_latest_block()
assert block['height'] == 9
@patch('bigchaindb.config_utils.autoconfigure')
@patch('bigchaindb.commands.bigchaindb.run_recover')
@patch('bigchaindb.tendermint.commands.start')
@ -356,6 +287,49 @@ def test_recover_db_on_start(mock_autoconfigure,
assert mock_start.called
@pytest.mark.tendermint
@pytest.mark.bdb
def test_run_recover(b, alice, bob):
from bigchaindb.commands.bigchaindb import run_recover
from bigchaindb.models import Transaction
from bigchaindb.tendermint.lib import Block, PreCommitState
from bigchaindb.backend.query import PRE_COMMIT_ID
from bigchaindb.backend import query
tx1 = Transaction.create([alice.public_key],
[([alice.public_key], 1)],
asset={'cycle': 'hero'},
metadata={'name': 'hohenheim'}) \
.sign([alice.private_key])
tx2 = Transaction.create([bob.public_key],
[([bob.public_key], 1)],
asset={'cycle': 'hero'},
metadata={'name': 'hohenheim'}) \
.sign([bob.private_key])
# store the transactions
b.store_bulk_transactions([tx1, tx2])
# create a random block
block8 = Block(app_hash='random_app_hash1', height=8,
transactions=['txid_doesnt_matter'])._asdict()
b.store_block(block8)
# create the next block
block9 = Block(app_hash='random_app_hash1', height=9,
transactions=[tx1.id])._asdict()
b.store_block(block9)
# create a pre_commit state which is ahead of the commit state
pre_commit_state = PreCommitState(commit_id=PRE_COMMIT_ID, height=10,
transactions=[tx2.id])._asdict()
b.store_pre_commit_state(pre_commit_state)
run_recover(b)
assert not query.get_transaction(b.connection, tx2.id)
# Helper
class MockResponse():

View File

@ -166,3 +166,35 @@ def test_end_block_return_validator_updates(b):
updates = b.get_validator_update()
assert updates == []
def test_store_pre_commit_state_in_end_block(b, alice):
from bigchaindb.tendermint import App
from bigchaindb.backend import query
from bigchaindb.models import Transaction
from bigchaindb.backend.query import PRE_COMMIT_ID
tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)],
asset={'msg': 'live long and prosper'})\
.sign([alice.private_key])
app = App(b)
app.init_chain(['ignore'])
app.begin_block('ignore')
app.deliver_tx(encode_tx_to_bytes(tx))
app.end_block(99)
resp = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID)
assert resp['commit_id'] == PRE_COMMIT_ID
assert resp['height'] == 99
assert resp['transactions'] == [tx.id]
app.begin_block('ignore')
app.deliver_tx(encode_tx_to_bytes(tx))
app.end_block(100)
resp = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID)
assert resp['commit_id'] == PRE_COMMIT_ID
assert resp['height'] == 100
assert resp['transactions'] == [tx.id]