From 955b018be8995743015f3efdfa78b55332c3fd26 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Wed, 15 Feb 2017 14:58:16 +0100 Subject: [PATCH] add stepping pipeline and fix issue #1191 --- bigchaindb/backend/mongodb/changefeed.py | 4 +- bigchaindb/core.py | 4 +- tests/pipelines/conftest.py | 10 ++ tests/pipelines/stepping.py | 138 +++++++++++++++++++++++ tests/pipelines/test_steps.py | 24 ++++ 5 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 tests/pipelines/conftest.py create mode 100644 tests/pipelines/stepping.py create mode 100644 tests/pipelines/test_steps.py diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 4a5a5b7e..3abcbeda 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -85,11 +85,13 @@ class MongoDBChangeFeed(ChangeFeed): # document itself. So here we first read the document # and then return it. doc = self.connection.conn[dbname][table].find_one( - {'_id': record['o2']}, + {'_id': record['o2']['_id']}, {'_id': False} ) self.outqueue.put(doc) + logger.debug('Record in changefeed: %s:%s', table, record['op']) + @register_changefeed(MongoDBConnection) def get_changefeed(connection, table, operation, *, prefeed=None): diff --git a/bigchaindb/core.py b/bigchaindb/core.py index e4dddb6f..a7ed93f0 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -55,7 +55,9 @@ class Bigchain(object): self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] - self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay'] + if backlog_reassign_delay is None: + backlog_reassign_delay = bigchaindb.config['backlog_reassign_delay'] + self.backlog_reassign_delay = backlog_reassign_delay self.consensus = BaseConsensusRules self.connection = connection if connection else backend.connect(**bigchaindb.config['database']) if not self.me or not self.me_private: diff --git a/tests/pipelines/conftest.py b/tests/pipelines/conftest.py new file mode 100644 index 00000000..5b66f048 --- /dev/null +++ b/tests/pipelines/conftest.py @@ -0,0 +1,10 @@ +import pytest + +from .stepping import create_stepper + + +@pytest.fixture +def steps(): + stepper = create_stepper() + with stepper.start(): + yield stepper diff --git a/tests/pipelines/stepping.py b/tests/pipelines/stepping.py new file mode 100644 index 00000000..567dc846 --- /dev/null +++ b/tests/pipelines/stepping.py @@ -0,0 +1,138 @@ +""" +Pipeline stepping is a way to advance the asynchronous data pipeline +deterministically by exposing each step separately and advancing the states +manually. +""" + + +import functools +import time +import types +import logging +from contextlib import contextmanager +from unittest.mock import patch + +import bigchaindb.core +from bigchaindb.backend.changefeed import ChangeFeed +import bigchaindb.pipelines.block +import bigchaindb.pipelines.stale + + +class MultipipesStepper: + def __init__(self): + self.queues = {} + self.tasks = {} + self.input_tasks = set() + self.processes = [] + + def add_input(self, prefix, node, next): + name = '%s_%s' % (prefix, node.name) + next_name = '%s_%s' % (prefix, next.name) + + if isinstance(node, ChangeFeed): + self.processes.append(node) + + def f(*args, **kwargs): + _kwargs = {'timeout': 0.1} + _kwargs.update(kwargs) + return node.outqueue.get(*args, **kwargs) + else: + f = node.target + + def inner(**kwargs): + r = f(**kwargs) + if r is not None: + self.enqueue(next_name, r) + + self.tasks[name] = functools.wraps(f)(inner) + self.input_tasks.add(name) + + def add_stage(self, prefix, node, next): + """ + Convert pipeline stage into task. + """ + f = node.target + name = '%s_%s' % (prefix, node.name) + if next: + next_name = '%s_%s' % (prefix, next.name) + + def inner(*args): + out = f(*args) + if out is not None and next: + self.enqueue(next_name, out) + + task = functools.wraps(f)(inner) + self.tasks[name] = task + + def enqueue(self, name, item): + queue = self.queues.setdefault(name, []) + if isinstance(item, types.GeneratorType): + queue.extend(list(item)) + else: + queue.append(item) + + def step(self, name, **kwargs): + logging.debug('Stepping %s', name) + task = self.tasks[name] + if name in self.input_tasks: + task(**kwargs) + else: + queue = self.queues.get(name, []) + if not queue: + raise Empty(name) + task(queue.pop(0), **kwargs) + logging.debug('Stepped %s', name) + + def get_counts(self): + counts = {} + for name in self.queues: + n = len(self.queues[name]) + if n: + counts[name] = n + return counts + + def __getattr__(self, name): + return lambda **kwargs: self.step(name, **kwargs) + + @contextmanager + def start(self): + for p in self.processes: + p.start() + # It would be nice to have a better way to wait for changefeeds here. + # We have to wait some amount of time because the feed setup is + # happening in a different process and won't include any writes we + # perform before it is ready. + time.sleep(0.2) + try: + yield + finally: + for p in self.processes: + p.terminate() + + +class Empty(Exception): + pass + + +def update_stepper(stepper, prefix, pipeline): + nodes = pipeline.nodes + for i in range(len(nodes)): + n0 = nodes[i] + n1 = (nodes + [None])[i+1] + f = stepper.add_input if i == 0 else stepper.add_stage + f(prefix, n0, n1) + + +def create_stepper(): + stepper = MultipipesStepper() + + with patch('bigchaindb.pipelines.block.Pipeline.start'): + pipeline = bigchaindb.pipelines.block.start() + update_stepper(stepper, 'block', pipeline) + + with patch('bigchaindb.pipelines.stale.Pipeline.start'): + pipeline = bigchaindb.pipelines.stale.start( + timeout=0, backlog_reassign_delay=0) + update_stepper(stepper, 'stale', pipeline) + + return stepper diff --git a/tests/pipelines/test_steps.py b/tests/pipelines/test_steps.py new file mode 100644 index 00000000..76862bd9 --- /dev/null +++ b/tests/pipelines/test_steps.py @@ -0,0 +1,24 @@ +import pytest +import random + + +@pytest.mark.bdb +def test_stepping_changefeed_produces_update(b, steps): + input_single_create(b) + steps.block_changefeed() + steps.block_filter_tx() + + # timeouts are 0 so will reassign immediately + steps.stale_check_transactions() + steps.stale_reassign_transactions() + + # We expect 2 changefeed events + steps.block_changefeed() + steps.block_filter_tx() + + +def input_single_create(b): + from bigchaindb.common.transaction import Transaction + metadata = {'r': random.random()} + tx = Transaction.create([b.me], [([b.me], 1)], metadata) + b.write_transaction(tx)