From 92392b51a7a6e1eda3263d17d5ca3ffc2bf46f2c Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Wed, 10 May 2017 16:43:52 +0200 Subject: [PATCH] Initial implementation to decouple assets from transactions. Most changes done to how we write and read blocks to the database. Created schema, indexes and queries for mongodb. Fixed tests. --- bigchaindb/backend/mongodb/query.py | 38 +++++++++++++++--- bigchaindb/backend/mongodb/schema.py | 13 ++++++- bigchaindb/backend/query.py | 14 ++++++- bigchaindb/backend/rethinkdb/query.py | 2 +- bigchaindb/core.py | 48 ++++++++++++++++++----- bigchaindb/models.py | 56 +++++++++++++++++++++++++++ bigchaindb/pipelines/vote.py | 2 +- tests/backend/mongodb/test_queries.py | 13 ++++--- tests/backend/mongodb/test_schema.py | 9 ++++- tests/backend/test_generics.py | 4 +- tests/db/test_bigchain_api.py | 2 +- tests/pipelines/test_vote.py | 18 +++++++-- 12 files changed, 187 insertions(+), 32 deletions(-) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 74b9c35a..39d99d4a 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -7,7 +7,7 @@ from pymongo import ReturnDocument from bigchaindb import backend from bigchaindb.common.exceptions import CyclicBlockchainError from bigchaindb.common.transaction import Transaction -from bigchaindb.backend.exceptions import DuplicateKeyError +from bigchaindb.backend.exceptions import DuplicateKeyError, OperationError from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.mongodb.connection import MongoDBConnection @@ -127,6 +127,7 @@ def get_txids_filtered(conn, asset_id, operation=None): return (elem['block']['transactions']['id'] for elem in cursor) +# TODO: This doesn't seem to be used anywhere @register_query(MongoDBConnection) def get_asset_by_id(conn, asset_id): cursor = conn.run( @@ -206,10 +207,10 @@ def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey): @register_query(MongoDBConnection) -def write_block(conn, block): +def write_block(conn, block_dict): return conn.run( conn.collection('bigchain') - .insert_one(block.to_dict())) + .insert_one(block_dict)) @register_query(MongoDBConnection) @@ -220,6 +221,31 @@ def get_block(conn, block_id): projection={'_id': False})) +@register_query(MongoDBConnection) +def write_assets(conn, assets): + try: + # unordered means that all the inserts will be attempted instead of + # stopping after the first error. + return conn.run( + conn.collection('assets') + .insert_many(assets, ordered=False)) + # This can happen if we try to write the same asset multiple times. + # One case is when we write the same transaction into multiple blocks due + # to invalid blocks. + # The actual mongodb exception is a BulkWriteError due to a duplicated key + # in one of the inserts. + except OperationError: + return + + +@register_query(MongoDBConnection) +def get_assets(conn, asset_ids): + return conn.run( + conn.collection('assets') + .find({'id': {'$in': asset_ids}}, + projection={'_id': False})) + + @register_query(MongoDBConnection) def count_blocks(conn): return conn.run( @@ -252,7 +278,7 @@ def get_genesis_block(conn): @register_query(MongoDBConnection) -def get_last_voted_block(conn, node_pubkey): +def get_last_voted_block_id(conn, node_pubkey): last_voted = conn.run( conn.collection('votes') .find({'node_pubkey': node_pubkey}, @@ -261,7 +287,7 @@ def get_last_voted_block(conn, node_pubkey): # pymongo seems to return a cursor even if there are no results # so we actually need to check the count if last_voted.count() == 0: - return get_genesis_block(conn) + return get_genesis_block(conn)['id'] mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] for v in last_voted} @@ -279,7 +305,7 @@ def get_last_voted_block(conn, node_pubkey): except KeyError: break - return get_block(conn, last_block_id) + return last_block_id @register_query(MongoDBConnection) diff --git a/bigchaindb/backend/mongodb/schema.py b/bigchaindb/backend/mongodb/schema.py index ad89f9bc..527476f0 100644 --- a/bigchaindb/backend/mongodb/schema.py +++ b/bigchaindb/backend/mongodb/schema.py @@ -27,7 +27,7 @@ def create_database(conn, dbname): @register_schema(MongoDBConnection) def create_tables(conn, dbname): - for table_name in ['bigchain', 'backlog', 'votes']: + for table_name in ['bigchain', 'backlog', 'votes', 'assets']: logger.info('Create `%s` table.', table_name) # create the table # TODO: read and write concerns can be declared here @@ -39,6 +39,7 @@ def create_indexes(conn, dbname): create_bigchain_secondary_index(conn, dbname) create_backlog_secondary_index(conn, dbname) create_votes_secondary_index(conn, dbname) + create_assets_secondary_index(conn, dbname) @register_schema(MongoDBConnection) @@ -102,3 +103,13 @@ def create_votes_secondary_index(conn, dbname): ASCENDING)], name='block_and_voter', unique=True) + + +def create_assets_secondary_index(conn, dbname): + logger.info('Create `assets` secondary index.') + + # is the first index redundant then? + # compound index to order votes by block id and node + conn.conn[dbname]['assets'].create_index('id', + name='asset_id', + unique=True) diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 9aa653d7..8f1325ef 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -211,6 +211,18 @@ def get_block(connection, block_id): raise NotImplementedError +@singledispatch +def write_assets(connection, assets): + # TODO: write docstring + raise NotImplementedError + + +@singledispatch +def get_assets(connection, assets): + # TODO: write docstring + raise NotImplementedError + + @singledispatch def count_blocks(connection): """Count the number of blocks in the bigchain table. @@ -259,7 +271,7 @@ def get_genesis_block(connection): @singledispatch -def get_last_voted_block(connection, node_pubkey): +def get_last_voted_block_id(connection, node_pubkey): """Get the last voted block for a specific node. Args: diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index 6011cc8c..be20442a 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -188,7 +188,7 @@ def get_genesis_block(connection): @register_query(RethinkDBConnection) -def get_last_voted_block(connection, node_pubkey): +def get_last_voted_block_id(connection, node_pubkey): try: # get the latest value for the vote timestamp (over all votes) max_timestamp = connection.run( diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 5d2e9c03..c6ff5608 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -183,15 +183,23 @@ class Bigchain(object): include_status (bool): also return the status of the block the return value is then a tuple: (block, status) """ - block = backend.query.get_block(self.connection, block_id) - status = None + # get block from database + block_dict = backend.query.get_block(self.connection, block_id) + # get the asset ids from the block + if block_dict: + asset_ids = Block.get_asset_ids(block_dict) + # get the assets from the database + assets = self.get_assets(asset_ids) + # add the assets to the block transactions + block_dict = Block.couple_assets(block_dict, assets) + status = None if include_status: - if block: - status = self.block_election_status(block) - return block, status + if block_dict: + status = self.block_election_status(block_dict) + return block_dict, status else: - return block + return block_dict def get_transaction(self, txid, include_status=False): """Get the transaction with the specified `txid` (and optionally its status) @@ -251,7 +259,13 @@ class Bigchain(object): tx_status = self.TX_IN_BACKLOG if response: - response = Transaction.from_dict(response) + if tx_status == self.TX_IN_BACKLOG: + response = Transaction.from_dict(response) + else: + # If we are reading from the bigchain collection the asset is + # not in the transaction so we need to fetch the asset and + # reconstruct the transaction. + response = Transaction.from_db(self, response) if include_status: return response, tx_status @@ -513,7 +527,14 @@ class Bigchain(object): block (Block): block to write to bigchain. """ - return backend.query.write_block(self.connection, block) + # Decouple assets from block + assets, block_dict = block.decouple_assets() + # write the assets + if assets: + self.write_assets(assets) + + # write the block + return backend.query.write_block(self.connection, block_dict) def prepare_genesis_block(self): """Prepare a genesis block.""" @@ -592,7 +613,9 @@ class Bigchain(object): def get_last_voted_block(self): """Returns the last block that this node voted on.""" - return Block.from_dict(backend.query.get_last_voted_block(self.connection, self.me)) + last_block_id = backend.query.get_last_voted_block_id(self.connection, + self.me) + return Block.from_dict(self.get_block(last_block_id)) def get_unvoted_blocks(self): """Return all the blocks that have not been voted on by this node. @@ -616,3 +639,10 @@ class Bigchain(object): """Tally the votes on a block, and return the status: valid, invalid, or undecided.""" return self.block_election(block)['status'] + + def get_assets(self, asset_ids): + # TODO: write docstrings + return backend.query.get_assets(self.connection, asset_ids) + + def write_assets(self, assets): + return backend.query.write_assets(self.connection, assets) diff --git a/bigchaindb/models.py b/bigchaindb/models.py index 2f46ba20..5de56aaf 100644 --- a/bigchaindb/models.py +++ b/bigchaindb/models.py @@ -1,3 +1,5 @@ +from copy import deepcopy + from bigchaindb.common.crypto import hash_data, PublicKey, PrivateKey from bigchaindb.common.exceptions import (InvalidHash, InvalidSignature, DoubleSpend, InputDoesNotExist, @@ -84,6 +86,16 @@ class Transaction(Transaction): validate_transaction_schema(tx_body) return super().from_dict(tx_body) + @classmethod + def from_db(cls, bigchain, tx_dict): + # TODO: write docstring + if tx_dict['operation'] in [Transaction.CREATE, Transaction.CREATE]: + asset = bigchain.get_assets([tx_dict['id']])[0] + asset.pop('id') + tx_dict.update({'asset': asset}) + + return cls.from_dict(tx_dict) + class Block(object): """Bundle a list of Transactions in a Block. Nodes vote on its validity. @@ -300,5 +312,49 @@ class Block(object): 'signature': self.signature, } + @classmethod + def from_db(cls, bigchain, block_dict): + asset_ids = cls.get_asset_ids(block_dict) + assets = bigchain.get_assets(asset_ids) + block_dict = cls.couple_assets(block_dict, assets) + return cls.from_dict(block_dict) + + def decouple_assets(self): + # TODO: Write documentation + block_dict = deepcopy(self.to_dict()) + assets = [] + for transaction in block_dict['block']['transactions']: + if transaction['operation'] in [Transaction.CREATE, + Transaction.GENESIS]: + asset = transaction.pop('asset') + asset.update({'id': transaction['id']}) + assets.append(asset) + + return (assets, block_dict) + + @staticmethod + def couple_assets(block_dict, assets): + # TODO: Write docstring + # create a dict with {'': asset} + assets = {asset.pop('id'): asset for asset in assets} + # add the assets to the block transactions + for transaction in block_dict['block']['transactions']: + if transaction['operation'] in [Transaction.CREATE, + Transaction.GENESIS]: + transaction.update({'asset': assets.get(transaction['id'], + None)}) + return block_dict + + @staticmethod + def get_asset_ids(block_dict): + # TODO: Write docstring + asset_ids = [] + for transaction in block_dict['block']['transactions']: + if transaction['operation'] in [Transaction.CREATE, + Transaction.GENESIS]: + asset_ids.append(transaction['id']) + + return asset_ids + def to_str(self): return serialize(self.to_dict()) diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index 9664c520..e90ce6c4 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -50,7 +50,7 @@ class Vote: def validate_block(self, block): if not self.bigchain.has_previous_vote(block['id']): try: - block = Block.from_dict(block) + block = Block.from_db(self.bigchain, block) except (exceptions.InvalidHash): # XXX: if a block is invalid we should skip the `validate_tx` # step, but since we are in a pipeline we cannot just jump to diff --git a/tests/backend/mongodb/test_queries.py b/tests/backend/mongodb/test_queries.py index bd7e75f1..1363d9d7 100644 --- a/tests/backend/mongodb/test_queries.py +++ b/tests/backend/mongodb/test_queries.py @@ -269,7 +269,7 @@ def test_write_block(signed_create_tx): # create and write block block = Block(transactions=[signed_create_tx]) - query.write_block(conn, block) + query.write_block(conn, block.to_dict()) block_db = conn.db.bigchain.find_one({'id': block.id}, {'_id': False}) @@ -347,17 +347,18 @@ def test_get_genesis_block(genesis_block): from bigchaindb.backend import connect, query conn = connect() - assert query.get_genesis_block(conn) == genesis_block.to_dict() + assets, genesis_block_dict = genesis_block.decouple_assets() + assert query.get_genesis_block(conn) == genesis_block_dict -def test_get_last_voted_block(genesis_block, signed_create_tx, b): +def test_get_last_voted_block_id(genesis_block, signed_create_tx, b): from bigchaindb.backend import connect, query from bigchaindb.models import Block from bigchaindb.common.exceptions import CyclicBlockchainError conn = connect() # check that the last voted block is the genesis block - assert query.get_last_voted_block(conn, b.me) == genesis_block.to_dict() + assert query.get_last_voted_block_id(conn, b.me) == genesis_block.id # create and insert a new vote and block block = Block(transactions=[signed_create_tx]) @@ -365,7 +366,7 @@ def test_get_last_voted_block(genesis_block, signed_create_tx, b): vote = b.vote(block.id, genesis_block.id, True) conn.db.votes.insert_one(vote) - assert query.get_last_voted_block(conn, b.me) == block.to_dict() + assert query.get_last_voted_block_id(conn, b.me) == block.id # force a bad chain vote.pop('_id') @@ -374,7 +375,7 @@ def test_get_last_voted_block(genesis_block, signed_create_tx, b): conn.db.votes.insert_one(vote) with pytest.raises(CyclicBlockchainError): - query.get_last_voted_block(conn, b.me) + query.get_last_voted_block_id(conn, b.me) def test_get_unvoted_blocks(signed_create_tx): diff --git a/tests/backend/mongodb/test_schema.py b/tests/backend/mongodb/test_schema.py index 71eac7ff..e3b320bd 100644 --- a/tests/backend/mongodb/test_schema.py +++ b/tests/backend/mongodb/test_schema.py @@ -18,7 +18,8 @@ def test_init_creates_db_tables_and_indexes(): init_database() collection_names = conn.conn[dbname].collection_names() - assert sorted(collection_names) == ['backlog', 'bigchain', 'votes'] + assert sorted(collection_names) == ['assets', 'backlog', 'bigchain', + 'votes'] indexes = conn.conn[dbname]['bigchain'].index_information().keys() assert sorted(indexes) == ['_id_', 'asset_id', 'block_timestamp', 'inputs', @@ -31,6 +32,9 @@ def test_init_creates_db_tables_and_indexes(): indexes = conn.conn[dbname]['votes'].index_information().keys() assert sorted(indexes) == ['_id_', 'block_and_voter'] + indexes = conn.conn[dbname]['assets'].index_information().keys() + assert sorted(indexes) == ['_id_', 'asset_id'] + def test_init_database_fails_if_db_exists(): import bigchaindb @@ -62,7 +66,8 @@ def test_create_tables(): schema.create_tables(conn, dbname) collection_names = conn.conn[dbname].collection_names() - assert sorted(collection_names) == ['backlog', 'bigchain', 'votes'] + assert sorted(collection_names) == ['assets', 'backlog', 'bigchain', + 'votes'] def test_create_secondary_indexes(): diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py index 57a644ee..6a1e9447 100644 --- a/tests/backend/test_generics.py +++ b/tests/backend/test_generics.py @@ -30,12 +30,14 @@ def test_schema(schema_func_name, args_qty): ('write_block', 1), ('get_block', 1), ('write_vote', 1), - ('get_last_voted_block', 1), + ('get_last_voted_block_id', 1), ('get_unvoted_blocks', 1), ('get_spent', 2), ('get_votes_by_block_id_and_voter', 2), ('update_transaction', 2), ('get_transaction_from_block', 2), + ('write_assets', 1), + ('get_assets', 1), )) def test_query(query_func_name, args_qty): from bigchaindb.backend import query diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 3f05385c..5960f171 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -383,7 +383,7 @@ class TestBigchainApi(object): from bigchaindb.backend import query genesis = query.get_genesis_block(b.connection) - genesis = Block.from_dict(genesis) + genesis = Block.from_db(b, genesis) gb = b.get_last_voted_block() assert gb == genesis assert b.validate_block(gb) == gb diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 7df7ca11..29523035 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -20,6 +20,15 @@ def dummy_block(b): return block +def decouple_assets(b, block): + # the block comming from the database does not contain the assets + # so we need to pass the block without the assets and store the assets + # so that the voting pipeline can reconstruct it + assets, block_dict = block.decouple_assets() + b.write_assets(assets) + return block_dict + + DUMMY_SHA3 = '0123456789abcdef' * 4 @@ -79,9 +88,10 @@ def test_vote_validate_block(b): tx = dummy_tx(b) block = b.create_block([tx]) + block_dict = decouple_assets(b, block) vote_obj = vote.Vote() - validation = vote_obj.validate_block(block.to_dict()) + validation = vote_obj.validate_block(block_dict) assert validation[0] == block.id for tx1, tx2 in zip(validation[1], block.transactions): assert tx1 == tx2 @@ -220,8 +230,9 @@ def test_valid_block_voting_multiprocessing(b, genesis_block, monkeypatch): vote_pipeline.setup(indata=inpipe, outdata=outpipe) block = dummy_block(b) + block_dict = decouple_assets(b, block) - inpipe.put(block.to_dict()) + inpipe.put(block_dict) vote_pipeline.start() vote_out = outpipe.get() vote_pipeline.terminate() @@ -257,6 +268,7 @@ def test_valid_block_voting_with_create_transaction(b, monkeypatch.setattr('time.time', lambda: 1111111111) block = b.create_block([tx]) + block_dict = decouple_assets(b, block) inpipe = Pipe() outpipe = Pipe() @@ -264,7 +276,7 @@ def test_valid_block_voting_with_create_transaction(b, vote_pipeline = vote.create_pipeline() vote_pipeline.setup(indata=inpipe, outdata=outpipe) - inpipe.put(block.to_dict()) + inpipe.put(block_dict) vote_pipeline.start() vote_out = outpipe.get() vote_pipeline.terminate()