1
0
mirror of https://github.com/bigchaindb/bigchaindb.git synced 2024-06-23 17:56:41 +02:00

add stepping pipeline and fix issue #1191

This commit is contained in:
Scott Sadler 2017-02-15 14:58:16 +01:00
parent e71ee0dbc1
commit 955b018be8
5 changed files with 178 additions and 2 deletions

View File

@ -85,11 +85,13 @@ class MongoDBChangeFeed(ChangeFeed):
# document itself. So here we first read the document
# and then return it.
doc = self.connection.conn[dbname][table].find_one(
{'_id': record['o2']},
{'_id': record['o2']['_id']},
{'_id': False}
)
self.outqueue.put(doc)
logger.debug('Record in changefeed: %s:%s', table, record['op'])
@register_changefeed(MongoDBConnection)
def get_changefeed(connection, table, operation, *, prefeed=None):

View File

@ -55,7 +55,9 @@ class Bigchain(object):
self.me = public_key or bigchaindb.config['keypair']['public']
self.me_private = private_key or bigchaindb.config['keypair']['private']
self.nodes_except_me = keyring or bigchaindb.config['keyring']
self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay']
if backlog_reassign_delay is None:
backlog_reassign_delay = bigchaindb.config['backlog_reassign_delay']
self.backlog_reassign_delay = backlog_reassign_delay
self.consensus = BaseConsensusRules
self.connection = connection if connection else backend.connect(**bigchaindb.config['database'])
if not self.me or not self.me_private:

View File

@ -0,0 +1,10 @@
import pytest
from .stepping import create_stepper
@pytest.fixture
def steps():
stepper = create_stepper()
with stepper.start():
yield stepper

138
tests/pipelines/stepping.py Normal file
View File

@ -0,0 +1,138 @@
"""
Pipeline stepping is a way to advance the asynchronous data pipeline
deterministically by exposing each step separately and advancing the states
manually.
"""
import functools
import time
import types
import logging
from contextlib import contextmanager
from unittest.mock import patch
import bigchaindb.core
from bigchaindb.backend.changefeed import ChangeFeed
import bigchaindb.pipelines.block
import bigchaindb.pipelines.stale
class MultipipesStepper:
def __init__(self):
self.queues = {}
self.tasks = {}
self.input_tasks = set()
self.processes = []
def add_input(self, prefix, node, next):
name = '%s_%s' % (prefix, node.name)
next_name = '%s_%s' % (prefix, next.name)
if isinstance(node, ChangeFeed):
self.processes.append(node)
def f(*args, **kwargs):
_kwargs = {'timeout': 0.1}
_kwargs.update(kwargs)
return node.outqueue.get(*args, **kwargs)
else:
f = node.target
def inner(**kwargs):
r = f(**kwargs)
if r is not None:
self.enqueue(next_name, r)
self.tasks[name] = functools.wraps(f)(inner)
self.input_tasks.add(name)
def add_stage(self, prefix, node, next):
"""
Convert pipeline stage into task.
"""
f = node.target
name = '%s_%s' % (prefix, node.name)
if next:
next_name = '%s_%s' % (prefix, next.name)
def inner(*args):
out = f(*args)
if out is not None and next:
self.enqueue(next_name, out)
task = functools.wraps(f)(inner)
self.tasks[name] = task
def enqueue(self, name, item):
queue = self.queues.setdefault(name, [])
if isinstance(item, types.GeneratorType):
queue.extend(list(item))
else:
queue.append(item)
def step(self, name, **kwargs):
logging.debug('Stepping %s', name)
task = self.tasks[name]
if name in self.input_tasks:
task(**kwargs)
else:
queue = self.queues.get(name, [])
if not queue:
raise Empty(name)
task(queue.pop(0), **kwargs)
logging.debug('Stepped %s', name)
def get_counts(self):
counts = {}
for name in self.queues:
n = len(self.queues[name])
if n:
counts[name] = n
return counts
def __getattr__(self, name):
return lambda **kwargs: self.step(name, **kwargs)
@contextmanager
def start(self):
for p in self.processes:
p.start()
# It would be nice to have a better way to wait for changefeeds here.
# We have to wait some amount of time because the feed setup is
# happening in a different process and won't include any writes we
# perform before it is ready.
time.sleep(0.2)
try:
yield
finally:
for p in self.processes:
p.terminate()
class Empty(Exception):
pass
def update_stepper(stepper, prefix, pipeline):
nodes = pipeline.nodes
for i in range(len(nodes)):
n0 = nodes[i]
n1 = (nodes + [None])[i+1]
f = stepper.add_input if i == 0 else stepper.add_stage
f(prefix, n0, n1)
def create_stepper():
stepper = MultipipesStepper()
with patch('bigchaindb.pipelines.block.Pipeline.start'):
pipeline = bigchaindb.pipelines.block.start()
update_stepper(stepper, 'block', pipeline)
with patch('bigchaindb.pipelines.stale.Pipeline.start'):
pipeline = bigchaindb.pipelines.stale.start(
timeout=0, backlog_reassign_delay=0)
update_stepper(stepper, 'stale', pipeline)
return stepper

View File

@ -0,0 +1,24 @@
import pytest
import random
@pytest.mark.bdb
def test_stepping_changefeed_produces_update(b, steps):
input_single_create(b)
steps.block_changefeed()
steps.block_filter_tx()
# timeouts are 0 so will reassign immediately
steps.stale_check_transactions()
steps.stale_reassign_transactions()
# We expect 2 changefeed events
steps.block_changefeed()
steps.block_filter_tx()
def input_single_create(b):
from bigchaindb.common.transaction import Transaction
metadata = {'r': random.random()}
tx = Transaction.create([b.me], [([b.me], 1)], metadata)
b.write_transaction(tx)