NodeJS Bull停止作业上的队列作业失败 [英] NodeJS Bull Stop the queue jobs on a job failed
问题描述
我的NodeJS
项目中有多个公牛队列,如果执行了先前的队列,这些队列将运行成功地.
我正在尝试在此处验证一些电子邮件地址.
I have multiple Bull Queues in my NodeJS
project which will run if previous queue is executed successfully.
I'm trying to verify some email addresses here.
-
检查电子邮件格式(formatQueue)
Check the Email format (formatQueue)
使用npm email-existence
程序包(existenceQueue)的电子邮件存在 p>
Email Existence using npm email-existence
package (existenceQueue)
formatQueue
花费的时间更少,可以运行RegEx
并验证Email
格式.但是email-existence
程序包大约需要5到10秒才能完成.
The formatQueue
is less time taking process, which wil run the RegEx
and validate the Email
format. but The email-existence
package takes around 5-10 seconds to complete.
formatQueue
和existenceQueue
可以正常工作.但是当我一次添加超过1000个作业时,existenceQueue
失败并显示以下错误
formatQueue
and existenceQueue
works properly if there are less jobs
like 20-100. but when I Add more than that around 1000 jobs at a time, existenceQueue
failes with below error
myemail@email.com job stalled more than allowable limit
I checked the issue HERE and HERE, I thought the process is taking too long to respond, so added limiter
as refered HERE. But that does not help me.
如果任何队列中的作业失败,则它不处理下一个作业.它将在那里停止,其他作业将保持在waiting
状态.
If a job in any of the queue fails, Its not processing the next job. It will stop there and the other jobs will stay in waiting
state.
我的代码类似于以下代码.请帮助我解决这个问题.
My code is something similar to below code. please help me with the issue.
Queue.js
var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");
// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
emails.forEach(element => {
formatQueue.add(element, { attempts: 3, backoff: 1000 });
});
}
// ------------ Queue Process -------------
// Format Test Process
formatQueue.process(function(job, done){
FormatTest.validate(job.data, (err, data) => {
if(err) done();
else{
job.data = data;
done();
}
});
});
// Existence Test Process
formatQueue.process(function(job, done){
ExistenceTest.validate(job.data, (err, data) => {
if(err) done();
else{
job.data = data;
done();
}
});
});
// ------------ On Cmplete Handlers ------------
formatQueue.on('completed', function(job){
if(job.data.is_well_format){
existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
}else QueueModel.lastStep(job.data)
});
existenceQueue.on('completed', function(job){
QueueModel.lastStep(job.data)
});
// ------------ To update the emaile ------------
module.exports.lastStep = (data) => {
Emails.updateEmail(data, (err, updated) => {
if(!err) {
formatQueue.clean('completed');
existenceQueue.clean('completed');
}
})
}
---------更新---------
处理器花费了太多时间来响应,因此由于我正在使用超时,因此作业变得stalled
或失败.
The processor was taking too much time to respond so the job was getting stalled
or getting failed since i was using timeout.
我试图在bull
文档中的其他processor
文件itsef中运行process
,我已如下添加文件.
I'm trying to run the process
in different processor
file itsef as its in bull
documentation, I've added the file as below.
// -------- Queue.js ----------
formatQueue.process(__dirname+"/processors/format-worker.js");
// On Cmplete Handler
formatQueue.on('completed', function(job, result){
console.log(result, "Format-Complete-job"); // result is undefined
if(job.data.is_well_format){
existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
}else QueueModel.lastStep(job.data)
});
// -------- Queue.js ends ---------
//format-worker.js
Validator = require("../../validators");
module.exports = (job) => {
Validator.Format.validate(job.data, (data) => {
job.data = data;
return Promise.resolve(data);
});
}
现在我以前使用的Job Complete上完成了,我曾经使用更新的Job参数来获取Job数据.现在,我没有更新的工作数据.文档中的第二个参数,即result
是undefined
.
现在,在这种情况下,如何获取更新的作业数据.
Now On Job complete which i was using before, I used to get job data with updated job parameters. Now I'm not getting updated job data. and the second parameter which is there in the documentation i.e result
is undefined
.
Now how can I get the updated job data in this case.
推荐答案
尝试可重复的工作
var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");
// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
emails.forEach(element => {
let jobOptions = {
repeat: {
every: 10 * 1000, // Run job every 10 seconds for example
limit: 3 // Maximum number of times a job can repeat.
},
jobId: someUniqueId, // important do not forget this
removeOnComplete: true, // removes job from queue on success (if required)
removeOnFail: true // removes job from queue on failure (if required)
}
formatQueue.add(element, jobOptions);
});
}
// ------------ Queue Process -------------
// Format Test Process
formatQueue.process(function (job, done) {
FormatTest.validate(job.data, (err, data) => {
if (err) {
// Done with error
done(true);
} else {
job.data = data;
// Done without any error
done(false);
}
});
});
// Existence Test Process
existenceQueue.process(function (job, done) {
ExistenceTest.validate(job.data, (err, data) => {
if (err) {
// Done with error
done(true);
} else {
job.data = data;
// Done without any error
done(false);
}
});
});
// ------------ On Complete Handlers ------------
formatQueue.on('completed', function (job) {
if (job.data.is_well_format) {
let jobOptions = {
repeat: {
every: 10 * 1000, // Run job every 10 seconds for example
limit: 3 // Maximum number of times a job can repeat.
},
jobId: someUniqueId, // important do not forget this
removeOnComplete: true, // removes job from queue on success (if required)
removeOnFail: true // removes job from queue on failure (if required)
}
existenceQueue.add(job.data, jobOptions);
} else QueueModel.lastStep(job.data)
});
existenceQueue.on('completed', function (job) {
QueueModel.lastStep(job.data)
});
// ------------ To update the email ------------
module.exports.lastStep = (data) => {
Emails.updateEmail(data, (err, updated) => {
if (!err) {
formatQueue.clean('completed');
existenceQueue.clean('completed');
}
})
}
这篇关于NodeJS Bull停止作业上的队列作业失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!