diff --git a/package.json b/package.json index fbac4b0..193c3e2 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "dev": "webpack -w", "clean": "rimraf dist/bundle dist/node", "test": "npm run lint && nyc ava test/ && npm run thanks && npm run report-coverage", + "mine-test": "nyc ava test/connection/test_mine*", "thanks": "cowsay Hi, thanks for your interest in BigchainDB. We appreciate your contribution!", "release": "./node_modules/release-it/bin/release-it.js --src.tagName='v%s' --github.release --npm.publish --non-interactive", "release-minor": "./node_modules/release-it/bin/release-it.js minor --src.tagName='v%s' --github.release --npm.publish --non-interactive", diff --git a/src/Transport.js b/src/Transport.js new file mode 100644 index 0000000..375f328 --- /dev/null +++ b/src/Transport.js @@ -0,0 +1,56 @@ +import Request from './request' + + +export default class Transport { + constructor(nodes, headers, timeout) { + this.connectionPool = [] + this.timeout = timeout + nodes.forEach(node => { + this.connectionPool.push(new Request(node, headers)) + }) + } + + // Select the connection with the earliest backoff time + pickConnection() { + if (this.connectionPool.length === 1) { + return this.connectionPool[0] + } + return this.minBackoff() + } + + minBackoff() { + let connection = this.connectionPool[0] + this.connectionPool.forEach(conn => { + // 0 the lowest value is the time for Thu Jan 01 1970 01:00:00 GMT+0100 (CET) + conn.backoffTime = conn.backoffTime ? conn.backoffTime : 0 + connection = (conn.backoffTime < connection.backoffTime) ? conn : connection + }) + return connection + } + + async forwardRequest(path, headers) { + while (!this.timeout || this.timeout > 0) { + const connection = this.pickConnection() + + // Date in milliseconds + const startTime = Date.now() + try { + // TODO wait until request is done + const response = connection.request( + path, + headers, + this.timeout + ) + return response + } catch (err) { + throw err + } finally { + const elapsed = Date.now() - startTime + if (this.timeout) { + this.timeout -= elapsed + } + } + } + throw new Error() + } +} diff --git a/src/connection.js b/src/connection.js index 0f9a2ef..41223f4 100644 --- a/src/connection.js +++ b/src/connection.js @@ -1,24 +1,55 @@ -import request from './request' +import Transport from './Transport' const HEADER_BLACKLIST = ['content-type'] - +const DEFAULT_NODE = 'http://localhost:9984' /** - * Base connection + * If initialized with ``>1`` nodes, the driver will send successive + requests to different nodes in a round-robin fashion (this will be + customizable in the future) + * + * @nodes + * list of + * + * @headers + * + * */ export default class Connection { - constructor(path, headers = {}) { - this.path = path + constructor(nodes, headers = {}, timeout = null) { + const nodesArray = Array.isArray(nodes) ? nodes : [nodes] + // Copy object this.headers = Object.assign({}, headers) + // Validate headers Object.keys(headers).forEach(header => { if (HEADER_BLACKLIST.includes(header.toLowerCase())) { throw new Error(`Header ${header} is reserved and cannot be set.`) } }) + + this.normalizedNodes = [] + if (!nodesArray) { + this.normalizedNodes.push(Connection.normalizeNode(DEFAULT_NODE, this.headers)) + } else { + nodesArray.forEach(node => { + this.normalizedNodes.push(Connection.normalizeNode(node, this.headers)) + }) + } + this.transport = new Transport(this.normalizedNodes, this.headers, timeout) } - getApiUrls(endpoint) { - return this.path + { + static normalizeNode(node, headers) { + if (typeof node === 'string') { + return { 'endpoint': node, 'headers': headers } + } else { + // TODO normalize URL if needed + const allHeaders = Object.assign({}, headers, node.headers) + return { 'endpoint': node, 'headers': allHeaders } + } + } + + static getApiUrls(endpoint) { + return { 'blocks': 'blocks', 'blocksDetail': 'blocks/%(blockHeight)s', 'outputs': 'outputs', @@ -33,17 +64,15 @@ export default class Connection { }[endpoint] } - _req(path, options = {}) { - // NOTE: `options.headers` could be undefined, but that's OK. - options.headers = Object.assign({}, options.headers, this.headers) - return request(path, options) + _req(pathEndpoint, options = {}) { + return this.transport.forwardRequest(pathEndpoint, options) } /** * @param blockHeight */ getBlock(blockHeight) { - return this._req(this.getApiUrls('blocksDetail'), { + return this._req(Connection.getApiUrls('blocksDetail'), { urlTemplateSpec: { blockHeight } @@ -54,7 +83,7 @@ export default class Connection { * @param transactionId */ getTransaction(transactionId) { - return this._req(this.getApiUrls('transactionsDetail'), { + return this._req(Connection.getApiUrls('transactionsDetail'), { urlTemplateSpec: { transactionId } @@ -66,7 +95,7 @@ export default class Connection { * @param status */ listBlocks(transactionId) { - return this._req(this.getApiUrls('blocks'), { + return this._req(Connection.getApiUrls('blocks'), { query: { transaction_id: transactionId, } @@ -86,7 +115,7 @@ export default class Connection { if (spent !== undefined) { query.spent = spent.toString() } - return this._req(this.getApiUrls('outputs'), { + return this._req(Connection.getApiUrls('outputs'), { query }) } @@ -96,7 +125,8 @@ export default class Connection { * @param operation */ listTransactions(assetId, operation) { - return this._req(this.getApiUrls('transactions'), { + console.log('listtransaction', assetId) + return this._req(Connection.getApiUrls('transactions'), { query: { asset_id: assetId, operation @@ -108,7 +138,7 @@ export default class Connection { * @param blockId */ listVotes(blockId) { - return this._req(this.getApiUrls('votes'), { + return this._req(Connection.getApiUrls('votes'), { query: { block_id: blockId } @@ -126,7 +156,7 @@ export default class Connection { * @param transaction */ postTransactionSync(transaction) { - return this._req(this.getApiUrls('transactionsSync'), { + return this._req(Connection.getApiUrls('transactionsSync'), { method: 'POST', jsonBody: transaction }) @@ -137,7 +167,7 @@ export default class Connection { * @param transaction */ postTransactionAsync(transaction) { - return this._req(this.getApiUrls('transactionsAsync'), { + return this._req(Connection.getApiUrls('transactionsAsync'), { method: 'POST', jsonBody: transaction }) @@ -148,7 +178,7 @@ export default class Connection { * @param transaction */ postTransactionCommit(transaction) { - return this._req(this.getApiUrls('transactionsCommit'), { + return this._req(Connection.getApiUrls('transactionsCommit'), { method: 'POST', jsonBody: transaction }) @@ -158,7 +188,7 @@ export default class Connection { * @param search */ searchAssets(search) { - return this._req(this.getApiUrls('assets'), { + return this._req(Connection.getApiUrls('assets'), { query: { search } @@ -169,7 +199,7 @@ export default class Connection { * @param search */ searchMetadata(search) { - return this._req(this.getApiUrls('metadata'), { + return this._req(Connection.getApiUrls('metadata'), { query: { search } diff --git a/src/index.js b/src/index.js index bbc98ee..24953b7 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,7 @@ export Ed25519Keypair from './Ed25519Keypair' export Connection from './connection' +export Request from './request' export Transaction from './transaction' export ccJsonLoad from './utils/ccJsonLoad' export ccJsonify from './utils/ccJsonify' diff --git a/src/request.js b/src/request.js index 1e45574..258579f 100644 --- a/src/request.js +++ b/src/request.js @@ -7,32 +7,95 @@ const DEFAULT_REQUEST_CONFIG = { } } +const BACKOFF_DELAY = 0.5 // seconds + /** * @private * Small wrapper around js-utility-belt's request that provides url resolving, * default settings, and response handling. */ -export default function request(url, config = {}) { - // Load default fetch configuration and remove any falsy query parameters - const requestConfig = Object.assign({}, DEFAULT_REQUEST_CONFIG, config, { - query: config.query && sanitize(config.query) - }) - const apiUrl = url - if (requestConfig.jsonBody) { - requestConfig.headers = Object.assign({}, requestConfig.headers, { - 'Content-Type': 'application/json' - }) + +export default class Request { + constructor(node, requestConfig) { + this.node = node + this.requestConfig = requestConfig + this.backoffTime = null } - if (!url) { - return Promise.reject(new Error('Request was not given a url.')) + async request(endpoint, config, timeout) { + // Num or retries to the same node + this.retries = 0 + // Load default fetch configuration and remove any falsy query parameters + const requestConfig = Object.assign({}, this.node.headers, DEFAULT_REQUEST_CONFIG, config, { + query: config.query && sanitize(config.query) + }) + const apiUrl = this.node.endpoint + endpoint + if (requestConfig.jsonBody) { + requestConfig.headers = Object.assign({}, requestConfig.headers, { + 'Content-Type': 'application/json' + }) + } + + if (!endpoint) { + return Promise.reject(new Error('Request was not given a url.')) + } + + // If `ConnectionError` occurs, a timestamp equal to now + + // the default delay (`BACKOFF_DELAY`) is assigned to the object. + // The timestamp is in UTC. Next time the function is called, it either + // waits till the timestamp is passed or raises `TimeoutError`. + // If `ConnectionError` occurs two or more times in a row, + // the retry count is incremented and the new timestamp is calculated + // as now + the default delay multiplied by two to the power of the + // number of retries. + // If a request is successful, the backoff timestamp is removed, + // the retry count is back to zero. + + this.backoffTimedelta = this.getBackoffTimedelta() + + if (timeout != null && timeout < this.backoffTimedelta) { + throw new Error() + } + if (this.backoffTimedelta > 0) { + await Request.sleep(this.backoffTimedelta) + } + this.timeout = this.timeout ? this.timeout - this.backoffTimedelta : timeout + + return baseRequest(apiUrl, requestConfig) + .then(res => async function handleResponse() { + res.json() + if (!(res.status >= 200 && res.status < 300)) { + console.log('Valid response') + } + }) + .catch(err => { + throw err + }) + .finally((res) => { + this.updateBackoffTime(res) + }) } - return baseRequest(apiUrl, requestConfig) - .then(res => res.json()) - .catch(err => { - console.error(err) - throw err - }) + getBackoffTimedelta() { + if (!this.backoffTime) { + return 0 + } + return (this.backoffTime - Date.now()) + } + + updateBackoffTime(success) { + if (success) { + this.retries = 0 + this.backoffTime = null + } else { + this.backoffTimedelta = BACKOFF_DELAY * (2 ** this.retries) + this.backoffTime = Date.now() + this.backoffTimedelta + this.retries += 1 + } + } + + static sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)) + } } diff --git a/test/connection/test_connection.js b/test/connection/test_connection.js index 23b866c..573cc2b 100644 --- a/test/connection/test_connection.js +++ b/test/connection/test_connection.js @@ -1,8 +1,7 @@ import test from 'ava' import sinon from 'sinon' -import * as request from '../../src/request' // eslint-disable-line -import { Connection } from '../../src' +import { Connection, Request } from '../../src' import { API_PATH } from '../constants' const conn = new Connection(API_PATH) @@ -32,13 +31,13 @@ test('Generate API URLS', t => { 'assets': 'assets', } Object.keys(endpoints).forEach(endpointName => { - const url = conn.getApiUrls(endpointName) - const expected = API_PATH + endpoints[endpointName] + const url = Connection.getApiUrls(endpointName) + const expected = endpoints[endpointName] t.is(url, expected) }) }) - +// TODO Redefine test test('Request with custom headers', t => { const testConn = new Connection(API_PATH, { hello: 'world' }) const expectedOptions = { @@ -49,11 +48,11 @@ test('Request with custom headers', t => { } // request is read only, cannot be mocked? - sinon.spy(request, 'default') + sinon.spy(Request, 'default') testConn._req(API_PATH, { headers: { custom: 'headers' } }) - t.truthy(request.default.calledWith(API_PATH, expectedOptions)) - request.default.restore() + t.truthy(Request.default.calledWith(API_PATH, expectedOptions)) + Request.default.restore() }) @@ -62,7 +61,7 @@ test('Get block for a block id', t => { const blockHeight = 'abc' conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.getBlock(blockHeight) t.truthy(conn._req.calledWith( @@ -77,7 +76,7 @@ test('Get transaction for a transaction id', t => { const transactionId = 'abc' conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.getTransaction(transactionId) t.truthy(conn._req.calledWith( @@ -92,7 +91,7 @@ test('Get list of blocks for a transaction id', t => { const transactionId = 'abc' conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.listBlocks(transactionId) t.truthy(conn._req.calledWith( @@ -112,7 +111,7 @@ test('Get list of transactions for an asset id', t => { const operation = 'operation' conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.listTransactions(assetId, operation) t.truthy(conn._req.calledWith( @@ -132,7 +131,7 @@ test('Get outputs for a public key and no spent flag', t => { const publicKey = 'publicKey' conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.listOutputs(publicKey) t.truthy(conn._req.calledWith( @@ -148,7 +147,7 @@ test('Get outputs for a public key and spent=false', t => { const spent = false conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.listOutputs(publicKey, spent) t.truthy(conn._req.calledWith( @@ -164,7 +163,7 @@ test('Get outputs for a public key and spent=true', t => { const spent = true conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.listOutputs(publicKey, spent) t.truthy(conn._req.calledWith( @@ -179,7 +178,7 @@ test('Get votes for a block id', t => { const blockId = 'abc' conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.listVotes(blockId) t.truthy(conn._req.calledWith( @@ -194,7 +193,7 @@ test('Get asset for text', t => { const search = 'abc' conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.searchAssets(search) t.truthy(conn._req.calledWith( @@ -209,7 +208,7 @@ test('Get metadata for text', t => { const search = 'abc' conn._req = sinon.spy() - conn.getApiUrls = sinon.stub().returns(expectedPath) + Connection.getApiUrls = sinon.stub().returns(expectedPath) conn.searchMetadata(search) t.truthy(conn._req.calledWith(