From 3bb5973f60d232714c99fbc92ab2988f23113f7f Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Wed, 12 Apr 2017 17:29:17 +0200 Subject: [PATCH] fix block order bug --- bigchaindb/backend/mongodb/changefeed.py | 74 +++++++++++++----------- bigchaindb/backend/mongodb/query.py | 26 +++------ bigchaindb/backend/query.py | 19 ++---- bigchaindb/backend/rethinkdb/query.py | 15 ----- bigchaindb/core.py | 10 ---- bigchaindb/pipelines/vote.py | 25 +++----- tests/backend/mongodb/test_changefeed.py | 70 +++++++++------------- tests/backend/mongodb/test_queries.py | 41 ++++++++----- tests/backend/test_generics.py | 2 +- tests/pipelines/test_vote.py | 54 +---------------- 10 files changed, 116 insertions(+), 220 deletions(-) diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 3abcbeda..59d4e721 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -20,50 +20,21 @@ class MongoDBChangeFeed(ChangeFeed): We emulate the behaviour of the RethinkDB changefeed by using a tailable cursor that listens for events on the oplog. """ - def run_forever(self): for element in self.prefeed: self.outqueue.put(element) - while True: - try: - # XXX: hack to force reconnection. Why? Because the cursor - # in `run_changefeed` does not run in the context of a - # Connection object, so if the connection is lost we need - # to manually reset the connection to None. - # See #1154 - self.connection.connection = None - self.run_changefeed() - break - except (BackendError, pymongo.errors.ConnectionFailure): - logger.exception('Error connecting to the database, retrying') - time.sleep(1) - - def run_changefeed(self): - dbname = self.connection.dbname table = self.table - namespace = '{}.{}'.format(dbname, table) + dbname = self.connection.dbname + # last timestamp in the oplog. We only care for operations happening # in the future. last_ts = self.connection.run( self.connection.query().local.oplog.rs.find() .sort('$natural', pymongo.DESCENDING).limit(1) .next()['ts']) - # tailable cursor. A tailable cursor will remain open even after the - # last result was returned. ``TAILABLE_AWAIT`` will block for some - # timeout after the last result was returned. If no result is received - # in the meantime it will raise a StopIteration excetiption. - cursor = self.connection.run( - self.connection.query().local.oplog.rs.find( - {'ns': namespace, 'ts': {'$gt': last_ts}}, - cursor_type=pymongo.CursorType.TAILABLE_AWAIT - )) - while cursor.alive: - try: - record = cursor.next() - except StopIteration: - continue + for record in run_changefeed(self.connection, table, last_ts): is_insert = record['op'] == 'i' is_delete = record['op'] == 'd' @@ -94,7 +65,8 @@ class MongoDBChangeFeed(ChangeFeed): @register_changefeed(MongoDBConnection) -def get_changefeed(connection, table, operation, *, prefeed=None): +def get_changefeed(connection, table, operation, *, prefeed=None, + get_resumption_pointer=None): """Return a MongoDB changefeed. Returns: @@ -103,4 +75,38 @@ def get_changefeed(connection, table, operation, *, prefeed=None): """ return MongoDBChangeFeed(table, operation, prefeed=prefeed, - connection=connection) + connection=connection, + get_resumption_pointer=get_resumption_pointer) + + +_FEED_STOP = False +"""If it's True then the changefeed will return when there are no more items. +""" + + +def run_changefeed(conn, table, last_ts): + """Encapsulate operational logic of tailing changefeed from MongoDB + """ + while True: + try: + # XXX: hack to force reconnection, in case the connection + # is lost while waiting on the cursor. See #1154. + conn.connection = 1 + namespace = conn.dbname + '.' + table + cursor = conn.conn.local.oplog.rs.find( + {'ns': namespace, 'ts': {'$gt': last_ts}}, + {'o._id': False}, + cursor_type=pymongo.CursorType.TAILABLE_AWAIT + ) + logging.debug('Tailing oplog at %s/%s', namespace, last_ts) + while cursor.alive: + try: + record = cursor.next() + yield record + last_ts = record['ts'] + except StopIteration: + if _FEED_STOP: + return + except (BackendError, pymongo.errors.ConnectionFailure): + logger.exception('Lost connection while tailing oplog, retrying') + time.sleep(1) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 74b9c35a..a0d8ed12 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -5,6 +5,7 @@ from time import time from pymongo import ReturnDocument from bigchaindb import backend +from bigchaindb.backend.mongodb.changefeed import run_changefeed from bigchaindb.common.exceptions import CyclicBlockchainError from bigchaindb.common.transaction import Transaction from bigchaindb.backend.exceptions import DuplicateKeyError @@ -283,21 +284,10 @@ def get_last_voted_block(conn, node_pubkey): @register_query(MongoDBConnection) -def get_unvoted_blocks(conn, node_pubkey): - return conn.run( - conn.collection('bigchain') - .aggregate([ - {'$lookup': { - 'from': 'votes', - 'localField': 'id', - 'foreignField': 'vote.voting_for_block', - 'as': 'votes' - }}, - {'$match': { - 'votes.node_pubkey': {'$ne': node_pubkey}, - 'block.transactions.operation': {'$ne': 'GENESIS'} - }}, - {'$project': { - 'votes': False, '_id': False - }} - ])) +def get_new_blocks_feed(conn, start_block_id): + namespace = conn.dbname + '.bigchain' + query = {'o.id': start_block_id, 'op': 'i', 'ns': namespace} + # Neccesary to find in descending order since tests may write same block id several times + last_ts = conn.conn.local.oplog.rs.find(query).sort('$natural', -1).next()['ts'] + feed = run_changefeed(conn, 'bigchain', last_ts) + return (evt['o'] for evt in feed if evt['op'] == 'i') diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 9aa653d7..8050c415 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -273,20 +273,6 @@ def get_last_voted_block(connection, node_pubkey): raise NotImplementedError -@singledispatch -def get_unvoted_blocks(connection, node_pubkey): - """Return all the blocks that have not been voted by the specified node. - - Args: - node_pubkey (str): base58 encoded public key - - Returns: - :obj:`list` of :obj:`dict`: a list of unvoted blocks - """ - - raise NotImplementedError - - @singledispatch def get_txids_filtered(connection, asset_id, operation=None): """ @@ -298,3 +284,8 @@ def get_txids_filtered(connection, asset_id, operation=None): """ raise NotImplementedError + + +@singledispatch +def get_new_blocks_feed(connection, start_block_id): + raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index 6011cc8c..deb02244 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -238,18 +238,3 @@ def get_last_voted_block(connection, node_pubkey): return connection.run( r.table('bigchain', read_mode=READ_MODE) .get(last_block_id)) - - -@register_query(RethinkDBConnection) -def get_unvoted_blocks(connection, node_pubkey): - unvoted = connection.run( - r.table('bigchain', read_mode=READ_MODE) - .filter(lambda block: r.table('votes', read_mode=READ_MODE) - .get_all([block['id'], node_pubkey], index='block_and_voter') - .is_empty()) - .order_by(r.asc(r.row['block']['timestamp']))) - - # FIXME: I (@vrde) don't like this solution. Filtering should be done at a - # database level. Solving issue #444 can help untangling the situation - unvoted_blocks = filter(lambda block: not utils.is_genesis_block(block), unvoted) - return unvoted_blocks diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 5d2e9c03..379f9c6f 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -594,16 +594,6 @@ class Bigchain(object): return Block.from_dict(backend.query.get_last_voted_block(self.connection, self.me)) - def get_unvoted_blocks(self): - """Return all the blocks that have not been voted on by this node. - - Returns: - :obj:`list` of :obj:`dict`: a list of unvoted blocks - """ - - # XXX: should this return instaces of Block? - return backend.query.get_unvoted_blocks(self.connection, self.me) - def block_election(self, block): if type(block) != dict: block = block.to_dict() diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index 9664c520..ee5c3ada 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -10,10 +10,7 @@ from collections import Counter from multipipes import Pipeline, Node -import bigchaindb -from bigchaindb import Bigchain -from bigchaindb import backend -from bigchaindb.backend.changefeed import ChangeFeed +from bigchaindb import Bigchain, backend from bigchaindb.models import Transaction, Block from bigchaindb.common import exceptions @@ -151,20 +148,13 @@ class Vote: return vote -def initial(): - """Return unvoted blocks.""" - b = Bigchain() - rs = b.get_unvoted_blocks() - return rs - - def create_pipeline(): """Create and return the pipeline of operations to be distributed on different processes.""" voter = Vote() - vote_pipeline = Pipeline([ + return Pipeline([ Node(voter.validate_block), Node(voter.ungroup), Node(voter.validate_tx, fraction_of_cores=1), @@ -172,13 +162,14 @@ def create_pipeline(): Node(voter.write_vote) ]) - return vote_pipeline - def get_changefeed(): - connection = backend.connect(**bigchaindb.config['database']) - return backend.get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, - prefeed=initial()) + """Create and return ordered changefeed of blocks starting from + last voted block""" + b = Bigchain() + last_voted = b.get_last_voted_block().id + feed = backend.query.get_new_blocks_feed(b.connection, last_voted) + return Node(feed.__next__) def start(): diff --git a/tests/backend/mongodb/test_changefeed.py b/tests/backend/mongodb/test_changefeed.py index 00dcaca5..cfe50e57 100644 --- a/tests/backend/mongodb/test_changefeed.py +++ b/tests/backend/mongodb/test_changefeed.py @@ -10,34 +10,32 @@ def mock_changefeed_data(): return [ { 'op': 'i', - 'o': {'_id': '', 'msg': 'seems like we have an insert here'} + 'o': {'_id': '', 'msg': 'seems like we have an insert here'}, + 'ts': 1, }, { 'op': 'd', - 'o': {'msg': 'seems like we have a delete here'} + 'o': {'msg': 'seems like we have a delete here'}, + 'ts': 2, }, { 'op': 'u', 'o': {'msg': 'seems like we have an update here'}, - 'o2': {'_id': 'some-id'} + 'o2': {'_id': 'some-id'}, + 'ts': 3, }, ] @pytest.mark.bdb -@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) +@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True) @mock.patch('pymongo.cursor.Cursor.next') -def test_changefeed_insert(mock_cursor_next, mock_cursor_alive, - mock_changefeed_data): +def test_changefeed_insert(mock_cursor_next, mock_changefeed_data): from bigchaindb.backend import get_changefeed, connect from bigchaindb.backend.changefeed import ChangeFeed # setup connection and mocks conn = connect() - # changefeed.run_forever only returns when the cursor is closed - # so we mock `alive` to be False it finishes reading the mocked data - mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, - mock.DEFAULT, mock.DEFAULT, False] # mock the `next` method of the cursor to return the mocked data mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data @@ -51,16 +49,13 @@ def test_changefeed_insert(mock_cursor_next, mock_cursor_alive, @pytest.mark.bdb -@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) +@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True) @mock.patch('pymongo.cursor.Cursor.next') -def test_changefeed_delete(mock_cursor_next, mock_cursor_alive, - mock_changefeed_data): +def test_changefeed_delete(mock_cursor_next, mock_changefeed_data): from bigchaindb.backend import get_changefeed, connect from bigchaindb.backend.changefeed import ChangeFeed conn = connect() - mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, - mock.DEFAULT, mock.DEFAULT, False] mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data outpipe = Pipe() @@ -73,17 +68,15 @@ def test_changefeed_delete(mock_cursor_next, mock_cursor_alive, @pytest.mark.bdb +@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True) @mock.patch('pymongo.collection.Collection.find_one') -@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) @mock.patch('pymongo.cursor.Cursor.next') -def test_changefeed_update(mock_cursor_next, mock_cursor_alive, - mock_cursor_find_one, mock_changefeed_data): +def test_changefeed_update(mock_cursor_next, mock_cursor_find_one, + mock_changefeed_data): from bigchaindb.backend import get_changefeed, connect from bigchaindb.backend.changefeed import ChangeFeed conn = connect() - mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, - mock.DEFAULT, mock.DEFAULT, False] mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data mock_cursor_find_one.return_value = mock_changefeed_data[2]['o'] @@ -101,18 +94,15 @@ def test_changefeed_update(mock_cursor_next, mock_cursor_alive, @pytest.mark.bdb +@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True) @mock.patch('pymongo.collection.Collection.find_one') -@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) @mock.patch('pymongo.cursor.Cursor.next') -def test_changefeed_multiple_operations(mock_cursor_next, mock_cursor_alive, - mock_cursor_find_one, +def test_changefeed_multiple_operations(mock_cursor_next, mock_cursor_find_one, mock_changefeed_data): from bigchaindb.backend import get_changefeed, connect from bigchaindb.backend.changefeed import ChangeFeed conn = connect() - mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, - mock.DEFAULT, mock.DEFAULT, False] mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data mock_cursor_find_one.return_value = mock_changefeed_data[2]['o'] @@ -128,16 +118,13 @@ def test_changefeed_multiple_operations(mock_cursor_next, mock_cursor_alive, @pytest.mark.bdb -@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) +@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True) @mock.patch('pymongo.cursor.Cursor.next') -def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive, - mock_changefeed_data): +def test_changefeed_prefeed(mock_cursor_next, mock_changefeed_data): from bigchaindb.backend import get_changefeed, connect from bigchaindb.backend.changefeed import ChangeFeed conn = connect() - mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, - mock.DEFAULT, mock.DEFAULT, False] mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data outpipe = Pipe() @@ -150,19 +137,14 @@ def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive, @pytest.mark.bdb -@mock.patch('bigchaindb.backend.mongodb.changefeed.MongoDBChangeFeed.run_changefeed') # noqa -def test_connection_failure(mock_run_changefeed): - from bigchaindb.backend import get_changefeed, connect +def test_connection_failure(): from bigchaindb.backend.exceptions import ConnectionError - from bigchaindb.backend.changefeed import ChangeFeed + from bigchaindb.backend.mongodb.changefeed import run_changefeed - conn = connect() - mock_run_changefeed.side_effect = [ConnectionError(), - mock.DEFAULT] - - changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT) - changefeed.run_forever() - - # run_changefeed raises an exception the first time its called and then - # it's called again - assert mock_run_changefeed.call_count == 2 + conn = mock.MagicMock() + find = conn.conn.local.oplog.rs.find + find.side_effect = [ConnectionError(), RuntimeError()] + changefeed = run_changefeed(conn, 'backlog', -1) + with pytest.raises(RuntimeError): + for record in changefeed: + assert False, 'Shouldn\'t get here' diff --git a/tests/backend/mongodb/test_queries.py b/tests/backend/mongodb/test_queries.py index bd7e75f1..f58ba464 100644 --- a/tests/backend/mongodb/test_queries.py +++ b/tests/backend/mongodb/test_queries.py @@ -1,4 +1,5 @@ import pytest +from unittest import mock pytestmark = pytest.mark.bdb @@ -377,21 +378,6 @@ def test_get_last_voted_block(genesis_block, signed_create_tx, b): query.get_last_voted_block(conn, b.me) -def test_get_unvoted_blocks(signed_create_tx): - from bigchaindb.backend import connect, query - from bigchaindb.models import Block - conn = connect() - - # create and insert a block - block = Block(transactions=[signed_create_tx], node_pubkey='aaa') - conn.db.bigchain.insert_one(block.to_dict()) - - unvoted_blocks = list(query.get_unvoted_blocks(conn, 'aaa')) - - assert len(unvoted_blocks) == 1 - assert unvoted_blocks[0] == block.to_dict() - - def test_get_txids_filtered(signed_create_tx, signed_transfer_tx): from bigchaindb.backend import connect, query from bigchaindb.models import Block, Transaction @@ -417,3 +403,28 @@ def test_get_txids_filtered(signed_create_tx, signed_transfer_tx): # Test get by asset and TRANSFER txids = set(query.get_txids_filtered(conn, asset_id, Transaction.TRANSFER)) assert txids == {signed_transfer_tx.id} + + +@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True) +def test_get_new_blocks_feed(b, create_tx): + from bigchaindb.backend import query + from bigchaindb.models import Block + import random + + def create_block(): + ts = str(random.random()) + block = Block(transactions=[create_tx], timestamp=ts) + b.write_block(block) + return block.to_dict() + + create_block() + b1 = create_block() + b2 = create_block() + + feed = query.get_new_blocks_feed(b.connection, b1['id']) + + assert feed.__next__() == b2 + + b3 = create_block() + + assert list(feed) == [b3] diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py index 57a644ee..45400c0e 100644 --- a/tests/backend/test_generics.py +++ b/tests/backend/test_generics.py @@ -31,11 +31,11 @@ def test_schema(schema_func_name, args_qty): ('get_block', 1), ('write_vote', 1), ('get_last_voted_block', 1), - ('get_unvoted_blocks', 1), ('get_spent', 2), ('get_votes_by_block_id_and_voter', 2), ('update_transaction', 2), ('get_transaction_from_block', 2), + ('get_new_blocks_feed', 1), )) def test_query(query_func_name, args_qty): from bigchaindb.backend import query diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 7df7ca11..12f84592 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -509,54 +509,6 @@ def test_invalid_block_voting(monkeypatch, b, user_pk, genesis_block): vote_doc['signature']) is True -@pytest.mark.genesis -def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): - from bigchaindb.backend import query - from bigchaindb.pipelines import vote - - outpipe = Pipe() - - monkeypatch.setattr('time.time', lambda: 1000000000) - - block_ids = [] - # insert blocks in the database while the voter process is not listening - # (these blocks won't appear in the changefeed) - monkeypatch.setattr('time.time', lambda: 1000000020) - block_1 = dummy_block(b) - block_ids.append(block_1.id) - monkeypatch.setattr('time.time', lambda: 1000000030) - b.write_block(block_1) - block_2 = dummy_block(b) - block_ids.append(block_2.id) - b.write_block(block_2) - - vote_pipeline = vote.create_pipeline() - vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) - vote_pipeline.start() - - # We expects two votes, so instead of waiting an arbitrary amount - # of time, we can do two blocking calls to `get` - outpipe.get() - outpipe.get() - - # create a new block that will appear in the changefeed - monkeypatch.setattr('time.time', lambda: 1000000040) - block_3 = dummy_block(b) - block_ids.append(block_3.id) - b.write_block(block_3) - - # Same as before with the two `get`s - outpipe.get() - - vote_pipeline.terminate() - - # retrieve vote - votes = [list(query.get_votes_by_block_id(b.connection, _id))[0] - for _id in block_ids] - - assert all(vote['node_pubkey'] == b.me for vote in votes) - - @pytest.mark.genesis def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): from bigchaindb.backend import query @@ -564,16 +516,14 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): outpipe = Pipe() - monkeypatch.setattr('time.time', lambda: 1000000000) - block_ids = [] - monkeypatch.setattr('time.time', lambda: 1000000020) block_1 = dummy_block(b) + block_1.timestamp = str(random.random()) block_ids.append(block_1.id) b.write_block(block_1) - monkeypatch.setattr('time.time', lambda: 1000000030) block_2 = dummy_block(b) + block_2.timestamp = str(random.random()) block_ids.append(block_2.id) b.write_block(block_2)