diff --git a/bigchaindb/backend/__init__.py b/bigchaindb/backend/__init__.py index c1deaa92..e2190a06 100644 --- a/bigchaindb/backend/__init__.py +++ b/bigchaindb/backend/__init__.py @@ -7,7 +7,7 @@ configuration or the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable. """ # Include the backend interfaces -from bigchaindb.backend import changefeed, schema, query # noqa +from bigchaindb.backend import admin, changefeed, schema, query # noqa from bigchaindb.backend.connection import connect # noqa from bigchaindb.backend.changefeed import get_changefeed # noqa diff --git a/bigchaindb/backend/admin.py b/bigchaindb/backend/admin.py new file mode 100644 index 00000000..057b5995 --- /dev/null +++ b/bigchaindb/backend/admin.py @@ -0,0 +1,22 @@ +"""Database configuration functions.""" +from functools import singledispatch + + +@singledispatch +def get_config(connection, *, table): + raise NotImplementedError + + +@singledispatch +def reconfigure(connection, *, table, shards, replicas, **kwargs): + raise NotImplementedError + + +@singledispatch +def set_shards(connection, *, shards): + raise NotImplementedError + + +@singledispatch +def set_replicas(connection, *, replicas): + raise NotImplementedError diff --git a/bigchaindb/backend/exceptions.py b/bigchaindb/backend/exceptions.py new file mode 100644 index 00000000..1d55bc52 --- /dev/null +++ b/bigchaindb/backend/exceptions.py @@ -0,0 +1,5 @@ +from bigchaindb.exceptions import BigchainDBError + + +class DatabaseOpFailedError(BigchainDBError): + """Exception for database operation errors.""" diff --git a/bigchaindb/backend/rethinkdb/__init__.py b/bigchaindb/backend/rethinkdb/__init__.py index 22eaae5f..4ad53aa0 100644 --- a/bigchaindb/backend/rethinkdb/__init__.py +++ b/bigchaindb/backend/rethinkdb/__init__.py @@ -16,7 +16,7 @@ generic backend interfaces to the implementations in this module. """ # Register the single dispatched modules on import. -from bigchaindb.backend.rethinkdb import changefeed, schema, query # noqa +from bigchaindb.backend.rethinkdb import admin, changefeed, schema, query # noqa # RethinkDBConnection should always be accessed via # ``bigchaindb.backend.connect()``. diff --git a/bigchaindb/backend/rethinkdb/admin.py b/bigchaindb/backend/rethinkdb/admin.py new file mode 100644 index 00000000..63548f87 --- /dev/null +++ b/bigchaindb/backend/rethinkdb/admin.py @@ -0,0 +1,165 @@ +"""Database configuration functions.""" +import logging + +import rethinkdb as r + +from bigchaindb.backend import admin +from bigchaindb.backend.schema import TABLES +from bigchaindb.backend.exceptions import DatabaseOpFailedError +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection + +logger = logging.getLogger(__name__) + +register_admin = module_dispatch_registrar(admin) + + +@register_admin(RethinkDBConnection) +def get_config(connection, *, table): + """Get the configuration of the given table. + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. + table (str): The name of the table to get the configuration for. + + Returns: + dict: The configuration of the given table + + """ + return connection.run(r.table(table).config()) + + +@register_admin(RethinkDBConnection) +def reconfigure(connection, *, table, shards, replicas, + primary_replica_tag=None, dry_run=False, + nonvoting_replica_tags=None): + """Reconfigures the given table. + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. + table (str): The name of the table to reconfigure. + shards (int): The number of shards, an integer from 1-64. + replicas (:obj:`int` | :obj:`dict`): + * If replicas is an integer, it specifies the number of + replicas per shard. Specifying more replicas than there + are servers will return an error. + * If replicas is a dictionary, it specifies key-value pairs + of server tags and the number of replicas to assign to + those servers:: + + {'africa': 2, 'asia': 4, 'europe': 2, ...} + primary_replica_tag (str): The primary server specified by its + server tag. Required if ``replicas`` is a dictionary. The + tag must be in the ``replicas`` dictionary. This must not be + specified if ``replicas`` is an integer. Defaults to + ``None``. + dry_run (bool): If ``True`` the generated configuration will not + be applied to the table, only returned. Defaults to + ``False``. + nonvoting_replica_tags (:obj:`list` of :obj:`str`): Replicas + with these server tags will be added to the + ``nonvoting_replicas`` list of the resulting configuration. + Defaults to ``None``. + + Returns: + dict: A dictionary with possibly three keys: + + * ``reconfigured``: the number of tables reconfigured. This + will be ``0`` if ``dry_run`` is ``True``. + * ``config_changes``: a list of new and old table + configuration values. + * ``status_changes``: a list of new and old table status + values. + + For more information please consult RethinkDB's + documentation `ReQL command: reconfigure + `_. + + Raises: + DatabaseOpFailedError: If the reconfiguration fails due to a + RethinkDB :exc:`ReqlOpFailedError` or + :exc:`ReqlQueryLogicError`. + + """ + params = { + 'shards': shards, + 'replicas': replicas, + 'dry_run': dry_run, + } + if primary_replica_tag: + params.update( + primary_replica_tag=primary_replica_tag, + nonvoting_replica_tags=nonvoting_replica_tags, + ) + try: + return connection.run(r.table(table).reconfigure(**params)) + except (r.ReqlOpFailedError, r.ReqlQueryLogicError) as e: + raise DatabaseOpFailedError from e + + +@register_admin(RethinkDBConnection) +def set_shards(connection, *, shards, dry_run=False): + """Sets the shards for the tables + :const:`~bigchaindb.backend.schema.TABLES`. + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. + shards (int): The number of shards, an integer from 1-64. + dry_run (bool): If ``True`` the generated configuration will not + be applied to the table, only returned. Defaults to + ``False``. + + Returns: + dict: A dictionary with the configuration and status changes. + For more details please see :func:`.reconfigure`. + + """ + changes = {} + for table in TABLES: + replicas = len( + get_config(connection, table=table)['shards'][0]['replicas']) + change = reconfigure( + connection, + table=table, + shards=shards, + replicas=replicas, + dry_run=dry_run, + ) + changes[table] = change + return changes + + +@register_admin(RethinkDBConnection) +def set_replicas(connection, *, replicas, dry_run=False): + """Sets the replicas for the tables + :const:`~bigchaindb.backend.schema.TABLES`. + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. + replicas (int): The number of replicas per shard. Specifying + more replicas than there are servers will return an error. + dry_run (bool): If ``True`` the generated configuration will not + be applied to the table, only returned. Defaults to + ``False``. + + Returns: + dict: A dictionary with the configuration and status changes. + For more details please see :func:`.reconfigure`. + + """ + changes = {} + for table in TABLES: + shards = len(get_config(connection, table=table)['shards']) + change = reconfigure( + connection, + table=table, + shards=shards, + replicas=replicas, + dry_run=dry_run, + ) + changes[table] = change + return changes diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 6dbf04d3..6661e902 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -16,14 +16,14 @@ from bigchaindb.common import crypto from bigchaindb.common.exceptions import (StartupError, DatabaseAlreadyExists, KeypairNotFoundException) -import rethinkdb as r - import bigchaindb import bigchaindb.config_utils from bigchaindb.models import Transaction from bigchaindb.utils import ProcessGroup from bigchaindb import backend from bigchaindb.backend import schema +from bigchaindb.backend.admin import set_replicas, set_shards +from bigchaindb.backend.exceptions import DatabaseOpFailedError from bigchaindb.commands import utils from bigchaindb import processes @@ -245,26 +245,18 @@ def run_load(args): def run_set_shards(args): conn = backend.connect() - for table in ['bigchain', 'backlog', 'votes']: - # See https://www.rethinkdb.com/api/python/config/ - table_config = conn.run(r.table(table).config()) - num_replicas = len(table_config['shards'][0]['replicas']) - try: - conn.run(r.table(table).reconfigure(shards=args.num_shards, replicas=num_replicas)) - except r.ReqlOpFailedError as e: - logger.warn(e) + try: + set_shards(conn, shards=args.num_shards) + except DatabaseOpFailedError as e: + logger.warn(e) def run_set_replicas(args): conn = backend.connect() - for table in ['bigchain', 'backlog', 'votes']: - # See https://www.rethinkdb.com/api/python/config/ - table_config = conn.run(r.table(table).config()) - num_shards = len(table_config['shards']) - try: - conn.run(r.table(table).reconfigure(shards=num_shards, replicas=args.num_replicas)) - except r.ReqlOpFailedError as e: - logger.warn(e) + try: + set_replicas(conn, replicas=args.num_replicas) + except DatabaseOpFailedError as e: + logger.warn(e) def create_parser(): diff --git a/docker-compose.yml b/docker-compose.yml index 072b8725..db8abd4f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,13 @@ services: volumes_from: - rdb-data + rdb-2: + image: rethinkdb + ports: + - "8080" + - "29015" + command: rethinkdb --join rdb:29015 --bind all + rdb-data: image: rethinkdb:2.3.5 volumes: diff --git a/tests/backend/rethinkdb/test_admin.py b/tests/backend/rethinkdb/test_admin.py new file mode 100644 index 00000000..f489f5f5 --- /dev/null +++ b/tests/backend/rethinkdb/test_admin.py @@ -0,0 +1,194 @@ +"""Tests for the :mod:`bigchaindb.backend.rethinkdb.admin` module.""" +import pytest + +import rethinkdb as r + + +def _count_rethinkdb_servers(): + from bigchaindb import config + conn = r.connect(host=config['database']['host'], + port=config['database']['port']) + return len(list(r.db('rethinkdb').table('server_status').run(conn))) + + +@pytest.fixture +def rdb_conn(db_host, db_port, db_name): + return r.connect(host=db_host, port=db_port, db=db_name) + + +@pytest.mark.bdb +def test_set_shards(rdb_conn, db_name, db_conn): + from bigchaindb.backend.schema import TABLES + from bigchaindb.backend.rethinkdb.admin import set_shards + for table in TABLES: + table_config = r.db(db_name).table('backlog').config().run(rdb_conn) + assert len(table_config['shards']) == 1 + what_happened = set_shards(db_conn, shards=2) + for table in TABLES: + assert what_happened[table]['reconfigured'] == 1 + config_changes = what_happened[table]['config_changes'] + assert len(config_changes) == 1 + assert len(config_changes[0]['new_val']['shards']) == 2 + assert len(what_happened[table]['status_changes']) == 1 + status_change = what_happened[table]['status_changes'][0] + assert not status_change['new_val']['status']['all_replicas_ready'] + table_config = r.db(db_name).table(table).config().run(rdb_conn) + assert len(table_config['shards']) == 2 + + +@pytest.mark.bdb +def test_set_shards_dry_run(rdb_conn, db_name, db_conn): + from bigchaindb.backend.schema import TABLES + from bigchaindb.backend.rethinkdb.admin import set_shards + for table in TABLES: + table_config = r.db(db_name).table('backlog').config().run(rdb_conn) + assert len(table_config['shards']) == 1 + what_happened = set_shards(db_conn, shards=2, dry_run=True) + for table in TABLES: + assert what_happened[table]['reconfigured'] == 0 + config_changes = what_happened[table]['config_changes'] + assert len(config_changes) == 1 + assert len(config_changes[0]['new_val']['shards']) == 2 + assert 'status_change' not in what_happened[table] + table_config = r.db(db_name).table(table).config().run(rdb_conn) + assert len(table_config['shards']) == 1 + + +@pytest.mark.bdb +@pytest.mark.skipif( + _count_rethinkdb_servers() < 2, + reason=("Requires at least two servers. It's impossible to have" + "more replicas of the data than there are servers.") +) +def test_set_replicas(rdb_conn, db_name, db_conn): + from bigchaindb.backend.schema import TABLES + from bigchaindb.backend.rethinkdb.admin import set_replicas + for table in TABLES: + table_config = r.db(db_name).table(table).config().run(rdb_conn) + replicas_before = table_config['shards'][0]['replicas'] + assert len(replicas_before) == 1 + what_happened = set_replicas(db_conn, replicas=2) + for table in TABLES: + assert what_happened[table]['reconfigured'] == 1 + config_changes = what_happened[table]['config_changes'] + assert len(config_changes) == 1 + assert len(config_changes[0]['new_val']['shards'][0]['replicas']) == 2 + assert len(what_happened[table]['status_changes']) == 1 + status_change = what_happened[table]['status_changes'][0] + assert not status_change['new_val']['status']['all_replicas_ready'] + table_config = r.db(db_name).table(table).config().run(rdb_conn) + assert len(table_config['shards'][0]['replicas']) == 2 + assert (table_config['shards'][0]['replicas'][0] != + table_config['shards'][0]['replicas'][1]) + + +@pytest.mark.bdb +@pytest.mark.skipif( + _count_rethinkdb_servers() < 2, + reason=("Requires at least two servers. It's impossible to have" + "more replicas of the data than there are servers.") +) +def test_set_replicas_dry_run(rdb_conn, db_name, db_conn): + from bigchaindb.backend.schema import TABLES + from bigchaindb.backend.rethinkdb.admin import set_replicas + for table in TABLES: + table_config = r.db(db_name).table(table).config().run(rdb_conn) + replicas_before = table_config['shards'][0]['replicas'] + assert len(replicas_before) == 1 + what_happened = set_replicas(db_conn, replicas=2, dry_run=True) + for table in TABLES: + assert what_happened[table]['reconfigured'] == 0 + config_changes = what_happened[table]['config_changes'] + assert len(config_changes) == 1 + assert len(config_changes[0]['new_val']['shards'][0]['replicas']) == 2 + assert 'status_change' not in what_happened[table] + table_config = r.db(db_name).table(table).config().run(rdb_conn) + assert len(table_config['shards'][0]['replicas']) == 1 + + +@pytest.mark.bdb +@pytest.mark.skipif( + _count_rethinkdb_servers() < 2, + reason=("Requires at least two servers. It's impossible to have" + "more replicas of the data than there are servers.") +) +def test_reconfigure(rdb_conn, db_name, db_conn): + from bigchaindb.backend.rethinkdb.admin import reconfigure + table_config = r.db(db_name).table('backlog').config().run(rdb_conn) + replicas_before = table_config['shards'][0]['replicas'] + assert len(replicas_before) == 1 + reconfigure(db_conn, table='backlog', shards=2, replicas=2) + table_config = r.db(db_name).table('backlog').config().run(rdb_conn) + assert len(table_config['shards'][0]['replicas']) == 2 + assert (table_config['shards'][0]['replicas'][0] != + table_config['shards'][0]['replicas'][1]) + + +@pytest.mark.bdb +def test_reconfigure_shards_for_real(rdb_conn, db_name, db_conn): + from bigchaindb.backend.rethinkdb.admin import reconfigure + table_config = r.db(db_name).table('backlog').config().run(rdb_conn) + replicas_before = table_config['shards'][0]['replicas'] + assert len(replicas_before) == 1 + assert len(table_config['shards']) == 1 + what_happened = reconfigure( + db_conn, + table='backlog', + shards=2, + replicas={'default': 1}, + primary_replica_tag='default', + nonvoting_replica_tags=('default',), + ) + assert what_happened['reconfigured'] == 1 + assert len(what_happened['config_changes']) == 1 + assert len(what_happened['config_changes'][0]['new_val']['shards']) == 2 + assert len(what_happened['status_changes']) == 1 + status_change = what_happened['status_changes'][0] + assert not status_change['new_val']['status']['all_replicas_ready'] + table_config = r.db(db_name).table('backlog').config().run(rdb_conn) + assert len(table_config['shards']) == 2 + + +@pytest.mark.bdb +def test_reconfigure_shards_dry_run(rdb_conn, db_name, db_conn): + from bigchaindb.backend.rethinkdb.admin import reconfigure + table_config = r.db(db_name).table('backlog').config().run(rdb_conn) + replicas_before = table_config['shards'][0]['replicas'] + assert len(replicas_before) == 1 + assert len(table_config['shards']) == 1 + what_happened = reconfigure( + db_conn, + table='backlog', + shards=2, + replicas={'default': 1}, + primary_replica_tag='default', + nonvoting_replica_tags=('default',), + dry_run=True, + ) + assert what_happened['reconfigured'] == 0 + assert len(what_happened['config_changes']) == 1 + assert len(what_happened['config_changes'][0]['new_val']['shards']) == 2 + table_config = r.db(db_name).table('backlog').config().run(rdb_conn) + assert len(table_config['shards']) == 1 + + +@pytest.mark.bdb +def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn, + db_name, + db_conn): + from bigchaindb.backend.rethinkdb.admin import reconfigure + from bigchaindb.backend.exceptions import DatabaseOpFailedError + with pytest.raises(DatabaseOpFailedError) as exc: + reconfigure(db_conn, table='backlog', shards=1, + replicas={'default': 1}, primary_replica_tag='default') + assert isinstance(exc.value.__cause__, r.ReqlQueryLogicError) + + +@pytest.mark.bdb +def test_reconfigure_too_many_replicas(rdb_conn, db_name, db_conn): + from bigchaindb.backend.rethinkdb.admin import reconfigure + from bigchaindb.backend.exceptions import DatabaseOpFailedError + replicas = _count_rethinkdb_servers() + 1 + with pytest.raises(DatabaseOpFailedError) as exc: + reconfigure(db_conn, table='backlog', shards=1, replicas=replicas) + assert isinstance(exc.value.__cause__, r.ReqlOpFailedError) diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py index 4ec192d8..2ab33a7c 100644 --- a/tests/backend/test_generics.py +++ b/tests/backend/test_generics.py @@ -90,3 +90,16 @@ def test_init_database(mock_create_database, mock_create_tables, mock_create_database.assert_called_once_with(conn, 'mickeymouse') mock_create_tables.assert_called_once_with(conn, 'mickeymouse') mock_create_indexes.assert_called_once_with(conn, 'mickeymouse') + + +@mark.parametrize('admin_func_name,kwargs', ( + ('get_config', {'table': None}), + ('reconfigure', {'table': None, 'shards': None, 'replicas': None}), + ('set_shards', {'shards': None}), + ('set_replicas', {'replicas': None}), +)) +def test_admin(admin_func_name, kwargs): + from bigchaindb.backend import admin + admin_func = getattr(admin, admin_func_name) + with raises(NotImplementedError): + admin_func(None, **kwargs) diff --git a/tests/commands/rethinkdb/test_commands.py b/tests/commands/rethinkdb/test_commands.py index 9effbdc8..5fb75f4d 100644 --- a/tests/commands/rethinkdb/test_commands.py +++ b/tests/commands/rethinkdb/test_commands.py @@ -47,7 +47,7 @@ def test_set_shards(mock_reconfigure, monkeypatch, b): monkeypatch.setattr(rethinkdb.RqlQuery, 'run', mockreturn_one_replica) args = Namespace(num_shards=3) run_set_shards(args) - mock_reconfigure.assert_called_with(replicas=1, shards=3) + mock_reconfigure.assert_called_with(replicas=1, shards=3, dry_run=False) # this will mock the call to retrieve the database config # we will set it to return three replica @@ -56,7 +56,7 @@ def test_set_shards(mock_reconfigure, monkeypatch, b): monkeypatch.setattr(rethinkdb.RqlQuery, 'run', mockreturn_three_replicas) run_set_shards(args) - mock_reconfigure.assert_called_with(replicas=3, shards=3) + mock_reconfigure.assert_called_with(replicas=3, shards=3, dry_run=False) @patch('logging.Logger.warn') @@ -91,7 +91,7 @@ def test_set_replicas(mock_reconfigure, monkeypatch, b): monkeypatch.setattr(rethinkdb.RqlQuery, 'run', mockreturn_two_shards) args = Namespace(num_replicas=2) run_set_replicas(args) - mock_reconfigure.assert_called_with(replicas=2, shards=2) + mock_reconfigure.assert_called_with(replicas=2, shards=2, dry_run=False) # this will mock the call to retrieve the database config # we will set it to return three shards @@ -100,7 +100,7 @@ def test_set_replicas(mock_reconfigure, monkeypatch, b): monkeypatch.setattr(rethinkdb.RqlQuery, 'run', mockreturn_three_shards) run_set_replicas(args) - mock_reconfigure.assert_called_with(replicas=2, shards=3) + mock_reconfigure.assert_called_with(replicas=2, shards=3, dry_run=False) @patch('logging.Logger.warn') diff --git a/tests/conftest.py b/tests/conftest.py index ea28521d..953cf6c7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -170,13 +170,24 @@ def _setup_database(_configure_bigchaindb): @pytest.fixture def _bdb(_setup_database, _configure_bigchaindb): - yield from bigchaindb import config from bigchaindb.backend import connect - from .utils import flush_db - dbname = config['database']['name'] + from bigchaindb.backend.admin import get_config + from bigchaindb.backend.schema import TABLES + from .utils import flush_db, update_table_config conn = connect() + # TODO remove condition once the mongodb implementation is done + if config['database']['backend'] == 'rethinkdb': + table_configs_before = { + t: get_config(conn, table=t) for t in TABLES + } + yield + dbname = config['database']['name'] flush_db(conn, dbname) + # TODO remove condition once the mongodb implementation is done + if config['database']['backend'] == 'rethinkdb': + for t, c in table_configs_before.items(): + update_table_config(conn, t, **c) @pytest.fixture @@ -355,3 +366,30 @@ def not_yet_created_db(request): schema.drop_database(conn, dbname) except DatabaseDoesNotExist: pass + + +@pytest.fixture +def db_config(): + from bigchaindb import config + return config['database'] + + +@pytest.fixture +def db_host(db_config): + return db_config['host'] + + +@pytest.fixture +def db_port(db_config): + return db_config['port'] + + +@pytest.fixture +def db_name(db_config): + return db_config['name'] + + +@pytest.fixture +def db_conn(): + from bigchaindb.backend import connect + return connect() diff --git a/tests/utils.py b/tests/utils.py index ca51fb6e..53ffd86b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -41,3 +41,13 @@ def flush_mongo_db(connection, dbname): connection.conn[dbname].bigchain.delete_many({}) connection.conn[dbname].backlog.delete_many({}) connection.conn[dbname].votes.delete_many({}) + + +@singledispatch +def update_table_config(connection, table, **kwrgas): + raise NotImplementedError + + +@update_table_config.register(RethinkDBConnection) +def update_table_config(connection, table, **kwargs): + return connection.run(r.table(table).config().update(dict(**kwargs)))