1
0
mirror of https://github.com/bigchaindb/bigchaindb.git synced 2024-06-23 17:56:41 +02:00

Initial implementation to decouple assets from transactions.

Most changes done to how we write and read blocks to the database.
Created schema, indexes and queries for mongodb.
Fixed tests.
This commit is contained in:
Rodolphe Marques 2017-05-10 16:43:52 +02:00
parent b96cfdecc1
commit 92392b51a7
12 changed files with 187 additions and 32 deletions

View File

@ -7,7 +7,7 @@ from pymongo import ReturnDocument
from bigchaindb import backend
from bigchaindb.common.exceptions import CyclicBlockchainError
from bigchaindb.common.transaction import Transaction
from bigchaindb.backend.exceptions import DuplicateKeyError
from bigchaindb.backend.exceptions import DuplicateKeyError, OperationError
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.mongodb.connection import MongoDBConnection
@ -127,6 +127,7 @@ def get_txids_filtered(conn, asset_id, operation=None):
return (elem['block']['transactions']['id'] for elem in cursor)
# TODO: This doesn't seem to be used anywhere
@register_query(MongoDBConnection)
def get_asset_by_id(conn, asset_id):
cursor = conn.run(
@ -206,10 +207,10 @@ def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
@register_query(MongoDBConnection)
def write_block(conn, block):
def write_block(conn, block_dict):
return conn.run(
conn.collection('bigchain')
.insert_one(block.to_dict()))
.insert_one(block_dict))
@register_query(MongoDBConnection)
@ -220,6 +221,31 @@ def get_block(conn, block_id):
projection={'_id': False}))
@register_query(MongoDBConnection)
def write_assets(conn, assets):
try:
# unordered means that all the inserts will be attempted instead of
# stopping after the first error.
return conn.run(
conn.collection('assets')
.insert_many(assets, ordered=False))
# This can happen if we try to write the same asset multiple times.
# One case is when we write the same transaction into multiple blocks due
# to invalid blocks.
# The actual mongodb exception is a BulkWriteError due to a duplicated key
# in one of the inserts.
except OperationError:
return
@register_query(MongoDBConnection)
def get_assets(conn, asset_ids):
return conn.run(
conn.collection('assets')
.find({'id': {'$in': asset_ids}},
projection={'_id': False}))
@register_query(MongoDBConnection)
def count_blocks(conn):
return conn.run(
@ -252,7 +278,7 @@ def get_genesis_block(conn):
@register_query(MongoDBConnection)
def get_last_voted_block(conn, node_pubkey):
def get_last_voted_block_id(conn, node_pubkey):
last_voted = conn.run(
conn.collection('votes')
.find({'node_pubkey': node_pubkey},
@ -261,7 +287,7 @@ def get_last_voted_block(conn, node_pubkey):
# pymongo seems to return a cursor even if there are no results
# so we actually need to check the count
if last_voted.count() == 0:
return get_genesis_block(conn)
return get_genesis_block(conn)['id']
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
for v in last_voted}
@ -279,7 +305,7 @@ def get_last_voted_block(conn, node_pubkey):
except KeyError:
break
return get_block(conn, last_block_id)
return last_block_id
@register_query(MongoDBConnection)

View File

@ -27,7 +27,7 @@ def create_database(conn, dbname):
@register_schema(MongoDBConnection)
def create_tables(conn, dbname):
for table_name in ['bigchain', 'backlog', 'votes']:
for table_name in ['bigchain', 'backlog', 'votes', 'assets']:
logger.info('Create `%s` table.', table_name)
# create the table
# TODO: read and write concerns can be declared here
@ -39,6 +39,7 @@ def create_indexes(conn, dbname):
create_bigchain_secondary_index(conn, dbname)
create_backlog_secondary_index(conn, dbname)
create_votes_secondary_index(conn, dbname)
create_assets_secondary_index(conn, dbname)
@register_schema(MongoDBConnection)
@ -102,3 +103,13 @@ def create_votes_secondary_index(conn, dbname):
ASCENDING)],
name='block_and_voter',
unique=True)
def create_assets_secondary_index(conn, dbname):
logger.info('Create `assets` secondary index.')
# is the first index redundant then?
# compound index to order votes by block id and node
conn.conn[dbname]['assets'].create_index('id',
name='asset_id',
unique=True)

View File

@ -211,6 +211,18 @@ def get_block(connection, block_id):
raise NotImplementedError
@singledispatch
def write_assets(connection, assets):
# TODO: write docstring
raise NotImplementedError
@singledispatch
def get_assets(connection, assets):
# TODO: write docstring
raise NotImplementedError
@singledispatch
def count_blocks(connection):
"""Count the number of blocks in the bigchain table.
@ -259,7 +271,7 @@ def get_genesis_block(connection):
@singledispatch
def get_last_voted_block(connection, node_pubkey):
def get_last_voted_block_id(connection, node_pubkey):
"""Get the last voted block for a specific node.
Args:

View File

@ -188,7 +188,7 @@ def get_genesis_block(connection):
@register_query(RethinkDBConnection)
def get_last_voted_block(connection, node_pubkey):
def get_last_voted_block_id(connection, node_pubkey):
try:
# get the latest value for the vote timestamp (over all votes)
max_timestamp = connection.run(

View File

@ -183,15 +183,23 @@ class Bigchain(object):
include_status (bool): also return the status of the block
the return value is then a tuple: (block, status)
"""
block = backend.query.get_block(self.connection, block_id)
status = None
# get block from database
block_dict = backend.query.get_block(self.connection, block_id)
# get the asset ids from the block
if block_dict:
asset_ids = Block.get_asset_ids(block_dict)
# get the assets from the database
assets = self.get_assets(asset_ids)
# add the assets to the block transactions
block_dict = Block.couple_assets(block_dict, assets)
status = None
if include_status:
if block:
status = self.block_election_status(block)
return block, status
if block_dict:
status = self.block_election_status(block_dict)
return block_dict, status
else:
return block
return block_dict
def get_transaction(self, txid, include_status=False):
"""Get the transaction with the specified `txid` (and optionally its status)
@ -251,7 +259,13 @@ class Bigchain(object):
tx_status = self.TX_IN_BACKLOG
if response:
response = Transaction.from_dict(response)
if tx_status == self.TX_IN_BACKLOG:
response = Transaction.from_dict(response)
else:
# If we are reading from the bigchain collection the asset is
# not in the transaction so we need to fetch the asset and
# reconstruct the transaction.
response = Transaction.from_db(self, response)
if include_status:
return response, tx_status
@ -513,7 +527,14 @@ class Bigchain(object):
block (Block): block to write to bigchain.
"""
return backend.query.write_block(self.connection, block)
# Decouple assets from block
assets, block_dict = block.decouple_assets()
# write the assets
if assets:
self.write_assets(assets)
# write the block
return backend.query.write_block(self.connection, block_dict)
def prepare_genesis_block(self):
"""Prepare a genesis block."""
@ -592,7 +613,9 @@ class Bigchain(object):
def get_last_voted_block(self):
"""Returns the last block that this node voted on."""
return Block.from_dict(backend.query.get_last_voted_block(self.connection, self.me))
last_block_id = backend.query.get_last_voted_block_id(self.connection,
self.me)
return Block.from_dict(self.get_block(last_block_id))
def get_unvoted_blocks(self):
"""Return all the blocks that have not been voted on by this node.
@ -616,3 +639,10 @@ class Bigchain(object):
"""Tally the votes on a block, and return the status:
valid, invalid, or undecided."""
return self.block_election(block)['status']
def get_assets(self, asset_ids):
# TODO: write docstrings
return backend.query.get_assets(self.connection, asset_ids)
def write_assets(self, assets):
return backend.query.write_assets(self.connection, assets)

View File

@ -1,3 +1,5 @@
from copy import deepcopy
from bigchaindb.common.crypto import hash_data, PublicKey, PrivateKey
from bigchaindb.common.exceptions import (InvalidHash, InvalidSignature,
DoubleSpend, InputDoesNotExist,
@ -84,6 +86,16 @@ class Transaction(Transaction):
validate_transaction_schema(tx_body)
return super().from_dict(tx_body)
@classmethod
def from_db(cls, bigchain, tx_dict):
# TODO: write docstring
if tx_dict['operation'] in [Transaction.CREATE, Transaction.CREATE]:
asset = bigchain.get_assets([tx_dict['id']])[0]
asset.pop('id')
tx_dict.update({'asset': asset})
return cls.from_dict(tx_dict)
class Block(object):
"""Bundle a list of Transactions in a Block. Nodes vote on its validity.
@ -300,5 +312,49 @@ class Block(object):
'signature': self.signature,
}
@classmethod
def from_db(cls, bigchain, block_dict):
asset_ids = cls.get_asset_ids(block_dict)
assets = bigchain.get_assets(asset_ids)
block_dict = cls.couple_assets(block_dict, assets)
return cls.from_dict(block_dict)
def decouple_assets(self):
# TODO: Write documentation
block_dict = deepcopy(self.to_dict())
assets = []
for transaction in block_dict['block']['transactions']:
if transaction['operation'] in [Transaction.CREATE,
Transaction.GENESIS]:
asset = transaction.pop('asset')
asset.update({'id': transaction['id']})
assets.append(asset)
return (assets, block_dict)
@staticmethod
def couple_assets(block_dict, assets):
# TODO: Write docstring
# create a dict with {'<txid>': asset}
assets = {asset.pop('id'): asset for asset in assets}
# add the assets to the block transactions
for transaction in block_dict['block']['transactions']:
if transaction['operation'] in [Transaction.CREATE,
Transaction.GENESIS]:
transaction.update({'asset': assets.get(transaction['id'],
None)})
return block_dict
@staticmethod
def get_asset_ids(block_dict):
# TODO: Write docstring
asset_ids = []
for transaction in block_dict['block']['transactions']:
if transaction['operation'] in [Transaction.CREATE,
Transaction.GENESIS]:
asset_ids.append(transaction['id'])
return asset_ids
def to_str(self):
return serialize(self.to_dict())

View File

@ -50,7 +50,7 @@ class Vote:
def validate_block(self, block):
if not self.bigchain.has_previous_vote(block['id']):
try:
block = Block.from_dict(block)
block = Block.from_db(self.bigchain, block)
except (exceptions.InvalidHash):
# XXX: if a block is invalid we should skip the `validate_tx`
# step, but since we are in a pipeline we cannot just jump to

View File

@ -269,7 +269,7 @@ def test_write_block(signed_create_tx):
# create and write block
block = Block(transactions=[signed_create_tx])
query.write_block(conn, block)
query.write_block(conn, block.to_dict())
block_db = conn.db.bigchain.find_one({'id': block.id}, {'_id': False})
@ -347,17 +347,18 @@ def test_get_genesis_block(genesis_block):
from bigchaindb.backend import connect, query
conn = connect()
assert query.get_genesis_block(conn) == genesis_block.to_dict()
assets, genesis_block_dict = genesis_block.decouple_assets()
assert query.get_genesis_block(conn) == genesis_block_dict
def test_get_last_voted_block(genesis_block, signed_create_tx, b):
def test_get_last_voted_block_id(genesis_block, signed_create_tx, b):
from bigchaindb.backend import connect, query
from bigchaindb.models import Block
from bigchaindb.common.exceptions import CyclicBlockchainError
conn = connect()
# check that the last voted block is the genesis block
assert query.get_last_voted_block(conn, b.me) == genesis_block.to_dict()
assert query.get_last_voted_block_id(conn, b.me) == genesis_block.id
# create and insert a new vote and block
block = Block(transactions=[signed_create_tx])
@ -365,7 +366,7 @@ def test_get_last_voted_block(genesis_block, signed_create_tx, b):
vote = b.vote(block.id, genesis_block.id, True)
conn.db.votes.insert_one(vote)
assert query.get_last_voted_block(conn, b.me) == block.to_dict()
assert query.get_last_voted_block_id(conn, b.me) == block.id
# force a bad chain
vote.pop('_id')
@ -374,7 +375,7 @@ def test_get_last_voted_block(genesis_block, signed_create_tx, b):
conn.db.votes.insert_one(vote)
with pytest.raises(CyclicBlockchainError):
query.get_last_voted_block(conn, b.me)
query.get_last_voted_block_id(conn, b.me)
def test_get_unvoted_blocks(signed_create_tx):

View File

@ -18,7 +18,8 @@ def test_init_creates_db_tables_and_indexes():
init_database()
collection_names = conn.conn[dbname].collection_names()
assert sorted(collection_names) == ['backlog', 'bigchain', 'votes']
assert sorted(collection_names) == ['assets', 'backlog', 'bigchain',
'votes']
indexes = conn.conn[dbname]['bigchain'].index_information().keys()
assert sorted(indexes) == ['_id_', 'asset_id', 'block_timestamp', 'inputs',
@ -31,6 +32,9 @@ def test_init_creates_db_tables_and_indexes():
indexes = conn.conn[dbname]['votes'].index_information().keys()
assert sorted(indexes) == ['_id_', 'block_and_voter']
indexes = conn.conn[dbname]['assets'].index_information().keys()
assert sorted(indexes) == ['_id_', 'asset_id']
def test_init_database_fails_if_db_exists():
import bigchaindb
@ -62,7 +66,8 @@ def test_create_tables():
schema.create_tables(conn, dbname)
collection_names = conn.conn[dbname].collection_names()
assert sorted(collection_names) == ['backlog', 'bigchain', 'votes']
assert sorted(collection_names) == ['assets', 'backlog', 'bigchain',
'votes']
def test_create_secondary_indexes():

View File

@ -30,12 +30,14 @@ def test_schema(schema_func_name, args_qty):
('write_block', 1),
('get_block', 1),
('write_vote', 1),
('get_last_voted_block', 1),
('get_last_voted_block_id', 1),
('get_unvoted_blocks', 1),
('get_spent', 2),
('get_votes_by_block_id_and_voter', 2),
('update_transaction', 2),
('get_transaction_from_block', 2),
('write_assets', 1),
('get_assets', 1),
))
def test_query(query_func_name, args_qty):
from bigchaindb.backend import query

View File

@ -383,7 +383,7 @@ class TestBigchainApi(object):
from bigchaindb.backend import query
genesis = query.get_genesis_block(b.connection)
genesis = Block.from_dict(genesis)
genesis = Block.from_db(b, genesis)
gb = b.get_last_voted_block()
assert gb == genesis
assert b.validate_block(gb) == gb

View File

@ -20,6 +20,15 @@ def dummy_block(b):
return block
def decouple_assets(b, block):
# the block comming from the database does not contain the assets
# so we need to pass the block without the assets and store the assets
# so that the voting pipeline can reconstruct it
assets, block_dict = block.decouple_assets()
b.write_assets(assets)
return block_dict
DUMMY_SHA3 = '0123456789abcdef' * 4
@ -79,9 +88,10 @@ def test_vote_validate_block(b):
tx = dummy_tx(b)
block = b.create_block([tx])
block_dict = decouple_assets(b, block)
vote_obj = vote.Vote()
validation = vote_obj.validate_block(block.to_dict())
validation = vote_obj.validate_block(block_dict)
assert validation[0] == block.id
for tx1, tx2 in zip(validation[1], block.transactions):
assert tx1 == tx2
@ -220,8 +230,9 @@ def test_valid_block_voting_multiprocessing(b, genesis_block, monkeypatch):
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
block = dummy_block(b)
block_dict = decouple_assets(b, block)
inpipe.put(block.to_dict())
inpipe.put(block_dict)
vote_pipeline.start()
vote_out = outpipe.get()
vote_pipeline.terminate()
@ -257,6 +268,7 @@ def test_valid_block_voting_with_create_transaction(b,
monkeypatch.setattr('time.time', lambda: 1111111111)
block = b.create_block([tx])
block_dict = decouple_assets(b, block)
inpipe = Pipe()
outpipe = Pipe()
@ -264,7 +276,7 @@ def test_valid_block_voting_with_create_transaction(b,
vote_pipeline = vote.create_pipeline()
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
inpipe.put(block.to_dict())
inpipe.put(block_dict)
vote_pipeline.start()
vote_out = outpipe.get()
vote_pipeline.terminate()