Core/198/handle stale transactions (#359)

* add timestamp to transaction assignment

* add reassignment delay to configuration

* refactor to multipipes

* # This is a combination of 7 commits.
# The first commit's message is:
stale transaction monitor and tests

# The 2nd commit message will be skipped:

#	simplify logic

# The 3rd commit message will be skipped:

#	node will assign to self

# The 4th commit message will be skipped:

#	block listens for insert and update

# The 5th commit message will be skipped:

#	more test coverage

# The 6th commit message will be skipped:

#	test coverage

# The 7th commit message will be skipped:

#	test coverage

* stale transaction monitor and tests

* update operation only returns new value
This commit is contained in:
Ryan Henderson 2016-09-07 16:26:41 +02:00 committed by GitHub
parent 90cc84ad3d
commit 92981e003d
16 changed files with 285 additions and 10 deletions

View File

@ -31,6 +31,7 @@ config = {
},
'api_endpoint': 'http://localhost:9984/api/v1',
'consensus_plugin': 'default',
'backlog_reassign_delay': 30
}
# We need to maintain a backup copy of the original config dict in case

View File

@ -106,6 +106,11 @@ def run_configure(args, skip_if_exists=False):
input('Statsd {}? (default `{}`): '.format(key, val)) \
or val
val = conf['backlog_reassign_delay']
conf['backlog_reassign_delay'] = \
input('Stale transaction reassignment delay (in seconds)? (default `{}`): '.format(val)) \
or val
if config_path != '-':
bigchaindb.config_utils.write_config(conf, config_path)
else:

View File

@ -2,6 +2,7 @@ import random
import math
import collections
from copy import deepcopy
from time import time
from itertools import compress
import rethinkdb as r
@ -28,7 +29,7 @@ class Bigchain(object):
def __init__(self, host=None, port=None, dbname=None,
public_key=None, private_key=None, keyring=[],
consensus_plugin=None):
consensus_plugin=None, backlog_reassign_delay=None):
"""Initialize the Bigchain instance
A Bigchain instance has several configuration parameters (e.g. host).
@ -56,6 +57,7 @@ class Bigchain(object):
self.me = public_key or bigchaindb.config['keypair']['public']
self.me_private = private_key or bigchaindb.config['keypair']['private']
self.nodes_except_me = keyring or bigchaindb.config['keyring']
self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay']
self.consensus = config_utils.load_consensus_plugin(consensus_plugin)
# change RethinkDB read mode to majority. This ensures consistency in query results
self.read_mode = 'majority'
@ -136,11 +138,54 @@ class Bigchain(object):
signed_transaction = deepcopy(signed_transaction)
# update the transaction
signed_transaction.update({'assignee': assignee})
signed_transaction.update({'assignment_timestamp': time()})
# write to the backlog
response = r.table('backlog').insert(signed_transaction, durability=durability).run(self.conn)
return response
def reassign_transaction(self, transaction, durability='hard'):
"""Assign a transaction to a new node
Args:
transaction (dict): assigned transaction
Returns:
dict: database response or None if no reassignment is possible
"""
if self.nodes_except_me:
try:
federation_nodes = self.nodes_except_me + [self.me]
index_current_assignee = federation_nodes.index(transaction['assignee'])
new_assignee = random.choice(federation_nodes[:index_current_assignee] +
federation_nodes[index_current_assignee + 1:])
except ValueError:
# current assignee not in federation
new_assignee = random.choice(self.nodes_except_me)
else:
# There is no other node to assign to
new_assignee = self.me
response = r.table('backlog')\
.get(transaction['id'])\
.update({'assignee': new_assignee,
'assignment_timestamp': time()},
durability=durability).run(self.conn)
return response
def get_stale_transactions(self):
"""Get a RethinkDB cursor of stale transactions
Transactions are considered stale if they have been assigned a node, but are still in the
backlog after some amount of time specified in the configuration
"""
return r.table('backlog')\
.filter(lambda tx: time() - tx['assignment_timestamp'] >
self.backlog_reassign_delay).run(self.conn)
def get_transaction(self, txid, include_status=False):
"""Retrieve a transaction with `txid` from bigchain.

View File

@ -130,7 +130,8 @@ def initial():
def get_changefeed():
"""Create and return the changefeed for the backlog."""
return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial())
return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE,
prefeed=initial())
def create_pipeline():

View File

@ -0,0 +1,76 @@
"""This module monitors for stale transactions.
It reassigns transactions which have been assigned a node but
remain in the backlog past a certain amount of time.
"""
import logging
from multipipes import Pipeline, Node
from bigchaindb import Bigchain
from time import sleep
logger = logging.getLogger(__name__)
class StaleTransactionMonitor:
"""This class encapsulates the logic for re-assigning stale transactions.
Note:
Methods of this class will be executed in different processes.
"""
def __init__(self, timeout=5, backlog_reassign_delay=None):
"""Initialize StaleTransaction monitor
Args:
timeout: how often to check for stale tx (in sec)
backlog_reassign_delay: How stale a transaction should
be before reassignment (in sec). If supplied, overrides the
Bigchain default value.
"""
self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay)
self.timeout = timeout
def check_transactions(self):
"""Poll backlog for stale transactions
Returns:
txs (list): txs to be re assigned
"""
sleep(self.timeout)
for tx in self.bigchain.get_stale_transactions():
yield tx
def reassign_transactions(self, tx):
"""Put tx back in backlog with new assignee
Returns:
transaction
"""
self.bigchain.reassign_transaction(tx)
return tx
def create_pipeline(timeout=5, backlog_reassign_delay=5):
"""Create and return the pipeline of operations to be distributed
on different processes."""
stm = StaleTransactionMonitor(timeout=timeout,
backlog_reassign_delay=backlog_reassign_delay)
monitor_pipeline = Pipeline([
Node(stm.check_transactions),
Node(stm.reassign_transactions)
])
return monitor_pipeline
def start(timeout=5, backlog_reassign_delay=5):
"""Create, start, and return the block pipeline."""
pipeline = create_pipeline(timeout=timeout,
backlog_reassign_delay=backlog_reassign_delay)
pipeline.setup()
pipeline.start()
return pipeline

View File

@ -58,5 +58,5 @@ class ChangeFeed(Node):
elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(change['old_val'])
elif is_update and (self.operation & ChangeFeed.UPDATE):
self.outqueue.put(change)
self.outqueue.put(change['new_val'])

View File

@ -2,7 +2,7 @@ import logging
import multiprocessing as mp
import bigchaindb
from bigchaindb.pipelines import vote, block, election
from bigchaindb.pipelines import vote, block, election, stale
from bigchaindb.web import server
@ -31,6 +31,9 @@ def start():
logger.info('Starting voter')
vote.start()
logger.info('Starting stale transaction monitor')
stale.start()
logger.info('Starting election')
election.start()

View File

@ -132,6 +132,7 @@ class TestBigchainApi(object):
response, status = b.get_transaction(tx_signed["id"], include_status=True)
response.pop('assignee')
response.pop('assignment_timestamp')
# add validity information, which will be returned
assert util.serialize(tx_signed) == util.serialize(response)
assert status == b.TX_IN_BACKLOG

View File

@ -78,6 +78,7 @@ def test_delete_tx(b, user_vk):
tx_backlog = r.table('backlog').get(tx['id']).run(b.conn)
tx_backlog.pop('assignee')
tx_backlog.pop('assignment_timestamp')
assert tx_backlog == tx
returned_tx = block_maker.delete_tx(tx)

View File

@ -99,6 +99,7 @@ def test_check_requeue_transaction(b, user_vk):
e.requeue_transactions(test_block)
tx_backlog = r.table('backlog').get(tx1['id']).run(b.conn)
tx_backlog.pop('assignee')
tx_backlog.pop('assignment_timestamp')
assert tx_backlog == tx1

View File

@ -0,0 +1,116 @@
import rethinkdb as r
from bigchaindb import Bigchain
from bigchaindb.pipelines import stale
from multipipes import Pipe, Pipeline
from unittest.mock import patch
from bigchaindb import config_utils
import time
import os
def test_get_stale(b, user_vk):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx, durability='hard')
stm = stale.StaleTransactionMonitor(timeout=0.001,
backlog_reassign_delay=0.001)
tx_stale = stm.check_transactions()
for _tx in tx_stale:
_tx.pop('assignee')
_tx.pop('assignment_timestamp')
assert tx == _tx
def test_reassign_transactions(b, user_vk):
# test with single node
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx, durability='hard')
stm = stale.StaleTransactionMonitor(timeout=0.001,
backlog_reassign_delay=0.001)
stm.reassign_transactions(tx)
# test with federation
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx, durability='hard')
stm = stale.StaleTransactionMonitor(timeout=0.001,
backlog_reassign_delay=0.001)
stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc']
tx = list(r.table('backlog').run(b.conn))[0]
stm.reassign_transactions(tx)
reassigned_tx = r.table('backlog').get(tx['id']).run(b.conn)
assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp']
assert reassigned_tx['assignee'] != tx['assignee']
# test with node not in federation
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
tx.update({'assignee': 'lol'})
tx.update({'assignment_timestamp': time.time()})
r.table('backlog').insert(tx, durability='hard').run(b.conn)
tx = list(r.table('backlog').run(b.conn))[0]
stm.reassign_transactions(tx)
assert r.table('backlog').get(tx['id']).run(b.conn)['assignee'] != 'lol'
def test_full_pipeline(user_vk):
CONFIG = {
'database': {
'name': 'bigchain_test_{}'.format(os.getpid())
},
'keypair': {
'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA',
'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8'
},
'keyring': ['aaa', 'bbb'],
'backlog_reassign_delay': 0.01
}
config_utils.set_config(CONFIG)
b = Bigchain()
outpipe = Pipe()
original_txs = {}
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
original_txs[tx['id']] = r.table('backlog').get(tx['id']).run(b.conn)
assert r.table('backlog').count().run(b.conn) == 100
pipeline = stale.create_pipeline(backlog_reassign_delay=1,
timeout=1)
pipeline.setup(outdata=outpipe)
pipeline.start()
# timing should be careful -- test will fail if reassignment happens multiple times
time.sleep(2)
pipeline.terminate()
# to terminate
outpipe.get()
assert r.table('backlog').count().run(b.conn) == 100
reassigned_txs = list(r.table('backlog').run(b.conn))
# check that every assignment timestamp has increased, and every tx has a new assignee
for reassigned_tx in reassigned_txs:
assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp']
assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee']
@patch.object(Pipeline, 'start')
def test_start(mock_start):
# TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`,
# that is tested by `test_full_pipeline`.
# If anyone has better ideas on how to test this, please do a PR :)
stale.start()
mock_start.assert_called_with()

View File

@ -44,8 +44,7 @@ def test_changefeed_update(mock_run):
changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == {'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here'}
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
@ -56,8 +55,7 @@ def test_changefeed_multiple_operations(mock_run):
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.get() == {'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here'}
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0

View File

@ -57,6 +57,7 @@ def mock_bigchaindb_backup_config(monkeypatch):
'keypair': {},
'database': {'host': 'host', 'port': 12345, 'name': 'adbname'},
'statsd': {'host': 'host', 'port': 12345, 'rate': 0.1},
'backlog_reassign_delay': 5
}
monkeypatch.setattr('bigchaindb._config', config)

View File

@ -145,7 +145,8 @@ def test_env_config(monkeypatch):
def test_autoconfigure_read_both_from_file_and_env(monkeypatch):
file_config = {
'database': {'host': 'test-host'}
'database': {'host': 'test-host'},
'backlog_reassign_delay': 5
}
monkeypatch.setattr('bigchaindb.config_utils.file_config', lambda *args, **kwargs: file_config)
monkeypatch.setattr('os.environ', {'BIGCHAINDB_DATABASE_NAME': 'test-dbname',
@ -180,6 +181,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch):
},
'api_endpoint': 'http://localhost:9984/api/v1',
'consensus_plugin': 'default',
'backlog_reassign_delay': 5
}

View File

@ -19,7 +19,8 @@ def config(request, monkeypatch):
},
'keyring': [],
'CONFIGURED': True,
'consensus_plugin': 'default'
'consensus_plugin': 'default',
'backlog_reassign_delay': 30
}
monkeypatch.setattr('bigchaindb.config', config)

23
tests/test_processes.py Normal file
View File

@ -0,0 +1,23 @@
from unittest.mock import patch
from multiprocessing import Process
from bigchaindb.pipelines import vote, block, election, stale
@patch.object(stale, 'start')
@patch.object(election, 'start')
@patch.object(block, 'start')
@patch.object(vote, 'start')
@patch.object(Process, 'start')
def test_processes_start(mock_vote, mock_block, mock_election, mock_stale,
mock_process):
from bigchaindb import processes
processes.start()
mock_vote.assert_called_with()
mock_block.assert_called_with()
mock_election.assert_called_with()
mock_stale.assert_called_with()
mock_process.assert_called_with()