From 8bc5b7be9ebed4d81f44d2a1b1fd02b2d15de642 Mon Sep 17 00:00:00 2001
From: smart_ex <sergei.smart@gmail.com>
Date: Wed, 18 May 2022 18:18:10 +1000
Subject: [PATCH] wip. relayer job flow. server api

---
 .eslintrc.json                        |  3 +-
 .nvmrc                                |  1 +
 package.json                          | 15 +++--
 src/api/routes.ts                     | 54 -----------------
 src/{api => app}/index.ts             | 13 +++-
 src/{api => app}/plugins/validator.ts | 23 +++----
 src/app/routes.ts                     | 61 +++++++++++++++++++
 src/{api => app}/schema.ts            | 47 ++++++++++++---
 src/{api => app}/server.ts            |  8 ++-
 src/config.ts                         |  8 +--
 src/modules/EnsResolver.ts            | 25 --------
 src/modules/contracts.ts              | 32 +++++-----
 src/modules/ensResolver.ts            | 19 ++++++
 src/modules/index.ts                  |  3 +-
 src/modules/readJSON.ts               | 12 ----
 src/modules/redis.ts                  |  4 +-
 src/queue/index.ts                    | 50 +++++++++++++---
 src/queue/priceProcessor.ts           |  8 ---
 src/queue/relayerProcessor.ts         |  6 +-
 src/queue/schedulerProcessor.ts       | 15 +++++
 src/queue/worker.ts                   | 21 +++++--
 src/services/ConfigService.ts         | 86 ++++++++++++++++++++-------
 src/services/JobService.ts            | 70 ++++++++++++++++++++++
 src/services/PriceService.ts          | 14 ++++-
 src/services/TxService.ts             | 84 ++++++++++++++++++++++----
 src/services/index.ts                 |  3 +-
 src/types.ts                          |  4 +-
 src/worker.ts                         |  4 +-
 tsconfig.json                         |  2 +-
 29 files changed, 485 insertions(+), 210 deletions(-)
 create mode 100644 .nvmrc
 delete mode 100644 src/api/routes.ts
 rename src/{api => app}/index.ts (55%)
 rename src/{api => app}/plugins/validator.ts (70%)
 create mode 100644 src/app/routes.ts
 rename src/{api => app}/schema.ts (62%)
 rename src/{api => app}/server.ts (67%)
 delete mode 100644 src/modules/EnsResolver.ts
 create mode 100644 src/modules/ensResolver.ts
 delete mode 100644 src/modules/readJSON.ts
 delete mode 100644 src/queue/priceProcessor.ts
 create mode 100644 src/queue/schedulerProcessor.ts
 create mode 100644 src/services/JobService.ts

diff --git a/.eslintrc.json b/.eslintrc.json
index 301960a..402c892 100644
--- a/.eslintrc.json
+++ b/.eslintrc.json
@@ -6,7 +6,8 @@
   },
   "extends": [
     "eslint:recommended",
-    "plugin:@typescript-eslint/recommended"
+    "plugin:@typescript-eslint/recommended",
+    "plugin:security/recommended"
   ],
   "parser": "@typescript-eslint/parser",
   "parserOptions": {
diff --git a/.nvmrc b/.nvmrc
new file mode 100644
index 0000000..04c9e2b
--- /dev/null
+++ b/.nvmrc
@@ -0,0 +1 @@
+v16.15
diff --git a/package.json b/package.json
index 529b0f2..a33cfe4 100644
--- a/package.json
+++ b/package.json
@@ -1,10 +1,10 @@
 {
   "name": "relay",
-  "version": "4.1.3",
+  "version": "5.0.0",
   "description": "Relayer for Tornado.cash privacy solution. https://tornado.cash",
   "scripts": {
-    "dev:app": "nodemon --watch './src/**/*.ts' --exec ts-node src/api/index.ts",
-    "server": "node src/api/server.ts",
+    "dev:app": "nodemon --watch './src/**/*.ts' --exec ts-node src/app/index.ts",
+    "server": "node src/app/server.ts",
     "worker": "node src/worker",
     "treeWatcher": "node src/treeWatcher",
     "priceWatcher": "node src/priceWatcher",
@@ -19,14 +19,15 @@
   "author": "tornado.cash",
   "license": "MIT",
   "dependencies": {
+    "@fastify/cors": "^7.0.0",
+    "@fastify/helmet": "^8.0.1",
+    "@fastify/sensible": "^4.1.0",
     "bullmq": "^1.80.6",
     "dotenv": "^8.2.0",
     "eth-ens-namehash": "^2.0.8",
     "ethers": "^5.6.4",
     "fastify": "^3.28.0",
-    "fastify-cors": "^6.0.3",
-    "fixed-merkle-tree": "^0.7.3",
-    "gas-price-oracle": "^0.3.5",
+    "gas-price-oracle": "^0.4.6",
     "ioredis": "^4.14.1",
     "json-schema-to-ts": "^2.2.0",
     "node-fetch": "^2.6.7",
@@ -40,12 +41,14 @@
   "devDependencies": {
     "@typechain/ethers-v5": "^10.0.0",
     "@types/ioredis": "^4.28.10",
+    "@types/uuid": "^8.3.4",
     "@typescript-eslint/eslint-plugin": "^5.20.0",
     "@typescript-eslint/parser": "^5.20.0",
     "chai": "^4.2.0",
     "eslint": "^8.14.0",
     "eslint-config-prettier": "^6.12.0",
     "eslint-plugin-prettier": "^3.1.4",
+    "eslint-plugin-security": "^1.5.0",
     "mocha": "^8.1.3",
     "nodemon": "^2.0.15",
     "ts-node": "^10.7.0",
diff --git a/src/api/routes.ts b/src/api/routes.ts
deleted file mode 100644
index 88ca840..0000000
--- a/src/api/routes.ts
+++ /dev/null
@@ -1,54 +0,0 @@
-import { FastifyInstance } from 'fastify';
-import { statusSchema, withdrawBodySchema } from './schema';
-import { FromSchema } from 'json-schema-to-ts';
-import { rewardAccount, tornadoServiceFee } from '../config';
-import { version } from '../../package.json';
-import { configService } from '../services';
-
-export function mainHandler(server: FastifyInstance, options, next) {
-  server.get('/',
-    async (req, res) => {
-      res.send('hello fellows');
-    });
-
-  server.get('/status',
-    { schema: statusSchema },
-    async (req, res) => {
-      server.log.info(req.method, 'status');
-      res.send({
-        rewardAccount,
-        instances: configService.instances,
-        netId: configService.netId,
-        ethPrices: {
-          dai: '488750716084282',
-          cdai: '10750196909100',
-          usdc: '488744421966526',
-          usdt: '486409579105158',
-          wbtc: '14586361452511510343',
-          torn: '18624781058055820',
-        },
-        tornadoServiceFee,
-        miningServiceFee: 0,
-        version,
-        health: {
-          status: true,
-          error: '',
-        },
-        currentQueue: 0,
-      });
-    });
-  next();
-}
-
-export function relayerHandler(server: FastifyInstance, options, next) {
-  server.get('/jobs/:id',
-    async (req, res) => {
-      res.send({});
-    });
-  server.post<{ Body: FromSchema<typeof withdrawBodySchema> }>('/tornadoWithdraw',
-    async (req, res) => {
-      console.log(req.body);
-      res.send({});
-    });
-  next();
-}
diff --git a/src/api/index.ts b/src/app/index.ts
similarity index 55%
rename from src/api/index.ts
rename to src/app/index.ts
index 0a57d59..76aa5a4 100644
--- a/src/api/index.ts
+++ b/src/app/index.ts
@@ -2,14 +2,23 @@ import createServer from './server';
 import { utils } from 'ethers';
 import { port, rewardAccount } from '../config';
 import { version } from '../../package.json';
+import { configService, getJobService } from '../services';
 
 
 if (!utils.isAddress(rewardAccount)) {
   throw new Error('No REWARD_ACCOUNT specified');
 }
 const server = createServer();
-
-server.listen(port, '0.0.0.0', (err, address) => {
+server.listen(port, '0.0.0.0', async (err, address) => {
   if (err) throw err;
+  await configService.init();
+  await getJobService().setupRepeatableJobs();
   console.log(`Relayer ${version} started on port ${address}`);
 });
+
+process
+  .on('uncaughtException', (e) => {
+    console.log('uncaughtException', e);
+    process.exit(1);
+  });
+
diff --git a/src/api/plugins/validator.ts b/src/app/plugins/validator.ts
similarity index 70%
rename from src/api/plugins/validator.ts
rename to src/app/plugins/validator.ts
index 8f7f54c..0cde28b 100644
--- a/src/api/plugins/validator.ts
+++ b/src/app/plugins/validator.ts
@@ -2,6 +2,7 @@ import Ajv from 'ajv';
 import fp from 'fastify-plugin';
 import { rewardAccount } from '../../config';
 import { getAddress, isAddress } from 'ethers/lib/utils';
+import { configService } from '../../services';
 
 export default fp(async server => {
   const ajv = new Ajv();
@@ -17,16 +18,16 @@ export default fp(async server => {
     errors: true,
   });
 
-  // ajv.addKeyword('isKnownContract', {
-  //   validate: (schema, data) => {
-  //     try {
-  //       return !!getInstance(data);
-  //     } catch (e) {
-  //       return false;
-  //     }
-  //   },
-  //   errors: true,
-  // });
+  ajv.addKeyword('isKnownContract', {
+    validate: (schema, data) => {
+      try {
+        return !!configService.getInstance(data);
+      } catch (e) {
+        return false;
+      }
+    },
+    errors: true,
+  });
 
   ajv.addKeyword('isFeeRecipient', {
     validate: (schema, data) => {
@@ -39,7 +40,7 @@ export default fp(async server => {
     errors: true,
   });
 
-  server.setValidatorCompiler(({ schema, method, url, httpPart }) => {
+  server.setValidatorCompiler(({ schema }) => {
     return ajv.compile(schema);
   });
   console.log('validator plugin registered');
diff --git a/src/app/routes.ts b/src/app/routes.ts
new file mode 100644
index 0000000..636d86e
--- /dev/null
+++ b/src/app/routes.ts
@@ -0,0 +1,61 @@
+import { FastifyInstance } from 'fastify';
+import { jobsSchema, statusSchema, withdrawBodySchema, withdrawSchema } from './schema';
+import { FromSchema } from 'json-schema-to-ts';
+import { rewardAccount, tornadoServiceFee } from '../config';
+import { version } from '../../package.json';
+import { configService, getJobService, getPriceService } from '../services';
+import { JobType } from '../types';
+
+const priceService = getPriceService();
+const jobService = getJobService();
+
+export function mainHandler(server: FastifyInstance, options, next) {
+  server.get('/',
+    async (req, res) => {
+      res.type('text/html')
+        .send('<h1>This is <a href=https://tornado.cash>tornado.cash</a> Relayer service.' +
+          ' Check the <a href=/v1/status>/status</a> for settings</h1>');
+    });
+
+  server.get('/status',
+    { schema: statusSchema },
+    async (req, res) => {
+      const ethPrices = await priceService.getPrices();
+      const currentQueue = await jobService.getQueueCount();
+      console.log(currentQueue);
+      res.send({
+        rewardAccount,
+        instances: configService.instances,
+        netId: configService.netId,
+        ethPrices,
+        tornadoServiceFee,
+        miningServiceFee: 0,
+        version,
+        health: {
+          status: true,
+          error: '',
+        },
+        currentQueue,
+      });
+    });
+  next();
+}
+
+export function relayerHandler(server: FastifyInstance, options, next) {
+  server.get<{ Params: { id: string } }>('/jobs/:id',
+    { schema: jobsSchema },
+    async (req, res) => {
+      const job = await jobService.getJob(req.params.id);
+      if (!job) return server.httpErrors.notFound();
+      res.send({ ...job.data, failedReason: job.failedReason });
+    });
+
+  server.post<{ Body: FromSchema<typeof withdrawBodySchema> }>('/tornadoWithdraw',
+    { schema: withdrawSchema },
+    async (req, res) => {
+      console.log(req.body);
+      const id = await jobService.postJob(JobType.TORNADO_WITHDRAW, req.body);
+      res.send({ id });
+    });
+  next();
+}
diff --git a/src/api/schema.ts b/src/app/schema.ts
similarity index 62%
rename from src/api/schema.ts
rename to src/app/schema.ts
index c69c29f..b04a173 100644
--- a/src/api/schema.ts
+++ b/src/app/schema.ts
@@ -1,10 +1,23 @@
-const addressType = { type: 'string', pattern: '^0x[a-fA-F0-9]{40}$', isAddress: true } as const;
+const addressType = {
+  type: 'string',
+  pattern: '^0x[a-fA-F0-9]{40}$',
+  isAddress: true,
+} as const;
 const proofType = { type: 'string', pattern: '^0x[a-fA-F0-9]{512}$' } as const;
 // const encryptedAccountType = { type: 'string', pattern: '^0x[a-fA-F0-9]{392}$' } as const;
 const bytes32Type = { type: 'string', pattern: '^0x[a-fA-F0-9]{64}$' } as const;
 const instanceType = { ...addressType, isKnownContract: true } as const;
 const relayerType = { ...addressType, isFeeRecipient: true } as const;
 
+export const idParamsSchema = {
+  type: 'object',
+  properties: {
+    id: { type: 'string', format: 'uuid' },
+  },
+  required: ['id'],
+  additionalProperties: false,
+} as const;
+
 export const withdrawBodySchema = {
   type: 'object',
   properties: {
@@ -14,22 +27,40 @@ export const withdrawBodySchema = {
       type: 'array',
       maxItems: 6,
       minItems: 6,
-      items: [bytes32Type, bytes32Type, addressType, relayerType, bytes32Type, bytes32Type],
+      items: [
+        bytes32Type,
+        bytes32Type,
+        addressType,
+        relayerType,
+        bytes32Type,
+        bytes32Type,
+      ],
     },
   },
   additionalProperties: false,
   required: ['proof', 'contract', 'args'],
 } as const;
 
+export const jobsResponseSchema = {
+  ...withdrawBodySchema,
+  properties: {
+    id: { type: 'string' },
+    status: { type: 'string' },
+    ...withdrawBodySchema.properties,
+    failedReason: { type: 'string' },
+  },
+} as const;
+
+export const jobsSchema = {
+  params: idParamsSchema,
+  response: {
+    200: jobsResponseSchema,
+  },
+};
 export const withdrawSchema = {
   body: withdrawBodySchema,
   response: {
-    200: {
-      type: 'object',
-      properties: {
-        jobId: { type: 'string', format: 'uuid' },
-      },
-    },
+    200: idParamsSchema,
   },
 };
 const statusResponseSchema = {
diff --git a/src/api/server.ts b/src/app/server.ts
similarity index 67%
rename from src/api/server.ts
rename to src/app/server.ts
index c65de94..8706491 100644
--- a/src/api/server.ts
+++ b/src/app/server.ts
@@ -1,5 +1,8 @@
 import fastify from 'fastify';
-import cors from 'fastify-cors';
+import cors from '@fastify/cors';
+import fastifySensible from '@fastify/sensible';
+import helmet from '@fastify/helmet';
+
 import validator from './plugins/validator';
 import { mainHandler, relayerHandler } from './routes';
 
@@ -12,7 +15,10 @@ function createServer() {
   });
   server.register(cors);
   server.register(validator);
+  server.register(helmet, { contentSecurityPolicy: false, frameguard: true });
+  server.register(fastifySensible);
   server.register(mainHandler);
+  server.register(mainHandler, { prefix: '/v1' });
   server.register(relayerHandler, { prefix: '/v1' });
 
 
diff --git a/src/config.ts b/src/config.ts
index 820c622..1ec8381 100644
--- a/src/config.ts
+++ b/src/config.ts
@@ -1,4 +1,4 @@
-import { jobType } from './types';
+import { JobType } from './types';
 import tornConfig, { availableIds } from 'torn-token';
 
 require('dotenv').config();
@@ -22,10 +22,10 @@ export const rewardAccount = process.env.REWARD_ACCOUNT;
 export const governanceAddress = '0x5efda50f22d34F262c29268506C5Fa42cB56A1Ce';
 export const tornadoGoerliProxy = '0x454d870a72e29d5E5697f635128D18077BD04C60';
 export const gasLimits = {
-  [jobType.TORNADO_WITHDRAW]: 390000,
+  [JobType.TORNADO_WITHDRAW]: 390000,
   WITHDRAW_WITH_EXTRA: 700000,
-  [jobType.MINING_REWARD]: 455000,
-  [jobType.MINING_WITHDRAW]: 400000,
+  [JobType.MINING_REWARD]: 455000,
+  [JobType.MINING_WITHDRAW]: 400000,
 };
 export const minimumBalance = '1000000000000000000';
 export const baseFeeReserve = Number(process.env.BASE_FEE_RESERVE_PERCENTAGE);
diff --git a/src/modules/EnsResolver.ts b/src/modules/EnsResolver.ts
deleted file mode 100644
index 48a587b..0000000
--- a/src/modules/EnsResolver.ts
+++ /dev/null
@@ -1,25 +0,0 @@
-import { Provider } from '@ethersproject/providers';
-
-export default class EnsResolver {
-  addresses: Map<string, string>;
-  provider: Provider;
-
-  constructor(provider: Provider) {
-    this.addresses = new Map<string, string>();
-    this.provider = provider;
-  }
-
-  async resolve(domain: string) {
-    try {
-      if (!this.addresses.has(domain)) {
-        const resolved = await this.provider.resolveName(domain);
-        this.addresses.set(domain, resolved);
-      }
-      return this.addresses.get(domain);
-    } catch (e) {
-      console.log(e);
-      return null;
-    }
-  }
-}
-
diff --git a/src/modules/contracts.ts b/src/modules/contracts.ts
index 40dc995..dfa5c7a 100644
--- a/src/modules/contracts.ts
+++ b/src/modules/contracts.ts
@@ -1,31 +1,33 @@
 import {
-  AggregatorAbi__factory,
   MulticallAbi__factory,
-  OffchainOracleAbi__factory, ProxyLightABI__factory,
+  OffchainOracleAbi__factory,
+  ProxyLightABI__factory,
   TornadoProxyABI__factory,
 } from '../../contracts';
 import { providers } from 'ethers';
-import { aggregatorAddress, httpRpcUrl, multiCallAddress, netId, offchainOracleAddress } from '../config';
-import { configService } from '../services';
+import { httpRpcUrl, multiCallAddress, netId, offchainOracleAddress, oracleRpcUrl } from '../config';
 
-export function getProvider() {
-  return new providers.StaticJsonRpcProvider(httpRpcUrl, netId);
+export function getProvider(isStatic = true, rpcUrl?: string) {
+  if (isStatic) return new providers.StaticJsonRpcProvider(rpcUrl || httpRpcUrl, netId);
+  else return new providers.JsonRpcProvider(rpcUrl || httpRpcUrl, netId);
 }
 
-export const getTornadoProxyContract = () => {
-  return TornadoProxyABI__factory.connect(configService.proxyAddress, getProvider());
+export const getTornadoProxyContract = (proxyAddress: string) => {
+  return TornadoProxyABI__factory.connect(proxyAddress, getProvider());
 };
-export const getTornadoProxyLightContract = () => {
-  return ProxyLightABI__factory.connect(configService.proxyAddress, getProvider());
-};
-export const getAggregatorContract = () => {
-  return AggregatorAbi__factory.connect(aggregatorAddress, getProvider());
+export const getTornadoProxyLightContract = (proxyAddress: string) => {
+  return ProxyLightABI__factory.connect(proxyAddress, getProvider());
 };
 
+
 export const getOffchainOracleContract = () => {
-  return OffchainOracleAbi__factory.connect(offchainOracleAddress, getProvider());
+  return OffchainOracleAbi__factory.connect(offchainOracleAddress, getProvider(true, oracleRpcUrl));
 };
 
 export const getMultiCallContract = () => {
-  return MulticallAbi__factory.connect(multiCallAddress, getProvider());
+  return MulticallAbi__factory.connect(multiCallAddress, getProvider(true, oracleRpcUrl));
 };
+
+// export const getAggregatorContract = () => {
+//   return AggregatorAbi__factory.connect(aggregatorAddress, getProvider());
+// };
diff --git a/src/modules/ensResolver.ts b/src/modules/ensResolver.ts
new file mode 100644
index 0000000..c98fee4
--- /dev/null
+++ b/src/modules/ensResolver.ts
@@ -0,0 +1,19 @@
+import { getProvider } from './contracts';
+
+const addresses = new Map<string, string>();
+const provider = getProvider();
+
+async function resolve(domain: string) {
+  try {
+    if (!addresses.has(domain)) {
+      const resolved = await provider.resolveName(domain);
+      addresses.set(domain, resolved);
+    }
+    return addresses.get(domain);
+  } catch (e) {
+    console.log(e);
+    return null;
+  }
+}
+
+export { resolve };
diff --git a/src/modules/index.ts b/src/modules/index.ts
index 4a9ef30..1c58ffa 100644
--- a/src/modules/index.ts
+++ b/src/modules/index.ts
@@ -1,3 +1,2 @@
 export { default as redis } from './redis';
-export { default as EnsResolver } from './EnsResolver';
-export { default as readJSON } from './readJSON';
+export { resolve } from './ensResolver';
diff --git a/src/modules/readJSON.ts b/src/modules/readJSON.ts
deleted file mode 100644
index 5b16320..0000000
--- a/src/modules/readJSON.ts
+++ /dev/null
@@ -1,12 +0,0 @@
-import fs from 'fs/promises';
-import path from 'path';
-
-export default async (pathToFile: string) => {
-  try {
-    const file = await fs.readFile(path.resolve(__dirname, pathToFile), { encoding: 'utf8' });
-    return JSON.parse(file);
-  } catch (e) {
-    console.log(e);
-    return null;
-  }
-};
diff --git a/src/modules/redis.ts b/src/modules/redis.ts
index 004142c..017bd97 100644
--- a/src/modules/redis.ts
+++ b/src/modules/redis.ts
@@ -4,7 +4,9 @@ import { redisUrl } from '../config';
 const redisClient = new Redis(redisUrl, { maxRetriesPerRequest: null });
 const redisSubscriber = new Redis(redisUrl, { maxRetriesPerRequest: null });
 
-export const getClient = () => redisClient;
+export const getClient = () => redisClient.on('error', (error) => {
+  throw error;
+});
 export const getSubscriber = () => redisSubscriber;
 
 export default { getClient, getSubscriber };
diff --git a/src/queue/index.ts b/src/queue/index.ts
index fbc7c5b..4fdf100 100644
--- a/src/queue/index.ts
+++ b/src/queue/index.ts
@@ -1,14 +1,48 @@
-import { Queue, Worker } from 'bullmq';
+import { Job, Processor, Queue, Worker } from 'bullmq';
 import { redis } from '../modules';
-import { priceProcessor } from './priceProcessor';
-import { Token } from '../types';
-import { netId } from '../config';
+import { JobStatus, JobType, Token } from '../types';
 import { relayerProcessor } from './relayerProcessor';
+import { WithdrawalData } from '../services/TxService';
+import { schedulerProcessor } from './schedulerProcessor';
+import { configService } from '../services';
+import { BigNumber } from 'ethers';
 
 const connection = redis.getClient();
 
-export const priceQueue = new Queue<Token[], any>('price', { connection });
-export const getPriceWorker = () => new Worker(priceQueue.name, priceProcessor, { connection });
+export type SchedulerJobProcessors = {
+  updatePrices: Processor,
+  checkBalance: Processor
+}
+
+type SchedulerJobName = keyof SchedulerJobProcessors
+type SchedulerJobData = Token[] | null
+type SchedulerJobReturn = Record<string, string> | { balance: BigNumber, isEnought: boolean }
+type RelayerJobData = WithdrawalData & { id: string, status: JobStatus, type: JobType }
+type RelayerJobReturn = void
+
+// export interface SchedulerProcessor {
+//   <U extends SchedulerJobName>(job: Job<SchedulerJobData, SchedulerJobReturn, U>): SchedulerJobProcessors[U];
+//
+// }
+
+
+export interface RelayerProcessor {
+  (job: Job<RelayerJobData, RelayerJobReturn, JobType>): Promise<void>;
+}
+
+export const schedulerQueue = new Queue<Token[], any, SchedulerJobName>('scheduler', {
+  connection,
+  defaultJobOptions: {
+    removeOnFail: 10,
+    removeOnComplete: 10,
+  },
+});
+export const getSchedulerWorker = () => new Worker<SchedulerJobData, SchedulerJobReturn, SchedulerJobName>(schedulerQueue.name, (job) => schedulerProcessor(job), {
+  connection,
+  concurrency: 3,
+});
+
+export const relayerQueue = new Queue<RelayerJobData, RelayerJobReturn, JobType>(configService.queueName, { connection });
+export const getRelayerWorker = () => new Worker<RelayerJobData, RelayerJobReturn, JobType>(relayerQueue.name, relayerProcessor, { connection });
+
 
-export const relayerQueue = new Queue(`relayer_${netId}`, { connection });
-export const getRelayerWorker = () => new Worker(relayerQueue.name, relayerProcessor, { connection });
diff --git a/src/queue/priceProcessor.ts b/src/queue/priceProcessor.ts
deleted file mode 100644
index 99d9642..0000000
--- a/src/queue/priceProcessor.ts
+++ /dev/null
@@ -1,8 +0,0 @@
-import { priceService } from '../services';
-import { Job } from 'bullmq';
-
-export const priceProcessor = async (job: Job) => {
-  const prices = await priceService.fetchPrices(job.data);
-  console.log(job.name, prices);
-  return prices;
-};
diff --git a/src/queue/relayerProcessor.ts b/src/queue/relayerProcessor.ts
index 311ee92..1ee3e28 100644
--- a/src/queue/relayerProcessor.ts
+++ b/src/queue/relayerProcessor.ts
@@ -1,6 +1,6 @@
-import { Job } from 'bullmq';
+import { RelayerProcessor } from './index';
+
+export const relayerProcessor: RelayerProcessor = async (job) => {
 
-export const relayerProcessor = async (job: Job) => {
   console.log(job.data);
-  return {};
 };
diff --git a/src/queue/schedulerProcessor.ts b/src/queue/schedulerProcessor.ts
new file mode 100644
index 0000000..3e9ad38
--- /dev/null
+++ b/src/queue/schedulerProcessor.ts
@@ -0,0 +1,15 @@
+import { configService, getPriceService } from '../services';
+import { Processor } from 'bullmq';
+
+export const schedulerProcessor: Processor = async (job) => {
+  switch (job.name) {
+  case 'updatePrices': {
+    const result = await getPriceService().fetchPrices(job.data);
+    return result;
+  }
+  case 'checkBalance': {
+    console.log(job.data);
+    return await configService.getBalance();
+  }
+  }
+};
diff --git a/src/queue/worker.ts b/src/queue/worker.ts
index 837d808..a30e077 100644
--- a/src/queue/worker.ts
+++ b/src/queue/worker.ts
@@ -1,11 +1,22 @@
-import { getPriceWorker, getRelayerWorker } from './';
+import { getRelayerWorker, getSchedulerWorker } from './';
+import { configService, getPriceService } from '../services';
 
 
-export default async () => {
-  const priceWorker = getPriceWorker();
-  priceWorker.on('completed', (job, result) => console.log(result));
-  priceWorker.on('failed', (job, error) => console.log(error));
+export const schedulerWorker = async () => {
+  await configService.init();
+  const priceService = getPriceService();
+  const schedulerWorkerWorker = getSchedulerWorker();
+  console.log('price worker');
+  schedulerWorkerWorker.on('active', () => console.log('worker active'));
+  schedulerWorkerWorker.on('completed', async (job, result) => {
+    if (job.name === 'updatePrices') {
+      // await priceService.savePrices(result);
+    }
+  });
+  schedulerWorkerWorker.on('failed', (job, error) => console.log(error));
+};
 
+export const relayerWorker = async () => {
   const relayerWorker = getRelayerWorker();
   relayerWorker.on('completed', (job, result) => console.log(result));
   relayerWorker.on('failed', (job, error) => console.log(error));
diff --git a/src/services/ConfigService.ts b/src/services/ConfigService.ts
index d3bc04d..572b37c 100644
--- a/src/services/ConfigService.ts
+++ b/src/services/ConfigService.ts
@@ -1,18 +1,28 @@
-import { httpRpcUrl, instances, netId, privateKey, torn, tornadoGoerliProxy, tornToken } from '../config';
+import {
+  httpRpcUrl,
+  instances,
+  minimumBalance,
+  netId,
+  privateKey,
+  torn,
+  tornadoGoerliProxy,
+  tornToken,
+} from '../config';
 import { Token } from '../types';
 import { getProvider, getTornadoProxyContract, getTornadoProxyLightContract } from '../modules/contracts';
-import { EnsResolver } from '../modules';
+import { resolve } from '../modules';
 import { ProxyLightABI, TornadoProxyABI } from '../../contracts';
 import { availableIds, netIds, NetInstances } from '../../../torn-token';
 import { getAddress } from 'ethers/lib/utils';
+import { providers, Wallet } from 'ethers';
 
-const resolver = new EnsResolver(getProvider());
-
+type relayerQueueName = `relayer_${availableIds}`
 
 export class ConfigService {
   static instance: ConfigService;
   netId: availableIds;
   netIdKey: netIds;
+  queueName: relayerQueueName;
   tokens: Token[];
   privateKey: string;
   rpcUrl: string;
@@ -21,14 +31,32 @@ export class ConfigService {
   addressMap = new Map<string, InstanceProps>();
   isLightMode: boolean;
   instances: NetInstances;
+  provider: providers.StaticJsonRpcProvider;
+  wallet: Wallet;
 
   constructor() {
     this.netId = netId;
     this.netIdKey = `netId${this.netId}`;
+    this.queueName = `relayer_${this.netId}`;
     this.isLightMode = ![1, 5].includes(netId);
     this.privateKey = privateKey;
     this.rpcUrl = httpRpcUrl;
     this.instances = instances[this.netIdKey];
+    this.provider = getProvider(false);
+    this.wallet = new Wallet(this.privateKey, this.provider);
+    this._fillInstanceMap();
+  }
+
+  get proxyContract(): TornadoProxyABI | ProxyLightABI {
+    return this._proxyContract;
+  }
+
+  get proxyAddress(): string {
+    return this._proxyAddress;
+  }
+
+  private _fillInstanceMap() {
+    if (!this.instances) throw new Error('config mismatch, check your environment variables');
     for (const [currency, { instanceAddress, symbol, decimals }] of Object.entries(this.instances)) {
       Object.entries(instanceAddress).forEach(([amount, address]) => {
           if (address) {
@@ -44,30 +72,36 @@ export class ConfigService {
     }
   }
 
-  get proxyContract(): TornadoProxyABI | ProxyLightABI {
-    return this._proxyContract;
-  }
-
-  get proxyAddress(): string {
-    return this._proxyAddress;
+  private async _checkNetwork() {
+    try {
+      await this.provider.getNetwork();
+    } catch (e) {
+      throw new Error(`Could not detect network, check your rpc url: ${this.rpcUrl}`);
+    }
   }
 
   async init() {
-    if (this.isLightMode) {
-      this._proxyAddress = await resolver.resolve(torn.tornadoProxyLight.address);
-      this._proxyContract = getTornadoProxyLightContract();
-    } else {
-      if (this.netIdKey === 'netId1') {
-        this._proxyAddress = await resolver.resolve(torn.tornadoRouter.address);
+    try {
+      await this._checkNetwork();
+      if (this.isLightMode) {
+        this._proxyAddress = await resolve(torn.tornadoProxyLight.address);
+        this._proxyContract = getTornadoProxyLightContract(this._proxyAddress);
       } else {
         this._proxyAddress = tornadoGoerliProxy;
+        if (this.netId === 1) {
+          this._proxyAddress = await resolve(torn.tornadoRouter.address);
+        }
+        this._proxyContract = getTornadoProxyContract(this._proxyAddress);
+        this.tokens = [tornToken, ...Object.values(torn.instances['netId1'])]
+          .map<Token>(el => (el.tokenAddress && {
+            address: getAddress(el.tokenAddress),
+            ...el,
+          })).filter(Boolean);
+        console.log(
+          `Configuration completed\n-- netId: ${this.netId}\n-- rpcUrl: ${this.rpcUrl}`);
       }
-      this._proxyContract = getTornadoProxyContract();
-      this.tokens = [tornToken, ...Object.values(torn.instances['netId1'])]
-        .map<Token>(el => ({
-          address: getAddress(el.tokenAddress),
-          ...el,
-        })).filter(e => e.address);
+    } catch (e) {
+      console.error(`${this.constructor.name} Error:`, e.message);
     }
   }
 
@@ -75,7 +109,13 @@ export class ConfigService {
     return this.addressMap.get(getAddress(address));
   }
 
-  public static getServiceInstance(): ConfigService {
+  async getBalance() {
+    const balance = await this.wallet.getBalance();
+    const isEnougth = balance.gt(minimumBalance);
+    return { balance, isEnougth };
+  }
+
+  public static getServiceInstance() {
     if (!ConfigService.instance) {
       ConfigService.instance = new ConfigService();
     }
diff --git a/src/services/JobService.ts b/src/services/JobService.ts
new file mode 100644
index 0000000..a72a6f2
--- /dev/null
+++ b/src/services/JobService.ts
@@ -0,0 +1,70 @@
+import { v4 } from 'uuid';
+import { JobStatus, JobType } from '../types';
+import { relayerQueue, schedulerQueue } from '../queue';
+import { WithdrawalData } from './TxService';
+import { getClient } from '../modules/redis';
+import { Job } from 'bullmq';
+import { configService } from './index';
+
+export class JobService {
+  store: ReturnType<typeof getClient>;
+
+  constructor() {
+    this.store = getClient();
+  }
+
+  async postJob(type: JobType, data: WithdrawalData) {
+    const id = v4();
+
+    const job = await relayerQueue.add(
+      type,
+      {
+        id,
+        type,
+        status: JobStatus.QUEUED,
+        ...data,
+      },
+      {},
+    );
+    this.save(job);
+    return id;
+  }
+
+  save(job: Job) {
+    return this.store.set(`job:${job.data.id}`, job.id);
+  }
+
+  async getJob(id: string) {
+    const key = 'job:' + id;
+    console.log(key);
+    const jobId = await this.store.get(key);
+    return await relayerQueue.getJob(jobId);
+  }
+
+  async getQueueCount() {
+    return await relayerQueue.getJobCountByTypes('active', 'waiting', 'delayed');
+  }
+
+  private async _clearSchedulerJobs() {
+    const jobs = await schedulerQueue.getJobs();
+    await Promise.all(jobs.map(job => schedulerQueue.remove(job.id)));
+  }
+
+  async setupRepeatableJobs() {
+    await this._clearSchedulerJobs();
+    await schedulerQueue.add('updatePrices', configService.tokens, {
+      repeat: {
+        every: 30000,
+        immediately: true,
+      },
+    });
+    await schedulerQueue.add('checkBalance', null, {
+      repeat: {
+        every: 30000,
+        immediately: true,
+      },
+    });
+  }
+}
+
+export default () => new JobService();
diff --git a/src/services/PriceService.ts b/src/services/PriceService.ts
index f5c06db..9653936 100644
--- a/src/services/PriceService.ts
+++ b/src/services/PriceService.ts
@@ -47,9 +47,17 @@ export class PriceService {
     return prices;
   }
 
-  async getPrice(symbol: string) {
-    return await redisClient.hget('prices', symbol);
+  async getPrice(currency: string) {
+    return await redisClient.hget('prices', currency);
+  }
+
+  async getPrices() {
+    return await redisClient.hgetall('prices');
+  }
+
+  async savePrices(prices: Record<string, string>) {
+    await redisClient.hset('prices', prices);
   }
 }
 
-export default new PriceService();
+export default () => new PriceService();
diff --git a/src/services/TxService.ts b/src/services/TxService.ts
index bf17e91..ce6e95f 100644
--- a/src/services/TxService.ts
+++ b/src/services/TxService.ts
@@ -1,17 +1,38 @@
 import { TxManager } from 'tx-manager';
 import { configService } from './index';
 import { ProxyLightABI, TornadoProxyABI } from '../../contracts';
-import { parseEther } from 'ethers/lib/utils';
-import { gasLimits } from '../config';
+import { formatEther, parseEther, parseUnits } from 'ethers/lib/utils';
+import { gasLimits, httpRpcUrl, tornadoServiceFee } from '../config';
+import { BigNumber, BigNumberish, BytesLike } from 'ethers';
+import { JobType } from '../types';
+import getPriceService from './PriceService';
+import { GasPriceOracle } from 'gas-price-oracle';
+
+export type WithdrawalData = {
+  contract: string,
+  proof: BytesLike,
+  args: [
+    BytesLike,
+    BytesLike,
+    string,
+    string,
+    BigNumberish,
+    BigNumberish
+  ]
+}
 
 export class TxService {
   txManager: TxManager;
   tornadoProxy: TornadoProxyABI | ProxyLightABI;
+  priceService: ReturnType<typeof getPriceService>;
+  oracle: GasPriceOracle;
 
   constructor() {
-    const { privateKey, rpcUrl, proxyContract } = configService;
+    const { privateKey, rpcUrl, netId } = configService;
     this.txManager = new TxManager({ privateKey, rpcUrl });
-    this.tornadoProxy = proxyContract;
+    this.tornadoProxy = configService.proxyContract;
+    this.oracle = new GasPriceOracle({ defaultRpc: httpRpcUrl, chainId: netId });
+    this.priceService = getPriceService();
   }
 
   async init() {
@@ -26,21 +47,60 @@ export class TxService {
       .on('mined', receipt => console.log('Mined in block', receipt.blockNumber))
       .on('confirmations', confirmations => console.log({ confirmations }));
 
-    console.log(receipt);
-    await Promise.resolve();
+    return receipt;
   }
 
-  private async prepareCallData(data) {
-    // const calldata = this.tornadoProxy.interface.encodeFunctionData('withdraw', );
-
+  private async prepareCallData(data: WithdrawalData) {
+    const { contract, proof, args } = data;
+    const calldata = this.tornadoProxy.interface.encodeFunctionData('withdraw', [contract, proof, ...args]);
     return {
       value: data.args[5],
       to: this.tornadoProxy.address,
-      data: [],
+      data: calldata,
       gasLimit: gasLimits['WITHDRAW_WITH_EXTRA'],
     };
   }
+
+  async checkTornadoFee({ args, contract }: WithdrawalData) {
+    const { currency, amount, decimals } = configService.getInstance(contract);
+    const [fee, refund] = [args[4], args[5]].map(BigNumber.from);
+    const gasPrice = await this.getGasPrice();
+    const ethPrice = await this.priceService.getPrice(currency);
+    const operationCost = gasPrice.mul((gasLimits[JobType.TORNADO_WITHDRAW]));
+
+    const serviceFee = parseUnits(amount, decimals)
+      .mul(tornadoServiceFee * 1e10)
+      .div(100 * 1e10);
+
+    let desiredFee = operationCost.add(serviceFee);
+    if (currency !== 'eth') {
+      desiredFee = operationCost
+        .add(refund)
+        .mul(10 ** decimals)
+        .div(ethPrice)
+        .add(serviceFee);
+    }
+    console.log(
+      {
+        sentFee: formatEther(fee),
+        desiredFee: formatEther(desiredFee),
+        serviceFee: formatEther(serviceFee),
+      },
+    );
+    if (fee.lt(desiredFee)) {
+      throw new Error('Provided fee is not enough. Probably it is a Gas Price spike, try to resubmit.');
+    }
+
+  }
+
+  async getGasPrice(): Promise<BigNumber> {
+    const { baseFeePerGas = 0 } = await this.tornadoProxy.provider.getBlock('latest');
+    // const gasPrice = await this.tornadoProxy.provider.getGasPrice();
+    if (baseFeePerGas) return baseFeePerGas;
+    const { fast = 0 } = await this.oracle.gasPrices();
+    return parseUnits(String(fast), 'gwei');
+  }
+
 }
 
-
-export default new TxService();
+export default () => new TxService();
diff --git a/src/services/index.ts b/src/services/index.ts
index adf202f..e0c0eec 100644
--- a/src/services/index.ts
+++ b/src/services/index.ts
@@ -1,3 +1,4 @@
-export { default as priceService } from './PriceService';
 export { default as configService } from './ConfigService';
+export { default as getPriceService } from './PriceService';
+export { default as getJobService } from './JobService';
 export { default as txService } from './TxService';
diff --git a/src/types.ts b/src/types.ts
index 1105dc5..44bc08c 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -1,10 +1,10 @@
-export enum jobType {
+export enum JobType {
   TORNADO_WITHDRAW = 'TORNADO_WITHDRAW',
   MINING_REWARD = 'MINING_REWARD',
   MINING_WITHDRAW = 'MINING_WITHDRAW',
 }
 
-export enum jobStatus {
+export enum JobStatus {
   QUEUED = 'QUEUED',
   ACCEPTED = 'ACCEPTED',
   SENT = 'SENT',
diff --git a/src/worker.ts b/src/worker.ts
index 38112c3..45be544 100644
--- a/src/worker.ts
+++ b/src/worker.ts
@@ -1,3 +1,3 @@
-import initWorker from './queue/worker';
+import { relayerWorker, schedulerWorker } from './queue/worker';
 
-initWorker();
+schedulerWorker();
diff --git a/tsconfig.json b/tsconfig.json
index 4ce74e0..17bd9dc 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -4,7 +4,7 @@
       "es6",
       "es2020"
     ],
-    "target": "es2017",
+    "target": "es2020",
     "module": "commonjs",
     "moduleResolution": "node",
     "outDir": "./build",