1
0
mirror of https://github.com/bigchaindb/bigchaindb.git synced 2024-06-23 17:56:41 +02:00

fix block order bug

This commit is contained in:
Scott Sadler 2017-04-12 17:29:17 +02:00
parent c64a35c362
commit 3bb5973f60
10 changed files with 116 additions and 220 deletions

View File

@ -20,50 +20,21 @@ class MongoDBChangeFeed(ChangeFeed):
We emulate the behaviour of the RethinkDB changefeed by using a tailable
cursor that listens for events on the oplog.
"""
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
while True:
try:
# XXX: hack to force reconnection. Why? Because the cursor
# in `run_changefeed` does not run in the context of a
# Connection object, so if the connection is lost we need
# to manually reset the connection to None.
# See #1154
self.connection.connection = None
self.run_changefeed()
break
except (BackendError, pymongo.errors.ConnectionFailure):
logger.exception('Error connecting to the database, retrying')
time.sleep(1)
def run_changefeed(self):
dbname = self.connection.dbname
table = self.table
namespace = '{}.{}'.format(dbname, table)
dbname = self.connection.dbname
# last timestamp in the oplog. We only care for operations happening
# in the future.
last_ts = self.connection.run(
self.connection.query().local.oplog.rs.find()
.sort('$natural', pymongo.DESCENDING).limit(1)
.next()['ts'])
# tailable cursor. A tailable cursor will remain open even after the
# last result was returned. ``TAILABLE_AWAIT`` will block for some
# timeout after the last result was returned. If no result is received
# in the meantime it will raise a StopIteration excetiption.
cursor = self.connection.run(
self.connection.query().local.oplog.rs.find(
{'ns': namespace, 'ts': {'$gt': last_ts}},
cursor_type=pymongo.CursorType.TAILABLE_AWAIT
))
while cursor.alive:
try:
record = cursor.next()
except StopIteration:
continue
for record in run_changefeed(self.connection, table, last_ts):
is_insert = record['op'] == 'i'
is_delete = record['op'] == 'd'
@ -94,7 +65,8 @@ class MongoDBChangeFeed(ChangeFeed):
@register_changefeed(MongoDBConnection)
def get_changefeed(connection, table, operation, *, prefeed=None):
def get_changefeed(connection, table, operation, *, prefeed=None,
get_resumption_pointer=None):
"""Return a MongoDB changefeed.
Returns:
@ -103,4 +75,38 @@ def get_changefeed(connection, table, operation, *, prefeed=None):
"""
return MongoDBChangeFeed(table, operation, prefeed=prefeed,
connection=connection)
connection=connection,
get_resumption_pointer=get_resumption_pointer)
_FEED_STOP = False
"""If it's True then the changefeed will return when there are no more items.
"""
def run_changefeed(conn, table, last_ts):
"""Encapsulate operational logic of tailing changefeed from MongoDB
"""
while True:
try:
# XXX: hack to force reconnection, in case the connection
# is lost while waiting on the cursor. See #1154.
conn.connection = 1
namespace = conn.dbname + '.' + table
cursor = conn.conn.local.oplog.rs.find(
{'ns': namespace, 'ts': {'$gt': last_ts}},
{'o._id': False},
cursor_type=pymongo.CursorType.TAILABLE_AWAIT
)
logging.debug('Tailing oplog at %s/%s', namespace, last_ts)
while cursor.alive:
try:
record = cursor.next()
yield record
last_ts = record['ts']
except StopIteration:
if _FEED_STOP:
return
except (BackendError, pymongo.errors.ConnectionFailure):
logger.exception('Lost connection while tailing oplog, retrying')
time.sleep(1)

View File

@ -5,6 +5,7 @@ from time import time
from pymongo import ReturnDocument
from bigchaindb import backend
from bigchaindb.backend.mongodb.changefeed import run_changefeed
from bigchaindb.common.exceptions import CyclicBlockchainError
from bigchaindb.common.transaction import Transaction
from bigchaindb.backend.exceptions import DuplicateKeyError
@ -283,21 +284,10 @@ def get_last_voted_block(conn, node_pubkey):
@register_query(MongoDBConnection)
def get_unvoted_blocks(conn, node_pubkey):
return conn.run(
conn.collection('bigchain')
.aggregate([
{'$lookup': {
'from': 'votes',
'localField': 'id',
'foreignField': 'vote.voting_for_block',
'as': 'votes'
}},
{'$match': {
'votes.node_pubkey': {'$ne': node_pubkey},
'block.transactions.operation': {'$ne': 'GENESIS'}
}},
{'$project': {
'votes': False, '_id': False
}}
]))
def get_new_blocks_feed(conn, start_block_id):
namespace = conn.dbname + '.bigchain'
query = {'o.id': start_block_id, 'op': 'i', 'ns': namespace}
# Neccesary to find in descending order since tests may write same block id several times
last_ts = conn.conn.local.oplog.rs.find(query).sort('$natural', -1).next()['ts']
feed = run_changefeed(conn, 'bigchain', last_ts)
return (evt['o'] for evt in feed if evt['op'] == 'i')

View File

@ -273,20 +273,6 @@ def get_last_voted_block(connection, node_pubkey):
raise NotImplementedError
@singledispatch
def get_unvoted_blocks(connection, node_pubkey):
"""Return all the blocks that have not been voted by the specified node.
Args:
node_pubkey (str): base58 encoded public key
Returns:
:obj:`list` of :obj:`dict`: a list of unvoted blocks
"""
raise NotImplementedError
@singledispatch
def get_txids_filtered(connection, asset_id, operation=None):
"""
@ -298,3 +284,8 @@ def get_txids_filtered(connection, asset_id, operation=None):
"""
raise NotImplementedError
@singledispatch
def get_new_blocks_feed(connection, start_block_id):
raise NotImplementedError

View File

@ -238,18 +238,3 @@ def get_last_voted_block(connection, node_pubkey):
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.get(last_block_id))
@register_query(RethinkDBConnection)
def get_unvoted_blocks(connection, node_pubkey):
unvoted = connection.run(
r.table('bigchain', read_mode=READ_MODE)
.filter(lambda block: r.table('votes', read_mode=READ_MODE)
.get_all([block['id'], node_pubkey], index='block_and_voter')
.is_empty())
.order_by(r.asc(r.row['block']['timestamp'])))
# FIXME: I (@vrde) don't like this solution. Filtering should be done at a
# database level. Solving issue #444 can help untangling the situation
unvoted_blocks = filter(lambda block: not utils.is_genesis_block(block), unvoted)
return unvoted_blocks

View File

@ -594,16 +594,6 @@ class Bigchain(object):
return Block.from_dict(backend.query.get_last_voted_block(self.connection, self.me))
def get_unvoted_blocks(self):
"""Return all the blocks that have not been voted on by this node.
Returns:
:obj:`list` of :obj:`dict`: a list of unvoted blocks
"""
# XXX: should this return instaces of Block?
return backend.query.get_unvoted_blocks(self.connection, self.me)
def block_election(self, block):
if type(block) != dict:
block = block.to_dict()

View File

@ -10,10 +10,7 @@ from collections import Counter
from multipipes import Pipeline, Node
import bigchaindb
from bigchaindb import Bigchain
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb import Bigchain, backend
from bigchaindb.models import Transaction, Block
from bigchaindb.common import exceptions
@ -151,20 +148,13 @@ class Vote:
return vote
def initial():
"""Return unvoted blocks."""
b = Bigchain()
rs = b.get_unvoted_blocks()
return rs
def create_pipeline():
"""Create and return the pipeline of operations to be distributed
on different processes."""
voter = Vote()
vote_pipeline = Pipeline([
return Pipeline([
Node(voter.validate_block),
Node(voter.ungroup),
Node(voter.validate_tx, fraction_of_cores=1),
@ -172,13 +162,14 @@ def create_pipeline():
Node(voter.write_vote)
])
return vote_pipeline
def get_changefeed():
connection = backend.connect(**bigchaindb.config['database'])
return backend.get_changefeed(connection, 'bigchain', ChangeFeed.INSERT,
prefeed=initial())
"""Create and return ordered changefeed of blocks starting from
last voted block"""
b = Bigchain()
last_voted = b.get_last_voted_block().id
feed = backend.query.get_new_blocks_feed(b.connection, last_voted)
return Node(feed.__next__)
def start():

View File

@ -10,34 +10,32 @@ def mock_changefeed_data():
return [
{
'op': 'i',
'o': {'_id': '', 'msg': 'seems like we have an insert here'}
'o': {'_id': '', 'msg': 'seems like we have an insert here'},
'ts': 1,
},
{
'op': 'd',
'o': {'msg': 'seems like we have a delete here'}
'o': {'msg': 'seems like we have a delete here'},
'ts': 2,
},
{
'op': 'u',
'o': {'msg': 'seems like we have an update here'},
'o2': {'_id': 'some-id'}
'o2': {'_id': 'some-id'},
'ts': 3,
},
]
@pytest.mark.bdb
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_insert(mock_cursor_next, mock_cursor_alive,
mock_changefeed_data):
def test_changefeed_insert(mock_cursor_next, mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
# setup connection and mocks
conn = connect()
# changefeed.run_forever only returns when the cursor is closed
# so we mock `alive` to be False it finishes reading the mocked data
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
# mock the `next` method of the cursor to return the mocked data
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
@ -51,16 +49,13 @@ def test_changefeed_insert(mock_cursor_next, mock_cursor_alive,
@pytest.mark.bdb
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_delete(mock_cursor_next, mock_cursor_alive,
mock_changefeed_data):
def test_changefeed_delete(mock_cursor_next, mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
outpipe = Pipe()
@ -73,17 +68,15 @@ def test_changefeed_delete(mock_cursor_next, mock_cursor_alive,
@pytest.mark.bdb
@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True)
@mock.patch('pymongo.collection.Collection.find_one')
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_update(mock_cursor_next, mock_cursor_alive,
mock_cursor_find_one, mock_changefeed_data):
def test_changefeed_update(mock_cursor_next, mock_cursor_find_one,
mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
mock_cursor_find_one.return_value = mock_changefeed_data[2]['o']
@ -101,18 +94,15 @@ def test_changefeed_update(mock_cursor_next, mock_cursor_alive,
@pytest.mark.bdb
@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True)
@mock.patch('pymongo.collection.Collection.find_one')
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_multiple_operations(mock_cursor_next, mock_cursor_alive,
mock_cursor_find_one,
def test_changefeed_multiple_operations(mock_cursor_next, mock_cursor_find_one,
mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
mock_cursor_find_one.return_value = mock_changefeed_data[2]['o']
@ -128,16 +118,13 @@ def test_changefeed_multiple_operations(mock_cursor_next, mock_cursor_alive,
@pytest.mark.bdb
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive,
mock_changefeed_data):
def test_changefeed_prefeed(mock_cursor_next, mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
outpipe = Pipe()
@ -150,19 +137,14 @@ def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive,
@pytest.mark.bdb
@mock.patch('bigchaindb.backend.mongodb.changefeed.MongoDBChangeFeed.run_changefeed') # noqa
def test_connection_failure(mock_run_changefeed):
from bigchaindb.backend import get_changefeed, connect
def test_connection_failure():
from bigchaindb.backend.exceptions import ConnectionError
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.mongodb.changefeed import run_changefeed
conn = connect()
mock_run_changefeed.side_effect = [ConnectionError(),
mock.DEFAULT]
changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT)
changefeed.run_forever()
# run_changefeed raises an exception the first time its called and then
# it's called again
assert mock_run_changefeed.call_count == 2
conn = mock.MagicMock()
find = conn.conn.local.oplog.rs.find
find.side_effect = [ConnectionError(), RuntimeError()]
changefeed = run_changefeed(conn, 'backlog', -1)
with pytest.raises(RuntimeError):
for record in changefeed:
assert False, 'Shouldn\'t get here'

View File

@ -1,4 +1,5 @@
import pytest
from unittest import mock
pytestmark = pytest.mark.bdb
@ -377,21 +378,6 @@ def test_get_last_voted_block(genesis_block, signed_create_tx, b):
query.get_last_voted_block(conn, b.me)
def test_get_unvoted_blocks(signed_create_tx):
from bigchaindb.backend import connect, query
from bigchaindb.models import Block
conn = connect()
# create and insert a block
block = Block(transactions=[signed_create_tx], node_pubkey='aaa')
conn.db.bigchain.insert_one(block.to_dict())
unvoted_blocks = list(query.get_unvoted_blocks(conn, 'aaa'))
assert len(unvoted_blocks) == 1
assert unvoted_blocks[0] == block.to_dict()
def test_get_txids_filtered(signed_create_tx, signed_transfer_tx):
from bigchaindb.backend import connect, query
from bigchaindb.models import Block, Transaction
@ -417,3 +403,28 @@ def test_get_txids_filtered(signed_create_tx, signed_transfer_tx):
# Test get by asset and TRANSFER
txids = set(query.get_txids_filtered(conn, asset_id, Transaction.TRANSFER))
assert txids == {signed_transfer_tx.id}
@mock.patch('bigchaindb.backend.mongodb.changefeed._FEED_STOP', True)
def test_get_new_blocks_feed(b, create_tx):
from bigchaindb.backend import query
from bigchaindb.models import Block
import random
def create_block():
ts = str(random.random())
block = Block(transactions=[create_tx], timestamp=ts)
b.write_block(block)
return block.to_dict()
create_block()
b1 = create_block()
b2 = create_block()
feed = query.get_new_blocks_feed(b.connection, b1['id'])
assert feed.__next__() == b2
b3 = create_block()
assert list(feed) == [b3]

View File

@ -31,11 +31,11 @@ def test_schema(schema_func_name, args_qty):
('get_block', 1),
('write_vote', 1),
('get_last_voted_block', 1),
('get_unvoted_blocks', 1),
('get_spent', 2),
('get_votes_by_block_id_and_voter', 2),
('update_transaction', 2),
('get_transaction_from_block', 2),
('get_new_blocks_feed', 1),
))
def test_query(query_func_name, args_qty):
from bigchaindb.backend import query

View File

@ -509,54 +509,6 @@ def test_invalid_block_voting(monkeypatch, b, user_pk, genesis_block):
vote_doc['signature']) is True
@pytest.mark.genesis
def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
from bigchaindb.backend import query
from bigchaindb.pipelines import vote
outpipe = Pipe()
monkeypatch.setattr('time.time', lambda: 1000000000)
block_ids = []
# insert blocks in the database while the voter process is not listening
# (these blocks won't appear in the changefeed)
monkeypatch.setattr('time.time', lambda: 1000000020)
block_1 = dummy_block(b)
block_ids.append(block_1.id)
monkeypatch.setattr('time.time', lambda: 1000000030)
b.write_block(block_1)
block_2 = dummy_block(b)
block_ids.append(block_2.id)
b.write_block(block_2)
vote_pipeline = vote.create_pipeline()
vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe)
vote_pipeline.start()
# We expects two votes, so instead of waiting an arbitrary amount
# of time, we can do two blocking calls to `get`
outpipe.get()
outpipe.get()
# create a new block that will appear in the changefeed
monkeypatch.setattr('time.time', lambda: 1000000040)
block_3 = dummy_block(b)
block_ids.append(block_3.id)
b.write_block(block_3)
# Same as before with the two `get`s
outpipe.get()
vote_pipeline.terminate()
# retrieve vote
votes = [list(query.get_votes_by_block_id(b.connection, _id))[0]
for _id in block_ids]
assert all(vote['node_pubkey'] == b.me for vote in votes)
@pytest.mark.genesis
def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
from bigchaindb.backend import query
@ -564,16 +516,14 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
outpipe = Pipe()
monkeypatch.setattr('time.time', lambda: 1000000000)
block_ids = []
monkeypatch.setattr('time.time', lambda: 1000000020)
block_1 = dummy_block(b)
block_1.timestamp = str(random.random())
block_ids.append(block_1.id)
b.write_block(block_1)
monkeypatch.setattr('time.time', lambda: 1000000030)
block_2 = dummy_block(b)
block_2.timestamp = str(random.random())
block_ids.append(block_2.id)
b.write_block(block_2)