diff --git a/src/app/index.ts b/src/app/index.ts index b49a1fd..0789f05 100644 --- a/src/app/index.ts +++ b/src/app/index.ts @@ -13,6 +13,7 @@ const server = createServer(); server.listen(port, '0.0.0.0', async (err, address) => { if (err) throw err; await configService.init(); + await configService.clearRedisState(); await getJobService().setupRepeatableJobs(); await getNotifierService().subscribe(); diff --git a/src/modules/contracts.ts b/src/modules/contracts.ts index 3350396..d697cf0 100644 --- a/src/modules/contracts.ts +++ b/src/modules/contracts.ts @@ -6,11 +6,12 @@ import { TornadoProxyABI__factory, } from '../../contracts'; import { providers } from 'ethers'; -import { multiCallAddress, netId, offchainOracleAddress, mainnetRpcUrl, rpcUrl } from '../config'; +import { mainnetRpcUrl, multiCallAddress, netId, offchainOracleAddress, rpcUrl } from '../config'; export function getProvider(isStatic = true, customRpcUrl?: string, chainId = netId) { - if (isStatic) return new providers.StaticJsonRpcProvider(customRpcUrl || rpcUrl, chainId); - else return new providers.JsonRpcProvider(customRpcUrl || rpcUrl, chainId); + const url = customRpcUrl || rpcUrl; + if (isStatic) return new providers.StaticJsonRpcProvider(url, chainId); + else return new providers.JsonRpcProvider(url, chainId); } @@ -33,6 +34,3 @@ export const getMultiCallContract = () => { export const getTornTokenContract = (tokenAddress: string) => { return ERC20Abi__factory.connect(tokenAddress, getProvider(true, mainnetRpcUrl)); }; -// export const getAggregatorContract = () => { -// return AggregatorAbi__factory.connect(aggregatorAddress, getProvider()); -// }; diff --git a/src/services/config.service.ts b/src/services/config.service.ts index 9454ab9..2e0aceb 100644 --- a/src/services/config.service.ts +++ b/src/services/config.service.ts @@ -24,6 +24,7 @@ import { getAddress } from 'ethers/lib/utils'; import { BigNumber, providers, Wallet } from 'ethers'; import { container, singleton } from 'tsyringe'; import { GasPrice } from 'gas-price-oracle/lib/types'; +import { RedisStore } from '../modules/redis'; type relayerQueueName = `relayer_${availableIds}` @@ -50,7 +51,7 @@ export class ConfigService { balances: { MAIN: { warn: string; critical: string; }; TORN: { warn: string; critical: string; }; }; - constructor() { + constructor(private store: RedisStore) { this.netIdKey = `netId${this.netId}`; this.queueName = `relayer_${this.netId}`; this.isLightMode = ![1, 5].includes(netId); @@ -132,6 +133,11 @@ export class ConfigService { } } + async clearRedisState() { + const queueKeys = (await this.store.client.keys('bull:*')).filter(s => s.indexOf('relayer') === -1); + await this.store.client.del(queueKeys); + } + getInstance(address: string) { return this.addressMap.get(getAddress(address)); } diff --git a/src/services/health.service.ts b/src/services/health.service.ts index 1dabbea..065e189 100644 --- a/src/services/health.service.ts +++ b/src/services/health.service.ts @@ -29,7 +29,7 @@ export class HealthService { private async _checkBalance(value, currency: 'MAIN' | 'TORN') { let level = 'OK'; const type = 'BALANCE'; - const key = 'alerts'; + const key = 'alerts:list'; const time = new Date().getTime(); if (value.lt(this.config.balances[currency].critical)) { level = 'CRITICAL'; diff --git a/src/services/job.service.ts b/src/services/job.service.ts index d09eac1..460f33d 100644 --- a/src/services/job.service.ts +++ b/src/services/job.service.ts @@ -49,15 +49,8 @@ export class JobService { } async setupRepeatableJobs() { - await this._clearSchedulerJobs(); await this.price.addRepeatable(this.config.tokens); await this.health.addRepeatable(); - // await this.schedulerQ.add('checkBalance', null, { - // repeat: { - // every: 30000, - // immediately: true, - // }, - // }); } } diff --git a/src/services/notifier.service.ts b/src/services/notifier.service.ts index 43ed455..d1b9358 100644 --- a/src/services/notifier.service.ts +++ b/src/services/notifier.service.ts @@ -34,25 +34,27 @@ export class NotifierService { async processAlert(message: string) { const alert = JSON.parse(message); - const [a, b] = alert.type.split('_'); - if (alert.level === 'OK') { - this.store.client.srem('alerts:sent', ...['WARN', 'CRITICAL'].map(l => `${a}_${b}_${l}`)); - } else { - await this.send(alert.message, alert.level); - this.store.client.sadd('alerts:sent', alert.type); + const [a, b, c] = alert.type.split('_'); + const isSent = await this.store.client.sismember('alerts:sent', `${a}_${b}_${c}`); + if (!isSent) { + if (alert.level === 'OK') { + this.store.client.srem('alerts:sent', ...['WARN', 'CRITICAL'].map(c => `${a}_${b}_${c}`)); + } else { + await this.send(alert.message, alert.level); + this.store.client.sadd('alerts:sent', alert.type); + } } } async subscribe() { const sub = await this.store.subscriber; - sub.subscribe('__keyspace@0__:alerts', 'rpush'); + sub.subscribe('__keyspace@0__:alerts:list', 'rpush'); sub.on('message', async (channel, event) => { if (event === 'rpush') { - const messages = await this.store.client.brpop('alerts', 10); - while (messages.length) { - const [, message] = messages.splice(0, 2); - await this.processAlert(message); - } + const messages = await this.store.client.rpop('alerts:list', 10); + messages?.forEach(message => { + this.processAlert(message); + }); } }); }