diff --git a/src/queue.js b/src/queue.js index ee1c4ca..dbd2ad4 100644 --- a/src/queue.js +++ b/src/queue.js @@ -5,7 +5,15 @@ const { redisUrl } = require('./config') const { status } = require('./constants') const redis = new Redis(redisUrl) -const queue = new Queue('proofs', redisUrl) +const queue = new Queue('proofs', redisUrl, { + lockDuration: 300000, // Key expiration time for job locks. + lockRenewTime: 30000, // Interval on which to acquire the job lock + stalledInterval: 30000, // How often check for stalled jobs (use 0 for never checking). + maxStalledCount: 3, // Max amount of times a stalled job will be re-processed. + guardInterval: 5000, // Poll interval for delayed jobs and added jobs. + retryProcessDelay: 5000, // delay before processing next job in case of internal error. + drainDelay: 5, // A timeout for when the queue is in drained state (empty waiting for jobs). +}) async function postJob({ type, request }) { const id = uuid() @@ -17,7 +25,9 @@ async function postJob({ type, request }) { status: status.QUEUED, ...request, // proof, args, ?contract }, - // { removeOnComplete: true }, + { + //removeOnComplete: true + }, ) await redis.set(`job:${id}`, job.id) return id @@ -30,6 +40,9 @@ async function getJob(uuid) { async function getJobStatus(uuid) { const job = await getJob(uuid) + if (!job) { + return null + } return { ...job.data, failedReason: job.failedReason, diff --git a/src/status.js b/src/status.js index d049533..3dc03d8 100644 --- a/src/status.js +++ b/src/status.js @@ -25,7 +25,7 @@ function index(req, res) { async function getJob(req, res) { const status = await queue.getJobStatus(req.params.id) - return res.json(status) + return status ? res.json(status) : res.status(400).json({ error: "The job doesn't exist" }) } module.exports = { diff --git a/src/worker.js b/src/worker.js index 7183b47..8016a72 100644 --- a/src/worker.js +++ b/src/worker.js @@ -11,7 +11,7 @@ const tornadoABI = require('../abis/tornadoABI.json') const miningABI = require('../abis/mining.abi.json') const swapABI = require('../abis/swap.abi.json') const { queue } = require('./queue') -const { poseidonHash2, getInstance, fromDecimals } = require('./utils') +const { poseidonHash2, getInstance, fromDecimals, sleep } = require('./utils') const { jobType, status } = require('./constants') const { netId, @@ -264,7 +264,21 @@ async function submitTx(job, retry = 0) { } catch (e) { // todo this could result in duplicated error logs // todo handle a case where account tree is still not up to date (wait and retry)? - throw new Error(`Revert by smart contract ${e.message}`) + if ( + job.data.type !== jobType.TORNADO_WITHDRAW && + (e.message.indexOf('Outdated account merkle root') !== -1 || + e.message.indexOf('Outdated tree update merkle root') !== -1) + ) { + if (retry < 5) { + await sleep(3000) + console.log('Tree is still not up to date, resubmitting') + await submitTx(job, retry + 1) + } else { + throw new Error('Tree update retry limit exceeded') + } + } else { + throw new Error(`Revert by smart contract ${e.message}`) + } } }