WIP health service, error handling

This commit is contained in:
smart_ex 2022-06-10 14:08:47 +10:00
parent ec2f20bfaf
commit 978c70de1e
9 changed files with 134 additions and 47 deletions

View File

@ -23,8 +23,7 @@ export function mainHandler(server: FastifyInstance, options, next) {
async (req, res) => {
const ethPrices = await priceService.getPrices();
const currentQueue = await jobService.getQueueCount();
const errorsLog = await healthService.getErrors();
console.log(currentQueue);
const health = await healthService.getStatus();
res.send({
rewardAccount,
instances: configService.instances,
@ -33,11 +32,7 @@ export function mainHandler(server: FastifyInstance, options, next) {
tornadoServiceFee,
miningServiceFee: 0,
version: '4.5.0',
health: {
status: 'true',
error: '',
errorsLog
},
health,
currentQueue,
});
});

10
src/modules/utils.ts Normal file
View File

@ -0,0 +1,10 @@
export const parseJSON = (str: string) => {
let parsed = null;
try {
parsed = JSON.parse(str);
if (typeof parsed === 'string') parsed = parseJSON(parsed);
return parsed;
} catch (e) {
return parsed;
}
};

View File

@ -2,7 +2,14 @@ import { getHealthService } from '../services';
import { Processor } from 'bullmq';
export const healthProcessor: Processor = async () => {
const healthService = getHealthService();
const healthService = getHealthService();
try {
await healthService.check();
await healthService.setStatus({ status: true, error: '' });
} catch (e) {
await healthService.saveError(e);
await healthService.setStatus({ status: false, error: e.message });
}
;
};

View File

@ -14,8 +14,7 @@ export const relayerProcessor: RelayerProcessor = async (job) => {
const txData = await txService.prepareTxData(withdrawalData);
return await txService.sendTx(txData);
} catch (e) {
console.log(e);
await job.update({ ...job.data, status: JobStatus.FAILED });
throw new Error(e.message);
throw e;
}
};

View File

@ -5,6 +5,8 @@ import { configService, getHealthService } from '../services';
export const priceWorker = async () => {
await configService.init();
const healthService = getHealthService();
const price = new PriceQueueHelper();
price.scheduler.on('stalled', (jobId, prev) => console.log({ jobId, prev }));
console.log('price worker', price.queue.name);
@ -12,7 +14,7 @@ export const priceWorker = async () => {
price.worker.on('completed', async (job, result) => {
console.log(`Job ${job.id} completed with result: ${result}`);
});
price.worker.on('failed', (job, error) => console.log(error));
price.worker.on('failed', (job, error) => healthService.saveError(error));
};
export const relayerWorker = async () => {
@ -25,7 +27,7 @@ export const relayerWorker = async () => {
});
relayer.worker.on('failed', (job, error) => {
healthService.saveError(error);
console.log(error);
// console.log(error);
});
};
@ -37,5 +39,8 @@ export const healthWorker = async () => {
health.worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result: `, result);
});
health.worker.on('failed', (job, error) => console.log(error));
health.worker.on('failed', (job, error) => {
// console.log(error);
});
};

View File

@ -1,5 +1,6 @@
import {
instances,
mainnetRpcUrl,
minimumBalance,
minimumTornBalance,
netId,
@ -39,6 +40,7 @@ export class ConfigService {
isLightMode: boolean;
instances: NetInstances;
provider: providers.JsonRpcProvider;
mainnentProvider: providers.JsonRpcProvider;
wallet: Wallet;
public readonly netId: availableIds = netId;
public readonly privateKey = privateKey;
@ -50,13 +52,13 @@ export class ConfigService {
private _tokenContract: ERC20Abi;
balances: { MAIN: { warn: string; critical: string; }; TORN: { warn: string; critical: string; }; };
constructor(private store: RedisStore) {
this.netIdKey = `netId${this.netId}`;
this.queueName = `relayer_${this.netId}`;
this.isLightMode = ![1, 5].includes(netId);
this.instances = instances[this.netIdKey];
this.provider = getProvider(false);
this.mainnentProvider = getProvider(false, mainnetRpcUrl, 1);
this.wallet = new Wallet(this.privateKey, this.provider);
this.balances = {
MAIN: { warn: BigNumber.from(minimumBalance).mul(150).div(100).toString(), critical: minimumBalance },
@ -85,18 +87,18 @@ export class ConfigService {
}
}
private async _checkNetwork() {
try {
await this.provider.getNetwork();
} catch (e) {
throw new Error(`Could not detect network, check your rpc url: ${this.rpcUrl}. ` + e.message);
async checkNetwork() {
await this.provider.getNetwork();
if (this.isLightMode) {
await this.mainnentProvider.getNetwork();
}
}
async init() {
try {
if (this.isInit) return;
await this._checkNetwork();
await this.checkNetwork();
this._tokenAddress = await resolve(torn.torn.address);
this._tokenContract = await getTornTokenContract(this._tokenAddress);
if (this.isLightMode) {
@ -130,12 +132,15 @@ export class ConfigService {
this.isInit = true;
} catch (e) {
console.error(`${this.constructor.name} Error:`, e.message);
process.exit(1);
}
}
async clearRedisState() {
const queueKeys = (await this.store.client.keys('bull:*')).filter(s => s.indexOf('relayer') === -1);
await this.store.client.del(queueKeys);
const errorKeys = await this.store.client.keys('errors:*');
// const alertKeys = await this.store.client.keys('alerts:*');
await this.store.client.del([...queueKeys, ...errorKeys]);
}
getInstance(address: string) {

View File

@ -3,27 +3,73 @@ import { ConfigService } from './config.service';
import { RedisStore } from '../modules/redis';
import { formatEther } from 'ethers/lib/utils';
class RelayerError extends Error {
constructor(message: string, code: string) {
super(message);
this.code = code;
}
code: string;
}
@autoInjectable()
export class HealthService {
constructor(private config: ConfigService, private store: RedisStore) {
}
async clearErrors() {
await this.store.client.del('errors');
await this.store.client.del('errors:log', 'errors:code');
}
async getErrors(): Promise<{ message: string, score: number }[]> {
const set = await this.store.client.zrevrange('errors', 0, -1, 'WITHSCORES');
const errors = [];
while (set.length) {
const [message, score] = set.splice(0, 2);
errors.push({ message, score });
private async _getErrors(): Promise<{ errorsLog: { message: string, score: number }[], errorsCode: Record<string, number> }> {
const logSet = await this.store.client.zrevrange('errors:log', 0, -1, 'WITHSCORES');
const codeSet = await this.store.client.zrevrange('errors:code', 0, -1, 'WITHSCORES');
return { errorsLog: HealthService._parseSet(logSet), errorsCode: HealthService._parseSet(codeSet, 'object') };
}
private async _getStatus() {
return this.store.client.hgetall('health:status');
}
private static _parseSet(log, to = 'array', keys = ['message', 'score']) {
let out;
if (to === 'array') {
out = [];
while (log.length) {
const [a, b] = log.splice(0, 2);
out.push({ [keys[0]]: a, [keys[1]]: b });
}
} else {
out = {};
while (log.length) {
const [a, b] = log.splice(0, 2);
out[a] = Number(b);
}
}
return errors;
return out;
}
async setStatus(status: { status: boolean; error: string; }) {
await this.store.client.hset('health:status', status);
}
async getStatus() {
const heathStatus = await this._getStatus();
const { errorsLog, errorsCode } = await this._getErrors();
return {
...heathStatus,
errorsLog,
errorsCode,
};
}
async saveError(e) {
await this.store.client.zadd('errors', 'INCR', 1, e.message);
await this.store.client.zadd('errors:code', 'INCR', 1, e?.code || 'RUNTIME_ERROR');
await this.store.client.zadd('errors:log', 'INCR', 1, e.message);
}
private async _checkBalance(value, currency: 'MAIN' | 'TORN') {
@ -37,26 +83,31 @@ export class HealthService {
level = 'WARN';
}
const isSent = await this.store.client.sismember(`${key}:sent`, `${type}_${currency}_${level}`);
if (!isSent) {
const alert = {
type: `${type}_${currency}_${level}`,
message: `Insufficient balance ${formatEther(value)} ${currency === 'MAIN' ? this.config.nativeCurrency : 'torn'}`,
level,
time,
};
await this.store.client.rpush(key, JSON.stringify(alert));
}
const alert = {
type: `${type}_${currency}_${level}`,
message: `Insufficient balance ${formatEther(value)} ${currency === 'MAIN' ? this.config.nativeCurrency : 'torn'}`,
level,
time,
};
await this.store.client.rpush(key, JSON.stringify(alert));
return alert;
}
async check() {
await this.config.checkNetwork();
const mainBalance = await this.config.wallet.getBalance();
const tornBalance = await this.config.tokenContract.balanceOf(this.config.wallet.address);
// const mainBalance = BigNumber.from(`${2e18}`).add(1);
// const tornBalance = BigNumber.from(`${50e18}`);
await this._checkBalance(mainBalance, 'MAIN');
await this._checkBalance(tornBalance, 'TORN');
// const mainBalance = BigNumber.from(`${1e18}`).add(1);
// const tornBalance = BigNumber.from(`${45e18}`);
const mainStatus = await this._checkBalance(mainBalance, 'MAIN');
const tornStatus = await this._checkBalance(tornBalance, 'TORN');
if (mainStatus.level === 'CRITICAL') {
throw new RelayerError(mainStatus.message, 'INSUFFICIENT_MAIN_BALANCE');
}
if (tornStatus.level === 'CRITICAL') {
throw new RelayerError(tornStatus.message, 'INSUFFICIENT_TORN_BALANCE');
}
}
}

View File

@ -28,6 +28,7 @@ export class PriceService {
async fetchPrices(tokens: Token[]) {
try {
if (!tokens?.length) return;
const names = tokens.reduce((p, c) => {
p[c.address] = c.symbol.toLowerCase();
return p;

View File

@ -11,6 +11,7 @@ import { Job } from 'bullmq';
import { RelayerJobData } from '../queue';
import { ConfigService } from './config.service';
import { container, injectable } from 'tsyringe';
import { parseJSON } from '../modules/utils';
export type WithdrawalData = {
contract: string,
@ -25,6 +26,15 @@ export type WithdrawalData = {
]
}
class ExecutionError extends Error {
constructor(message: string, code?: string) {
super(message);
this.code = code;
}
code: string;
}
@injectable()
export class TxService {
set currentJob(value: Job) {
@ -39,9 +49,9 @@ export class TxService {
constructor(private config: ConfigService, private priceService: PriceService) {
const { privateKey, rpcUrl, netId } = this.config;
this.txManager = new TxManager({ privateKey, rpcUrl, config: { THROW_ON_REVERT: true } });
this.tornadoProxy = this.config.proxyContract;
this.provider = this.tornadoProxy.provider;
this.txManager = new TxManager({ privateKey, rpcUrl, config: { THROW_ON_REVERT: true }, provider: this.provider });
this.oracle = new GasPriceOracle({
defaultRpc: rpcUrl,
chainId: netId,
@ -77,7 +87,11 @@ export class TxService {
} else throw new Error('Submitted transaction failed');
return receipt;
} catch (e) {
throw new Error(e.message);
const regex = /body=("\{.*}}")/;
if (regex.test(e.message)) {
const { error } = parseJSON(regex.exec(e.message)[1]);
throw new ExecutionError(error.message, 'REVERTED');
} else throw e.message;
}
}