This commit is contained in:
Sylvain Bellemare 2016-12-21 17:47:31 +01:00 committed by Sylvain Bellemare
parent b12ea854f7
commit 4d6df06236
12 changed files with 473 additions and 27 deletions

View File

@ -7,7 +7,7 @@ configuration or the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable.
"""
# Include the backend interfaces
from bigchaindb.backend import changefeed, schema, query # noqa
from bigchaindb.backend import admin, changefeed, schema, query # noqa
from bigchaindb.backend.connection import connect # noqa
from bigchaindb.backend.changefeed import get_changefeed # noqa

View File

@ -0,0 +1,22 @@
"""Database configuration functions."""
from functools import singledispatch
@singledispatch
def get_config(connection, *, table):
raise NotImplementedError
@singledispatch
def reconfigure(connection, *, table, shards, replicas, **kwargs):
raise NotImplementedError
@singledispatch
def set_shards(connection, *, shards):
raise NotImplementedError
@singledispatch
def set_replicas(connection, *, replicas):
raise NotImplementedError

View File

@ -0,0 +1,5 @@
from bigchaindb.exceptions import BigchainDBError
class DatabaseOpFailedError(BigchainDBError):
"""Exception for database operation errors."""

View File

@ -16,7 +16,7 @@ generic backend interfaces to the implementations in this module.
"""
# Register the single dispatched modules on import.
from bigchaindb.backend.rethinkdb import changefeed, schema, query # noqa
from bigchaindb.backend.rethinkdb import admin, changefeed, schema, query # noqa
# RethinkDBConnection should always be accessed via
# ``bigchaindb.backend.connect()``.

View File

@ -0,0 +1,165 @@
"""Database configuration functions."""
import logging
import rethinkdb as r
from bigchaindb.backend import admin
from bigchaindb.backend.schema import TABLES
from bigchaindb.backend.exceptions import DatabaseOpFailedError
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
logger = logging.getLogger(__name__)
register_admin = module_dispatch_registrar(admin)
@register_admin(RethinkDBConnection)
def get_config(connection, *, table):
"""Get the configuration of the given table.
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
table (str): The name of the table to get the configuration for.
Returns:
dict: The configuration of the given table
"""
return connection.run(r.table(table).config())
@register_admin(RethinkDBConnection)
def reconfigure(connection, *, table, shards, replicas,
primary_replica_tag=None, dry_run=False,
nonvoting_replica_tags=None):
"""Reconfigures the given table.
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
table (str): The name of the table to reconfigure.
shards (int): The number of shards, an integer from 1-64.
replicas (:obj:`int` | :obj:`dict`):
* If replicas is an integer, it specifies the number of
replicas per shard. Specifying more replicas than there
are servers will return an error.
* If replicas is a dictionary, it specifies key-value pairs
of server tags and the number of replicas to assign to
those servers::
{'africa': 2, 'asia': 4, 'europe': 2, ...}
primary_replica_tag (str): The primary server specified by its
server tag. Required if ``replicas`` is a dictionary. The
tag must be in the ``replicas`` dictionary. This must not be
specified if ``replicas`` is an integer. Defaults to
``None``.
dry_run (bool): If ``True`` the generated configuration will not
be applied to the table, only returned. Defaults to
``False``.
nonvoting_replica_tags (:obj:`list` of :obj:`str`): Replicas
with these server tags will be added to the
``nonvoting_replicas`` list of the resulting configuration.
Defaults to ``None``.
Returns:
dict: A dictionary with possibly three keys:
* ``reconfigured``: the number of tables reconfigured. This
will be ``0`` if ``dry_run`` is ``True``.
* ``config_changes``: a list of new and old table
configuration values.
* ``status_changes``: a list of new and old table status
values.
For more information please consult RethinkDB's
documentation `ReQL command: reconfigure
<https://rethinkdb.com/api/python/reconfigure/>`_.
Raises:
DatabaseOpFailedError: If the reconfiguration fails due to a
RethinkDB :exc:`ReqlOpFailedError` or
:exc:`ReqlQueryLogicError`.
"""
params = {
'shards': shards,
'replicas': replicas,
'dry_run': dry_run,
}
if primary_replica_tag:
params.update(
primary_replica_tag=primary_replica_tag,
nonvoting_replica_tags=nonvoting_replica_tags,
)
try:
return connection.run(r.table(table).reconfigure(**params))
except (r.ReqlOpFailedError, r.ReqlQueryLogicError) as e:
raise DatabaseOpFailedError from e
@register_admin(RethinkDBConnection)
def set_shards(connection, *, shards, dry_run=False):
"""Sets the shards for the tables
:const:`~bigchaindb.backend.schema.TABLES`.
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
shards (int): The number of shards, an integer from 1-64.
dry_run (bool): If ``True`` the generated configuration will not
be applied to the table, only returned. Defaults to
``False``.
Returns:
dict: A dictionary with the configuration and status changes.
For more details please see :func:`.reconfigure`.
"""
changes = {}
for table in TABLES:
replicas = len(
get_config(connection, table=table)['shards'][0]['replicas'])
change = reconfigure(
connection,
table=table,
shards=shards,
replicas=replicas,
dry_run=dry_run,
)
changes[table] = change
return changes
@register_admin(RethinkDBConnection)
def set_replicas(connection, *, replicas, dry_run=False):
"""Sets the replicas for the tables
:const:`~bigchaindb.backend.schema.TABLES`.
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
replicas (int): The number of replicas per shard. Specifying
more replicas than there are servers will return an error.
dry_run (bool): If ``True`` the generated configuration will not
be applied to the table, only returned. Defaults to
``False``.
Returns:
dict: A dictionary with the configuration and status changes.
For more details please see :func:`.reconfigure`.
"""
changes = {}
for table in TABLES:
shards = len(get_config(connection, table=table)['shards'])
change = reconfigure(
connection,
table=table,
shards=shards,
replicas=replicas,
dry_run=dry_run,
)
changes[table] = change
return changes

View File

@ -16,14 +16,14 @@ from bigchaindb.common import crypto
from bigchaindb.common.exceptions import (StartupError,
DatabaseAlreadyExists,
KeypairNotFoundException)
import rethinkdb as r
import bigchaindb
import bigchaindb.config_utils
from bigchaindb.models import Transaction
from bigchaindb.utils import ProcessGroup
from bigchaindb import backend
from bigchaindb.backend import schema
from bigchaindb.backend.admin import set_replicas, set_shards
from bigchaindb.backend.exceptions import DatabaseOpFailedError
from bigchaindb.commands import utils
from bigchaindb import processes
@ -245,26 +245,18 @@ def run_load(args):
def run_set_shards(args):
conn = backend.connect()
for table in ['bigchain', 'backlog', 'votes']:
# See https://www.rethinkdb.com/api/python/config/
table_config = conn.run(r.table(table).config())
num_replicas = len(table_config['shards'][0]['replicas'])
try:
conn.run(r.table(table).reconfigure(shards=args.num_shards, replicas=num_replicas))
except r.ReqlOpFailedError as e:
logger.warn(e)
try:
set_shards(conn, shards=args.num_shards)
except DatabaseOpFailedError as e:
logger.warn(e)
def run_set_replicas(args):
conn = backend.connect()
for table in ['bigchain', 'backlog', 'votes']:
# See https://www.rethinkdb.com/api/python/config/
table_config = conn.run(r.table(table).config())
num_shards = len(table_config['shards'])
try:
conn.run(r.table(table).reconfigure(shards=num_shards, replicas=args.num_replicas))
except r.ReqlOpFailedError as e:
logger.warn(e)
try:
set_replicas(conn, replicas=args.num_replicas)
except DatabaseOpFailedError as e:
logger.warn(e)
def create_parser():

View File

@ -15,6 +15,13 @@ services:
volumes_from:
- rdb-data
rdb-2:
image: rethinkdb
ports:
- "8080"
- "29015"
command: rethinkdb --join rdb:29015 --bind all
rdb-data:
image: rethinkdb:2.3.5
volumes:

View File

@ -0,0 +1,194 @@
"""Tests for the :mod:`bigchaindb.backend.rethinkdb.admin` module."""
import pytest
import rethinkdb as r
def _count_rethinkdb_servers():
from bigchaindb import config
conn = r.connect(host=config['database']['host'],
port=config['database']['port'])
return len(list(r.db('rethinkdb').table('server_status').run(conn)))
@pytest.fixture
def rdb_conn(db_host, db_port, db_name):
return r.connect(host=db_host, port=db_port, db=db_name)
@pytest.mark.bdb
def test_set_shards(rdb_conn, db_name, db_conn):
from bigchaindb.backend.schema import TABLES
from bigchaindb.backend.rethinkdb.admin import set_shards
for table in TABLES:
table_config = r.db(db_name).table('backlog').config().run(rdb_conn)
assert len(table_config['shards']) == 1
what_happened = set_shards(db_conn, shards=2)
for table in TABLES:
assert what_happened[table]['reconfigured'] == 1
config_changes = what_happened[table]['config_changes']
assert len(config_changes) == 1
assert len(config_changes[0]['new_val']['shards']) == 2
assert len(what_happened[table]['status_changes']) == 1
status_change = what_happened[table]['status_changes'][0]
assert not status_change['new_val']['status']['all_replicas_ready']
table_config = r.db(db_name).table(table).config().run(rdb_conn)
assert len(table_config['shards']) == 2
@pytest.mark.bdb
def test_set_shards_dry_run(rdb_conn, db_name, db_conn):
from bigchaindb.backend.schema import TABLES
from bigchaindb.backend.rethinkdb.admin import set_shards
for table in TABLES:
table_config = r.db(db_name).table('backlog').config().run(rdb_conn)
assert len(table_config['shards']) == 1
what_happened = set_shards(db_conn, shards=2, dry_run=True)
for table in TABLES:
assert what_happened[table]['reconfigured'] == 0
config_changes = what_happened[table]['config_changes']
assert len(config_changes) == 1
assert len(config_changes[0]['new_val']['shards']) == 2
assert 'status_change' not in what_happened[table]
table_config = r.db(db_name).table(table).config().run(rdb_conn)
assert len(table_config['shards']) == 1
@pytest.mark.bdb
@pytest.mark.skipif(
_count_rethinkdb_servers() < 2,
reason=("Requires at least two servers. It's impossible to have"
"more replicas of the data than there are servers.")
)
def test_set_replicas(rdb_conn, db_name, db_conn):
from bigchaindb.backend.schema import TABLES
from bigchaindb.backend.rethinkdb.admin import set_replicas
for table in TABLES:
table_config = r.db(db_name).table(table).config().run(rdb_conn)
replicas_before = table_config['shards'][0]['replicas']
assert len(replicas_before) == 1
what_happened = set_replicas(db_conn, replicas=2)
for table in TABLES:
assert what_happened[table]['reconfigured'] == 1
config_changes = what_happened[table]['config_changes']
assert len(config_changes) == 1
assert len(config_changes[0]['new_val']['shards'][0]['replicas']) == 2
assert len(what_happened[table]['status_changes']) == 1
status_change = what_happened[table]['status_changes'][0]
assert not status_change['new_val']['status']['all_replicas_ready']
table_config = r.db(db_name).table(table).config().run(rdb_conn)
assert len(table_config['shards'][0]['replicas']) == 2
assert (table_config['shards'][0]['replicas'][0] !=
table_config['shards'][0]['replicas'][1])
@pytest.mark.bdb
@pytest.mark.skipif(
_count_rethinkdb_servers() < 2,
reason=("Requires at least two servers. It's impossible to have"
"more replicas of the data than there are servers.")
)
def test_set_replicas_dry_run(rdb_conn, db_name, db_conn):
from bigchaindb.backend.schema import TABLES
from bigchaindb.backend.rethinkdb.admin import set_replicas
for table in TABLES:
table_config = r.db(db_name).table(table).config().run(rdb_conn)
replicas_before = table_config['shards'][0]['replicas']
assert len(replicas_before) == 1
what_happened = set_replicas(db_conn, replicas=2, dry_run=True)
for table in TABLES:
assert what_happened[table]['reconfigured'] == 0
config_changes = what_happened[table]['config_changes']
assert len(config_changes) == 1
assert len(config_changes[0]['new_val']['shards'][0]['replicas']) == 2
assert 'status_change' not in what_happened[table]
table_config = r.db(db_name).table(table).config().run(rdb_conn)
assert len(table_config['shards'][0]['replicas']) == 1
@pytest.mark.bdb
@pytest.mark.skipif(
_count_rethinkdb_servers() < 2,
reason=("Requires at least two servers. It's impossible to have"
"more replicas of the data than there are servers.")
)
def test_reconfigure(rdb_conn, db_name, db_conn):
from bigchaindb.backend.rethinkdb.admin import reconfigure
table_config = r.db(db_name).table('backlog').config().run(rdb_conn)
replicas_before = table_config['shards'][0]['replicas']
assert len(replicas_before) == 1
reconfigure(db_conn, table='backlog', shards=2, replicas=2)
table_config = r.db(db_name).table('backlog').config().run(rdb_conn)
assert len(table_config['shards'][0]['replicas']) == 2
assert (table_config['shards'][0]['replicas'][0] !=
table_config['shards'][0]['replicas'][1])
@pytest.mark.bdb
def test_reconfigure_shards_for_real(rdb_conn, db_name, db_conn):
from bigchaindb.backend.rethinkdb.admin import reconfigure
table_config = r.db(db_name).table('backlog').config().run(rdb_conn)
replicas_before = table_config['shards'][0]['replicas']
assert len(replicas_before) == 1
assert len(table_config['shards']) == 1
what_happened = reconfigure(
db_conn,
table='backlog',
shards=2,
replicas={'default': 1},
primary_replica_tag='default',
nonvoting_replica_tags=('default',),
)
assert what_happened['reconfigured'] == 1
assert len(what_happened['config_changes']) == 1
assert len(what_happened['config_changes'][0]['new_val']['shards']) == 2
assert len(what_happened['status_changes']) == 1
status_change = what_happened['status_changes'][0]
assert not status_change['new_val']['status']['all_replicas_ready']
table_config = r.db(db_name).table('backlog').config().run(rdb_conn)
assert len(table_config['shards']) == 2
@pytest.mark.bdb
def test_reconfigure_shards_dry_run(rdb_conn, db_name, db_conn):
from bigchaindb.backend.rethinkdb.admin import reconfigure
table_config = r.db(db_name).table('backlog').config().run(rdb_conn)
replicas_before = table_config['shards'][0]['replicas']
assert len(replicas_before) == 1
assert len(table_config['shards']) == 1
what_happened = reconfigure(
db_conn,
table='backlog',
shards=2,
replicas={'default': 1},
primary_replica_tag='default',
nonvoting_replica_tags=('default',),
dry_run=True,
)
assert what_happened['reconfigured'] == 0
assert len(what_happened['config_changes']) == 1
assert len(what_happened['config_changes'][0]['new_val']['shards']) == 2
table_config = r.db(db_name).table('backlog').config().run(rdb_conn)
assert len(table_config['shards']) == 1
@pytest.mark.bdb
def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn,
db_name,
db_conn):
from bigchaindb.backend.rethinkdb.admin import reconfigure
from bigchaindb.backend.exceptions import DatabaseOpFailedError
with pytest.raises(DatabaseOpFailedError) as exc:
reconfigure(db_conn, table='backlog', shards=1,
replicas={'default': 1}, primary_replica_tag='default')
assert isinstance(exc.value.__cause__, r.ReqlQueryLogicError)
@pytest.mark.bdb
def test_reconfigure_too_many_replicas(rdb_conn, db_name, db_conn):
from bigchaindb.backend.rethinkdb.admin import reconfigure
from bigchaindb.backend.exceptions import DatabaseOpFailedError
replicas = _count_rethinkdb_servers() + 1
with pytest.raises(DatabaseOpFailedError) as exc:
reconfigure(db_conn, table='backlog', shards=1, replicas=replicas)
assert isinstance(exc.value.__cause__, r.ReqlOpFailedError)

View File

@ -90,3 +90,16 @@ def test_init_database(mock_create_database, mock_create_tables,
mock_create_database.assert_called_once_with(conn, 'mickeymouse')
mock_create_tables.assert_called_once_with(conn, 'mickeymouse')
mock_create_indexes.assert_called_once_with(conn, 'mickeymouse')
@mark.parametrize('admin_func_name,kwargs', (
('get_config', {'table': None}),
('reconfigure', {'table': None, 'shards': None, 'replicas': None}),
('set_shards', {'shards': None}),
('set_replicas', {'replicas': None}),
))
def test_admin(admin_func_name, kwargs):
from bigchaindb.backend import admin
admin_func = getattr(admin, admin_func_name)
with raises(NotImplementedError):
admin_func(None, **kwargs)

View File

@ -47,7 +47,7 @@ def test_set_shards(mock_reconfigure, monkeypatch, b):
monkeypatch.setattr(rethinkdb.RqlQuery, 'run', mockreturn_one_replica)
args = Namespace(num_shards=3)
run_set_shards(args)
mock_reconfigure.assert_called_with(replicas=1, shards=3)
mock_reconfigure.assert_called_with(replicas=1, shards=3, dry_run=False)
# this will mock the call to retrieve the database config
# we will set it to return three replica
@ -56,7 +56,7 @@ def test_set_shards(mock_reconfigure, monkeypatch, b):
monkeypatch.setattr(rethinkdb.RqlQuery, 'run', mockreturn_three_replicas)
run_set_shards(args)
mock_reconfigure.assert_called_with(replicas=3, shards=3)
mock_reconfigure.assert_called_with(replicas=3, shards=3, dry_run=False)
@patch('logging.Logger.warn')
@ -91,7 +91,7 @@ def test_set_replicas(mock_reconfigure, monkeypatch, b):
monkeypatch.setattr(rethinkdb.RqlQuery, 'run', mockreturn_two_shards)
args = Namespace(num_replicas=2)
run_set_replicas(args)
mock_reconfigure.assert_called_with(replicas=2, shards=2)
mock_reconfigure.assert_called_with(replicas=2, shards=2, dry_run=False)
# this will mock the call to retrieve the database config
# we will set it to return three shards
@ -100,7 +100,7 @@ def test_set_replicas(mock_reconfigure, monkeypatch, b):
monkeypatch.setattr(rethinkdb.RqlQuery, 'run', mockreturn_three_shards)
run_set_replicas(args)
mock_reconfigure.assert_called_with(replicas=2, shards=3)
mock_reconfigure.assert_called_with(replicas=2, shards=3, dry_run=False)
@patch('logging.Logger.warn')

View File

@ -170,13 +170,24 @@ def _setup_database(_configure_bigchaindb):
@pytest.fixture
def _bdb(_setup_database, _configure_bigchaindb):
yield
from bigchaindb import config
from bigchaindb.backend import connect
from .utils import flush_db
dbname = config['database']['name']
from bigchaindb.backend.admin import get_config
from bigchaindb.backend.schema import TABLES
from .utils import flush_db, update_table_config
conn = connect()
# TODO remove condition once the mongodb implementation is done
if config['database']['backend'] == 'rethinkdb':
table_configs_before = {
t: get_config(conn, table=t) for t in TABLES
}
yield
dbname = config['database']['name']
flush_db(conn, dbname)
# TODO remove condition once the mongodb implementation is done
if config['database']['backend'] == 'rethinkdb':
for t, c in table_configs_before.items():
update_table_config(conn, t, **c)
@pytest.fixture
@ -355,3 +366,30 @@ def not_yet_created_db(request):
schema.drop_database(conn, dbname)
except DatabaseDoesNotExist:
pass
@pytest.fixture
def db_config():
from bigchaindb import config
return config['database']
@pytest.fixture
def db_host(db_config):
return db_config['host']
@pytest.fixture
def db_port(db_config):
return db_config['port']
@pytest.fixture
def db_name(db_config):
return db_config['name']
@pytest.fixture
def db_conn():
from bigchaindb.backend import connect
return connect()

View File

@ -41,3 +41,13 @@ def flush_mongo_db(connection, dbname):
connection.conn[dbname].bigchain.delete_many({})
connection.conn[dbname].backlog.delete_many({})
connection.conn[dbname].votes.delete_many({})
@singledispatch
def update_table_config(connection, table, **kwrgas):
raise NotImplementedError
@update_table_config.register(RethinkDBConnection)
def update_table_config(connection, table, **kwargs):
return connection.run(r.table(table).config().update(dict(**kwargs)))