1
0
mirror of https://github.com/kremalicious/metamask-extension.git synced 2024-12-23 09:52:26 +01:00

streams - use pump and published obj-multiplex

This commit is contained in:
kumavis 2017-09-07 21:17:49 -07:00
parent 440a42bbc3
commit 57e4805c62
5 changed files with 44 additions and 81 deletions

View File

@ -1,48 +0,0 @@
const through = require('through2')
module.exports = ObjectMultiplex
function ObjectMultiplex (opts) {
opts = opts || {}
// create multiplexer
const mx = through.obj(function (chunk, enc, cb) {
const name = chunk.name
const data = chunk.data
if (!name) {
console.warn(`ObjectMultiplex - Malformed chunk without name "${chunk}"`)
return cb()
}
const substream = mx.streams[name]
if (!substream) {
console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`)
} else {
if (substream.push) substream.push(data)
}
return cb()
})
mx.streams = {}
// create substreams
mx.createStream = function (name) {
const substream = mx.streams[name] = through.obj(function (chunk, enc, cb) {
mx.push({
name: name,
data: chunk,
})
return cb()
})
mx.on('end', function () {
return substream.emit('end')
})
if (opts.error) {
mx.on('error', function () {
return substream.emit('error')
})
}
return substream
}
// ignore streams (dont display orphaned data warning)
mx.ignoreStream = function (name) {
mx.streams[name] = true
}
return mx
}

View File

@ -1,5 +1,6 @@
const Duplex = require('readable-stream').Duplex
const inherits = require('util').inherits
const noop = function(){}
module.exports = PortDuplexStream
@ -20,20 +21,14 @@ PortDuplexStream.prototype._onMessage = function (msg) {
if (Buffer.isBuffer(msg)) {
delete msg._isBuffer
var data = new Buffer(msg)
// console.log('PortDuplexStream - saw message as buffer', data)
this.push(data)
} else {
// console.log('PortDuplexStream - saw message', msg)
this.push(msg)
}
}
PortDuplexStream.prototype._onDisconnect = function () {
try {
this.push(null)
} catch (err) {
this.emit('error', err)
}
this.destroy()
}
// stream plumbing
@ -45,19 +40,12 @@ PortDuplexStream.prototype._write = function (msg, encoding, cb) {
if (Buffer.isBuffer(msg)) {
var data = msg.toJSON()
data._isBuffer = true
// console.log('PortDuplexStream - sent message as buffer', data)
this._port.postMessage(data)
} else {
// console.log('PortDuplexStream - sent message', msg)
this._port.postMessage(msg)
}
} catch (err) {
// console.error(err)
return cb(new Error('PortDuplexStream - disconnected'))
}
cb()
}
// util
function noop () {}

View File

@ -1,6 +1,7 @@
const Through = require('through2')
const endOfStream = require('end-of-stream')
const ObjectMultiplex = require('./obj-multiplex')
const ObjectMultiplex = require('obj-multiplex')
const pump = require('pump')
module.exports = {
jsonParseStream: jsonParseStream,
@ -23,14 +24,14 @@ function jsonStringifyStream () {
}
function setupMultiplex (connectionStream) {
var mx = ObjectMultiplex()
connectionStream.pipe(mx).pipe(connectionStream)
endOfStream(mx, function (err) {
if (err) console.error(err)
})
endOfStream(connectionStream, function (err) {
if (err) console.error(err)
mx.destroy()
})
return mx
const mux = new ObjectMultiplex()
pump(
connectionStream,
mux,
connectionStream,
(err) => {
if (err) console.error(err)
}
)
return mux
}

View File

@ -1,7 +1,7 @@
const EventEmitter = require('events')
const extend = require('xtend')
const promiseToCallback = require('promise-to-callback')
const pipe = require('pump')
const pump = require('pump')
const Dnode = require('dnode')
const ObservableStore = require('obs-store')
const EthStore = require('./lib/eth-store')
@ -367,7 +367,14 @@ module.exports = class MetamaskController extends EventEmitter {
setupControllerConnection (outStream) {
const api = this.getApi()
const dnode = Dnode(api)
outStream.pipe(dnode).pipe(outStream)
pump(
outStream,
dnode,
outStream,
(err) => {
if (err) console.error(err)
}
)
dnode.on('remote', (remote) => {
// push updates to popup
const sendUpdate = remote.sendUpdate.bind(remote)
@ -376,6 +383,7 @@ module.exports = class MetamaskController extends EventEmitter {
}
setupProviderConnection (outStream, originDomain) {
// setup json rpc engine stack
const engine = new RpcEngine()
engine.push(originMiddleware)
engine.push(loggerMiddleware)
@ -383,13 +391,21 @@ module.exports = class MetamaskController extends EventEmitter {
// setup connection
const providerStream = createEngineStream({ engine })
outStream.pipe(providerStream).pipe(outStream)
pump(
outStream,
providerStream,
outStream,
(err) => {
if (err) console.error(err)
}
)
// append dapp origin domain to request
function originMiddleware (req, res, next, end) {
req.origin = originDomain
next()
}
// log rpc activity
function loggerMiddleware (req, res, next, end) {
next((cb) => {
@ -401,6 +417,7 @@ module.exports = class MetamaskController extends EventEmitter {
cb()
})
}
// forward requests to provider
function createProviderMiddleware({ provider }) {
return (req, res, next, end) => {
@ -414,9 +431,12 @@ module.exports = class MetamaskController extends EventEmitter {
}
setupPublicConfig (outStream) {
pipe(
pump(
this.publicConfigStore,
outStream
outStream,
(err) => {
if (err) console.error(err)
}
)
}

View File

@ -69,6 +69,7 @@
"eth-bin-to-ops": "^1.0.1",
"eth-contract-metadata": "^1.1.4",
"eth-hd-keyring": "^1.1.1",
"eth-json-rpc-filters": "^1.0.1",
"eth-phishing-detect": "^1.1.4",
"eth-query": "^2.1.2",
"eth-sig-util": "^1.2.2",
@ -101,6 +102,7 @@
"mkdirp": "^0.5.1",
"multiplex": "^6.7.0",
"number-to-bn": "^1.7.0",
"obj-multiplex": "^1.0.0",
"obs-store": "^2.3.1",
"once": "^1.3.3",
"ping-pong-stream": "^1.0.0",
@ -121,7 +123,7 @@
"react-select": "^1.0.0-rc.2",
"react-simple-file-input": "^1.0.0",
"react-tooltip-component": "^0.3.0",
"readable-stream": "^2.1.2",
"readable-stream": "^2.3.3",
"redux": "^3.0.5",
"redux-logger": "^3.0.6",
"redux-thunk": "^2.2.0",