mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-06-28 00:27:45 +02:00
Merge branch 'remove-conn-from-bigchaindb-core'
This commit is contained in:
commit
832c79d58e
|
@ -225,28 +225,27 @@ def run_load(args):
|
|||
|
||||
|
||||
def run_set_shards(args):
|
||||
b = bigchaindb.Bigchain()
|
||||
for table in ['bigchain', 'backlog', 'votes']:
|
||||
# See https://www.rethinkdb.com/api/python/config/
|
||||
table_config = r.table(table).config().run(b.conn)
|
||||
table_config = r.table(table).config().run(db.get_conn())
|
||||
num_replicas = len(table_config['shards'][0]['replicas'])
|
||||
try:
|
||||
r.table(table).reconfigure(shards=args.num_shards, replicas=num_replicas).run(b.conn)
|
||||
r.table(table).reconfigure(shards=args.num_shards, replicas=num_replicas).run(db.get_conn())
|
||||
except r.ReqlOpFailedError as e:
|
||||
logger.warn(e)
|
||||
|
||||
|
||||
def run_set_replicas(args):
|
||||
b = bigchaindb.Bigchain()
|
||||
for table in ['bigchain', 'backlog', 'votes']:
|
||||
# See https://www.rethinkdb.com/api/python/config/
|
||||
table_config = r.table(table).config().run(b.conn)
|
||||
table_config = r.table(table).config().run(db.get_conn())
|
||||
num_shards = len(table_config['shards'])
|
||||
try:
|
||||
r.table(table).reconfigure(shards=num_shards, replicas=args.num_replicas).run(b.conn)
|
||||
r.table(table).reconfigure(shards=num_shards, replicas=args.num_replicas).run(db.get_conn())
|
||||
except r.ReqlOpFailedError as e:
|
||||
logger.warn(e)
|
||||
|
||||
|
||||
def create_parser():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Control your BigchainDB node.',
|
||||
|
|
|
@ -71,15 +71,8 @@ class Bigchain(object):
|
|||
if not self.me or not self.me_private:
|
||||
raise exceptions.KeypairNotFoundException()
|
||||
|
||||
self._conn = None
|
||||
self.connection = Connection(host=self.host, port=self.port, db=self.dbname)
|
||||
|
||||
@property
|
||||
def conn(self):
|
||||
if not self._conn:
|
||||
self._conn = self.reconnect()
|
||||
return self._conn
|
||||
|
||||
def reconnect(self):
|
||||
return r.connect(host=self.host, port=self.port, db=self.dbname)
|
||||
|
||||
|
@ -343,11 +336,11 @@ class Bigchain(object):
|
|||
A list of transactions containing that metadata. If no transaction exists with that metadata it
|
||||
returns an empty list `[]`
|
||||
"""
|
||||
cursor = r.table('bigchain', read_mode=self.read_mode) \
|
||||
.get_all(metadata_id, index='metadata_id') \
|
||||
.concat_map(lambda block: block['block']['transactions']) \
|
||||
.filter(lambda transaction: transaction['transaction']['metadata']['id'] == metadata_id) \
|
||||
.run(self.conn)
|
||||
cursor = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.get_all(metadata_id, index='metadata_id')
|
||||
.concat_map(lambda block: block['block']['transactions'])
|
||||
.filter(lambda transaction: transaction['transaction']['metadata']['id'] == metadata_id))
|
||||
|
||||
transactions = list(cursor)
|
||||
return [Transaction.from_dict(tx) for tx in transactions]
|
||||
|
|
|
@ -70,8 +70,9 @@ class Connection:
|
|||
def get_conn():
|
||||
'''Get the connection to the database.'''
|
||||
|
||||
return r.connect(bigchaindb.config['database']['host'],
|
||||
bigchaindb.config['database']['port'])
|
||||
return r.connect(host=bigchaindb.config['database']['host'],
|
||||
port=bigchaindb.config['database']['port'],
|
||||
db=bigchaindb.config['database']['name'])
|
||||
|
||||
|
||||
def get_database_name():
|
||||
|
|
|
@ -69,9 +69,10 @@ class BlockPipeline:
|
|||
# if the tx is already in a valid or undecided block,
|
||||
# then it no longer should be in the backlog, or added
|
||||
# to a new block. We can delete and drop it.
|
||||
r.table('backlog').get(tx.id) \
|
||||
.delete(durability='hard') \
|
||||
.run(self.bigchain.conn)
|
||||
self.bigchain.connection.run(
|
||||
r.table('backlog')
|
||||
.get(tx.id)
|
||||
.delete(durability='hard'))
|
||||
return None
|
||||
|
||||
tx_validated = self.bigchain.is_valid_transaction(tx)
|
||||
|
@ -80,9 +81,10 @@ class BlockPipeline:
|
|||
else:
|
||||
# if the transaction is not valid, remove it from the
|
||||
# backlog
|
||||
r.table('backlog').get(tx.id) \
|
||||
.delete(durability='hard') \
|
||||
.run(self.bigchain.conn)
|
||||
self.bigchain.connection.run(
|
||||
r.table('backlog')
|
||||
.get(tx.id)
|
||||
.delete(durability='hard'))
|
||||
return None
|
||||
|
||||
def create(self, tx, timeout=False):
|
||||
|
@ -120,8 +122,7 @@ class BlockPipeline:
|
|||
Returns:
|
||||
:class:`~bigchaindb.models.Block`: The Block.
|
||||
"""
|
||||
logger.info('Write new block {} with {} transactions'.format(block.id,
|
||||
len(block.transactions)))
|
||||
logger.info('Write new block %s with %s transactions', block.id, block.transactions)
|
||||
self.bigchain.write_block(block)
|
||||
return block
|
||||
|
||||
|
@ -135,28 +136,24 @@ class BlockPipeline:
|
|||
Returns:
|
||||
:class:`~bigchaindb.models.Block`: The block.
|
||||
"""
|
||||
r.table('backlog')\
|
||||
.get_all(*[tx.id for tx in block.transactions])\
|
||||
.delete(durability='hard')\
|
||||
.run(self.bigchain.conn)
|
||||
|
||||
self.bigchain.connection.run(
|
||||
r.table('backlog')
|
||||
.get_all(*[tx.id for tx in block.transactions])
|
||||
.delete(durability='hard'))
|
||||
return block
|
||||
|
||||
|
||||
def initial():
|
||||
"""Return old transactions from the backlog."""
|
||||
|
||||
b = Bigchain()
|
||||
bigchain = Bigchain()
|
||||
|
||||
rs = b.connection.run(
|
||||
r.table('backlog')
|
||||
.between(
|
||||
[b.me, r.minval],
|
||||
[b.me, r.maxval],
|
||||
index='assignee__transaction_timestamp')
|
||||
.order_by(index=r.asc('assignee__transaction_timestamp')))
|
||||
|
||||
return rs
|
||||
return bigchain.connection.run(
|
||||
r.table('backlog')
|
||||
.between([bigchain.me, r.minval],
|
||||
[bigchain.me, r.maxval],
|
||||
index='assignee__transaction_timestamp')
|
||||
.order_by(index=r.asc('assignee__transaction_timestamp')))
|
||||
|
||||
|
||||
def get_changefeed():
|
||||
|
|
|
@ -71,6 +71,5 @@ def start(timeout=5, backlog_reassign_delay=5):
|
|||
"""Create, start, and return the block pipeline."""
|
||||
pipeline = create_pipeline(timeout=timeout,
|
||||
backlog_reassign_delay=backlog_reassign_delay)
|
||||
pipeline.setup()
|
||||
pipeline.start()
|
||||
return pipeline
|
||||
|
|
|
@ -263,9 +263,11 @@ class TestBigchainApi(object):
|
|||
def test_genesis_block(self, b):
|
||||
import rethinkdb as r
|
||||
from bigchaindb.util import is_genesis_block
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
response = list(r.table('bigchain')
|
||||
.filter(is_genesis_block)
|
||||
.run(b.conn))
|
||||
.run(get_conn()))
|
||||
|
||||
assert len(response) == 1
|
||||
block = response[0]
|
||||
|
@ -277,6 +279,8 @@ class TestBigchainApi(object):
|
|||
import rethinkdb as r
|
||||
from bigchaindb_common.exceptions import GenesisBlockAlreadyExistsError
|
||||
from bigchaindb.util import is_genesis_block
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
b.create_genesis_block()
|
||||
|
||||
with pytest.raises(GenesisBlockAlreadyExistsError):
|
||||
|
@ -284,15 +288,17 @@ class TestBigchainApi(object):
|
|||
|
||||
genesis_blocks = list(r.table('bigchain')
|
||||
.filter(is_genesis_block)
|
||||
.run(b.conn))
|
||||
.run(get_conn()))
|
||||
|
||||
assert len(genesis_blocks) == 1
|
||||
|
||||
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
|
||||
def test_get_last_block(self, b):
|
||||
import rethinkdb as r
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
# get the number of blocks
|
||||
num_blocks = r.table('bigchain').count().run(b.conn)
|
||||
num_blocks = r.table('bigchain').count().run(get_conn())
|
||||
|
||||
# get the last block
|
||||
last_block = b.get_last_block()
|
||||
|
@ -338,11 +344,12 @@ class TestBigchainApi(object):
|
|||
import rethinkdb as r
|
||||
from bigchaindb import util
|
||||
from bigchaindb.models import Block
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
b.create_genesis_block()
|
||||
genesis = list(r.table('bigchain')
|
||||
.filter(util.is_genesis_block)
|
||||
.run(b.conn))[0]
|
||||
.run(get_conn()))[0]
|
||||
genesis = Block.from_dict(genesis)
|
||||
gb = b.get_last_voted_block()
|
||||
assert gb == genesis
|
||||
|
@ -407,18 +414,19 @@ class TestBigchainApi(object):
|
|||
def test_no_vote_written_if_block_already_has_vote(self, b):
|
||||
import rethinkdb as r
|
||||
from bigchaindb.models import Block
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
genesis = b.create_genesis_block()
|
||||
block_1 = dummy_block()
|
||||
b.write_block(block_1, durability='hard')
|
||||
|
||||
b.write_vote(b.vote(block_1.id, genesis.id, True))
|
||||
retrieved_block_1 = r.table('bigchain').get(block_1.id).run(b.conn)
|
||||
retrieved_block_1 = r.table('bigchain').get(block_1.id).run(get_conn())
|
||||
retrieved_block_1 = Block.from_dict(retrieved_block_1)
|
||||
|
||||
# try to vote again on the retrieved block, should do nothing
|
||||
b.write_vote(b.vote(retrieved_block_1.id, genesis.id, True))
|
||||
retrieved_block_2 = r.table('bigchain').get(block_1.id).run(b.conn)
|
||||
retrieved_block_2 = r.table('bigchain').get(block_1.id).run(get_conn())
|
||||
retrieved_block_2 = Block.from_dict(retrieved_block_2)
|
||||
|
||||
assert retrieved_block_1 == retrieved_block_2
|
||||
|
@ -426,6 +434,7 @@ class TestBigchainApi(object):
|
|||
def test_more_votes_than_voters(self, b):
|
||||
import rethinkdb as r
|
||||
from bigchaindb_common.exceptions import MultipleVotesError
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
b.create_genesis_block()
|
||||
block_1 = dummy_block()
|
||||
|
@ -434,8 +443,8 @@ class TestBigchainApi(object):
|
|||
vote_1 = b.vote(block_1.id, b.get_last_voted_block().id, True)
|
||||
vote_2 = b.vote(block_1.id, b.get_last_voted_block().id, True)
|
||||
vote_2['node_pubkey'] = 'aaaaaaa'
|
||||
r.table('votes').insert(vote_1).run(b.conn)
|
||||
r.table('votes').insert(vote_2).run(b.conn)
|
||||
r.table('votes').insert(vote_1).run(get_conn())
|
||||
r.table('votes').insert(vote_2).run(get_conn())
|
||||
|
||||
with pytest.raises(MultipleVotesError) as excinfo:
|
||||
b.block_election_status(block_1.id, block_1.voters)
|
||||
|
@ -445,12 +454,14 @@ class TestBigchainApi(object):
|
|||
def test_multiple_votes_single_node(self, b):
|
||||
import rethinkdb as r
|
||||
from bigchaindb_common.exceptions import MultipleVotesError
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
genesis = b.create_genesis_block()
|
||||
block_1 = dummy_block()
|
||||
b.write_block(block_1, durability='hard')
|
||||
# insert duplicate votes
|
||||
for i in range(2):
|
||||
r.table('votes').insert(b.vote(block_1.id, genesis.id, True)).run(b.conn)
|
||||
r.table('votes').insert(b.vote(block_1.id, genesis.id, True)).run(get_conn())
|
||||
|
||||
with pytest.raises(MultipleVotesError) as excinfo:
|
||||
b.block_election_status(block_1.id, block_1.voters)
|
||||
|
@ -465,13 +476,15 @@ class TestBigchainApi(object):
|
|||
def test_improper_vote_error(selfs, b):
|
||||
import rethinkdb as r
|
||||
from bigchaindb_common.exceptions import ImproperVoteError
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
b.create_genesis_block()
|
||||
block_1 = dummy_block()
|
||||
b.write_block(block_1, durability='hard')
|
||||
vote_1 = b.vote(block_1.id, b.get_last_voted_block().id, True)
|
||||
# mangle the signature
|
||||
vote_1['signature'] = 'a' * 87
|
||||
r.table('votes').insert(vote_1).run(b.conn)
|
||||
r.table('votes').insert(vote_1).run(get_conn())
|
||||
with pytest.raises(ImproperVoteError) as excinfo:
|
||||
b.has_previous_vote(block_1.id, block_1.id)
|
||||
assert excinfo.value.args[0] == 'Block {block_id} already has an incorrectly signed ' \
|
||||
|
@ -481,6 +494,7 @@ class TestBigchainApi(object):
|
|||
def test_assign_transaction_one_node(self, b, user_vk, user_sk):
|
||||
import rethinkdb as r
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
input_tx = b.get_owned_ids(user_vk).pop()
|
||||
input_tx = b.get_transaction(input_tx.txid)
|
||||
|
@ -490,7 +504,7 @@ class TestBigchainApi(object):
|
|||
b.write_transaction(tx)
|
||||
|
||||
# retrieve the transaction
|
||||
response = r.table('backlog').get(tx.id).run(b.conn)
|
||||
response = r.table('backlog').get(tx.id).run(get_conn())
|
||||
|
||||
# check if the assignee is the current node
|
||||
assert response['assignee'] == b.me
|
||||
|
@ -500,6 +514,7 @@ class TestBigchainApi(object):
|
|||
import rethinkdb as r
|
||||
from bigchaindb_common.crypto import generate_key_pair
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.db.utils import get_conn
|
||||
|
||||
# create 5 federation nodes
|
||||
for _ in range(5):
|
||||
|
@ -515,7 +530,7 @@ class TestBigchainApi(object):
|
|||
b.write_transaction(tx)
|
||||
|
||||
# retrieve the transaction
|
||||
response = r.table('backlog').get(tx.id).run(b.conn)
|
||||
response = r.table('backlog').get(tx.id).run(get_conn())
|
||||
|
||||
# check if the assignee is one of the _other_ federation nodes
|
||||
assert response['assignee'] in b.nodes_except_me
|
||||
|
|
|
@ -69,7 +69,7 @@ def test_write_block(b, user_vk):
|
|||
|
||||
block_doc = b.create_block(txs)
|
||||
block_maker.write(block_doc)
|
||||
expected = r.table('bigchain').get(block_doc.id).run(b.conn)
|
||||
expected = b.connection.run(r.table('bigchain').get(block_doc.id))
|
||||
expected = Block.from_dict(expected)
|
||||
|
||||
assert expected == block_doc
|
||||
|
@ -90,19 +90,19 @@ def test_duplicate_transaction(b, user_vk):
|
|||
block_maker.write(block_doc)
|
||||
|
||||
# block is in bigchain
|
||||
assert r.table('bigchain').get(block_doc.id).run(b.conn) == block_doc.to_dict()
|
||||
assert b.connection.run(r.table('bigchain').get(block_doc.id)) == block_doc.to_dict()
|
||||
|
||||
b.write_transaction(txs[0])
|
||||
|
||||
# verify tx is in the backlog
|
||||
assert r.table('backlog').get(txs[0].id).run(b.conn) is not None
|
||||
assert b.connection.run(r.table('backlog').get(txs[0].id)) is not None
|
||||
|
||||
# try to validate a transaction that's already in the chain; should not
|
||||
# work
|
||||
assert block_maker.validate_tx(txs[0].to_dict()) is None
|
||||
|
||||
# duplicate tx should be removed from backlog
|
||||
assert r.table('backlog').get(txs[0].id).run(b.conn) is None
|
||||
assert b.connection.run(r.table('backlog').get(txs[0].id)) is None
|
||||
|
||||
|
||||
def test_delete_tx(b, user_vk):
|
||||
|
@ -120,7 +120,7 @@ def test_delete_tx(b, user_vk):
|
|||
block_doc = block_maker.create(None, timeout=True)
|
||||
|
||||
for tx in block_doc.to_dict()['block']['transactions']:
|
||||
returned_tx = r.table('backlog').get(tx['id']).run(b.conn)
|
||||
returned_tx = b.connection.run(r.table('backlog').get(tx['id']))
|
||||
returned_tx.pop('assignee')
|
||||
returned_tx.pop('assignment_timestamp')
|
||||
assert returned_tx == tx
|
||||
|
@ -130,7 +130,7 @@ def test_delete_tx(b, user_vk):
|
|||
assert returned_block == block_doc
|
||||
|
||||
for tx in block_doc.to_dict()['block']['transactions']:
|
||||
assert r.table('backlog').get(tx['id']).run(b.conn) is None
|
||||
assert b.connection.run(r.table('backlog').get(tx['id'])) is None
|
||||
|
||||
|
||||
def test_prefeed(b, user_vk):
|
||||
|
@ -175,9 +175,9 @@ def test_full_pipeline(b, user_vk):
|
|||
tx['assignment_timestamp'] = time.time()
|
||||
if assignee == b.me:
|
||||
count_assigned_to_me += 1
|
||||
r.table('backlog').insert(tx, durability='hard').run(b.conn)
|
||||
b.connection.run(r.table('backlog').insert(tx, durability='hard'))
|
||||
|
||||
assert r.table('backlog').count().run(b.conn) == 100
|
||||
assert b.connection.run(r.table('backlog').count()) == 100
|
||||
|
||||
pipeline = create_pipeline()
|
||||
pipeline.setup(indata=get_changefeed(), outdata=outpipe)
|
||||
|
@ -187,9 +187,9 @@ def test_full_pipeline(b, user_vk):
|
|||
pipeline.terminate()
|
||||
|
||||
block_doc = outpipe.get()
|
||||
chained_block = r.table('bigchain').get(block_doc.id).run(b.conn)
|
||||
chained_block = b.connection.run(r.table('bigchain').get(block_doc.id))
|
||||
chained_block = Block.from_dict(chained_block)
|
||||
|
||||
assert len(block_doc.transactions) == count_assigned_to_me
|
||||
assert chained_block == block_doc
|
||||
assert r.table('backlog').count().run(b.conn) == 100 - count_assigned_to_me
|
||||
assert b.connection.run(r.table('backlog').count()) == 100 - count_assigned_to_me
|
||||
|
|
|
@ -33,7 +33,7 @@ def test_check_for_quorum_invalid(b, user_vk):
|
|||
[member.vote(test_block.id, 'abc', False) for member in test_federation[2:]]
|
||||
|
||||
# cast votes
|
||||
r.table('votes').insert(votes, durability='hard').run(b.conn)
|
||||
b.connection.run(r.table('votes').insert(votes, durability='hard'))
|
||||
|
||||
# since this block is now invalid, should pass to the next process
|
||||
assert e.check_for_quorum(votes[-1]) == test_block
|
||||
|
@ -62,7 +62,7 @@ def test_check_for_quorum_invalid_prev_node(b, user_vk):
|
|||
[member.vote(test_block.id, 'def', True) for member in test_federation[2:]]
|
||||
|
||||
# cast votes
|
||||
r.table('votes').insert(votes, durability='hard').run(b.conn)
|
||||
b.connection.run(r.table('votes').insert(votes, durability='hard'))
|
||||
|
||||
# since nodes cannot agree on prev block, the block is invalid
|
||||
assert e.check_for_quorum(votes[-1]) == test_block
|
||||
|
@ -91,7 +91,7 @@ def test_check_for_quorum_valid(b, user_vk):
|
|||
votes = [member.vote(test_block.id, 'abc', True)
|
||||
for member in test_federation]
|
||||
# cast votes
|
||||
r.table('votes').insert(votes, durability='hard').run(b.conn)
|
||||
b.connection.run(r.table('votes').insert(votes, durability='hard'))
|
||||
|
||||
# since this block is valid, should go nowhere
|
||||
assert e.check_for_quorum(votes[-1]) is None
|
||||
|
@ -107,7 +107,7 @@ def test_check_requeue_transaction(b, user_vk):
|
|||
test_block = b.create_block([tx1])
|
||||
|
||||
e.requeue_transactions(test_block)
|
||||
backlog_tx = r.table('backlog').get(tx1.id).run(b.conn)
|
||||
backlog_tx = b.connection.run(r.table('backlog').get(tx1.id))
|
||||
backlog_tx.pop('assignee')
|
||||
backlog_tx.pop('assignment_timestamp')
|
||||
assert backlog_tx == tx1.to_dict()
|
||||
|
@ -155,16 +155,16 @@ def test_full_pipeline(b, user_vk):
|
|||
vote_valid = b.vote(valid_block.id, 'abc', True)
|
||||
vote_invalid = b.vote(invalid_block.id, 'abc', False)
|
||||
|
||||
r.table('votes').insert(vote_valid, durability='hard').run(b.conn)
|
||||
r.table('votes').insert(vote_invalid, durability='hard').run(b.conn)
|
||||
b.connection.run(r.table('votes').insert(vote_valid, durability='hard'))
|
||||
b.connection.run(r.table('votes').insert(vote_invalid, durability='hard'))
|
||||
|
||||
outpipe.get()
|
||||
pipeline.terminate()
|
||||
|
||||
# only transactions from the invalid block should be returned to
|
||||
# the backlog
|
||||
assert r.table('backlog').count().run(b.conn) == 100
|
||||
assert b.connection.run(r.table('backlog').count()) == 100
|
||||
# NOTE: I'm still, I'm still tx from the block.
|
||||
tx_from_block = set([tx.id for tx in invalid_block.transactions])
|
||||
tx_from_backlog = set([tx['id'] for tx in list(r.table('backlog').run(b.conn))])
|
||||
tx_from_backlog = set([tx['id'] for tx in list(b.connection.run(r.table('backlog')))])
|
||||
assert tx_from_block == tx_from_backlog
|
||||
|
|
|
@ -43,10 +43,10 @@ def test_reassign_transactions(b, user_vk):
|
|||
stm = stale.StaleTransactionMonitor(timeout=0.001,
|
||||
backlog_reassign_delay=0.001)
|
||||
stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc']
|
||||
tx = list(r.table('backlog').run(b.conn))[0]
|
||||
tx = list(b.connection.run(r.table('backlog')))[0]
|
||||
stm.reassign_transactions(tx)
|
||||
|
||||
reassigned_tx = r.table('backlog').get(tx['id']).run(b.conn)
|
||||
reassigned_tx = b.connection.run(r.table('backlog').get(tx['id']))
|
||||
assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp']
|
||||
assert reassigned_tx['assignee'] != tx['assignee']
|
||||
|
||||
|
@ -55,14 +55,14 @@ def test_reassign_transactions(b, user_vk):
|
|||
tx = tx.sign([b.me_private]).to_dict()
|
||||
tx.update({'assignee': 'lol'})
|
||||
tx.update({'assignment_timestamp': time.time()})
|
||||
r.table('backlog').insert(tx, durability='hard').run(b.conn)
|
||||
b.connection.run(r.table('backlog').insert(tx, durability='hard'))
|
||||
|
||||
tx = list(r.table('backlog').run(b.conn))[0]
|
||||
tx = list(b.connection.run(r.table('backlog')))[0]
|
||||
stm.reassign_transactions(tx)
|
||||
assert r.table('backlog').get(tx['id']).run(b.conn)['assignee'] != 'lol'
|
||||
assert b.connection.run(r.table('backlog').get(tx['id']))['assignee'] != 'lol'
|
||||
|
||||
|
||||
def test_full_pipeline(user_vk):
|
||||
def test_full_pipeline(monkeypatch, user_vk):
|
||||
from bigchaindb.models import Transaction
|
||||
CONFIG = {
|
||||
'database': {
|
||||
|
@ -82,30 +82,33 @@ def test_full_pipeline(user_vk):
|
|||
original_txs = {}
|
||||
original_txc = []
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1)
|
||||
|
||||
for i in range(100):
|
||||
tx = Transaction.create([b.me], [user_vk])
|
||||
tx = tx.sign([b.me_private])
|
||||
original_txc.append(tx.to_dict())
|
||||
|
||||
b.write_transaction(tx)
|
||||
original_txs[tx.id] = r.table('backlog').get(tx.id).run(b.conn)
|
||||
original_txs[tx.id] = b.connection.run(r.table('backlog').get(tx.id))
|
||||
|
||||
assert r.table('backlog').count().run(b.conn) == 100
|
||||
assert b.connection.run(r.table('backlog').count()) == 100
|
||||
|
||||
monkeypatch.undo()
|
||||
|
||||
pipeline = stale.create_pipeline(backlog_reassign_delay=1,
|
||||
timeout=1)
|
||||
pipeline.setup(outdata=outpipe)
|
||||
pipeline.start()
|
||||
|
||||
# timing should be careful -- test will fail if reassignment happens multiple times
|
||||
time.sleep(2)
|
||||
# to terminate
|
||||
for _ in range(100):
|
||||
outpipe.get()
|
||||
|
||||
pipeline.terminate()
|
||||
|
||||
# to terminate
|
||||
outpipe.get()
|
||||
|
||||
assert r.table('backlog').count().run(b.conn) == 100
|
||||
reassigned_txs = list(r.table('backlog').run(b.conn))
|
||||
assert b.connection.run(r.table('backlog').count()) == 100
|
||||
reassigned_txs = list(b.connection.run(r.table('backlog')))
|
||||
|
||||
# check that every assignment timestamp has increased, and every tx has a new assignee
|
||||
for reassigned_tx in reassigned_txs:
|
||||
|
|
|
@ -166,8 +166,7 @@ def test_valid_block_voting_sequential(b, monkeypatch):
|
|||
last_vote = vote_obj.vote(*vote_obj.validate_tx(tx, block_id, num_tx))
|
||||
|
||||
vote_obj.write_vote(last_vote)
|
||||
vote_rs = r.table('votes').get_all([block.id, b.me],
|
||||
index='block_and_voter').run(b.conn)
|
||||
vote_rs = b.connection.run(r.table('votes').get_all([block.id, b.me], index='block_and_voter'))
|
||||
vote_doc = vote_rs.next()
|
||||
|
||||
assert vote_doc['vote'] == {'voting_for_block': block.id,
|
||||
|
@ -201,8 +200,7 @@ def test_valid_block_voting_multiprocessing(b, monkeypatch):
|
|||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = r.table('votes').get_all([block.id, b.me],
|
||||
index='block_and_voter').run(b.conn)
|
||||
vote_rs = b.connection.run(r.table('votes').get_all([block.id, b.me], index='block_and_voter'))
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block.id,
|
||||
|
@ -243,8 +241,7 @@ def test_valid_block_voting_with_create_transaction(b, monkeypatch):
|
|||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = r.table('votes').get_all([block.id, b.me],
|
||||
index='block_and_voter').run(b.conn)
|
||||
vote_rs = b.connection.run(r.table('votes').get_all([block.id, b.me], index='block_and_voter'))
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block.id,
|
||||
|
@ -298,8 +295,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b):
|
|||
vote2_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = r.table('votes').get_all([block.id, b.me],
|
||||
index='block_and_voter').run(b.conn)
|
||||
vote_rs = b.connection.run(r.table('votes').get_all([block.id, b.me], index='block_and_voter'))
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block.id,
|
||||
|
@ -313,8 +309,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b):
|
|||
assert crypto.VerifyingKey(b.me).verify(serialized_vote,
|
||||
vote_doc['signature']) is True
|
||||
|
||||
vote2_rs = r.table('votes').get_all([block2.id, b.me],
|
||||
index='block_and_voter').run(b.conn)
|
||||
vote2_rs = b.connection.run(r.table('votes').get_all([block2.id, b.me], index='block_and_voter'))
|
||||
vote2_doc = vote2_rs.next()
|
||||
assert vote2_out['vote'] == vote2_doc['vote']
|
||||
assert vote2_doc['vote'] == {'voting_for_block': block2.id,
|
||||
|
@ -351,8 +346,7 @@ def test_unsigned_tx_in_block_voting(monkeypatch, b, user_vk):
|
|||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = r.table('votes').get_all([block.id, b.me],
|
||||
index='block_and_voter').run(b.conn)
|
||||
vote_rs = b.connection.run(r.table('votes').get_all([block.id, b.me], index='block_and_voter'))
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block.id,
|
||||
|
@ -391,8 +385,7 @@ def test_invalid_id_tx_in_block_voting(monkeypatch, b, user_vk):
|
|||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = r.table('votes').get_all([block['id'], b.me],
|
||||
index='block_and_voter').run(b.conn)
|
||||
vote_rs = b.connection.run(r.table('votes').get_all([block['id'], b.me], index='block_and_voter'))
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block['id'],
|
||||
|
@ -431,8 +424,7 @@ def test_invalid_content_in_tx_in_block_voting(monkeypatch, b, user_vk):
|
|||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = r.table('votes').get_all([block['id'], b.me],
|
||||
index='block_and_voter').run(b.conn)
|
||||
vote_rs = b.connection.run(r.table('votes').get_all([block['id'], b.me], index='block_and_voter'))
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block['id'],
|
||||
|
@ -467,8 +459,7 @@ def test_invalid_block_voting(monkeypatch, b, user_vk):
|
|||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = r.table('votes').get_all([block['id'], b.me],
|
||||
index='block_and_voter').run(b.conn)
|
||||
vote_rs = b.connection.run(r.table('votes').get_all([block['id'], b.me], index='block_and_voter'))
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block['id'],
|
||||
|
@ -520,9 +511,9 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
|
|||
vote_pipeline.terminate()
|
||||
|
||||
# retrieve blocks from bigchain
|
||||
blocks = list(r.table('bigchain')
|
||||
.order_by(r.asc((r.row['block']['timestamp'])))
|
||||
.run(b.conn))
|
||||
blocks = list(b.connection.run(
|
||||
r.table('bigchain')
|
||||
.order_by(r.asc((r.row['block']['timestamp'])))))
|
||||
|
||||
# FIXME: remove genesis block, we don't vote on it
|
||||
# (might change in the future)
|
||||
|
@ -530,7 +521,7 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
|
|||
vote_pipeline.terminate()
|
||||
|
||||
# retrieve vote
|
||||
votes = r.table('votes').run(b.conn)
|
||||
votes = b.connection.run(r.table('votes'))
|
||||
votes = list(votes)
|
||||
|
||||
assert all(vote['node_pubkey'] == b.me for vote in votes)
|
||||
|
@ -563,12 +554,12 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
|
|||
vote_pipeline.terminate()
|
||||
|
||||
# retrive blocks from bigchain
|
||||
blocks = list(r.table('bigchain')
|
||||
.order_by(r.asc((r.row['block']['timestamp'])))
|
||||
.run(b.conn))
|
||||
blocks = list(b.connection.run(
|
||||
r.table('bigchain')
|
||||
.order_by(r.asc((r.row['block']['timestamp'])))))
|
||||
|
||||
# retrieve votes
|
||||
votes = list(r.table('votes').run(b.conn))
|
||||
votes = list(b.connection.run(r.table('votes')))
|
||||
|
||||
assert votes[0]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id'])
|
||||
assert votes[1]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id'])
|
||||
|
@ -587,7 +578,7 @@ def test_voter_checks_for_previous_vote(monkeypatch, b):
|
|||
block_1 = dummy_block(b)
|
||||
inpipe.put(block_1.to_dict())
|
||||
|
||||
assert r.table('votes').count().run(b.conn) == 0
|
||||
assert b.connection.run(r.table('votes').count()) == 0
|
||||
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
|
@ -609,7 +600,7 @@ def test_voter_checks_for_previous_vote(monkeypatch, b):
|
|||
|
||||
vote_pipeline.terminate()
|
||||
|
||||
assert r.table('votes').count().run(b.conn) == 2
|
||||
assert b.connection.run(r.table('votes').count()) == 2
|
||||
|
||||
|
||||
@patch.object(Pipeline, 'start')
|
||||
|
|
|
@ -38,7 +38,6 @@ def test_bigchain_class_default_initialization(config):
|
|||
assert bigchain.me_private == config['keypair']['private']
|
||||
assert bigchain.nodes_except_me == config['keyring']
|
||||
assert bigchain.consensus == BaseConsensusRules
|
||||
assert bigchain._conn is None
|
||||
|
||||
|
||||
def test_bigchain_class_initialization_with_parameters(config):
|
||||
|
@ -60,7 +59,6 @@ def test_bigchain_class_initialization_with_parameters(config):
|
|||
assert bigchain.me_private == init_kwargs['private_key']
|
||||
assert bigchain.nodes_except_me == init_kwargs['keyring']
|
||||
assert bigchain.consensus == BaseConsensusRules
|
||||
assert bigchain._conn is None
|
||||
|
||||
|
||||
def test_get_blocks_status_containing_tx(monkeypatch):
|
||||
|
|
Loading…
Reference in New Issue
Block a user