From 7557158835a47da476ae51b79c8cf0fd3f3a19f4 Mon Sep 17 00:00:00 2001
From: smart_ex <sergei.smart@gmail.com>
Date: Tue, 5 Jul 2022 22:19:38 +1000
Subject: [PATCH] error handling

---
 src/queue/index.ts             | 14 ++++++++++----
 src/queue/relayer.processor.ts | 18 +++++++++++++++++-
 src/queue/worker.ts            |  5 +++--
 src/services/tx.service.ts     | 10 +++++++---
 4 files changed, 37 insertions(+), 10 deletions(-)

diff --git a/src/queue/index.ts b/src/queue/index.ts
index 577b512..1243d16 100644
--- a/src/queue/index.ts
+++ b/src/queue/index.ts
@@ -32,6 +32,7 @@ export class PriceQueueHelper {
   _queue: Queue<PriceJobData, PriceJobReturn, 'updatePrice'>;
   _worker: Worker<PriceJobData, PriceJobReturn, 'updatePrice'>;
   _scheduler: QueueScheduler;
+  interval = 30000;
 
   constructor(private store?: RedisStore) {}
 
@@ -70,7 +71,7 @@ export class PriceQueueHelper {
   async addRepeatable(tokens: PriceJobData) {
     await this.queue.add('updatePrice', tokens, {
       repeat: {
-        every: 30000,
+        every: this.interval,
         immediately: true,
       },
     });
@@ -89,7 +90,11 @@ export class RelayerQueueHelper {
     if (!this._queue) {
       this._queue = new Queue<RelayerJobData, RelayerJobReturn, RelayerJobType>(this.config.queueName, {
         connection: this.store.client,
-        defaultJobOptions: { stackTraceLimit: 100 },
+        defaultJobOptions: {
+          stackTraceLimit: 100,
+          attempts: 3,
+          backoff: 1000,
+        },
       });
     }
     return this._queue;
@@ -120,8 +125,9 @@ export class HealthQueueHelper {
   private _queue: Queue<HealthJobData, HealthJobReturn, 'checkHealth'>;
   private _worker: Worker<HealthJobData, HealthJobReturn, 'checkHealth'>;
   private _scheduler: QueueScheduler;
+  interval = 30000;
 
-  constructor(private store?: RedisStore, private config?: ConfigService) {}
+  constructor(private store?: RedisStore) {}
 
   get scheduler(): QueueScheduler {
     if (!this._scheduler) {
@@ -155,7 +161,7 @@ export class HealthQueueHelper {
   async addRepeatable() {
     await this.queue.add('checkHealth', null, {
       repeat: {
-        every: 30000,
+        every: this.interval,
         immediately: true,
       },
     });
diff --git a/src/queue/relayer.processor.ts b/src/queue/relayer.processor.ts
index e48bd14..c8ef1e5 100644
--- a/src/queue/relayer.processor.ts
+++ b/src/queue/relayer.processor.ts
@@ -1,6 +1,19 @@
 import { RelayerProcessor } from './index';
 import { getTxService } from '../services';
 import { JobStatus } from '../types';
+import { UnrecoverableError } from 'bullmq';
+import { ExecutionError } from '../services/tx.service';
+
+class RevertError extends UnrecoverableError {
+  code: string;
+
+  constructor(message: string, code: string) {
+    super(message);
+    this.name = this.constructor.name;
+    this.code = code;
+    Object.setPrototypeOf(this, new.target.prototype);
+  }
+}
 
 export const relayerProcessor: RelayerProcessor = async (job) => {
   try {
@@ -13,7 +26,10 @@ export const relayerProcessor: RelayerProcessor = async (job) => {
     const txData = await txService.prepareTxData(withdrawalData);
     return await txService.sendTx(txData);
   } catch (e) {
-    await job.update({ ...job.data, status: JobStatus.FAILED });
+    if (e instanceof ExecutionError && e.code === 'REVERTED') {
+      await job.update({ ...job.data, status: JobStatus.FAILED });
+      throw new RevertError(e.message, e.code);
+    }
     throw e;
   }
 };
diff --git a/src/queue/worker.ts b/src/queue/worker.ts
index 41ded48..9116298 100644
--- a/src/queue/worker.ts
+++ b/src/queue/worker.ts
@@ -27,8 +27,9 @@ export const relayerWorker = async () => {
   });
   relayer.worker.on('failed', (job, error) => {
     healthService.saveError(error, job.id);
-    console.log(error);
+    console.log(`Failed job ${job.id}: `, error);
   });
+  relayer.scheduler.on('stalled', (jobId, prev) => console.log({ jobId, prev }));
 };
 
 export const healthWorker = async () => {
@@ -42,6 +43,6 @@ export const healthWorker = async () => {
     console.log(`Job ${job.name} completed with result: `, result);
   });
   health.worker.on('failed', (job, error) => {
-    console.log(error);
+    console.log(`Failed job ${job.id}: `, error);
   });
 };
diff --git a/src/services/tx.service.ts b/src/services/tx.service.ts
index 9a4867b..242a2ee 100644
--- a/src/services/tx.service.ts
+++ b/src/services/tx.service.ts
@@ -19,7 +19,7 @@ export type WithdrawalData = {
   args: [BytesLike, BytesLike, string, string, BigNumberish, BigNumberish];
 };
 
-class ExecutionError extends Error {
+export class ExecutionError extends Error {
   constructor(message: string, code?: string) {
     super(message);
     this.code = code;
@@ -83,7 +83,9 @@ export class TxService {
         });
       if (receipt.status === 1) {
         await this.updateJobData({ status: JobStatus.CONFIRMED });
-      } else throw new ExecutionError('Submitted transaction failed', 'REVERTED');
+      } else {
+        throw new ExecutionError('Submitted transaction failed', 'REVERTED');
+      }
       return receipt;
     } catch (e) {
       const regex = /body=("\{.*}}")/;
@@ -138,7 +140,9 @@ export class TxService {
   async getGasPrice(): Promise<BigNumber> {
     const gasPrices = await this.oracle.gasPrices();
     let gasPrice = gasPrices['fast'];
-    if ('maxFeePerGas' in gasPrices) gasPrice = gasPrices['maxFeePerGas'];
+    if ('maxFeePerGas' in gasPrices) {
+      gasPrice = gasPrices['maxFeePerGas'];
+    }
     return parseUnits(String(gasPrice), 'gwei');
   }
 }