fast unspents

This commit is contained in:
Scott Sadler 2017-04-19 15:47:58 +02:00
parent 28441a9edd
commit 4f99122758
8 changed files with 261 additions and 11 deletions

View File

@ -176,13 +176,33 @@ def get_spent(conn, transaction_id, output):
@register_query(MongoDBConnection)
def get_owned_ids(conn, owner):
def get_spending_transactions(conn, inputs):
return conn.run(
conn.collection('bigchain').aggregate([
{'$match': {
'block.transactions.inputs.fulfills': {
'$in': inputs,
},
}},
{'$unwind': '$block.transactions'},
{'$match': {
'block.transactions.inputs.fulfills': {
'$in': inputs,
},
}},
]))
@register_query(MongoDBConnection)
def get_owned_ids(conn, owner, unwrap=True):
cursor = conn.run(
conn.collection('bigchain').aggregate([
{'$match': {'block.transactions.outputs.public_keys': owner}},
{'$unwind': '$block.transactions'},
{'$match': {'block.transactions.outputs.public_keys': owner}}
]))
if not unwrap:
return cursor
# we need to access some nested fields before returning so lets use a
# generator to avoid having to read all records on the cursor at this point
return (elem['block']['transactions'] for elem in cursor)
@ -196,6 +216,15 @@ def get_votes_by_block_id(conn, block_id):
projection={'_id': False}))
@register_query(MongoDBConnection)
def get_votes_for_blocks_by_voter(conn, block_ids, node_pubkey):
return conn.run(
conn.collection('votes')
.find({'vote.voting_for_block': {'$in': block_ids},
'node_pubkey': node_pubkey},
projection={'_id': False}))
@register_query(MongoDBConnection)
def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
return conn.run(

View File

@ -140,6 +140,19 @@ def get_spent(connection, transaction_id, condition_id):
raise NotImplementedError
@singledispatch
def get_spending_transactions(connection, inputs):
"""Return transactions which spend given inputs
Args:
inputs (list): list of {txid, output}
Returns:
List of transactions that spend given inputs
"""
raise NotImplementedError
@singledispatch
def get_owned_ids(connection, owner):
"""Retrieve a list of `txids` that can we used has inputs.
@ -183,6 +196,21 @@ def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey):
raise NotImplementedError
@singledispatch
def get_votes_for_blocks_by_voter(connection, block_ids, pubkey):
"""Return votes for many block_ids
Args:
block_ids (set): block_ids
pubkey (str): public key of voting node
Returns:
A cursor of votes matching given votes.
"""
raise NotImplementedError
@singledispatch
def write_block(connection, block):
"""Write a block to the bigchain table.

View File

@ -161,6 +161,9 @@ class TransactionLink(object):
# TODO: If `other !== TransactionLink` return `False`
return self.to_dict() == other.to_dict()
def __hash__(self):
return hash((self.txid, self.output))
@classmethod
def from_dict(cls, link):
"""Transforms a Python dictionary to a TransactionLink object.

View File

@ -8,7 +8,7 @@ from bigchaindb.common.transaction import TransactionLink
import bigchaindb
from bigchaindb import backend, config_utils, utils
from bigchaindb import backend, config_utils, utils, fastquery
from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.models import Block, Transaction
@ -433,14 +433,17 @@ class Bigchain(object):
"""
return self.get_outputs_filtered(owner, include_spent=False)
@property
def fastquery(self):
return fastquery.FastQuery(self.connection, self.me)
def get_outputs_filtered(self, owner, include_spent=True):
"""
Get a list of output links filtered on some criteria
"""
outputs = self.get_outputs(owner)
outputs = self.fastquery.get_outputs_by_pubkey(owner)
if not include_spent:
outputs = [o for o in outputs
if not self.get_spent(o.txid, o.output)]
outputs = self.fastquery.filter_spent_outputs(outputs)
return outputs
def get_transactions_filtered(self, asset_id, operation=None):

56
bigchaindb/fastquery.py Normal file
View File

@ -0,0 +1,56 @@
from bigchaindb.utils import output_has_owner
from bigchaindb.backend import query
from bigchaindb.common.transaction import TransactionLink
class FastQuery:
"""
Database queries that join on block results from a single node.
* Votes are not validated for security (security is replication concern)
* Votes come from only one node, and as such, fault tolerance is not provided
(elected Blockchain table not yet available)
In return, these queries offer good performance, as it is not neccesary to validate
each result separately.
"""
def __init__(self, connection, me):
self.connection = connection
self.me = me
def filter_block_ids(self, block_ids, include_undecided=True):
votes = query.get_votes_for_blocks_by_voter(
self.connection, block_ids, self.me)
votes = {v['vote']['voting_for_block']: v['vote']['is_block_valid'] for v in votes}
return [b for b in block_ids if votes.get(b, include_undecided)]
def filter_valid_blocks(self, blocks):
block_ids = list(set(b['id'] for b in blocks))
valid_block_ids = self.filter_block_ids(block_ids)
return [b for b in blocks if b['id'] in valid_block_ids]
def get_outputs_by_pubkey(self, pubkey):
cursor = query.get_owned_ids(self.connection, pubkey, unwrap=False)
wrapped_txs = self.filter_valid_blocks(list(cursor))
txs = [wrapped['block']['transactions'] for wrapped in wrapped_txs]
return [TransactionLink(tx['id'], i)
for tx in txs
for i, o in enumerate(tx['outputs'])
if output_has_owner(o, pubkey)]
def filter_spent_outputs(self, outputs):
"""
Remove outputs that have been spent
Args:
outputs: list of TransactionLink
"""
links = [o.to_dict() for o in outputs]
wrapped = self.filter_valid_blocks(
list(query.get_spending_transactions(self.connection, links)))
spends = {TransactionLink.from_dict(input_['fulfills'])
for block in wrapped
for input_ in block['block']['transactions']['inputs']}
return [ff for ff in outputs if ff not in spends]

View File

@ -417,3 +417,54 @@ def test_get_txids_filtered(signed_create_tx, signed_transfer_tx):
# Test get by asset and TRANSFER
txids = set(query.get_txids_filtered(conn, asset_id, Transaction.TRANSFER))
assert txids == {signed_transfer_tx.id}
def test_get_spending_transactions(user_pk):
from bigchaindb.backend import connect, query
from bigchaindb.models import Block, Transaction
conn = connect()
out = [([user_pk], 1)]
tx1 = Transaction.create([user_pk], out * 3)
inputs = tx1.to_inputs()
tx2 = Transaction.transfer([inputs[0]], out, tx1.id)
tx3 = Transaction.transfer([inputs[1]], out, tx1.id)
tx4 = Transaction.transfer([inputs[2]], out, tx1.id)
block = Block([tx1, tx2, tx3, tx4])
conn.db.bigchain.insert_one(block.to_dict())
links = [inputs[0].fulfills.to_dict(), inputs[2].fulfills.to_dict()]
# discard block noise
res = [(r['id'], r['block']['transactions'])
for r in list(query.get_spending_transactions(conn, links))]
# tx3 not a member because input 1 not asked for
assert res == [(block.id, tx2.to_dict()), (block.id, tx4.to_dict())]
def test_get_votes_for_blocks_by_voter():
from bigchaindb.backend import connect, query
conn = connect()
votes = [
{
'node_pubkey': 'a',
'vote': {'voting_for_block': 'block1'},
},
{
'node_pubkey': 'b',
'vote': {'voting_for_block': 'block1'},
},
{
'node_pubkey': 'a',
'vote': {'voting_for_block': 'block2'},
},
{
'node_pubkey': 'a',
'vote': {'voting_for_block': 'block3'},
}
]
for vote in votes:
conn.db.votes.insert_one(vote.copy())
res = query.get_votes_for_blocks_by_voter(conn, ['block1', 'block2'], 'a')
assert list(res) == [votes[0], votes[2]]

View File

@ -1119,11 +1119,11 @@ def test_get_owned_ids_calls_get_outputs_filtered():
def test_get_outputs_filtered_only_unspent():
from bigchaindb.common.transaction import TransactionLink
from bigchaindb.core import Bigchain
with patch('bigchaindb.core.Bigchain.get_outputs') as get_outputs:
with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_pubkey') as get_outputs:
get_outputs.return_value = [TransactionLink('a', 1),
TransactionLink('b', 2)]
with patch('bigchaindb.core.Bigchain.get_spent') as get_spent:
get_spent.side_effect = [True, False]
with patch('bigchaindb.fastquery.FastQuery.filter_spent_outputs') as filter_spent:
filter_spent.return_value = [TransactionLink('b', 2)]
out = Bigchain().get_outputs_filtered('abc', include_spent=False)
get_outputs.assert_called_once_with('abc')
assert out == [TransactionLink('b', 2)]
@ -1132,13 +1132,13 @@ def test_get_outputs_filtered_only_unspent():
def test_get_outputs_filtered():
from bigchaindb.common.transaction import TransactionLink
from bigchaindb.core import Bigchain
with patch('bigchaindb.core.Bigchain.get_outputs') as get_outputs:
with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_pubkey') as get_outputs:
get_outputs.return_value = [TransactionLink('a', 1),
TransactionLink('b', 2)]
with patch('bigchaindb.core.Bigchain.get_spent') as get_spent:
with patch('bigchaindb.fastquery.FastQuery.filter_spent_outputs') as filter_spent:
out = Bigchain().get_outputs_filtered('abc')
get_outputs.assert_called_once_with('abc')
get_spent.assert_not_called()
filter_spent.assert_not_called()
assert out == get_outputs.return_value

80
tests/test_fastquery.py Normal file
View File

@ -0,0 +1,80 @@
import pytest
from bigchaindb.common.transaction import TransactionLink
from bigchaindb.models import Block, Transaction
pytestmark = pytest.mark.bdb
@pytest.fixture
def blockdata(b, user_pk, user2_pk):
txs = [Transaction.create([user_pk], [([user2_pk], 1)]),
Transaction.create([user2_pk], [([user_pk], 1)]),
Transaction.create([user_pk], [([user_pk], 1), ([user2_pk], 1)])]
blocks = []
for i in range(3):
block = Block([txs[i]])
b.write_block(block)
blocks.append(block.to_dict())
if i > 0:
b.write_vote(b.vote(block.id, '', i % 2 == 1))
return blocks, [b['id'] for b in blocks]
def test_filter_block_ids_with_undecided(b, blockdata):
blocks, block_ids = blockdata
valid_ids = b.fastquery.filter_block_ids(block_ids)
assert set(valid_ids) == {blocks[0]['id'], blocks[1]['id']}
def test_filter_block_ids_only_valid(b, blockdata):
blocks, block_ids = blockdata
valid_ids = b.fastquery.filter_block_ids(block_ids, include_undecided=False)
assert set(valid_ids) == {blocks[1]['id']}
def test_filter_valid_blocks(b, blockdata):
blocks, _ = blockdata
assert b.fastquery.filter_valid_blocks(blocks) == [blocks[0], blocks[1]]
def test_get_outputs_by_pubkey(b, user_pk, user2_pk, blockdata):
blocks, _ = blockdata
assert b.fastquery.get_outputs_by_pubkey(user_pk) == [
TransactionLink(blocks[1]['block']['transactions'][0]['id'], 0)
]
assert b.fastquery.get_outputs_by_pubkey(user2_pk) == [
TransactionLink(blocks[0]['block']['transactions'][0]['id'], 0)
]
def test_filter_spent_outputs(b, user_pk):
out = [([user_pk], 1)]
tx1 = Transaction.create([user_pk], out * 3)
inputs = tx1.to_inputs()
tx2 = Transaction.transfer([inputs[0]], out, tx1.id)
tx3 = Transaction.transfer([inputs[1]], out, tx1.id)
tx4 = Transaction.transfer([inputs[2]], out, tx1.id)
for tx in [tx1, tx2]:
block = Block([tx])
b.write_block(block)
b.write_vote(b.vote(block.id, '', True))
# mark invalid
block = Block([tx3])
b.write_block(block)
b.write_vote(b.vote(block.id, '', False))
# undecided
block = Block([tx4])
b.write_block(block)
unspents = b.fastquery.filter_spent_outputs(
b.fastquery.get_outputs_by_pubkey(user_pk))
assert set(unspents) == {
inputs[1].fulfills,
tx2.to_inputs()[0].fulfills,
tx4.to_inputs()[0].fulfills
}