MongoDB socket timeout to handle master re-election (#1638)
* add a socket timeout so that changefeeds can reconnect if theres a re-election * fix threshold max depth at 100
This commit is contained in:
parent
c14b53b90f
commit
599dd21410
|
@ -111,7 +111,8 @@ class MongoDBConnection(Connection):
|
|||
self.port,
|
||||
replicaset=self.replicaset,
|
||||
serverselectiontimeoutms=self.connection_timeout,
|
||||
ssl=self.ssl)
|
||||
ssl=self.ssl,
|
||||
**MONGO_OPTS)
|
||||
if self.login is not None and self.password is not None:
|
||||
client[self.dbname].authenticate(self.login, self.password)
|
||||
else:
|
||||
|
@ -126,7 +127,8 @@ class MongoDBConnection(Connection):
|
|||
ssl_keyfile=self.keyfile,
|
||||
ssl_pem_passphrase=self.keyfile_passphrase,
|
||||
ssl_crlfile=self.crlfile,
|
||||
ssl_cert_reqs=CERT_REQUIRED)
|
||||
ssl_cert_reqs=CERT_REQUIRED,
|
||||
**MONGO_OPTS)
|
||||
if self.login is not None:
|
||||
client[self.dbname].authenticate(self.login,
|
||||
mechanism='MONGODB-X509')
|
||||
|
@ -143,6 +145,11 @@ class MongoDBConnection(Connection):
|
|||
raise ConfigurationError from exc
|
||||
|
||||
|
||||
MONGO_OPTS = {
|
||||
'socketTimeoutMS': 20000,
|
||||
}
|
||||
|
||||
|
||||
def initialize_replica_set(host, port, connection_timeout, dbname, ssl, login,
|
||||
password, ca_cert, certfile, keyfile,
|
||||
keyfile_passphrase, crlfile):
|
||||
|
@ -160,7 +167,8 @@ def initialize_replica_set(host, port, connection_timeout, dbname, ssl, login,
|
|||
conn = pymongo.MongoClient(host,
|
||||
port,
|
||||
serverselectiontimeoutms=connection_timeout,
|
||||
ssl=ssl)
|
||||
ssl=ssl,
|
||||
**MONGO_OPTS)
|
||||
if login is not None and password is not None:
|
||||
conn[dbname].authenticate(login, password)
|
||||
else:
|
||||
|
@ -174,7 +182,8 @@ def initialize_replica_set(host, port, connection_timeout, dbname, ssl, login,
|
|||
ssl_keyfile=keyfile,
|
||||
ssl_pem_passphrase=keyfile_passphrase,
|
||||
ssl_crlfile=crlfile,
|
||||
ssl_cert_reqs=CERT_REQUIRED)
|
||||
ssl_cert_reqs=CERT_REQUIRED,
|
||||
**MONGO_OPTS)
|
||||
if login is not None:
|
||||
logger.info('Authenticating to the database...')
|
||||
conn[dbname].authenticate(login, mechanism='MONGODB-X509')
|
||||
|
|
|
@ -155,26 +155,26 @@ def _fulfillment_to_details(fulfillment):
|
|||
raise UnsupportedTypeError(fulfillment.type_name)
|
||||
|
||||
|
||||
def _fulfillment_from_details(data):
|
||||
def _fulfillment_from_details(data, _depth=0):
|
||||
"""
|
||||
Load a fulfillment for a signing spec dictionary
|
||||
|
||||
Args:
|
||||
data: tx.output[].condition.details dictionary
|
||||
"""
|
||||
if _depth == 100:
|
||||
raise ThresholdTooDeep()
|
||||
|
||||
if data['type'] == 'ed25519-sha-256':
|
||||
public_key = base58.b58decode(data['public_key'])
|
||||
return Ed25519Sha256(public_key=public_key)
|
||||
|
||||
if data['type'] == 'threshold-sha-256':
|
||||
try:
|
||||
threshold = ThresholdSha256(data['threshold'])
|
||||
for cond in data['subconditions']:
|
||||
cond = _fulfillment_from_details(cond)
|
||||
threshold.add_subfulfillment(cond)
|
||||
return threshold
|
||||
except RecursionError:
|
||||
raise ThresholdTooDeep()
|
||||
threshold = ThresholdSha256(data['threshold'])
|
||||
for cond in data['subconditions']:
|
||||
cond = _fulfillment_from_details(cond, _depth+1)
|
||||
threshold.add_subfulfillment(cond)
|
||||
return threshold
|
||||
|
||||
raise UnsupportedTypeError(data.get('type'))
|
||||
|
||||
|
|
Loading…
Reference in New Issue