NodeJS Bull停止作业上的队列作业失败 [英] NodeJS Bull Stop the queue jobs on a job failed

查看:313
本文介绍了NodeJS Bull停止作业上的队列作业失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的NodeJS项目中有多个公牛队列,如果执行了先前的队列,这些队列将运行成功地. 我正在尝试在此处验证一些电子邮件地址.

I have multiple Bull Queues in my NodeJS project which will run if previous queue is executed successfully. I'm trying to verify some email addresses here.

  1. 检查电子邮件格式(formatQueue)

  1. Check the Email format (formatQueue)

使用npm email-existence程序包(existenceQueue)的电子邮件存在 p>

Email Existence using npm email-existence package (existenceQueue)

formatQueue花费的时间更少,可以运行RegEx并验证Email格式.但是email-existence程序包大约需要5到10秒才能完成.

The formatQueue is less time taking process, which wil run the RegEx and validate the Email format. but The email-existence package takes around 5-10 seconds to complete.

formatQueueexistenceQueue可以正常工作.但是当我一次添加超过1000个作业时,existenceQueue失败并显示以下错误

formatQueue and existenceQueue works properly if there are less jobs like 20-100. but when I Add more than that around 1000 jobs at a time, existenceQueue failes with below error

myemail@email.com job stalled more than allowable limit

我检查了问题这里

I checked the issue HERE and HERE, I thought the process is taking too long to respond, so added limiter as refered HERE. But that does not help me.

如果任何队列中的作业失败,则它不处理下一个作业.它将在那里停止,其他作业将保持在waiting状态.

If a job in any of the queue fails, Its not processing the next job. It will stop there and the other jobs will stay in waiting state.

我的代码类似于以下代码.请帮助我解决这个问题.

My code is something similar to below code. please help me with the issue.

Queue.js

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");

// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
    emails.forEach(element => {
        formatQueue.add(element, { attempts: 3, backoff: 1000 });
    });
}

// ------------ Queue Process -------------

// Format Test Process
formatQueue.process(function(job, done){
    FormatTest.validate(job.data, (err, data) => {
        if(err) done();
        else{
            job.data = data;
            done();
        }
    });
});

// Existence Test Process
formatQueue.process(function(job, done){
    ExistenceTest.validate(job.data, (err, data) => {
        if(err) done();
        else{
            job.data = data;
            done();
        }
    });
});


// ------------ On Cmplete Handlers ------------
formatQueue.on('completed', function(job){
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

existenceQueue.on('completed', function(job){
    QueueModel.lastStep(job.data)
});


// ------------ To update the emaile ------------
module.exports.lastStep = (data) => {
    Emails.updateEmail(data, (err, updated) => {
        if(!err) {
            formatQueue.clean('completed');
            existenceQueue.clean('completed');
        }
    })
}

---------更新---------

处理器花费了太多时间来响应,因此由于我正在使用超时,因此作业变得stalled或失败.

The processor was taking too much time to respond so the job was getting stalled or getting failed since i was using timeout.

我试图在bull文档中的其他processor文件itsef中运行process,我已如下添加文件.

I'm trying to run the process in different processor file itsef as its in bull documentation, I've added the file as below.

// -------- Queue.js ----------

formatQueue.process(__dirname+"/processors/format-worker.js");


// On Cmplete Handler

formatQueue.on('completed', function(job, result){
    console.log(result, "Format-Complete-job"); // result is undefined
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

// -------- Queue.js ends ---------

//format-worker.js
Validator = require("../../validators");
module.exports = (job) => {
    Validator.Format.validate(job.data, (data) => {
        job.data = data;
        return Promise.resolve(data);
    });
}

现在我以前使用的Job Complete上完成了,我曾经使用更新的Job参数来获取Job数据.现在,我没有更新的工作数据.文档中的第二个参数,即resultundefined. 现在,在这种情况下,如何获取更新的作业数据.

Now On Job complete which i was using before, I used to get job data with updated job parameters. Now I'm not getting updated job data. and the second parameter which is there in the documentation i.e result is undefined. Now how can I get the updated job data in this case.

推荐答案

尝试可重复的工作

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");

// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
  emails.forEach(element => {
    let jobOptions = {
      repeat: {
        every: 10 * 1000, // Run job every 10 seconds for example
        limit: 3 // Maximum number of times a job can repeat.
      },
      jobId: someUniqueId, // important do not forget this
      removeOnComplete: true, // removes job from queue on success (if required)
      removeOnFail: true // removes job from queue on failure (if required)
    }

    formatQueue.add(element, jobOptions);
  });
}

// ------------ Queue Process -------------

// Format Test Process
formatQueue.process(function (job, done) {
  FormatTest.validate(job.data, (err, data) => {
    if (err) {
      // Done with error
      done(true);
    } else {
      job.data = data;
      // Done without any error
      done(false);
    }
  });
});

// Existence Test Process
existenceQueue.process(function (job, done) {
  ExistenceTest.validate(job.data, (err, data) => {
    if (err) {
      // Done with error
      done(true);
    } else {
      job.data = data;
      // Done without any error
      done(false);
    }
  });
});


// ------------ On Complete Handlers ------------
formatQueue.on('completed', function (job) {
  if (job.data.is_well_format) {
    let jobOptions = {
      repeat: {
        every: 10 * 1000, // Run job every 10 seconds for example
        limit: 3 // Maximum number of times a job can repeat.
      },
      jobId: someUniqueId, // important do not forget this
      removeOnComplete: true, // removes job from queue on success (if required)
      removeOnFail: true // removes job from queue on failure (if required)
    }

    existenceQueue.add(job.data, jobOptions);
  } else QueueModel.lastStep(job.data)
});

existenceQueue.on('completed', function (job) {
  QueueModel.lastStep(job.data)
});


// ------------ To update the email ------------
module.exports.lastStep = (data) => {
  Emails.updateEmail(data, (err, updated) => {
    if (!err) {
      formatQueue.clean('completed');
      existenceQueue.clean('completed');
    }
  })
}

这篇关于NodeJS Bull停止作业上的队列作业失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆