公牛队列:当一个作业失败时,如何停止队列处理剩余的作业? [英] Bull queue: When a job fails, how to stop queue from processing remaining jobs?

查看:69
本文介绍了公牛队列:当一个作业失败时,如何停止队列处理剩余的作业?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用

预期输出:如果 Type-1 two 在第三次尝试中成功完成.

已完成:Type-1 One失败:Type-1 二失败:Type-1 二已完成:Type-1 二已完成:Type-1 三

预期输出:如果 Type-1 two 在第三次尝试中也失败.

已完成:Type-1 One失败:Type-1 二失败:Type-1 二失败:Type-1 二

我想要的是队列必须停止处理新作业,直到当前作业完成且没有任何故障.如果出现任何失败,失败的作业必须运行一些 x 次.在 x+1 尝试时,它必须清除(删除所有作业)队列.那么如何在队列中实现这种线性行为.

解决方案

bull 中,不可能在任务失败后立即重复相同的任务,然后再选择队列中的下一个任务.

解决方案:

  1. 创建新作业并将其优先级设置为小于当前作业类型的值.
  2. 释放失败的作业(resolve()done())
  3. 这个新工作将被 bull 立即拿起进行处理.

示例代码:在下面的代码中,Job-3 将失败并创建新的作业,依此类推,直到作业的目的"结束.在某个时间点成功.

var Queue = require('bull');让 redisOptions = {redis:{ 端口:6379,主机:'127.0.0.1'}}var myQueue = new Queue('Linear-Queue', redisOptions);myQueue.process('Type-1', function (job, done) {console.log(`Processing Job-${job.id} Attempt: ${job.attemptsMade}`);下载文件(作业,异步函数(错误){如果(错误){等待repeatSameJob(工作,完成);} 别的 {完毕();}});});异步函数 repeatSameJob(job, done) {let newJob = await myQueue.add('Type-1', job.data, { ...{ priority: 1 }, ...job.opts });console.log(`Job-${job.id} 失败.为相同数据创建具有最高优先级的新 Job-${newJob.id}.`);完成(真);}功能下载文件(工作,完成){setTimeout(async() => {完成(job.data.error)}, job.data.time);}myQueue.on('完成',函数(作业,结果){console.log("已完成:作业-" + job.id);});myQueue.on('failed', async function (job, error) {console.log("失败:作业-" + job.id);});让选项 = {removeOnComplete: true,//成功时从队列中删除作业removeOnFail: true//失败时从队列中删除作业}for (让 i = 1; i <= 5; i++) {让错误=假;如果(我 == 3){ 错误 = 真;}设置超时(我 => {让作业数据 = {时间:我* 2000,错误:错误,描述:`Job-${i}`}myQueue.add('Type-1', jobData, options);}, i * 2000, i);}

输出:

I am using bull queue to process some jobs. In the current scenario each job is some kind of an operation. So whenever an operation(job) among a list of operations in the queue fails, queue has to stop processing the remaining jobs(operations).

What have I tried so far?

So i tried to pause the queue when a particular job fails. Next the queue is resumed when it drains. Now when it is resumed the queue does not start from the failed job instead picks up the next job.

var Queue = require('bull');

let redisOptions = {
  redis: { port: 6379, host: '127.0.0.1' },
  limiter: { max: 1, duration: 1000 }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  setTimeout(() => {
    done(job.data.error);
  }, job.data.time);
});

let options = {
  attempts: 3,
  removeOnComplete: false, // removes job from queue on success
  removeOnFail: false // removes job from queue on failure
}

setTimeout(() => {
  myQueue.add('Type-1', { time: 10000, description: "Type-1 One", error: false }, options);
}, 1 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 5000, description: "Type-1 two", error: true }, options);
}, 2 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 3000, description: "Type-1 three", error: false }, options);
}, 3 * 1000);


myQueue.on('completed', function (job, result) {
  console.log("Completed: " + job.data.description);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: " + job.data.description);
  try {
    await myQueue.pause();
  } catch (error) {
    console.log(error);
  }
});

myQueue.on('drained', async function () {
  try {
    await myQueue.resume();
  } catch (error) {
    console.log(error);
  }
});

Current Output:

Expected Output: if the Type-1 two completes successfully in 3rd attempt.

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Completed: Type-1 two
Completed: Type-1 three

Expected Output: if the Type-1 two failed in 3rd attempt as well.

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Failed: Type-1 two

All i want is the queue has to stop processing new jobs until current job is completed without any failure. In case of any failure, the failed job has to run some x number of time. At the x+1 attempt it has to clear(delete all jobs) the queue. So how to achieve this linear behavior in queue.

解决方案

In bull Its not possible to repeat the same job immediately after its failure before picking up the next job in the queue.

Solution:

  1. Create new job and set its priority to a value less than the current job type.
  2. Release the failed job (resolve() or done())
  3. This new job will be picked up immediately by the bull for processing.

Sample code: In the below code Job-3 will fail and create new job and so on until "purpose of the job" succeeds at some point of time.

var Queue = require('bull');

let redisOptions = {
  redis: { port: 6379, host: '127.0.0.1' }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  console.log(`Processing Job-${job.id} Attempt: ${job.attemptsMade}`);
  downloadFile(job, async function (error) {
    if (error) {
      await repeatSameJob(job, done);
    } else {
      done();
    }
  });
});

async function repeatSameJob(job, done) {
  let newJob = await myQueue.add('Type-1', job.data, { ...{ priority: 1 }, ...job.opts });
  console.log(`Job-${job.id} failed. Creating new Job-${newJob.id} with highest priority for same data.`);
  done(true);
}

function downloadFile(job, done) {
  setTimeout(async () => {
    done(job.data.error)
  }, job.data.time);
}

myQueue.on('completed', function (job, result) {
  console.log("Completed: Job-" + job.id);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: Job-" + job.id);
});

let options = {
  removeOnComplete: true, // removes job from queue on success
  removeOnFail: true // removes job from queue on failure
}

for (let i = 1; i <= 5; i++) {
  let error = false;
  if (i == 3) { error = true; }

  setTimeout(i => {
    let jobData = {
      time: i * 2000,
      error: error,
      description: `Job-${i}`
    }
    myQueue.add('Type-1', jobData, options);
  }, i * 2000, i);
}

Output:

这篇关于公牛队列:当一个作业失败时,如何停止队列处理剩余的作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆