mirror of
https://github.com/bigchaindb/js-bigchaindb-driver.git
synced 2024-11-22 09:46:58 +01:00
cap backoff time
This commit is contained in:
parent
e0cde66749
commit
bd8db702c4
@ -5,22 +5,21 @@
|
|||||||
import Transport from './transport'
|
import Transport from './transport'
|
||||||
|
|
||||||
const HEADER_BLACKLIST = ['content-type']
|
const HEADER_BLACKLIST = ['content-type']
|
||||||
const DEFAULT_NODE = 'http://localhost:9984'
|
const DEFAULT_NODE = 'http://localhost:9984/api/v1/'
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If initialized with ``>1`` nodes, the driver will send successive
|
|
||||||
requests to different nodes in a round-robin fashion (this will be
|
|
||||||
customizable in the future)
|
|
||||||
*
|
*
|
||||||
* @nodes
|
* @param {String, Array} nodes Nodes for the connection. String possible to be backwards compatible
|
||||||
* list of
|
* with version before 4.1.0 version
|
||||||
*
|
* @param {Object} headers Common headers for every request
|
||||||
* @headers
|
* @param {float} timeout Optional timeout in secs
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
export default class Connection {
|
export default class Connection {
|
||||||
constructor(nodes, headers = {}, timeout = null) {
|
// 20 seconds is the default value for a timeout if not specified
|
||||||
const nodesArray = Array.isArray(nodes) ? nodes : [nodes]
|
constructor(nodes, headers = {}, timeout = 20000) {
|
||||||
// Copy object
|
// Copy object
|
||||||
this.headers = Object.assign({}, headers)
|
this.headers = Object.assign({}, headers)
|
||||||
|
|
||||||
@ -32,13 +31,16 @@ export default class Connection {
|
|||||||
})
|
})
|
||||||
|
|
||||||
this.normalizedNodes = []
|
this.normalizedNodes = []
|
||||||
if (!nodesArray) {
|
if (!nodes) {
|
||||||
this.normalizedNodes.push(Connection.normalizeNode(DEFAULT_NODE, this.headers))
|
this.normalizedNodes.push(Connection.normalizeNode(DEFAULT_NODE, this.headers))
|
||||||
} else {
|
} else if (Array.isArray(nodes)) {
|
||||||
nodesArray.forEach(node => {
|
nodes.forEach(node => {
|
||||||
this.normalizedNodes.push(Connection.normalizeNode(node, this.headers))
|
this.normalizedNodes.push(Connection.normalizeNode(node, this.headers))
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
this.normalizedNodes.push(Connection.normalizeNode(nodes, this.headers))
|
||||||
}
|
}
|
||||||
|
|
||||||
this.transport = new Transport(this.normalizedNodes, this.headers, timeout)
|
this.transport = new Transport(this.normalizedNodes, this.headers, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ export default class Request {
|
|||||||
this.connectionError = null
|
this.connectionError = null
|
||||||
}
|
}
|
||||||
|
|
||||||
async request(endpoint, config, setTimeout) {
|
async request(endpoint, config, timeout, maxBackoffTime) {
|
||||||
// Load default fetch configuration and remove any falsy query parameters
|
// Load default fetch configuration and remove any falsy query parameters
|
||||||
const requestConfig = Object.assign({}, this.node.headers, DEFAULT_REQUEST_CONFIG, config, {
|
const requestConfig = Object.assign({}, this.node.headers, DEFAULT_REQUEST_CONFIG, config, {
|
||||||
query: config.query && sanitize(config.query)
|
query: config.query && sanitize(config.query)
|
||||||
@ -58,7 +58,7 @@ export default class Request {
|
|||||||
|
|
||||||
const backoffTimedelta = this.getBackoffTimedelta()
|
const backoffTimedelta = this.getBackoffTimedelta()
|
||||||
|
|
||||||
if (setTimeout != null && setTimeout < this.backoffTimedelta) {
|
if (timeout != null && timeout < backoffTimedelta) {
|
||||||
const errorObject = {
|
const errorObject = {
|
||||||
message: 'TimeoutError'
|
message: 'TimeoutError'
|
||||||
}
|
}
|
||||||
@ -67,16 +67,15 @@ export default class Request {
|
|||||||
if (backoffTimedelta > 0) {
|
if (backoffTimedelta > 0) {
|
||||||
await Request.sleep(backoffTimedelta)
|
await Request.sleep(backoffTimedelta)
|
||||||
}
|
}
|
||||||
this.timeout = setTimeout ? setTimeout - backoffTimedelta : setTimeout
|
// this.timeout = setTimeout ? setTimeout - backoffTimedelta : setTimeout
|
||||||
return baseRequest(apiUrl, requestConfig)
|
return baseRequest(apiUrl, requestConfig)
|
||||||
.then(res => res.json())
|
.then(res => res.json())
|
||||||
.catch(err => {
|
.catch(err => {
|
||||||
// ConnectionError
|
// ConnectionError
|
||||||
this.connectionError = err
|
this.connectionError = err
|
||||||
// throw err
|
|
||||||
})
|
})
|
||||||
.finally(() => {
|
.finally(() => {
|
||||||
this.updateBackoffTime()
|
this.updateBackoffTime(maxBackoffTime)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,12 +86,12 @@ export default class Request {
|
|||||||
return (this.backoffTime - Date.now())
|
return (this.backoffTime - Date.now())
|
||||||
}
|
}
|
||||||
|
|
||||||
updateBackoffTime() {
|
updateBackoffTime(maxBackoffTime) {
|
||||||
if (!this.connectionError) {
|
if (!this.connectionError) {
|
||||||
this.retries = 0
|
this.retries = 0
|
||||||
this.backoffTime = null
|
this.backoffTime = null
|
||||||
} else {
|
} else {
|
||||||
const backoffTimedelta = BACKOFF_DELAY * (2 ** this.retries)
|
const backoffTimedelta = Math.min(BACKOFF_DELAY * (2 ** this.retries), maxBackoffTime)
|
||||||
this.backoffTime = Date.now() + backoffTimedelta
|
this.backoffTime = Date.now() + backoffTimedelta
|
||||||
this.retries += 1
|
this.retries += 1
|
||||||
}
|
}
|
||||||
|
@ -5,16 +5,28 @@
|
|||||||
import Request from './request'
|
import Request from './request'
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @private
|
||||||
|
* If initialized with ``>1`` nodes, the driver will send successive
|
||||||
|
* requests to different nodes in a round-robin fashion (this will be
|
||||||
|
* customizable in the future).
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
export default class Transport {
|
export default class Transport {
|
||||||
constructor(nodes, headers, timeout) {
|
constructor(nodes, headers, timeout) {
|
||||||
this.connectionPool = []
|
this.connectionPool = []
|
||||||
this.timeout = timeout
|
this.timeout = timeout
|
||||||
|
// the maximum backoff time is 10 seconds
|
||||||
|
this.maxBackoffTime = timeout ? timeout / 10 : 10000
|
||||||
nodes.forEach(node => {
|
nodes.forEach(node => {
|
||||||
this.connectionPool.push(new Request(node, headers))
|
this.connectionPool.push(new Request(node, headers))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select the connection with the earliest backoff time
|
// Select the connection with the earliest backoff time, in case of a tie,
|
||||||
|
// prefer the one with the smaller list index
|
||||||
pickConnection() {
|
pickConnection() {
|
||||||
if (this.connectionPool.length === 1) {
|
if (this.connectionPool.length === 1) {
|
||||||
return this.connectionPool[0]
|
return this.connectionPool[0]
|
||||||
@ -32,6 +44,7 @@ export default class Transport {
|
|||||||
async forwardRequest(path, headers) {
|
async forwardRequest(path, headers) {
|
||||||
let response
|
let response
|
||||||
let connection
|
let connection
|
||||||
|
// A new request will be executed until there is a valid response or timeout < 0
|
||||||
while (!this.timeout || this.timeout > 0) {
|
while (!this.timeout || this.timeout > 0) {
|
||||||
connection = this.pickConnection()
|
connection = this.pickConnection()
|
||||||
// Date in milliseconds
|
// Date in milliseconds
|
||||||
@ -41,25 +54,23 @@ export default class Transport {
|
|||||||
response = await connection.request(
|
response = await connection.request(
|
||||||
path,
|
path,
|
||||||
headers,
|
headers,
|
||||||
this.timeout
|
this.timeout,
|
||||||
|
this.maxBackoffTime
|
||||||
)
|
)
|
||||||
const elapsed = Date.now() - startTime
|
const elapsed = Date.now() - startTime
|
||||||
if (connection.backoffTime) {
|
if (connection.backoffTime) {
|
||||||
this.timeout += elapsed
|
this.timeout -= elapsed
|
||||||
} else {
|
} else {
|
||||||
|
// No connection error, the response is valid
|
||||||
return response
|
return response
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connection.retries > 3) {
|
|
||||||
throw connection.connectionError
|
|
||||||
}
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
throw err
|
throw err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const errorObject = {
|
const errorObject = {
|
||||||
message: 'Timeout error',
|
message: 'TimeoutError',
|
||||||
}
|
}
|
||||||
throw errorObject
|
throw connection.connectionError ? connection.connectionError : errorObject
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,7 @@ const conn = new Connection(API_PATH)
|
|||||||
|
|
||||||
test('Payload thrown at incorrect API_PATH', async t => {
|
test('Payload thrown at incorrect API_PATH', async t => {
|
||||||
const path = 'http://localhost:9984/api/wrong/'
|
const path = 'http://localhost:9984/api/wrong/'
|
||||||
const connection = new Connection(path)
|
const connection = new Connection(path, {}, 0)
|
||||||
const target = {
|
const target = {
|
||||||
message: 'HTTP Error: Requested page not reachable',
|
message: 'HTTP Error: Requested page not reachable',
|
||||||
status: '404 NOT FOUND',
|
status: '404 NOT FOUND',
|
||||||
|
@ -28,8 +28,8 @@ test('Keypair is created', t => {
|
|||||||
// TODO: The following tests are a bit messy currently, please do:
|
// TODO: The following tests are a bit messy currently, please do:
|
||||||
//
|
//
|
||||||
// - tidy up dependency on `pollStatusAndFetchTransaction`
|
// - tidy up dependency on `pollStatusAndFetchTransaction`
|
||||||
test('Valid CREATE transaction', t => {
|
test('Valid CREATE transaction with default node', t => {
|
||||||
const conn = new Connection(API_PATH)
|
const conn = new Connection()
|
||||||
|
|
||||||
const tx = Transaction.makeCreateTransaction(
|
const tx = Transaction.makeCreateTransaction(
|
||||||
asset(),
|
asset(),
|
||||||
|
@ -8,25 +8,13 @@ import {
|
|||||||
Connection
|
Connection
|
||||||
} from '../../src'
|
} from '../../src'
|
||||||
|
|
||||||
|
|
||||||
test('Pick connection with earliest backoff time', t => {
|
|
||||||
const path1 = 'http://localhost:9984/api/v1/'
|
|
||||||
const path2 = 'http://localhost:9984/api/wrong/'
|
|
||||||
|
|
||||||
const conn = new Connection([path1, path2])
|
|
||||||
|
|
||||||
conn.searchAssets('example')
|
|
||||||
const connection1 = conn.transport.connectionPool[0]
|
|
||||||
|
|
||||||
t.deepEqual(conn.transport.pickConnection(), connection1)
|
|
||||||
})
|
|
||||||
|
|
||||||
test('Pick connection with earliest backoff time', async t => {
|
test('Pick connection with earliest backoff time', async t => {
|
||||||
const path1 = 'http://localhost:9984/api/v1/'
|
const path1 = 'http://localhost:9984/api/v1/'
|
||||||
const path2 = 'http://localhost:9984/api/wrong/'
|
const path2 = 'http://localhost:9984/api/wrong/'
|
||||||
|
|
||||||
// Reverse order
|
// Reverse order
|
||||||
const conn = new Connection([path2, path1])
|
const conn = new Connection([path2, path1])
|
||||||
|
// This will trigger the 'forwardRequest' so the correct connection will be taken
|
||||||
await conn.searchAssets('example')
|
await conn.searchAssets('example')
|
||||||
|
|
||||||
const connection1 = conn.transport.connectionPool[1]
|
const connection1 = conn.transport.connectionPool[1]
|
||||||
|
Loading…
Reference in New Issue
Block a user