From 978c70de1e5b783c18143e0ec1e2a07341427be0 Mon Sep 17 00:00:00 2001 From: smart_ex Date: Fri, 10 Jun 2022 14:08:47 +1000 Subject: [PATCH] WIP health service, error handling --- src/app/routes.ts | 9 +--- src/modules/utils.ts | 10 ++++ src/queue/health.processor.ts | 11 +++- src/queue/relayer.processor.ts | 3 +- src/queue/worker.ts | 11 ++-- src/services/config.service.ts | 21 +++++--- src/services/health.service.ts | 97 ++++++++++++++++++++++++++-------- src/services/price.service.ts | 1 + src/services/tx.service.ts | 18 ++++++- 9 files changed, 134 insertions(+), 47 deletions(-) create mode 100644 src/modules/utils.ts diff --git a/src/app/routes.ts b/src/app/routes.ts index 1d5f07a..ca8d9b7 100644 --- a/src/app/routes.ts +++ b/src/app/routes.ts @@ -23,8 +23,7 @@ export function mainHandler(server: FastifyInstance, options, next) { async (req, res) => { const ethPrices = await priceService.getPrices(); const currentQueue = await jobService.getQueueCount(); - const errorsLog = await healthService.getErrors(); - console.log(currentQueue); + const health = await healthService.getStatus(); res.send({ rewardAccount, instances: configService.instances, @@ -33,11 +32,7 @@ export function mainHandler(server: FastifyInstance, options, next) { tornadoServiceFee, miningServiceFee: 0, version: '4.5.0', - health: { - status: 'true', - error: '', - errorsLog - }, + health, currentQueue, }); }); diff --git a/src/modules/utils.ts b/src/modules/utils.ts new file mode 100644 index 0000000..32d17d7 --- /dev/null +++ b/src/modules/utils.ts @@ -0,0 +1,10 @@ +export const parseJSON = (str: string) => { + let parsed = null; + try { + parsed = JSON.parse(str); + if (typeof parsed === 'string') parsed = parseJSON(parsed); + return parsed; + } catch (e) { + return parsed; + } +}; diff --git a/src/queue/health.processor.ts b/src/queue/health.processor.ts index 49d5864..5a9a7ea 100644 --- a/src/queue/health.processor.ts +++ b/src/queue/health.processor.ts @@ -2,7 +2,14 @@ import { getHealthService } from '../services'; import { Processor } from 'bullmq'; export const healthProcessor: Processor = async () => { - const healthService = getHealthService(); + const healthService = getHealthService(); + + try { await healthService.check(); + await healthService.setStatus({ status: true, error: '' }); + } catch (e) { + await healthService.saveError(e); + await healthService.setStatus({ status: false, error: e.message }); } -; +}; + diff --git a/src/queue/relayer.processor.ts b/src/queue/relayer.processor.ts index 3f53b33..631d95d 100644 --- a/src/queue/relayer.processor.ts +++ b/src/queue/relayer.processor.ts @@ -14,8 +14,7 @@ export const relayerProcessor: RelayerProcessor = async (job) => { const txData = await txService.prepareTxData(withdrawalData); return await txService.sendTx(txData); } catch (e) { - console.log(e); await job.update({ ...job.data, status: JobStatus.FAILED }); - throw new Error(e.message); + throw e; } }; diff --git a/src/queue/worker.ts b/src/queue/worker.ts index 8e91202..be28688 100644 --- a/src/queue/worker.ts +++ b/src/queue/worker.ts @@ -5,6 +5,8 @@ import { configService, getHealthService } from '../services'; export const priceWorker = async () => { await configService.init(); + const healthService = getHealthService(); + const price = new PriceQueueHelper(); price.scheduler.on('stalled', (jobId, prev) => console.log({ jobId, prev })); console.log('price worker', price.queue.name); @@ -12,7 +14,7 @@ export const priceWorker = async () => { price.worker.on('completed', async (job, result) => { console.log(`Job ${job.id} completed with result: ${result}`); }); - price.worker.on('failed', (job, error) => console.log(error)); + price.worker.on('failed', (job, error) => healthService.saveError(error)); }; export const relayerWorker = async () => { @@ -25,7 +27,7 @@ export const relayerWorker = async () => { }); relayer.worker.on('failed', (job, error) => { healthService.saveError(error); - console.log(error); + // console.log(error); }); }; @@ -37,5 +39,8 @@ export const healthWorker = async () => { health.worker.on('completed', (job, result) => { console.log(`Job ${job.id} completed with result: `, result); }); - health.worker.on('failed', (job, error) => console.log(error)); + health.worker.on('failed', (job, error) => { + + // console.log(error); + }); }; diff --git a/src/services/config.service.ts b/src/services/config.service.ts index 2e0aceb..b86775a 100644 --- a/src/services/config.service.ts +++ b/src/services/config.service.ts @@ -1,5 +1,6 @@ import { instances, + mainnetRpcUrl, minimumBalance, minimumTornBalance, netId, @@ -39,6 +40,7 @@ export class ConfigService { isLightMode: boolean; instances: NetInstances; provider: providers.JsonRpcProvider; + mainnentProvider: providers.JsonRpcProvider; wallet: Wallet; public readonly netId: availableIds = netId; public readonly privateKey = privateKey; @@ -50,13 +52,13 @@ export class ConfigService { private _tokenContract: ERC20Abi; balances: { MAIN: { warn: string; critical: string; }; TORN: { warn: string; critical: string; }; }; - constructor(private store: RedisStore) { this.netIdKey = `netId${this.netId}`; this.queueName = `relayer_${this.netId}`; this.isLightMode = ![1, 5].includes(netId); this.instances = instances[this.netIdKey]; this.provider = getProvider(false); + this.mainnentProvider = getProvider(false, mainnetRpcUrl, 1); this.wallet = new Wallet(this.privateKey, this.provider); this.balances = { MAIN: { warn: BigNumber.from(minimumBalance).mul(150).div(100).toString(), critical: minimumBalance }, @@ -85,18 +87,18 @@ export class ConfigService { } } - private async _checkNetwork() { - try { - await this.provider.getNetwork(); - } catch (e) { - throw new Error(`Could not detect network, check your rpc url: ${this.rpcUrl}. ` + e.message); + async checkNetwork() { + await this.provider.getNetwork(); + if (this.isLightMode) { + await this.mainnentProvider.getNetwork(); } } + async init() { try { if (this.isInit) return; - await this._checkNetwork(); + await this.checkNetwork(); this._tokenAddress = await resolve(torn.torn.address); this._tokenContract = await getTornTokenContract(this._tokenAddress); if (this.isLightMode) { @@ -130,12 +132,15 @@ export class ConfigService { this.isInit = true; } catch (e) { console.error(`${this.constructor.name} Error:`, e.message); + process.exit(1); } } async clearRedisState() { const queueKeys = (await this.store.client.keys('bull:*')).filter(s => s.indexOf('relayer') === -1); - await this.store.client.del(queueKeys); + const errorKeys = await this.store.client.keys('errors:*'); + // const alertKeys = await this.store.client.keys('alerts:*'); + await this.store.client.del([...queueKeys, ...errorKeys]); } getInstance(address: string) { diff --git a/src/services/health.service.ts b/src/services/health.service.ts index 065e189..b2d6d5b 100644 --- a/src/services/health.service.ts +++ b/src/services/health.service.ts @@ -3,27 +3,73 @@ import { ConfigService } from './config.service'; import { RedisStore } from '../modules/redis'; import { formatEther } from 'ethers/lib/utils'; +class RelayerError extends Error { + constructor(message: string, code: string) { + super(message); + this.code = code; + } + + code: string; +} + @autoInjectable() export class HealthService { + constructor(private config: ConfigService, private store: RedisStore) { } async clearErrors() { - await this.store.client.del('errors'); + await this.store.client.del('errors:log', 'errors:code'); } - async getErrors(): Promise<{ message: string, score: number }[]> { - const set = await this.store.client.zrevrange('errors', 0, -1, 'WITHSCORES'); - const errors = []; - while (set.length) { - const [message, score] = set.splice(0, 2); - errors.push({ message, score }); + private async _getErrors(): Promise<{ errorsLog: { message: string, score: number }[], errorsCode: Record }> { + const logSet = await this.store.client.zrevrange('errors:log', 0, -1, 'WITHSCORES'); + const codeSet = await this.store.client.zrevrange('errors:code', 0, -1, 'WITHSCORES'); + + return { errorsLog: HealthService._parseSet(logSet), errorsCode: HealthService._parseSet(codeSet, 'object') }; + } + + private async _getStatus() { + return this.store.client.hgetall('health:status'); + } + + private static _parseSet(log, to = 'array', keys = ['message', 'score']) { + let out; + if (to === 'array') { + out = []; + while (log.length) { + const [a, b] = log.splice(0, 2); + out.push({ [keys[0]]: a, [keys[1]]: b }); + } + } else { + out = {}; + while (log.length) { + const [a, b] = log.splice(0, 2); + out[a] = Number(b); + } } - return errors; + + return out; + } + + async setStatus(status: { status: boolean; error: string; }) { + await this.store.client.hset('health:status', status); + } + + async getStatus() { + const heathStatus = await this._getStatus(); + const { errorsLog, errorsCode } = await this._getErrors(); + + return { + ...heathStatus, + errorsLog, + errorsCode, + }; } async saveError(e) { - await this.store.client.zadd('errors', 'INCR', 1, e.message); + await this.store.client.zadd('errors:code', 'INCR', 1, e?.code || 'RUNTIME_ERROR'); + await this.store.client.zadd('errors:log', 'INCR', 1, e.message); } private async _checkBalance(value, currency: 'MAIN' | 'TORN') { @@ -37,26 +83,31 @@ export class HealthService { level = 'WARN'; } - const isSent = await this.store.client.sismember(`${key}:sent`, `${type}_${currency}_${level}`); - if (!isSent) { - const alert = { - type: `${type}_${currency}_${level}`, - message: `Insufficient balance ${formatEther(value)} ${currency === 'MAIN' ? this.config.nativeCurrency : 'torn'}`, - level, - time, - }; - await this.store.client.rpush(key, JSON.stringify(alert)); - } + const alert = { + type: `${type}_${currency}_${level}`, + message: `Insufficient balance ${formatEther(value)} ${currency === 'MAIN' ? this.config.nativeCurrency : 'torn'}`, + level, + time, + }; + await this.store.client.rpush(key, JSON.stringify(alert)); + return alert; } async check() { + await this.config.checkNetwork(); const mainBalance = await this.config.wallet.getBalance(); const tornBalance = await this.config.tokenContract.balanceOf(this.config.wallet.address); - // const mainBalance = BigNumber.from(`${2e18}`).add(1); - // const tornBalance = BigNumber.from(`${50e18}`); - await this._checkBalance(mainBalance, 'MAIN'); - await this._checkBalance(tornBalance, 'TORN'); + // const mainBalance = BigNumber.from(`${1e18}`).add(1); + // const tornBalance = BigNumber.from(`${45e18}`); + const mainStatus = await this._checkBalance(mainBalance, 'MAIN'); + const tornStatus = await this._checkBalance(tornBalance, 'TORN'); + if (mainStatus.level === 'CRITICAL') { + throw new RelayerError(mainStatus.message, 'INSUFFICIENT_MAIN_BALANCE'); + } + if (tornStatus.level === 'CRITICAL') { + throw new RelayerError(tornStatus.message, 'INSUFFICIENT_TORN_BALANCE'); + } } } diff --git a/src/services/price.service.ts b/src/services/price.service.ts index fb7738b..7853bf5 100644 --- a/src/services/price.service.ts +++ b/src/services/price.service.ts @@ -28,6 +28,7 @@ export class PriceService { async fetchPrices(tokens: Token[]) { try { + if (!tokens?.length) return; const names = tokens.reduce((p, c) => { p[c.address] = c.symbol.toLowerCase(); return p; diff --git a/src/services/tx.service.ts b/src/services/tx.service.ts index c01dc9b..0142b13 100644 --- a/src/services/tx.service.ts +++ b/src/services/tx.service.ts @@ -11,6 +11,7 @@ import { Job } from 'bullmq'; import { RelayerJobData } from '../queue'; import { ConfigService } from './config.service'; import { container, injectable } from 'tsyringe'; +import { parseJSON } from '../modules/utils'; export type WithdrawalData = { contract: string, @@ -25,6 +26,15 @@ export type WithdrawalData = { ] } +class ExecutionError extends Error { + constructor(message: string, code?: string) { + super(message); + this.code = code; + } + + code: string; +} + @injectable() export class TxService { set currentJob(value: Job) { @@ -39,9 +49,9 @@ export class TxService { constructor(private config: ConfigService, private priceService: PriceService) { const { privateKey, rpcUrl, netId } = this.config; - this.txManager = new TxManager({ privateKey, rpcUrl, config: { THROW_ON_REVERT: true } }); this.tornadoProxy = this.config.proxyContract; this.provider = this.tornadoProxy.provider; + this.txManager = new TxManager({ privateKey, rpcUrl, config: { THROW_ON_REVERT: true }, provider: this.provider }); this.oracle = new GasPriceOracle({ defaultRpc: rpcUrl, chainId: netId, @@ -77,7 +87,11 @@ export class TxService { } else throw new Error('Submitted transaction failed'); return receipt; } catch (e) { - throw new Error(e.message); + const regex = /body=("\{.*}}")/; + if (regex.test(e.message)) { + const { error } = parseJSON(regex.exec(e.message)[1]); + throw new ExecutionError(error.message, 'REVERTED'); + } else throw e.message; } }