diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index 07f35320..9504b68c 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -89,6 +89,9 @@ config = { 'fmt_logfile': log_config['formatters']['file']['format'], 'granular_levels': {}, }, + 'graphite': { + 'host': os.environ.get('BIGCHAINDB_GRAPHITE_HOST', 'localhost'), + }, } # We need to maintain a backup copy of the original config dict in case diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 2f4d24da..862c4b78 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -1,4 +1,5 @@ import random +import statsd from time import time from bigchaindb import exceptions as core_exceptions @@ -71,6 +72,8 @@ class Bigchain(object): if not self.me or not self.me_private: raise exceptions.KeypairNotFoundException() + self.statsd = statsd.StatsClient(bigchaindb.config['graphite']['host']) + federation = property(lambda self: set(self.nodes_except_me + [self.me])) """ Set of federation member public keys """ diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 0fe327bb..945d369c 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -117,6 +117,8 @@ class BlockPipeline: logger.info('Write new block %s with %s transactions', block.id, len(block.transactions)) self.bigchain.write_block(block) + self.bigchain.statsd.incr('pipelines.block.throughput', + len(block.transactions)) return block def delete_tx(self, block): diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index 10b33fd1..3eb6dafa 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -137,9 +137,9 @@ class Vote: self.last_voted_id = block_id del self.counters[block_id] del self.validity[block_id] - return vote + return vote, num_tx - def write_vote(self, vote): + def write_vote(self, vote, num_tx): """Write vote to the database. Args: @@ -149,6 +149,7 @@ class Vote: logger.info("Voting '%s' for block %s", validity, vote['vote']['voting_for_block']) self.bigchain.write_vote(vote) + self.bigchain.statsd.incr('pipelines.vote.throughput', num_tx) return vote diff --git a/bigchaindb/web/views/transactions.py b/bigchaindb/web/views/transactions.py index 9f024f54..6a52dac4 100644 --- a/bigchaindb/web/views/transactions.py +++ b/bigchaindb/web/views/transactions.py @@ -76,6 +76,7 @@ class TransactionListApi(Resource): ) with pool() as bigchain: + bigchain.statsd.incr('web.tx.post') try: bigchain.validate_transaction(tx_obj) except ValidationError as e: diff --git a/docker-compose.yml b/docker-compose.yml index c7f3c584..a970e32b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -64,11 +64,22 @@ services: - ./setup.cfg:/usr/src/app/setup.cfg - ./pytest.ini:/usr/src/app/pytest.ini - ./tox.ini:/usr/src/app/tox.ini + - ./scripts:/usr/src/app/scripts environment: BIGCHAINDB_DATABASE_BACKEND: mongodb BIGCHAINDB_DATABASE_HOST: mdb BIGCHAINDB_DATABASE_PORT: 27017 BIGCHAINDB_SERVER_BIND: 0.0.0.0:9984 + BIGCHAINDB_GRAPHITE_HOST: graphite ports: - "9984" - command: bigchaindb start + command: bash -c 'bigchaindb -y configure mongodb && bigchaindb start' + + graphite: + image: hopsoft/graphite-statsd + ports: + - "2003-2004" + - "2023-2024" + - "8125/udp" + - "8126" + - "80" diff --git a/scripts/benchmarks/README.md b/scripts/benchmarks/README.md new file mode 100644 index 00000000..befe2400 --- /dev/null +++ b/scripts/benchmarks/README.md @@ -0,0 +1,40 @@ +# Benchmarks + +## CREATE transaction throughput + +This is a measurement of the throughput of CREATE transactions through the entire +pipeline, ie, the web frontend, block creation, and block validation, where the +output of the measurement is transactions per second. + +The benchmark runs for a fixed period of time and makes metrics available via +a graphite interface. + +### Running the benchmark + +Dependencies: + +* Python 3.5+ +* docker-compose 1.8.0+ +* docker 1.12+ + +To start: + + $ python3 scripts/benchmarks/create_thoughtput.py + +To start using a separate namespace for docker-compose: + + $ COMPOSE_PROJECT_NAME=somename python3 scripts/benchmarks/create_thoughtput.py + +### Results + +A test was run on AWS with the following instance configuration: + +* Ubuntu Server 16.04 (ami-060cde69) +* 32 core compute optimized (c3.8xlarge) +* 100gb root volume (300/3000 IOPS) + +The server received and validated over 800 transactions per second: + +![BigchainDB transaction throughput](https://cloud.githubusercontent.com/assets/125019/26688641/85d56d1e-46f3-11e7-8148-bf3bc8c54c33.png) + +For more information on how the benchmark was run, the abridged session buffer [is available](https://gist.github.com/libscott/8a37c5e134b2d55cfb55082b1cd85a02). diff --git a/scripts/benchmarks/create_thoughtput.py b/scripts/benchmarks/create_thoughtput.py new file mode 100644 index 00000000..5f4b5fdd --- /dev/null +++ b/scripts/benchmarks/create_thoughtput.py @@ -0,0 +1,133 @@ +import sys +import math +import time +import requests +import subprocess +import multiprocessing + + +def main(): + cmd('docker-compose up -d mdb') + cmd('docker-compose up -d bdb') + cmd('docker-compose up -d graphite') + + out = cmd('docker-compose port graphite 80', capture=True) + graphite_web = 'http://localhost:%s/' % out.strip().split(':')[1] + print('Graphite web interface at: ' + graphite_web) + + start = time.time() + + cmd('docker-compose exec bdb python %s load' % sys.argv[0]) + + mins = math.ceil((time.time() - start) / 60) + 1 + + graph_url = graphite_web + 'render/?width=900&height=600&_salt=1495462891.335&target=stats.pipelines.block.throughput&target=stats.pipelines.vote.throughput&target=stats.web.tx.post&from=-%sminutes' % mins # noqa + + print(graph_url) + + +def load(): + from bigchaindb.core import Bigchain + from bigchaindb.common.crypto import generate_key_pair + from bigchaindb.common.transaction import Transaction + + def transactions(): + priv, pub = generate_key_pair() + tx = Transaction.create([pub], [([pub], 1)]) + while True: + i = yield tx.to_dict() + tx.asset = {'data': {'n': i}} + tx.sign([priv]) + + def wait_for_up(): + print('Waiting for server to start... ', end='') + while True: + try: + requests.get('http://localhost:9984/') + break + except requests.ConnectionError: + time.sleep(0.1) + print('Ok') + + def post_txs(): + txs = transactions() + txs.send(None) + try: + with requests.Session() as session: + while True: + i = tx_queue.get() + if i is None: + break + tx = txs.send(i) + res = session.post('http://localhost:9984/api/v1/transactions/', json=tx) + assert res.status_code == 202 + except KeyboardInterrupt: + pass + + wait_for_up() + num_clients = 30 + test_time = 60 + tx_queue = multiprocessing.Queue(maxsize=num_clients) + txn = 0 + b = Bigchain() + + start_time = time.time() + + for i in range(num_clients): + multiprocessing.Process(target=post_txs).start() + + print('Sending transactions') + while time.time() - start_time < test_time: + # Post 500 transactions to the server + for i in range(500): + tx_queue.put(txn) + txn += 1 + print(txn) + while True: + # Wait for the server to reduce the backlog to below + # 10000 transactions. The expectation is that 10000 transactions + # will not be processed faster than a further 500 transactions can + # be posted, but nonetheless will be processed within a few seconds. + # This keeps the test from running on and keeps the transactions from + # being considered stale. + count = b.connection.db.backlog.count() + if count > 10000: + time.sleep(0.2) + else: + break + + for i in range(num_clients): + tx_queue.put(None) + + print('Waiting to clear backlog') + while True: + bl = b.connection.db.backlog.count() + if bl == 0: + break + print(bl) + time.sleep(1) + + print('Waiting for all votes to come in') + while True: + blocks = b.connection.db.bigchain.count() + votes = b.connection.db.votes.count() + if blocks == votes + 1: + break + print('%s blocks, %s votes' % (blocks, votes)) + time.sleep(3) + + print('Finished') + + +def cmd(command, capture=False): + stdout = subprocess.PIPE if capture else None + args = ['bash', '-c', command] + proc = subprocess.Popen(args, stdout=stdout) + assert not proc.wait() + return capture and proc.stdout.read().decode() + + +if sys.argv[1:] == ['load']: + load() +else: + main() diff --git a/setup.py b/setup.py index 4fd485c0..5e05b6e4 100644 --- a/setup.py +++ b/setup.py @@ -80,6 +80,7 @@ install_requires = [ 'pyyaml~=3.12', 'aiohttp~=2.0', 'python-rapidjson-schema==0.1.1', + 'statsd==3.2.1', ] setup( diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index f68c6f6e..cb9a131a 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -199,7 +199,7 @@ def test_valid_block_voting_sequential(b, genesis_block, monkeypatch): for tx, block_id, num_tx in vote_obj.ungroup(block['id'], txs): last_vote = vote_obj.vote(*vote_obj.validate_tx(tx, block_id, num_tx)) - vote_obj.write_vote(last_vote) + vote_obj.write_vote(*last_vote) vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block_id, b.me) vote_doc = vote_rs.next() diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index a2d1f13e..d17ce7d7 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -233,6 +233,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): 'fmt_logfile': log_config['formatters']['file']['format'], 'granular_levels': {}, }, + 'graphite': {'host': 'localhost'}, } diff --git a/tests/test_core.py b/tests/test_core.py index b8803e9b..b626b2d1 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -19,7 +19,8 @@ def config(request, monkeypatch): }, 'keyring': [], 'CONFIGURED': True, - 'backlog_reassign_delay': 30 + 'backlog_reassign_delay': 30, + 'graphite': {'host': 'localhost'}, } monkeypatch.setattr('bigchaindb.config', config)