1
0
mirror of https://github.com/bigchaindb/bigchaindb.git synced 2024-06-17 18:13:22 +02:00

Merge remote-tracking branch 'origin/master' into docker-mongo

This commit is contained in:
Rodolphe Marques 2016-12-16 12:54:42 +01:00
commit c1a5d5e86f
32 changed files with 495 additions and 312 deletions

View File

@ -10,3 +10,4 @@ configuration or the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable.
from bigchaindb.backend import changefeed, schema, query # noqa
from bigchaindb.backend.connection import connect # noqa
from bigchaindb.backend.changefeed import get_changefeed # noqa

View File

@ -1 +1,90 @@
"""Changefeed interfaces for backends."""
from functools import singledispatch
from multipipes import Node
import bigchaindb
class ChangeFeed(Node):
"""Create a new changefeed.
It extends :class:`multipipes.Node` to make it pluggable in other
Pipelines instances, and makes usage of ``self.outqueue`` to output
the data.
A changefeed is a real time feed on inserts, updates, and deletes, and
is volatile. This class is a helper to create changefeeds. Moreover,
it provides a way to specify a ``prefeed`` of iterable data to output
before the actual changefeed.
"""
INSERT = 1
DELETE = 2
UPDATE = 4
def __init__(self, table, operation, *, prefeed=None, connection=None):
"""Create a new ChangeFeed.
Args:
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operations is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (:class:`~collections.abc.Iterable`, optional): whatever
set of data you want to be published first.
connection (:class:`~bigchaindb.backend.connection.Connection`, optional): # noqa
A connection to the database. If no connection is provided a
default connection will be created.
"""
super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else []
self.table = table
self.operation = operation
if connection:
self.connection = connection
else:
self.connection = bigchaindb.backend.connect(
**bigchaindb.config['database'])
def run_forever(self):
"""Main loop of the ``multipipes.Node``
This method is responsible for first feeding the prefeed to the
outqueue and after that starting the changefeed and recovering from any
errors that may occur in the backend.
"""
raise NotImplementedError
def run_changefeed(self):
"""Backend specific method to run the changefeed.
The changefeed is usually a backend cursor that is not closed when all
the results are exausted. Instead it remains open waiting for new
results.
This method should also filter each result based on the ``operation``
and put all matching results on the outqueue of ``multipipes.Node``.
"""
raise NotImplementedError
@singledispatch
def get_changefeed(connection, table, operation, *, prefeed=None):
"""Return a ChangeFeed.
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operation is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published
first.
"""
raise NotImplementedError

View File

@ -0,0 +1,54 @@
import time
import logging
import rethinkdb as r
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
logger = logging.getLogger(__name__)
register_changefeed = module_dispatch_registrar(backend.changefeed)
class RethinkDBChangeFeed(ChangeFeed):
"""This class wraps a RethinkDB changefeed."""
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
while True:
try:
self.run_changefeed()
break
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
logger.exception(exc)
time.sleep(1)
def run_changefeed(self):
for change in self.connection.run(r.table(self.table).changes()):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete
if is_insert and (self.operation & ChangeFeed.INSERT):
self.outqueue.put(change['new_val'])
elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(change['old_val'])
elif is_update and (self.operation & ChangeFeed.UPDATE):
self.outqueue.put(change['new_val'])
@register_changefeed(RethinkDBConnection)
def get_changefeed(connection, table, operation, *, prefeed=None):
"""Return a RethinkDB changefeed.
Returns:
An instance of
:class:`~bigchaindb.backend.rethinkdb.RethinkDBChangeFeed`.
"""
return RethinkDBChangeFeed(table, operation, prefeed=prefeed,
connection=connection)

View File

@ -189,6 +189,12 @@ class TransactionLink(object):
'cid': self.cid,
}
def to_uri(self, path=''):
if self.txid is None and self.cid is None:
return None
return '{}/transactions/{}/conditions/{}'.format(path, self.txid,
self.cid)
class Condition(object):
"""A Condition is used to lock an asset.

View File

@ -7,11 +7,12 @@ function.
import logging
import rethinkdb as r
from multipipes import Pipeline, Node, Pipe
import bigchaindb
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Transaction
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
@ -116,7 +117,8 @@ class BlockPipeline:
Returns:
:class:`~bigchaindb.models.Block`: The Block.
"""
logger.info('Write new block %s with %s transactions', block.id, len(block.transactions))
logger.info('Write new block %s with %s transactions',
block.id, len(block.transactions))
self.bigchain.write_block(block)
return block
@ -134,26 +136,6 @@ class BlockPipeline:
return block
def initial():
"""Return old transactions from the backlog."""
bigchain = Bigchain()
return bigchain.connection.run(
r.table('backlog')
.between([bigchain.me, r.minval],
[bigchain.me, r.maxval],
index='assignee__transaction_timestamp')
.order_by(index=r.asc('assignee__transaction_timestamp')))
def get_changefeed():
"""Create and return the changefeed for the backlog."""
return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE,
prefeed=initial())
def create_pipeline():
"""Create and return the pipeline of operations to be distributed
on different processes."""
@ -172,6 +154,12 @@ def create_pipeline():
return pipeline
def get_changefeed():
connection = backend.connect(**bigchaindb.config['database'])
return backend.get_changefeed(connection, 'backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE)
def start():
"""Create, start, and return the block pipeline."""
pipeline = create_pipeline()

View File

@ -8,7 +8,9 @@ import logging
from multipipes import Pipeline, Node
from bigchaindb.pipelines.utils import ChangeFeed
import bigchaindb
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Block
from bigchaindb import Bigchain
@ -50,10 +52,6 @@ class Election:
return invalid_block
def get_changefeed():
return ChangeFeed(table='votes', operation=ChangeFeed.INSERT)
def create_pipeline():
election = Election()
@ -65,6 +63,11 @@ def create_pipeline():
return election_pipeline
def get_changefeed():
connection = backend.connect(**bigchaindb.config['database'])
return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT)
def start():
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())

View File

@ -1,75 +0,0 @@
"""Utility classes and functions to work with the pipelines."""
import time
import rethinkdb as r
import logging
from multipipes import Node
from bigchaindb import Bigchain
logger = logging.getLogger(__name__)
class ChangeFeed(Node):
"""This class wraps a RethinkDB changefeed adding a ``prefeed``.
It extends :class:`multipipes.Node` to make it pluggable in other
Pipelines instances, and makes usage of ``self.outqueue`` to output
the data.
A changefeed is a real time feed on inserts, updates, and deletes, and
is volatile. This class is a helper to create changefeeds. Moreover,
it provides a way to specify a ``prefeed`` of iterable data to output
before the actual changefeed.
"""
INSERT = 1
DELETE = 2
UPDATE = 4
def __init__(self, table, operation, prefeed=None, bigchain=None):
"""Create a new RethinkDB ChangeFeed.
Args:
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operation is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published
first.
bigchain (``Bigchain``): the bigchain instance to use (can be None).
"""
super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else []
self.table = table
self.operation = operation
self.bigchain = bigchain or Bigchain()
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
while True:
try:
self.run_changefeed()
break
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
logger.exception(exc)
time.sleep(1)
def run_changefeed(self):
for change in self.bigchain.connection.run(r.table(self.table).changes()):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete
if is_insert and (self.operation & ChangeFeed.INSERT):
self.outqueue.put(change['new_val'])
elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(change['old_val'])
elif is_update and (self.operation & ChangeFeed.UPDATE):
self.outqueue.put(change['new_val'])

View File

@ -10,10 +10,12 @@ from collections import Counter
from multipipes import Pipeline, Node
from bigchaindb.common import exceptions
import bigchaindb
from bigchaindb import Bigchain
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.models import Transaction, Block
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
class Vote:
@ -142,12 +144,6 @@ def initial():
return rs
def get_changefeed():
"""Create and return the changefeed for the bigchain table."""
return ChangeFeed('bigchain', operation=ChangeFeed.INSERT, prefeed=initial())
def create_pipeline():
"""Create and return the pipeline of operations to be distributed
on different processes."""
@ -165,6 +161,12 @@ def create_pipeline():
return vote_pipeline
def get_changefeed():
connection = backend.connect(**bigchaindb.config['database'])
return backend.get_changefeed(connection, 'bigchain', ChangeFeed.INSERT,
prefeed=initial())
def start():
"""Create, start, and return the block pipeline."""

View File

@ -13,10 +13,12 @@ from bigchaindb import util
from bigchaindb import Bigchain
from bigchaindb.web.views.info import info_views
from bigchaindb.web.views.transactions import transaction_views
from bigchaindb.web.views.unspents import unspent_views
from bigchaindb.monitor import Monitor
# TODO: Figure out if we do we need all this boilerplate.
class StandaloneApplication(gunicorn.app.base.BaseApplication):
"""Run a **wsgi** app wrapping it in a Gunicorn Base Application.
@ -69,6 +71,7 @@ def create_app(*, debug=False, threads=4):
app.register_blueprint(info_views, url_prefix='/')
app.register_blueprint(transaction_views, url_prefix='/api/v1')
app.register_blueprint(unspent_views, url_prefix='/api/v1')
return app

View File

@ -1,7 +1,8 @@
"""This module provides the blueprint for some basic API endpoints.
For more information please refer to the documentation on ReadTheDocs:
- https://docs.bigchaindb.com/projects/server/en/latest/drivers-clients/http-client-server-api.html
- https://docs.bigchaindb.com/projects/server/en/latest/drivers-clients/
http-client-server-api.html
"""
from flask import current_app, request, Blueprint
from flask_restful import Resource, Api
@ -28,6 +29,7 @@ transaction_views = Blueprint('transaction_views', __name__)
transaction_api = Api(transaction_views)
# TODO: Do we really need this?
# Unfortunately I cannot find a reference to this decorator.
# This answer on SO is quite useful tho:
# - http://stackoverflow.com/a/13432373/597097
@ -78,8 +80,8 @@ class TransactionStatusApi(Resource):
tx_id (str): the id of the transaction.
Return:
A ``dict`` in the format ``{'status': <status>}``, where ``<status>``
is one of "valid", "invalid", "undecided", "backlog".
A ``dict`` in the format ``{'status': <status>}``, where
``<status>`` is one of "valid", "invalid", "undecided", "backlog".
"""
pool = current_app.config['bigchain_pool']
@ -103,8 +105,8 @@ class TransactionListApi(Resource):
pool = current_app.config['bigchain_pool']
monitor = current_app.config['monitor']
# `force` will try to format the body of the POST request even if the `content-type` header is not
# set to `application/json`
# `force` will try to format the body of the POST request even if the
# `content-type` header is not set to `application/json`
tx = request.get_json(force=True)
try:

View File

@ -0,0 +1,32 @@
from flask import current_app, Blueprint
from flask_restful import reqparse, Resource, Api
unspent_views = Blueprint('unspent_views', __name__)
unspent_api = Api(unspent_views)
class UnspentListApi(Resource):
def get(self):
"""API endpoint to retrieve a list of links to transactions's
conditions that have not been used in any previous transaction.
Returns:
A :obj:`list` of :cls:`str` of links to unfulfilled conditions.
"""
parser = reqparse.RequestParser()
parser.add_argument('owner_after', type=str, location='args',
required=True)
args = parser.parse_args()
pool = current_app.config['bigchain_pool']
with pool() as bigchain:
unspents = bigchain.get_owned_ids(args['owner_after'])
# NOTE: We pass '..' as a path to create a valid relative URI
return [u.to_uri('..') for u in unspents]
unspent_api.add_resource(UnspentListApi,
'/unspents/',
strict_slashes=False)

View File

@ -43,8 +43,9 @@ USE_KEYPAIRS_FILE=False
# Canonical (the company behind Ubuntu) generates many AMIs
# and you can search for one that meets your needs at:
# https://cloud-images.ubuntu.com/locator/ec2/
# Example: ami-8504fdea is what you get if you search for:
# Example: At one point, if you searched for
# eu-central-1 16.04 LTS amd64 hvm:ebs-ssd
# you would get this AMI ID:
IMAGE_ID="ami-8504fdea"
# INSTANCE_TYPE is the type of AWS instance to launch

View File

@ -2,7 +2,7 @@
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXOPTS = -W
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = build

View File

@ -2,7 +2,7 @@
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXOPTS = -W
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = build

View File

@ -8,31 +8,69 @@ as described in [the section on JSON serialization](json-serialization.html).
## Hashes
We compute hashes using the SHA3-256 algorithm and
[pysha3](https://bitbucket.org/tiran/pykeccak) as the Python implementation. We
store the hex-encoded hash in the database. For example:
BigchainDB computes transaction and block hashes using an implementation of the
[SHA3-256](https://en.wikipedia.org/wiki/SHA-3)
algorithm provided by the
[**pysha3** package](https://bitbucket.org/tiran/pykeccak),
which is a wrapper around the optimized reference implementation
from [http://keccak.noekeon.org](http://keccak.noekeon.org).
Here's the relevant code from `bigchaindb/bigchaindb/common/crypto.py`
(as of 11 December 2016):
```python
import hashlib
# monkey patch hashlib with sha3 functions
import sha3
data = "message"
tx_hash = hashlib.sha3_256(data).hexdigest()
def hash_data(data):
"""Hash the provided data using SHA3-256"""
return sha3.sha3_256(data.encode()).hexdigest()
```
The incoming `data` is understood to be a Python 3 string,
which may contain Unicode characters such as `'ü'` or `'字'`.
The Python 3 `encode()` method converts `data` to a bytes object.
`sha3.sha3_256(data.encode())` is a _sha3.SHA3 object;
the `hexdigest()` method converts it to a hexadecimal string.
For example:
```python
>>> import sha3
>>> data = '字'
>>> sha3.sha3_256(data.encode()).hexdigest()
'c67820de36d949a35ca24492e15767e2972b22f77213f6704ac0adec123c5690'
```
Note: Hashlocks (which are one kind of crypto-condition)
may use a different hash function.
## Signature Algorithm and Keys
BigchainDB uses the [Ed25519](https://ed25519.cr.yp.to/) public-key signature
system for generating its public/private key pairs. Ed25519 is an instance of
the [Edwards-curve Digital Signature Algorithm
(EdDSA)](https://en.wikipedia.org/wiki/EdDSA). As of April 2016, EdDSA was in
["Internet-Draft" status with the
IETF](https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05) but was [already
(EdDSA)](https://en.wikipedia.org/wiki/EdDSA). As of December 2016, EdDSA was an
["Internet-Draft" with the
IETF](https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-08) but was [already
widely used](https://ianix.com/pub/ed25519-deployment.html).
BigchainDB uses the the [ed25519](https://github.com/warner/python-ed25519)
Python package, overloaded by the [cryptoconditions
library](https://github.com/bigchaindb/cryptoconditions).
BigchainDB uses the the
[**cryptoconditions** package](https://github.com/bigchaindb/cryptoconditions)
to do signature and keypair-related calculations.
That package, in turn, uses the [**PyNaCl** package](https://pypi.python.org/pypi/PyNaCl),
a Python binding to the Networking and Cryptography (NaCl) library.
All keys are represented with the base58 encoding by default.
All keys are represented with
[a Base58 encoding](https://en.wikipedia.org/wiki/Base58).
The cryptoconditions package uses the
[**base58** package](https://pypi.python.org/pypi/base58)
to calculate a Base58 encoding.
(There's no standard for Base58 encoding.)
Here's an example public/private key pair:
```js
"keypair": {
"public": "9WYFf8T65bv4S8jKU8wongKPD4AmMZAwvk1absFDbYLM",
"private": "3x7MQpPq8AEUGEuzAxSVHjU1FhLWVQJKFNNkvHhJPGCX"
}
```

View File

@ -24,9 +24,3 @@ Stale Transaction Monitoring
============================
.. automodule:: bigchaindb.pipelines.stale
Utilities
=========
.. automodule:: bigchaindb.pipelines.utils

View File

@ -14,7 +14,7 @@ One can deploy a BigchainDB node on Azure using the template in the `bigchaindb-
1. Once you are logged in to the Microsoft Azure Portal, you should be taken to a form titled **BigchainDB**. Some notes to help with filling in that form are available [below](azure-quickstart-template.html#notes-on-the-blockchain-template-form-fields).
1. Deployment takes a few minutes. You can follow the notifications by clicking the bell icon at the top of the screen. At the time of writing, the final deployment operation (running the `init.sh` script) was failing. We will fix that, but for now, just follow these instructions and we'll update them once that's fixed.
1. Deployment takes a few minutes. You can follow the notifications by clicking the bell icon at the top of the screen. At the time of writing, the final deployment operation (running the `init.sh` script) was failing, but a pull request ([#2884](https://github.com/Azure/azure-quickstart-templates/pull/2884)) has been made to fix that and these instructions say what you can do before that pull request gets merged...
1. Find out the public IP address of the virtual machine in the Azure Portal. Example: `40.69.87.250`
@ -22,13 +22,21 @@ One can deploy a BigchainDB node on Azure using the template in the `bigchaindb-
1. You should be prompted for a password. Give the `<Admin_password>` you entered into the form.
1. Go to the `init.sh` script at [https://github.com/Azure/azure-quickstart-templates/blob/master/bigchaindb-on-ubuntu/scripts/init.sh](https://github.com/Azure/azure-quickstart-templates/blob/master/bigchaindb-on-ubuntu/scripts/init.sh)
1. If pull request [#2884](https://github.com/Azure/azure-quickstart-templates/pull/2884) has been merged, then skip this step. Otherwise, go to the `init.sh` script at [https://github.com/Azure/azure-quickstart-templates/blob/master/bigchaindb-on-ubuntu/scripts/init.sh](https://github.com/Azure/azure-quickstart-templates/blob/master/bigchaindb-on-ubuntu/scripts/init.sh). Your goal is to read through that script and check if each line should be run manually in the terminal. For example, is RethinkDB already installed and running? You can figure that out using `pgrep rethinkdb`. And so on. If in doubt, just try running each command in `init.sh`.
1. Manually run all the commands in that `init.sh` script *except for* **sudo easy_install3 pip**. This will install RethinkDB, run RethinkDB, and install BigchainDB Server.
1. Configure BigchainDB Server by doing:
```text
bigchaindb configure
```
It will ask you several questions. You can press `Enter` (or `Return`) to accept the default for all of them *except for one*. When it asks **API Server bind? (default \`localhost:9984\`):**, you should answer:
```text
API Server bind? (default `localhost:9984`): 0.0.0.0:9984
```
1. Configure BigchainDB Server using the default BigchainDB settings: `bigchaindb -y configure`
1. Run BigchainDB Server: `bigchaindb start`
Finally, run BigchainDB Server by doing:
```text
bigchaindb start
```
BigchainDB Server should now be running on the Azure virtual machine.

View File

@ -30,6 +30,7 @@ check_setuptools_features()
dev_require = [
'ipdb',
'ipython',
'watchdog',
]
docs_require = [
@ -63,7 +64,7 @@ install_requires = [
'pysha3>=0.3',
'cryptoconditions>=0.5.0',
'statsd>=3.2.1',
'python-rapidjson>=0.0.6',
'python-rapidjson>=0.0.8',
'logstats>=0.2.1',
'flask>=0.10.1',
'flask-restful~=0.3.0',
@ -115,5 +116,5 @@ setup(
'dev': dev_require + tests_require + docs_require + benchmarks_require,
'docs': docs_require,
},
package_data={'bigchaindb.common.schema': ['transaction.yaml']},
package_data={'bigchaindb.common.schema': ['*.yaml']},
)

View File

@ -8,12 +8,6 @@ def restore_config(request, node_config):
config_utils.set_config(node_config)
@pytest.fixture(scope='module', autouse=True)
@pytest.fixture(scope='function', autouse=True)
def setup_database(request, node_config):
conftest.setup_database(request, node_config)
@pytest.fixture(scope='function', autouse=True)
def cleanup_tables(request, node_config):
conftest.cleanup_tables(request, node_config)

View File

@ -0,0 +1,96 @@
import pytest
from unittest.mock import Mock
from multipipes import Pipe
import bigchaindb
from bigchaindb import Bigchain
from bigchaindb.backend import connect
@pytest.fixture
def mock_changefeed_data():
return [
{
'new_val': 'seems like we have an insert here',
'old_val': None,
}, {
'new_val': None,
'old_val': 'seems like we have a delete here',
}, {
'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here',
}
]
@pytest.fixture
def mock_changefeed_connection(mock_changefeed_data):
connection = connect(**bigchaindb.config['database'])
connection.run = Mock(return_value=mock_changefeed_data)
return connection
def test_changefeed_insert(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection,
'backlog', ChangeFeed.INSERT)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.qsize() == 0
def test_changefeed_delete(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection,
'backlog', ChangeFeed.DELETE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have a delete here'
assert outpipe.qsize() == 0
def test_changefeed_update(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection,
'backlog', ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_multiple_operations(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection, 'backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_prefeed(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection, 'backlog',
ChangeFeed.INSERT, prefeed=[1, 2, 3])
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.qsize() == 4

View File

@ -48,6 +48,27 @@ def test_query(query_func_name, args_qty):
query_func(None, *range(args_qty))
@mark.parametrize('changefeed_func_name,args_qty', (
('get_changefeed', 2),
))
def test_changefeed(changefeed_func_name, args_qty):
from bigchaindb.backend import changefeed
changefeed_func = getattr(changefeed, changefeed_func_name)
with raises(NotImplementedError):
changefeed_func(None, *range(args_qty))
@mark.parametrize('changefeed_class_func_name,args_qty', (
('run_forever', 0),
('run_changefeed', 0),
))
def test_changefeed_class(changefeed_class_func_name, args_qty):
from bigchaindb.backend.changefeed import ChangeFeed
changefeed_class_func = getattr(ChangeFeed, changefeed_class_func_name)
with raises(NotImplementedError):
changefeed_class_func(None, *range(args_qty))
@mark.parametrize('db,conn_cls', (
('mongodb', 'MongoDBConnection'),
('rethinkdb', 'RethinkDBConnection'),
@ -68,4 +89,4 @@ def test_init_database(mock_create_database, mock_create_tables,
init_database(connection=conn, dbname='mickeymouse')
mock_create_database.assert_called_once_with(conn, 'mickeymouse')
mock_create_tables.assert_called_once_with(conn, 'mickeymouse')
mock_create_indexes.assert_called_once_with(conn, 'mickeymouse')
mock_create_indexes.assert_called_once_with(conn, 'mickeymouse')

View File

@ -428,6 +428,24 @@ def test_transaction_link_deserialization_with_empty_payload():
assert tx_link == expected
def test_transaction_link_empty_to_uri():
from bigchaindb.common.transaction import TransactionLink
expected = None
tx_link = TransactionLink().to_uri()
assert expected == tx_link
def test_transaction_link_to_uri():
from bigchaindb.common.transaction import TransactionLink
expected = 'path/transactions/abc/conditions/0'
tx_link = TransactionLink('abc', 0).to_uri('path')
assert expected == tx_link
def test_cast_transaction_link_to_boolean():
from bigchaindb.common.transaction import TransactionLink

View File

@ -11,8 +11,7 @@ import pytest
from bigchaindb import Bigchain
from bigchaindb.backend import connect, schema
from bigchaindb.common import crypto
from bigchaindb.common.exceptions import (DatabaseAlreadyExists,
DatabaseDoesNotExist)
from bigchaindb.common.exceptions import DatabaseDoesNotExist
USER2_SK, USER2_PK = crypto.generate_key_pair()
@ -24,18 +23,18 @@ def restore_config(request, node_config):
config_utils.set_config(node_config)
@pytest.fixture(scope='module', autouse=True)
@pytest.fixture(scope='function', autouse=True)
def setup_database(request, node_config):
print('Initializing test db')
db_name = node_config['database']['name']
conn = connect()
try:
schema.init_database(conn)
except DatabaseAlreadyExists:
print('Database already exists.')
schema.drop_database(conn, db_name)
schema.init_database(conn)
except DatabaseDoesNotExist:
pass
schema.init_database(conn)
print('Finishing init database')
@ -44,32 +43,14 @@ def setup_database(request, node_config):
print('Deleting `{}` database'.format(db_name))
try:
schema.drop_database(conn, db_name)
except DatabaseDoesNotExist as e:
if str(e) != 'Database `{}` does not exist'.format(db_name):
raise
except DatabaseDoesNotExist:
pass
print('Finished deleting `{}`'.format(db_name))
request.addfinalizer(fin)
@pytest.fixture(scope='function', autouse=True)
def cleanup_tables(request, node_config):
db_name = node_config['database']['name']
def fin():
conn = connect()
try:
schema.drop_database(conn, db_name)
schema.create_database(conn, db_name)
schema.create_tables(conn, db_name)
schema.create_indexes(conn, db_name)
except DatabaseDoesNotExist as e:
if str(e) != 'Database `{}` does not exist'.format(db_name):
raise
request.addfinalizer(fin)
@pytest.fixture
def inputs(user_pk):
from bigchaindb.models import Transaction

View File

@ -11,16 +11,11 @@ def restore_config(request, node_config):
config_utils.set_config(node_config)
@pytest.fixture(scope='module', autouse=True)
@pytest.fixture(scope='function', autouse=True)
def setup_database(request, node_config):
conftest.setup_database(request, node_config)
@pytest.fixture(scope='function', autouse=True)
def cleanup_tables(request, node_config):
conftest.cleanup_tables(request, node_config)
@pytest.fixture
def processes(b):
b.create_genesis_block()

View File

@ -8,11 +8,6 @@ def restore_config(request, node_config):
config_utils.set_config(node_config)
@pytest.fixture(scope='module', autouse=True)
@pytest.fixture(scope='function', autouse=True)
def setup_database(request, node_config):
conftest.setup_database(request, node_config)
@pytest.fixture(scope='function', autouse=True)
def cleanup_tables(request, node_config):
conftest.cleanup_tables(request, node_config)

View File

@ -95,7 +95,8 @@ def test_duplicate_transaction(b, user_pk):
# verify tx is in the backlog
assert b.get_transaction(txs[0].id) is not None
# try to validate a transaction that's already in the chain; should not work
# try to validate a transaction that's already in the chain; should not
# work
assert block_maker.validate_tx(txs[0].to_dict()) is None
# duplicate tx should be removed from backlog
@ -130,22 +131,6 @@ def test_delete_tx(b, user_pk):
assert status != b.TX_IN_BACKLOG
def test_prefeed(b, user_pk):
import random
from bigchaindb.models import Transaction
from bigchaindb.pipelines.block import initial
for i in range(100):
tx = Transaction.create([b.me], [([user_pk], 1)],
{'msg': random.random()})
tx = tx.sign([b.me_private])
b.write_transaction(tx)
backlog = initial()
assert len(list(backlog)) == 100
@patch('bigchaindb.pipelines.block.create_pipeline')
def test_start(create_pipeline):
from bigchaindb.pipelines import block
@ -164,28 +149,39 @@ def test_full_pipeline(b, user_pk):
from bigchaindb.pipelines.block import create_pipeline, get_changefeed
outpipe = Pipe()
pipeline = create_pipeline()
pipeline.setup(outdata=outpipe)
inpipe = pipeline.items[0]
# include myself here, so that some tx are actually assigned to me
b.nodes_except_me = [b.me, 'aaa', 'bbb', 'ccc']
number_assigned_to_others = 0
for i in range(100):
tx = Transaction.create([b.me], [([user_pk], 1)],
{'msg': random.random()})
tx = tx.sign([b.me_private])
b.write_transaction(tx)
tx = tx.to_dict()
assert query.count_backlog(b.connection) == 100
# simulate write_transaction
tx['assignee'] = random.choice(b.nodes_except_me)
if tx['assignee'] != b.me:
number_assigned_to_others += 1
tx['assignment_timestamp'] = time.time()
inpipe.put(tx)
assert inpipe.qsize() == 100
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed(), outdata=outpipe)
pipeline.start()
time.sleep(2)
pipeline.terminate()
pipeline.terminate()
block_doc = outpipe.get()
chained_block = b.get_block(block_doc.id)
chained_block = Block.from_dict(chained_block)
block_len = len(block_doc.transactions)
assert chained_block == block_doc
assert query.count_backlog(b.connection) == 100 - block_len
assert number_assigned_to_others == 100 - block_len

View File

@ -118,8 +118,8 @@ def test_check_requeue_transaction(b, user_pk):
e.requeue_transactions(test_block)
time.sleep(1)
backlog_tx, status = b.get_transaction(tx1.id, include_status=True)
#backlog_tx = b.connection.run(r.table('backlog').get(tx1.id))
assert status == b.TX_IN_BACKLOG
assert backlog_tx == tx1
@ -166,6 +166,7 @@ def test_full_pipeline(b, user_pk):
pipeline.setup(indata=election.get_changefeed(), outdata=outpipe)
pipeline.start()
time.sleep(1)
# vote one block valid, one invalid
vote_valid = b.vote(valid_block.id, 'b' * 64, True)
vote_invalid = b.vote(invalid_block.id, 'c' * 64, False)

View File

@ -1,80 +0,0 @@
import pytest
from unittest.mock import Mock
from multipipes import Pipe
from bigchaindb import Bigchain
from bigchaindb.backend.connection import Connection
from bigchaindb.pipelines.utils import ChangeFeed
@pytest.fixture
def mock_changefeed_data():
return [
{
'new_val': 'seems like we have an insert here',
'old_val': None,
}, {
'new_val': None,
'old_val': 'seems like we have a delete here',
}, {
'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here',
}
]
@pytest.fixture
def mock_changefeed_bigchain(mock_changefeed_data):
connection = Connection(host=None, port=None, dbname=None)
connection.run = Mock(return_value=mock_changefeed_data)
return Bigchain(connection=connection)
def test_changefeed_insert(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.qsize() == 0
def test_changefeed_delete(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.DELETE, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have a delete here'
assert outpipe.qsize() == 0
def test_changefeed_update(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_multiple_operations(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE,
bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_prefeed(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog',
ChangeFeed.INSERT,
prefeed=[1, 2, 3],
bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.qsize() == 4

View File

@ -48,8 +48,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch):
import time
import multiprocessing as mp
from bigchaindb import Bigchain
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.rethinkdb.changefeed import RethinkDBChangeFeed
class MockConnection:
tries = 0
@ -75,10 +75,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch):
else:
time.sleep(10)
bigchain = Bigchain()
bigchain.connection = MockConnection()
changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT,
bigchain=bigchain)
changefeed = RethinkDBChangeFeed('cat_facts', ChangeFeed.INSERT,
connection=MockConnection())
changefeed.outqueue = mp.Queue()
t_changefeed = Thread(target=changefeed.run_forever, daemon=True)

View File

@ -8,16 +8,11 @@ def restore_config(request, node_config):
config_utils.set_config(node_config)
@pytest.fixture(scope='module', autouse=True)
@pytest.fixture(scope='function', autouse=True)
def setup_database(request, node_config):
conftest.setup_database(request, node_config)
@pytest.fixture(scope='function', autouse=True)
def cleanup_tables(request, node_config):
conftest.cleanup_tables(request, node_config)
@pytest.fixture
def app(request, node_config):
# XXX: For whatever reason this fixture runs before `restore_config`,

View File

@ -1,4 +1,4 @@
def test_settings(monkeypatch):
def test_settings():
import bigchaindb
from bigchaindb.web import server

View File

@ -0,0 +1,26 @@
import pytest
UNSPENTS_ENDPOINT = '/api/v1/unspents/'
@pytest.mark.usefixtures('inputs')
def test_get_unspents_endpoint(b, client, user_pk):
expected = [u.to_uri('..') for u in b.get_owned_ids(user_pk)]
res = client.get(UNSPENTS_ENDPOINT + '?owner_after={}'.format(user_pk))
assert expected == res.json
assert res.status_code == 200
@pytest.mark.usefixtures('inputs')
def test_get_unspents_endpoint_without_owner_after(client):
res = client.get(UNSPENTS_ENDPOINT)
assert res.status_code == 400
@pytest.mark.usefixtures('inputs')
def test_get_unspents_endpoint_with_unused_owner_after(client):
expected = []
res = client.get(UNSPENTS_ENDPOINT + '?owner_after=abc')
assert expected == res.json
assert res.status_code == 200