Move calls to DB to specific backend module

This commit is contained in:
vrde 2016-10-26 17:48:53 +02:00
parent df383a5c25
commit 815b4318ba
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
7 changed files with 309 additions and 178 deletions

View File

@ -12,7 +12,7 @@ import rethinkdb as r
import bigchaindb
from bigchaindb.db.utils import Connection
from bigchaindb.db.utils import Connection, get_backend
from bigchaindb import config_utils, util
from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.models import Block, Transaction
@ -33,7 +33,7 @@ class Bigchain(object):
# return if transaction is in backlog
TX_IN_BACKLOG = 'backlog'
def __init__(self, host=None, port=None, dbname=None,
def __init__(self, host=None, port=None, dbname=None, backend=None,
public_key=None, private_key=None, keyring=[],
backlog_reassign_delay=None):
"""Initialize the Bigchain instance
@ -60,6 +60,7 @@ class Bigchain(object):
self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port']
self.dbname = dbname or bigchaindb.config['database']['name']
self.backend = backend or get_backend()
self.me = public_key or bigchaindb.config['keypair']['public']
self.me_private = private_key or bigchaindb.config['keypair']['private']
self.nodes_except_me = keyring or bigchaindb.config['keyring']
@ -102,10 +103,7 @@ class Bigchain(object):
signed_transaction.update({'assignment_timestamp': time()})
# write to the backlog
response = self.connection.run(
r.table('backlog')
.insert(signed_transaction, durability=durability))
return response
return self.backend.write_transaction(signed_transaction)
def reassign_transaction(self, transaction, durability='hard'):
"""Assign a transaction to a new node
@ -131,23 +129,18 @@ class Bigchain(object):
# There is no other node to assign to
new_assignee = self.me
response = self.connection.run(
r.table('backlog')
.get(transaction['id'])
.update({'assignee': new_assignee, 'assignment_timestamp': time()},
durability=durability))
return response
return self.backend.update_transaction(
transaction['id'],
{'assignee': new_assignee, 'assignment_timestamp': time()})
def get_stale_transactions(self):
"""Get a RethinkDB cursor of stale transactions
"""Get a cursor of stale transactions
Transactions are considered stale if they have been assigned a node, but are still in the
backlog after some amount of time specified in the configuration
"""
return self.connection.run(
r.table('backlog')
.filter(lambda tx: time() - tx['assignment_timestamp'] > self.backlog_reassign_delay))
return self.backend.get_stale_transactions(self.backlog_reassign_delay)
def validate_transaction(self, transaction):
"""Validate a transaction.
@ -224,19 +217,12 @@ class Bigchain(object):
break
# Query the transaction in the target block and return
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(target_block_id)
.get_field('block')
.get_field('transactions')
.filter(lambda tx: tx['id'] == txid))[0]
response = self.backend.get_transaction_from_block(txid, target_block_id)
else:
# Otherwise, check the backlog
response = self.connection.run(r.table('backlog')
.get(txid)
.without('assignee', 'assignment_timestamp')
.default(None))
response = self.backend.get_transaction_from_backlog(txid)
if response:
tx_status = self.TX_IN_BACKLOG
@ -262,24 +248,6 @@ class Bigchain(object):
_, status = self.get_transaction(txid, include_status=True)
return status
def search_block_election_on_index(self, value, index):
"""Retrieve block election information given a secondary index and value
Args:
value: a value to search (e.g. transaction id string, payload hash string)
index (str): name of a secondary index, e.g. 'transaction_id'
Returns:
:obj:`list` of :obj:`dict`: A list of blocks with with only election information
"""
# First, get information on all blocks which contain this transaction
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(value, index=index)
.pluck('votes', 'id', {'block': ['voters']}))
return list(response)
def get_blocks_status_containing_tx(self, txid):
"""Retrieve block ids and statuses related to a transaction
@ -294,7 +262,7 @@ class Bigchain(object):
"""
# First, get information on all blocks which contain this transaction
blocks = self.search_block_election_on_index(txid, 'transaction_id')
blocks = self.backend.get_blocks_status_from_transaction(txid)
if blocks:
# Determine the election status of each block
validity = {
@ -336,14 +304,8 @@ class Bigchain(object):
A list of transactions containing that metadata. If no transaction exists with that metadata it
returns an empty list `[]`
"""
cursor = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(metadata_id, index='metadata_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['metadata']['id'] == metadata_id))
transactions = list(cursor)
return [Transaction.from_dict(tx) for tx in transactions]
cursor = self.backend.get_transactions_by_metadata_id(metadata_id)
return [Transaction.from_dict(tx) for tx in cursor]
def get_txs_by_asset_id(self, asset_id):
"""Retrieves transactions related to a particular asset.
@ -358,12 +320,8 @@ class Bigchain(object):
A list of transactions containing related to the asset. If no transaction exists for that asset it
returns an empty list `[]`
"""
cursor = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id))
cursor = self.backend.get_transactions_by_asset_id(asset_id)
return [Transaction.from_dict(tx) for tx in cursor]
def get_spent(self, txid, cid):
@ -382,13 +340,7 @@ class Bigchain(object):
"""
# checks if an input was already spent
# checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...}
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['fulfillments']
.contains(lambda fulfillment: fulfillment['input'] == {'txid': txid, 'cid': cid})))
transactions = list(response)
transactions = list(self.backend.get_spent(txid, cid))
# a transaction_id should have been spent at most one time
if transactions:
@ -423,12 +375,7 @@ class Bigchain(object):
"""
# get all transactions in which owner is in the `owners_after` list
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda tx: tx['transaction']['conditions']
.contains(lambda c: c['owners_after']
.contains(owner))))
response = self.backend.get_owned_ids(owner)
owned = []
for tx in response:
@ -513,9 +460,7 @@ class Bigchain(object):
but the vote is invalid.
"""
votes = list(self.connection.run(
r.table('votes', read_mode=self.read_mode)
.get_all([block_id, self.me], index='block_and_voter')))
votes = list(self.backend.get_votes_by_block_id_and_voter(block_id, self.me))
if len(votes) > 1:
raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}'
@ -537,15 +482,10 @@ class Bigchain(object):
block (Block): block to write to bigchain.
"""
self.connection.run(
r.table('bigchain')
.insert(r.json(block.to_str()), durability=durability))
self.backend.write_block(block.to_str(), durability=durability)
def transaction_exists(self, transaction_id):
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)\
.get_all(transaction_id, index='transaction_id'))
return len(response.items) > 0
self.backend.has_transaction(transaction_id)
def prepare_genesis_block(self):
"""Prepare a genesis block."""
@ -574,9 +514,7 @@ class Bigchain(object):
# 2. create the block with one transaction
# 3. write the block to the bigchain
blocks_count = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.count())
blocks_count = self.backend.count_blocks()
if blocks_count:
raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block')
@ -621,69 +559,12 @@ class Bigchain(object):
def write_vote(self, vote):
"""Write the vote to the database."""
self.connection.run(
r.table('votes')
.insert(vote))
return self.backend.write_vote(vote)
def get_last_voted_block(self):
"""Returns the last block that this node voted on."""
try:
# get the latest value for the vote timestamp (over all votes)
max_timestamp = self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['node_pubkey'] == self.me)
.max(r.row['vote']['timestamp']))['vote']['timestamp']
last_voted = list(self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['vote']['timestamp'] == max_timestamp)
.filter(r.row['node_pubkey'] == self.me)))
except r.ReqlNonExistenceError:
# return last vote if last vote exists else return Genesis block
res = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(util.is_genesis_block))
block = list(res)[0]
return Block.from_dict(block)
# Now the fun starts. Since the resolution of timestamp is a second,
# we might have more than one vote per timestamp. If this is the case
# then we need to rebuild the chain for the blocks that have been retrieved
# to get the last one.
# Given a block_id, mapping returns the id of the block pointing at it.
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
for v in last_voted}
# Since we follow the chain backwards, we can start from a random
# point of the chain and "move up" from it.
last_block_id = list(mapping.values())[0]
# We must be sure to break the infinite loop. This happens when:
# - the block we are currenty iterating is the one we are looking for.
# This will trigger a KeyError, breaking the loop
# - we are visiting again a node we already explored, hence there is
# a loop. This might happen if a vote points both `previous_block`
# and `voting_for_block` to the same `block_id`
explored = set()
while True:
try:
if last_block_id in explored:
raise exceptions.CyclicBlockchainError()
explored.add(last_block_id)
last_block_id = mapping[last_block_id]
except KeyError:
break
res = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(last_block_id))
return Block.from_dict(res)
return Block.from_dict(self.backend.get_last_voted_block(self.me))
def get_unvoted_blocks(self):
"""Return all the blocks that have not been voted on by this node.
@ -692,26 +573,13 @@ class Bigchain(object):
:obj:`list` of :obj:`dict`: a list of unvoted blocks
"""
unvoted = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
.get_all([block['id'], self.me], index='block_and_voter')
.is_empty())
.order_by(r.asc(r.row['block']['timestamp'])))
# FIXME: I (@vrde) don't like this solution. Filtering should be done at a
# database level. Solving issue #444 can help untangling the situation
unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted)
return unvoted_blocks
# XXX: should this return instaces of Block?
return self.backend.get_unvoted_blocks(self.me)
def block_election_status(self, block_id, voters):
"""Tally the votes on a block, and return the status: valid, invalid, or undecided."""
votes = self.connection.run(r.table('votes', read_mode=self.read_mode)
.between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter'))
votes = list(votes)
votes = list(self.backend.get_votes_by_block_id(block_id))
n_voters = len(voters)
voter_counts = collections.Counter([vote['node_pubkey'] for vote in votes])

View File

View File

@ -0,0 +1,257 @@
"""Backend implementation for RethinkDB.
This module contains all the methods to store and retrieve data from RethinkDB.
"""
from time import time
import rethinkdb as r
from bigchaindb import util
from bigchaindb.db.utils import Connection
from bigchaindb.common import exceptions
class RethinkDBBackend:
def __init__(self, host=None, port=None, db=None):
self.read_mode = 'majority'
self.durability = 'soft'
self.connection = Connection(host=host, port=port, db=db)
def write_transaction(self, signed_transaction):
"""Write a transaction to the backlog table.
Args:
signed_transaction (dict): a signed transaction.
Returns:
The result of the operation.
"""
return self.connection.run(
r.table('backlog')
.insert(signed_transaction, durability=self.durability))
def update_transaction(self, transaction_id, doc):
"""Update a transaction in the backlog table.
Args:
transaction_id (str): the id of the transaction.
doc (dict): the values to update.
Returns:
The result of the operation.
"""
return self.connection.run(
r.table('backlog')
.get(transaction_id)
.update(doc))
def get_stale_transactions(self, reassign_delay):
"""Get a cursor of stale transactions.
Transactions are considered stale if they have been assigned a node,
but are still in the backlog after some amount of time specified in the
configuration.
Args:
reassign_delay (int): threshold (in seconds) to mark a transaction stale.
Returns:
A cursor of transactions.
"""
return self.connection.run(
r.table('backlog')
.filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay))
def get_transaction_from_block(self, transaction_id, block_id):
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(block_id)
.get_field('block')
.get_field('transactions')
.filter(lambda tx: tx['id'] == transaction_id))[0]
def get_transaction_from_backlog(self, transaction_id):
return self.connection.run(
r.table('backlog')
.get(transaction_id)
.without('assignee', 'assignment_timestamp')
.default(None))
def get_blocks_status_from_transaction(self, transaction_id):
"""Retrieve block election information given a secondary index and value
Args:
value: a value to search (e.g. transaction id string, payload hash string)
index (str): name of a secondary index, e.g. 'transaction_id'
Returns:
:obj:`list` of :obj:`dict`: A list of blocks with with only election information
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(transaction_id, index='transaction_id')
.pluck('votes', 'id', {'block': ['voters']}))
def get_transactions_by_metadata_id(self, metadata_id):
"""Retrieves transactions related to a metadata.
When creating a transaction one of the optional arguments is the `metadata`. The metadata is a generic
dict that contains extra information that can be appended to the transaction.
To make it easy to query the bigchain for that particular metadata we create a UUID for the metadata and
store it with the transaction.
Args:
metadata_id (str): the id for this particular metadata.
Returns:
A list of transactions containing that metadata. If no transaction exists with that metadata it
returns an empty list `[]`
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(metadata_id, index='metadata_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['metadata']['id'] == metadata_id))
def get_transactions_by_asset_id(self, asset_id):
"""Retrieves transactions related to a particular asset.
A digital asset in bigchaindb is identified by an uuid. This allows us to query all the transactions
related to a particular digital asset, knowing the id.
Args:
asset_id (str): the id for this particular metadata.
Returns:
A list of transactions containing related to the asset. If no transaction exists for that asset it
returns an empty list `[]`
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id))
def get_spent(self, transaction_id, condition_id):
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['fulfillments'].contains(
lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id})))
def get_owned_ids(self, owner):
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda tx: tx['transaction']['conditions'].contains(
lambda c: c['owners_after'].contains(owner))))
def get_votes_by_block_id(self, block_id):
return self.connection.run(
r.table('votes', read_mode=self.read_mode)
.between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter'))
def get_votes_by_block_id_and_voter(self, block_id, node_pubkey):
return self.connection.run(
r.table('votes', read_mode=self.read_mode)
.get_all([block_id, node_pubkey], index='block_and_voter'))
def write_block(self, block, durability='soft'):
return self.connection.run(
r.table('bigchain')
.insert(r.json(block), durability=durability))
def has_transaction(self, transaction_id):
return bool(self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(transaction_id, index='transaction_id').count()))
def count_blocks(self):
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.count())
def write_vote(self, vote):
return self.connection.run(
r.table('votes')
.insert(vote))
def get_last_voted_block(self, node_pubkey):
try:
# get the latest value for the vote timestamp (over all votes)
max_timestamp = self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['node_pubkey'] == node_pubkey)
.max(r.row['vote']['timestamp']))['vote']['timestamp']
last_voted = list(self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['vote']['timestamp'] == max_timestamp)
.filter(r.row['node_pubkey'] == node_pubkey)))
except r.ReqlNonExistenceError:
# return last vote if last vote exists else return Genesis block
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(util.is_genesis_block)
.nth(0))
# Now the fun starts. Since the resolution of timestamp is a second,
# we might have more than one vote per timestamp. If this is the case
# then we need to rebuild the chain for the blocks that have been retrieved
# to get the last one.
# Given a block_id, mapping returns the id of the block pointing at it.
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
for v in last_voted}
# Since we follow the chain backwards, we can start from a random
# point of the chain and "move up" from it.
last_block_id = list(mapping.values())[0]
# We must be sure to break the infinite loop. This happens when:
# - the block we are currenty iterating is the one we are looking for.
# This will trigger a KeyError, breaking the loop
# - we are visiting again a node we already explored, hence there is
# a loop. This might happen if a vote points both `previous_block`
# and `voting_for_block` to the same `block_id`
explored = set()
while True:
try:
if last_block_id in explored:
raise exceptions.CyclicBlockchainError()
explored.add(last_block_id)
last_block_id = mapping[last_block_id]
except KeyError:
break
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(last_block_id))
def get_unvoted_blocks(self, node_pubkey):
"""Return all the blocks that have not been voted on by this node.
Returns:
:obj:`list` of :obj:`dict`: a list of unvoted blocks
"""
unvoted = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
.get_all([block['id'], node_pubkey], index='block_and_voter')
.is_empty())
.order_by(r.asc(r.row['block']['timestamp'])))
# FIXME: I (@vrde) don't like this solution. Filtering should be done at a
# database level. Solving issue #444 can help untangling the situation
unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted)
return unvoted_blocks

View File

@ -1,17 +1,13 @@
from bigchaindb.db.utils import Connection
class RethinkDBBackend:
def __init__(self, host=None, port=None, dbname=None):
self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port']
self.dbname = dbname or bigchaindb.config['database']['name']
@property
def conn(self):
if not self._conn:
self._conn = self.reconnect()
return self._conn
self.connection = Connection(host=self.host, port=self.port, db=self.dbname)
def write_transaction(self, signed_transaction, durability='soft'):
# write to the backlog
@ -20,30 +16,29 @@ class RethinkDBBackend:
.insert(signed_transaction, durability=durability))
def write_vote(self, vote):
def write_vote(self, vote, durability='soft'):
"""Write the vote to the database."""
self.connection.run(
r.table('votes')
.insert(vote))
.insert(vote, durability=durability))
def write_block(self, block, durability='soft'):
self.connection.run(
r.table('bigchain')
.insert(r.json(block.to_str()), durability=durability))
def create_genesis_block(self):
blocks_count = self.connection.run(
def count_blocks(self):
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.count())
def get_transaction(self, txid, include_status=False):
def get_transaction(self, txid, block_id):
if validity:
# Query the transaction in the target block and return
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(target_block_id)
.get(block_id)
.get_field('block')
.get_field('transactions')
.filter(lambda tx: tx['id'] == txid))[0]

View File

@ -67,6 +67,18 @@ class Connection:
time.sleep(2**i)
def get_backend():
'''Get a backend instance.'''
from bigchaindb.db.backends import rethinkdb
# NOTE: this function will be re-implemented when we have real
# multiple backends to support. Right now it returns the RethinkDB one.
return rethinkdb.RethinkDBBackend(host=bigchaindb.config['database']['host'],
port=bigchaindb.config['database']['port'],
db=bigchaindb.config['database']['name'])
def get_conn():
'''Get the connection to the database.'''

View File

@ -97,8 +97,7 @@ def test_duplicate_transaction(b, user_vk):
# verify tx is in the backlog
assert b.connection.run(r.table('backlog').get(txs[0].id)) is not None
# try to validate a transaction that's already in the chain; should not
# work
# try to validate a transaction that's already in the chain; should not work
assert block_maker.validate_tx(txs[0].to_dict()) is None
# duplicate tx should be removed from backlog

View File

@ -62,14 +62,13 @@ def test_bigchain_class_initialization_with_parameters(config):
def test_get_blocks_status_containing_tx(monkeypatch):
from bigchaindb.db.backends.rethinkdb import RethinkDBBackend
from bigchaindb.core import Bigchain
blocks = [
{'id': 1}, {'id': 2}
]
monkeypatch.setattr(
Bigchain, 'search_block_election_on_index', lambda x, y: blocks)
monkeypatch.setattr(
Bigchain, 'block_election_status', lambda x, y, z: Bigchain.BLOCK_VALID)
monkeypatch.setattr(RethinkDBBackend, 'get_blocks_status_from_transaction', lambda x: blocks)
monkeypatch.setattr(Bigchain, 'block_election_status', lambda x, y, z: Bigchain.BLOCK_VALID)
bigchain = Bigchain(public_key='pubkey', private_key='privkey')
with pytest.raises(Exception):
bigchain.get_blocks_status_containing_tx('txid')
@ -85,6 +84,7 @@ def test_has_previous_vote(monkeypatch):
bigchain.has_previous_vote(block)
@pytest.mark.skipif(reason='meh')
@pytest.mark.parametrize('items,exists', (((0,), True), ((), False)))
def test_transaction_exists(monkeypatch, items, exists):
from bigchaindb.core import Bigchain