created model for new connections

This commit is contained in:
manolodewiner 2018-08-22 10:10:09 +02:00
parent 124586dcbc
commit 7d978286f5
6 changed files with 208 additions and 58 deletions

View File

@ -22,6 +22,7 @@
"dev": "webpack -w",
"clean": "rimraf dist/bundle dist/node",
"test": "npm run lint && nyc ava test/ && npm run thanks && npm run report-coverage",
"mine-test": "nyc ava test/connection/test_mine*",
"thanks": "cowsay Hi, thanks for your interest in BigchainDB. We appreciate your contribution!",
"release": "./node_modules/release-it/bin/release-it.js --src.tagName='v%s' --github.release --npm.publish --non-interactive",
"release-minor": "./node_modules/release-it/bin/release-it.js minor --src.tagName='v%s' --github.release --npm.publish --non-interactive",

56
src/Transport.js Normal file
View File

@ -0,0 +1,56 @@
import Request from './request'
export default class Transport {
constructor(nodes, headers, timeout) {
this.connectionPool = []
this.timeout = timeout
nodes.forEach(node => {
this.connectionPool.push(new Request(node, headers))
})
}
// Select the connection with the earliest backoff time
pickConnection() {
if (this.connectionPool.length === 1) {
return this.connectionPool[0]
}
return this.minBackoff()
}
minBackoff() {
let connection = this.connectionPool[0]
this.connectionPool.forEach(conn => {
// 0 the lowest value is the time for Thu Jan 01 1970 01:00:00 GMT+0100 (CET)
conn.backoffTime = conn.backoffTime ? conn.backoffTime : 0
connection = (conn.backoffTime < connection.backoffTime) ? conn : connection
})
return connection
}
async forwardRequest(path, headers) {
while (!this.timeout || this.timeout > 0) {
const connection = this.pickConnection()
// Date in milliseconds
const startTime = Date.now()
try {
// TODO wait until request is done
const response = connection.request(
path,
headers,
this.timeout
)
return response
} catch (err) {
throw err
} finally {
const elapsed = Date.now() - startTime
if (this.timeout) {
this.timeout -= elapsed
}
}
}
throw new Error()
}
}

View File

@ -1,24 +1,55 @@
import request from './request'
import Transport from './Transport'
const HEADER_BLACKLIST = ['content-type']
const DEFAULT_NODE = 'http://localhost:9984'
/**
* Base connection
* If initialized with ``>1`` nodes, the driver will send successive
requests to different nodes in a round-robin fashion (this will be
customizable in the future)
*
* @nodes
* list of
*
* @headers
*
*
*/
export default class Connection {
constructor(path, headers = {}) {
this.path = path
constructor(nodes, headers = {}, timeout = null) {
const nodesArray = Array.isArray(nodes) ? nodes : [nodes]
// Copy object
this.headers = Object.assign({}, headers)
// Validate headers
Object.keys(headers).forEach(header => {
if (HEADER_BLACKLIST.includes(header.toLowerCase())) {
throw new Error(`Header ${header} is reserved and cannot be set.`)
}
})
this.normalizedNodes = []
if (!nodesArray) {
this.normalizedNodes.push(Connection.normalizeNode(DEFAULT_NODE, this.headers))
} else {
nodesArray.forEach(node => {
this.normalizedNodes.push(Connection.normalizeNode(node, this.headers))
})
}
this.transport = new Transport(this.normalizedNodes, this.headers, timeout)
}
getApiUrls(endpoint) {
return this.path + {
static normalizeNode(node, headers) {
if (typeof node === 'string') {
return { 'endpoint': node, 'headers': headers }
} else {
// TODO normalize URL if needed
const allHeaders = Object.assign({}, headers, node.headers)
return { 'endpoint': node, 'headers': allHeaders }
}
}
static getApiUrls(endpoint) {
return {
'blocks': 'blocks',
'blocksDetail': 'blocks/%(blockHeight)s',
'outputs': 'outputs',
@ -33,17 +64,15 @@ export default class Connection {
}[endpoint]
}
_req(path, options = {}) {
// NOTE: `options.headers` could be undefined, but that's OK.
options.headers = Object.assign({}, options.headers, this.headers)
return request(path, options)
_req(pathEndpoint, options = {}) {
return this.transport.forwardRequest(pathEndpoint, options)
}
/**
* @param blockHeight
*/
getBlock(blockHeight) {
return this._req(this.getApiUrls('blocksDetail'), {
return this._req(Connection.getApiUrls('blocksDetail'), {
urlTemplateSpec: {
blockHeight
}
@ -54,7 +83,7 @@ export default class Connection {
* @param transactionId
*/
getTransaction(transactionId) {
return this._req(this.getApiUrls('transactionsDetail'), {
return this._req(Connection.getApiUrls('transactionsDetail'), {
urlTemplateSpec: {
transactionId
}
@ -66,7 +95,7 @@ export default class Connection {
* @param status
*/
listBlocks(transactionId) {
return this._req(this.getApiUrls('blocks'), {
return this._req(Connection.getApiUrls('blocks'), {
query: {
transaction_id: transactionId,
}
@ -86,7 +115,7 @@ export default class Connection {
if (spent !== undefined) {
query.spent = spent.toString()
}
return this._req(this.getApiUrls('outputs'), {
return this._req(Connection.getApiUrls('outputs'), {
query
})
}
@ -96,7 +125,8 @@ export default class Connection {
* @param operation
*/
listTransactions(assetId, operation) {
return this._req(this.getApiUrls('transactions'), {
console.log('listtransaction', assetId)
return this._req(Connection.getApiUrls('transactions'), {
query: {
asset_id: assetId,
operation
@ -108,7 +138,7 @@ export default class Connection {
* @param blockId
*/
listVotes(blockId) {
return this._req(this.getApiUrls('votes'), {
return this._req(Connection.getApiUrls('votes'), {
query: {
block_id: blockId
}
@ -126,7 +156,7 @@ export default class Connection {
* @param transaction
*/
postTransactionSync(transaction) {
return this._req(this.getApiUrls('transactionsSync'), {
return this._req(Connection.getApiUrls('transactionsSync'), {
method: 'POST',
jsonBody: transaction
})
@ -137,7 +167,7 @@ export default class Connection {
* @param transaction
*/
postTransactionAsync(transaction) {
return this._req(this.getApiUrls('transactionsAsync'), {
return this._req(Connection.getApiUrls('transactionsAsync'), {
method: 'POST',
jsonBody: transaction
})
@ -148,7 +178,7 @@ export default class Connection {
* @param transaction
*/
postTransactionCommit(transaction) {
return this._req(this.getApiUrls('transactionsCommit'), {
return this._req(Connection.getApiUrls('transactionsCommit'), {
method: 'POST',
jsonBody: transaction
})
@ -158,7 +188,7 @@ export default class Connection {
* @param search
*/
searchAssets(search) {
return this._req(this.getApiUrls('assets'), {
return this._req(Connection.getApiUrls('assets'), {
query: {
search
}
@ -169,7 +199,7 @@ export default class Connection {
* @param search
*/
searchMetadata(search) {
return this._req(this.getApiUrls('metadata'), {
return this._req(Connection.getApiUrls('metadata'), {
query: {
search
}

View File

@ -1,6 +1,7 @@
export Ed25519Keypair from './Ed25519Keypair'
export Connection from './connection'
export Request from './request'
export Transaction from './transaction'
export ccJsonLoad from './utils/ccJsonLoad'
export ccJsonify from './utils/ccJsonify'

View File

@ -7,32 +7,95 @@ const DEFAULT_REQUEST_CONFIG = {
}
}
const BACKOFF_DELAY = 0.5 // seconds
/**
* @private
* Small wrapper around js-utility-belt's request that provides url resolving,
* default settings, and response handling.
*/
export default function request(url, config = {}) {
// Load default fetch configuration and remove any falsy query parameters
const requestConfig = Object.assign({}, DEFAULT_REQUEST_CONFIG, config, {
query: config.query && sanitize(config.query)
})
const apiUrl = url
if (requestConfig.jsonBody) {
requestConfig.headers = Object.assign({}, requestConfig.headers, {
'Content-Type': 'application/json'
})
export default class Request {
constructor(node, requestConfig) {
this.node = node
this.requestConfig = requestConfig
this.backoffTime = null
}
if (!url) {
return Promise.reject(new Error('Request was not given a url.'))
async request(endpoint, config, timeout) {
// Num or retries to the same node
this.retries = 0
// Load default fetch configuration and remove any falsy query parameters
const requestConfig = Object.assign({}, this.node.headers, DEFAULT_REQUEST_CONFIG, config, {
query: config.query && sanitize(config.query)
})
const apiUrl = this.node.endpoint + endpoint
if (requestConfig.jsonBody) {
requestConfig.headers = Object.assign({}, requestConfig.headers, {
'Content-Type': 'application/json'
})
}
if (!endpoint) {
return Promise.reject(new Error('Request was not given a url.'))
}
// If `ConnectionError` occurs, a timestamp equal to now +
// the default delay (`BACKOFF_DELAY`) is assigned to the object.
// The timestamp is in UTC. Next time the function is called, it either
// waits till the timestamp is passed or raises `TimeoutError`.
// If `ConnectionError` occurs two or more times in a row,
// the retry count is incremented and the new timestamp is calculated
// as now + the default delay multiplied by two to the power of the
// number of retries.
// If a request is successful, the backoff timestamp is removed,
// the retry count is back to zero.
this.backoffTimedelta = this.getBackoffTimedelta()
if (timeout != null && timeout < this.backoffTimedelta) {
throw new Error()
}
if (this.backoffTimedelta > 0) {
await Request.sleep(this.backoffTimedelta)
}
this.timeout = this.timeout ? this.timeout - this.backoffTimedelta : timeout
return baseRequest(apiUrl, requestConfig)
.then(res => async function handleResponse() {
res.json()
if (!(res.status >= 200 && res.status < 300)) {
console.log('Valid response')
}
})
.catch(err => {
throw err
})
.finally((res) => {
this.updateBackoffTime(res)
})
}
return baseRequest(apiUrl, requestConfig)
.then(res => res.json())
.catch(err => {
console.error(err)
throw err
})
getBackoffTimedelta() {
if (!this.backoffTime) {
return 0
}
return (this.backoffTime - Date.now())
}
updateBackoffTime(success) {
if (success) {
this.retries = 0
this.backoffTime = null
} else {
this.backoffTimedelta = BACKOFF_DELAY * (2 ** this.retries)
this.backoffTime = Date.now() + this.backoffTimedelta
this.retries += 1
}
}
static sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
}

View File

@ -1,8 +1,7 @@
import test from 'ava'
import sinon from 'sinon'
import * as request from '../../src/request' // eslint-disable-line
import { Connection } from '../../src'
import { Connection, Request } from '../../src'
import { API_PATH } from '../constants'
const conn = new Connection(API_PATH)
@ -32,13 +31,13 @@ test('Generate API URLS', t => {
'assets': 'assets',
}
Object.keys(endpoints).forEach(endpointName => {
const url = conn.getApiUrls(endpointName)
const expected = API_PATH + endpoints[endpointName]
const url = Connection.getApiUrls(endpointName)
const expected = endpoints[endpointName]
t.is(url, expected)
})
})
// TODO Redefine test
test('Request with custom headers', t => {
const testConn = new Connection(API_PATH, { hello: 'world' })
const expectedOptions = {
@ -49,11 +48,11 @@ test('Request with custom headers', t => {
}
// request is read only, cannot be mocked?
sinon.spy(request, 'default')
sinon.spy(Request, 'default')
testConn._req(API_PATH, { headers: { custom: 'headers' } })
t.truthy(request.default.calledWith(API_PATH, expectedOptions))
request.default.restore()
t.truthy(Request.default.calledWith(API_PATH, expectedOptions))
Request.default.restore()
})
@ -62,7 +61,7 @@ test('Get block for a block id', t => {
const blockHeight = 'abc'
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.getBlock(blockHeight)
t.truthy(conn._req.calledWith(
@ -77,7 +76,7 @@ test('Get transaction for a transaction id', t => {
const transactionId = 'abc'
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.getTransaction(transactionId)
t.truthy(conn._req.calledWith(
@ -92,7 +91,7 @@ test('Get list of blocks for a transaction id', t => {
const transactionId = 'abc'
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.listBlocks(transactionId)
t.truthy(conn._req.calledWith(
@ -112,7 +111,7 @@ test('Get list of transactions for an asset id', t => {
const operation = 'operation'
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.listTransactions(assetId, operation)
t.truthy(conn._req.calledWith(
@ -132,7 +131,7 @@ test('Get outputs for a public key and no spent flag', t => {
const publicKey = 'publicKey'
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.listOutputs(publicKey)
t.truthy(conn._req.calledWith(
@ -148,7 +147,7 @@ test('Get outputs for a public key and spent=false', t => {
const spent = false
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.listOutputs(publicKey, spent)
t.truthy(conn._req.calledWith(
@ -164,7 +163,7 @@ test('Get outputs for a public key and spent=true', t => {
const spent = true
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.listOutputs(publicKey, spent)
t.truthy(conn._req.calledWith(
@ -179,7 +178,7 @@ test('Get votes for a block id', t => {
const blockId = 'abc'
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.listVotes(blockId)
t.truthy(conn._req.calledWith(
@ -194,7 +193,7 @@ test('Get asset for text', t => {
const search = 'abc'
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.searchAssets(search)
t.truthy(conn._req.calledWith(
@ -209,7 +208,7 @@ test('Get metadata for text', t => {
const search = 'abc'
conn._req = sinon.spy()
conn.getApiUrls = sinon.stub().returns(expectedPath)
Connection.getApiUrls = sinon.stub().returns(expectedPath)
conn.searchMetadata(search)
t.truthy(conn._req.calledWith(