Added replicaset name to bigchaindb config (#1063)

* Added replicaset name to bigchaindb config

* changed travis replSet to match bigchaindb default

* Updated initialize_replica_set

It now initializes the replica set with the name provided by the
bigchaindb config file.


* initialize_replica_set is now called when creating a mongodb connection.

This way we are sure that a replica set exists when we return a
connection.

* Moved the initialize replica set logic to the connection.

* update the config documentation
This commit is contained in:
Rodolphe Marques 2017-01-20 14:05:28 +01:00 committed by GitHub
parent 7606bdb59f
commit e7ffcf5705
11 changed files with 248 additions and 183 deletions

View File

@ -8,5 +8,5 @@ elif [[ "${BIGCHAINDB_DATABASE_BACKEND}" == mongodb ]]; then
wget http://downloads.mongodb.org/linux/mongodb-linux-x86_64-3.4.1.tgz -O /tmp/mongodb.tgz
tar -xvf /tmp/mongodb.tgz
mkdir /tmp/mongodb-data
${PWD}/mongodb-linux-x86_64-3.4.1/bin/mongod --dbpath=/tmp/mongodb-data --replSet=rs0 &> /dev/null &
${PWD}/mongodb-linux-x86_64-3.4.1/bin/mongod --dbpath=/tmp/mongodb-data --replSet=bigchain-rs &> /dev/null &
fi

View File

@ -18,7 +18,8 @@ config = {
'backend': os.environ.get('BIGCHAINDB_DATABASE_BACKEND', 'rethinkdb'),
'host': os.environ.get('BIGCHAINDB_DATABASE_HOST', 'localhost'),
'port': int(os.environ.get('BIGCHAINDB_DATABASE_PORT', 28015)),
'name': 'bigchain',
'name': os.environ.get('BIGCHAINDB_DATABASE_NAME', 'bigchain'),
'replicaset': os.environ.get('BIGCHAINDB_DATABASE_REPLICASET', 'bigchain-rs'),
},
'keypair': {
'public': None,

View File

@ -13,7 +13,7 @@ BACKENDS = {
logger = logging.getLogger(__name__)
def connect(backend=None, host=None, port=None, name=None):
def connect(backend=None, host=None, port=None, name=None, replicaset=None):
"""Create a new connection to the database backend.
All arguments default to the current configuration's values if not
@ -24,6 +24,8 @@ def connect(backend=None, host=None, port=None, name=None):
host (str): the host to connect to.
port (int): the port to connect to.
name (str): the name of the database to use.
replicaset (str): the name of the replica set (only relevant for
MongoDB connections).
Returns:
An instance of :class:`~bigchaindb.backend.connection.Connection`

View File

@ -2,9 +2,10 @@ import time
import logging
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
from pymongo import errors
import bigchaindb
from bigchaindb.common import exceptions
from bigchaindb.backend.connection import Connection
logger = logging.getLogger(__name__)
@ -12,7 +13,8 @@ logger = logging.getLogger(__name__)
class MongoDBConnection(Connection):
def __init__(self, host=None, port=None, dbname=None, max_tries=3):
def __init__(self, host=None, port=None, dbname=None, max_tries=3,
replicaset=None):
"""Create a new Connection instance.
Args:
@ -20,10 +22,13 @@ class MongoDBConnection(Connection):
port (int, optional): the port to connect to.
dbname (str, optional): the database to use.
max_tries (int, optional): how many tries before giving up.
replicaset (str, optional): the name of the replica set to
connect to.
"""
self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port']
self.replicaset = replicaset or bigchaindb.config['database']['replicaset']
self.dbname = dbname or bigchaindb.config['database']['name']
self.max_tries = max_tries
self.connection = None
@ -39,11 +44,95 @@ class MongoDBConnection(Connection):
return self.conn[self.dbname]
def _connect(self):
# we should only return a connection if the replica set is
# initialized. initialize_replica_set will check if the
# replica set is initialized else it will initialize it.
initialize_replica_set()
for i in range(self.max_tries):
try:
self.connection = MongoClient(self.host, self.port)
except ConnectionFailure as exc:
self.connection = MongoClient(self.host, self.port,
replicaset=self.replicaset)
except errors.ConnectionFailure:
if i + 1 == self.max_tries:
raise
else:
time.sleep(2**i)
def initialize_replica_set():
"""Initialize a replica set. If already initialized skip."""
# Setup a MongoDB connection
# The reason we do this instead of `backend.connect` is that
# `backend.connect` will connect you to a replica set but this fails if
# you try to connect to a replica set that is not yet initialized
conn = MongoClient(host=bigchaindb.config['database']['host'],
port=bigchaindb.config['database']['port'])
_check_replica_set(conn)
host = '{}:{}'.format(bigchaindb.config['database']['host'],
bigchaindb.config['database']['port'])
config = {'_id': bigchaindb.config['database']['replicaset'],
'members': [{'_id': 0, 'host': host}]}
try:
conn.admin.command('replSetInitiate', config)
except errors.OperationFailure as exc_info:
if exc_info.details['codeName'] == 'AlreadyInitialized':
return
raise
else:
_wait_for_replica_set_initialization(conn)
logger.info('Initialized replica set')
def _check_replica_set(conn):
"""Checks if the replSet option was enabled either through the command
line option or config file and if it matches the one provided by
bigchaindb configuration.
Note:
The setting we are looking for will have a different name depending
if it was set by the config file (`replSetName`) or by command
line arguments (`replSet`).
Raise:
:exc:`~ConfigurationError`: If mongod was not started with the
replSet option.
"""
options = conn.admin.command('getCmdLineOpts')
try:
repl_opts = options['parsed']['replication']
repl_set_name = repl_opts.get('replSetName', None) or repl_opts['replSet']
except KeyError:
raise exceptions.ConfigurationError('mongod was not started with'
' the replSet option.')
bdb_repl_set_name = bigchaindb.config['database']['replicaset']
if repl_set_name != bdb_repl_set_name:
raise exceptions.ConfigurationError('The replicaset configuration of '
'bigchaindb (`{}`) needs to match '
'the replica set name from MongoDB'
' (`{}`)'
.format(bdb_repl_set_name,
repl_set_name))
def _wait_for_replica_set_initialization(conn):
"""Wait for a replica set to finish initialization.
If a replica set is being initialized for the first time it takes some
time. Nodes need to discover each other and an election needs to take
place. During this time the database is not writable so we need to wait
before continuing with the rest of the initialization
"""
# I did not find a better way to do this for now.
# To check if the database is ready we will poll the mongodb logs until
# we find the line that says the database is ready
logger.info('Waiting for mongodb replica set initialization')
while True:
logs = conn.admin.command('getLog', 'rs')['log']
if any('database writes are now permitted' in line for line in logs):
return
time.sleep(0.1)

View File

@ -1,10 +1,8 @@
"""Utils to initialize and drop the database."""
import time
import logging
from pymongo import ASCENDING, DESCENDING
from pymongo import errors
from bigchaindb import backend
from bigchaindb.common import exceptions
@ -26,9 +24,6 @@ def create_database(conn, dbname):
# TODO: read and write concerns can be declared here
conn.conn.get_database(dbname)
# initialize the replica set
initialize_replica_set(conn)
@register_schema(MongoDBConnection)
def create_tables(conn, dbname):
@ -95,66 +90,3 @@ def create_votes_secondary_index(conn, dbname):
('node_pubkey',
ASCENDING)],
name='block_and_voter')
def initialize_replica_set(conn):
"""Initialize a replica set. If already initialized skip."""
replica_set_name = _get_replica_set_name(conn)
config = {'_id': replica_set_name,
'members': [{'_id': 0, 'host': 'localhost:27017'}]}
try:
conn.conn.admin.command('replSetInitiate', config)
except errors.OperationFailure as exc_info:
if exc_info.details['codeName'] == 'AlreadyInitialized':
logger.info('Replica set already initialized')
return
raise
else:
_wait_for_replica_set_initialization(conn)
logger.info('Initialized replica set')
def _get_replica_set_name(conn):
"""Checks if the replSet option was enabled either through the command
line option or config file.
Note:
The setting we are looking for will have a different name depending
if it was set by the config file (`replSetName`) or by command
line arguments (`replSet`).
Returns:
The replica set name if enabled.
Raise:
:exc:`~ConfigurationError`: If mongod was not started with the
replSet option.
"""
options = conn.conn.admin.command('getCmdLineOpts')
try:
repl_opts = options['parsed']['replication']
return repl_opts.get('replSetName', None) or repl_opts['replSet']
except KeyError:
raise exceptions.ConfigurationError('mongod was not started with'
' the replSet option.')
def _wait_for_replica_set_initialization(conn):
"""Wait for a replica set to finish initialization.
If a replica set is being initialized for the first time it takes some
time. Nodes need to discover each other and an election needs to take
place. During this time the database is not writable so we need to wait
before continuing with the rest of the initialization
"""
# I did not find a better way to do this for now.
# To check if the database is ready we will poll the mongodb logs until
# we find the line that says the database is ready
logger.info('Waiting for mongodb replica set initialization')
while True:
logs = conn.conn.admin.command('getLog', 'rs')['log']
if any('database writes are now permitted' in line for line in logs):
return
time.sleep(0.1)

View File

@ -15,6 +15,7 @@ For convenience, here's a list of all the relevant environment variables (docume
`BIGCHAINDB_DATABASE_HOST`<br>
`BIGCHAINDB_DATABASE_PORT`<br>
`BIGCHAINDB_DATABASE_NAME`<br>
`BIGCHAINDB_DATABASE_REPLICASET`<br>
`BIGCHAINDB_SERVER_BIND`<br>
`BIGCHAINDB_SERVER_WORKERS`<br>
`BIGCHAINDB_SERVER_THREADS`<br>
@ -77,7 +78,7 @@ Note how the keys in the list are separated by colons.
```
## database.backend, database.host, database.port & database.name
## database.backend, database.host, database.port, database.name & database.replicaset
The database backend to use (e.g. RethinkDB) and its hostname, port and name.
@ -87,6 +88,7 @@ export BIGCHAINDB_DATABASE_BACKEND=rethinkdb
export BIGCHAINDB_DATABASE_HOST=localhost
export BIGCHAINDB_DATABASE_PORT=28015
export BIGCHAINDB_DATABASE_NAME=bigchain
export BIGCHAINDB_DATABASE_REPLICASET=bigchain-rs
```
**Example config file snippet**
@ -95,22 +97,25 @@ export BIGCHAINDB_DATABASE_NAME=bigchain
"backend": "rethinkdb",
"host": "localhost",
"port": 28015,
"name": "bigchain"
"name": "bigchain",
"replicaset": "bigchain-rs"
}
```
**Default values (a snippet from `bigchaindb/__init__.py`)**
```python
'database': {
"backend": "rethinkdb",
'backend': os.environ.get('BIGCHAINDB_DATABASE_BACKEND', 'rethinkdb'),
'host': os.environ.get('BIGCHAINDB_DATABASE_HOST', 'localhost'),
'port': 28015,
'name': 'bigchain',
'port': int(os.environ.get('BIGCHAINDB_DATABASE_PORT', 28015)),
'name': os.environ.get('BIGCHAINDB_DATABASE_NAME', 'bigchain'),
'replicaset': os.environ.get('BIGCHAINDB_DATABASE_REPLICASET', 'bigchain-rs')
}
```
**Note**: As of now, only RethinkDB ("rethinkdb") is supported as a value for `database.backend`. In
the future, other options (e.g. MongoDB) will be available.
**Note**: We are currently adding support for MongoDB. The `replicaset` and
`BIGCHAINDB_DATABASE_REPLICASET` option is only used if the `backend` or
`BIGCHAINDB_DATABASE_BACKEND` is set to `"mongodb"`.
## server.bind, server.workers & server.threads

View File

@ -1,7 +1,35 @@
from unittest import mock
import pytest
from pymongo.errors import ConnectionFailure
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.errors import ConnectionFailure, OperationFailure
pytestmark = pytest.mark.bdb
@pytest.fixture
def mock_cmd_line_opts():
return {'argv': ['mongod', '--dbpath=/data', '--replSet=bigchain-rs'],
'ok': 1.0,
'parsed': {'replication': {'replSet': 'bigchain-rs'},
'storage': {'dbPath': '/data'}}}
@pytest.fixture
def mock_config_opts():
return {'argv': ['mongod', '--dbpath=/data', '--replSet=bigchain-rs'],
'ok': 1.0,
'parsed': {'replication': {'replSetName': 'bigchain-rs'},
'storage': {'dbPath': '/data'}}}
@pytest.fixture
def mongodb_connection():
import bigchaindb
return MongoClient(host=bigchaindb.config['database']['host'],
port=bigchaindb.config['database']['port'])
def test_get_connection_returns_the_correct_instance():
@ -13,17 +41,20 @@ def test_get_connection_returns_the_correct_instance():
'backend': 'mongodb',
'host': 'localhost',
'port': 27017,
'name': 'test'
'name': 'test',
'replicaset': 'bigchain-rs'
}
conn = connect(**config)
assert isinstance(conn, Connection)
assert isinstance(conn, MongoDBConnection)
assert conn.conn._topology_settings.replica_set_name == config['replicaset']
@mock.patch('bigchaindb.backend.mongodb.connection.initialize_replica_set')
@mock.patch('pymongo.MongoClient.__init__')
@mock.patch('time.sleep')
def test_connection_error(mock_sleep, mock_client):
def test_connection_error(mock_sleep, mock_client, mock_init_repl_set):
from bigchaindb.backend import connect
# force the driver to trow ConnectionFailure
@ -36,3 +67,85 @@ def test_connection_error(mock_sleep, mock_client):
conn.db
assert mock_client.call_count == 3
def test_check_replica_set_not_enabled(mongodb_connection):
from bigchaindb.backend.mongodb.connection import _check_replica_set
from bigchaindb.common.exceptions import ConfigurationError
# no replSet option set
cmd_line_opts = {'argv': ['mongod', '--dbpath=/data'],
'ok': 1.0,
'parsed': {'storage': {'dbPath': '/data'}}}
with mock.patch.object(Database, 'command', return_value=cmd_line_opts):
with pytest.raises(ConfigurationError):
_check_replica_set(mongodb_connection)
def test_check_replica_set_command_line(mongodb_connection,
mock_cmd_line_opts):
from bigchaindb.backend.mongodb.connection import _check_replica_set
# replSet option set through the command line
with mock.patch.object(Database, 'command',
return_value=mock_cmd_line_opts):
assert _check_replica_set(mongodb_connection) is None
def test_check_replica_set_config_file(mongodb_connection, mock_config_opts):
from bigchaindb.backend.mongodb.connection import _check_replica_set
# replSet option set through the config file
with mock.patch.object(Database, 'command', return_value=mock_config_opts):
assert _check_replica_set(mongodb_connection) is None
def test_check_replica_set_name_mismatch(mongodb_connection,
mock_cmd_line_opts):
from bigchaindb.backend.mongodb.connection import _check_replica_set
from bigchaindb.common.exceptions import ConfigurationError
# change the replica set name so it does not match the bigchaindb config
mock_cmd_line_opts['parsed']['replication']['replSet'] = 'rs0'
with mock.patch.object(Database, 'command',
return_value=mock_cmd_line_opts):
with pytest.raises(ConfigurationError):
_check_replica_set(mongodb_connection)
def test_wait_for_replica_set_initialization(mongodb_connection):
from bigchaindb.backend.mongodb.connection import _wait_for_replica_set_initialization # noqa
with mock.patch.object(Database, 'command') as mock_command:
mock_command.side_effect = [
{'log': ['a line']},
{'log': ['database writes are now permitted']},
]
# check that it returns
assert _wait_for_replica_set_initialization(mongodb_connection) is None
def test_initialize_replica_set(mock_cmd_line_opts):
from bigchaindb.backend.mongodb.connection import initialize_replica_set
with mock.patch.object(Database, 'command') as mock_command:
mock_command.side_effect = [
mock_cmd_line_opts,
None,
{'log': ['database writes are now permitted']},
]
# check that it returns
assert initialize_replica_set() is None
# test it raises OperationError if anything wrong
with mock.patch.object(Database, 'command') as mock_command:
mock_command.side_effect = [
mock_cmd_line_opts,
OperationFailure(None, details={'codeName': ''})
]
with pytest.raises(OperationFailure):
initialize_replica_set()

View File

@ -1,8 +1,4 @@
from unittest import mock
import pytest
from pymongo.database import Database
from pymongo.errors import OperationFailure
pytestmark = pytest.mark.bdb
@ -106,90 +102,3 @@ def test_drop(dummy_db):
assert dummy_db in conn.conn.database_names()
schema.drop_database(conn, dummy_db)
assert dummy_db not in conn.conn.database_names()
def test_get_replica_set_name_not_enabled():
from bigchaindb import backend
from bigchaindb.backend.mongodb.schema import _get_replica_set_name
from bigchaindb.common.exceptions import ConfigurationError
conn = backend.connect()
# no replSet option set
cmd_line_opts = {'argv': ['mongod', '--dbpath=/data'],
'ok': 1.0,
'parsed': {'storage': {'dbPath': '/data'}}}
with mock.patch.object(Database, 'command', return_value=cmd_line_opts):
with pytest.raises(ConfigurationError):
_get_replica_set_name(conn)
def test_get_replica_set_name_command_line():
from bigchaindb import backend
from bigchaindb.backend.mongodb.schema import _get_replica_set_name
conn = backend.connect()
# replSet option set through the command line
cmd_line_opts = {'argv': ['mongod', '--dbpath=/data', '--replSet=rs0'],
'ok': 1.0,
'parsed': {'replication': {'replSet': 'rs0'},
'storage': {'dbPath': '/data'}}}
with mock.patch.object(Database, 'command', return_value=cmd_line_opts):
assert _get_replica_set_name(conn) == 'rs0'
def test_get_replica_set_name_config_file():
from bigchaindb import backend
from bigchaindb.backend.mongodb.schema import _get_replica_set_name
conn = backend.connect()
# replSet option set through the config file
cmd_line_opts = {'argv': ['mongod', '--dbpath=/data', '--replSet=rs0'],
'ok': 1.0,
'parsed': {'replication': {'replSetName': 'rs0'},
'storage': {'dbPath': '/data'}}}
with mock.patch.object(Database, 'command', return_value=cmd_line_opts):
assert _get_replica_set_name(conn) == 'rs0'
def test_wait_for_replica_set_initialization():
from bigchaindb.backend.mongodb.schema import _wait_for_replica_set_initialization # noqa
from bigchaindb.backend import connect
conn = connect()
with mock.patch.object(Database, 'command') as mock_command:
mock_command.side_effect = [
{'log': ['a line']},
{'log': ['database writes are now permitted']},
]
# check that it returns
assert _wait_for_replica_set_initialization(conn) is None
def test_initialize_replica_set():
from bigchaindb.backend.mongodb.schema import initialize_replica_set
from bigchaindb.backend import connect
conn = connect()
with mock.patch.object(Database, 'command') as mock_command:
mock_command.side_effect = [
mock.DEFAULT,
None,
{'log': ['database writes are now permitted']},
]
# check that it returns
assert initialize_replica_set(conn) is None
# test it raises OperationError if anything wrong
with mock.patch.object(Database, 'command') as mock_command:
mock_command.side_effect = [
mock.DEFAULT,
OperationFailure(None, details={'codeName': ''})
]
with pytest.raises(OperationFailure):
initialize_replica_set(conn)

View File

@ -136,18 +136,11 @@ def _configure_bigchaindb(request):
def _setup_database(_configure_bigchaindb):
from bigchaindb import config
from bigchaindb.backend import connect, schema
from bigchaindb.backend.mongodb.schema import initialize_replica_set
from bigchaindb.common.exceptions import DatabaseDoesNotExist
print('Initializing test db')
dbname = config['database']['name']
conn = connect()
# if we are setting up mongodb for the first time we need to make sure
# that the replica set is initialized before doing any operation in the
# database
if config['database']['backend'] == 'mongodb':
initialize_replica_set(conn)
try:
schema.drop_database(conn, dbname)
except DatabaseDoesNotExist:

View File

@ -130,6 +130,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request):
'host': 'test-host',
'port': 4242,
'name': 'test-dbname',
'replicaset': 'bigchain-rs'
},
'keypair': {
'public': None,
@ -207,3 +208,22 @@ def test_write_config():
m.assert_called_once_with(CONFIG_DEFAULT_PATH, 'w')
handle = m()
handle.write.assert_called_once_with('{}')
@pytest.mark.parametrize('env_name,env_value,config_key', (
('BIGCHAINDB_DATABASE_BACKEND', 'test-backend', 'backend'),
('BIGCHAINDB_DATABASE_HOST', 'test-host', 'host'),
('BIGCHAINDB_DATABASE_PORT', 4242, 'port'),
('BIGCHAINDB_DATABASE_NAME', 'test-db', 'name'),
('BIGCHAINDB_DATABASE_REPLICASET', 'test-replicaset', 'replicaset')
))
def test_database_envs(env_name, env_value, config_key, monkeypatch):
import bigchaindb
monkeypatch.setattr('os.environ', {env_name: env_value})
bigchaindb.config_utils.autoconfigure()
expected_config = copy.deepcopy(bigchaindb.config)
expected_config['database'][config_key] = env_value
assert bigchaindb.config == expected_config

View File

@ -9,6 +9,7 @@ def config(request, monkeypatch):
'host': 'host',
'port': 28015,
'name': 'bigchain',
'replicaset': 'bigchain-rs',
},
'keypair': {
'public': 'pubkey',