如何在牛市队列中收听已完成的事件 - 仅针对当前作业 [英] How to listen to the completed event in bull queue - just for the current job

查看:50
本文介绍了如何在牛市队列中收听已完成的事件 - 仅针对当前作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当运行下面的代码时,它会打印出许多结果.我怀疑 completed 事件监听了当前队列实例中所有以前的工作.

When running the code below it prints to many results. my suspicious is that the completed event listen to all the previous jobs made in the current queue instance.

如何管理已完成事件以仅侦听当前作业完成情况?

How can I manage the completed event to listen just to the current job completion?

生产者创建一个带有默认数字 id 的作业,并监听全局完成,以便在作业完成时返回响应.

The producer creates a job with a default numerical id and listens to the global completion in order to return a response when the job is done.

const BullQ = require('bull');

let bullQ = BullQ('my-first-queue', {
  redis: {
    host: process.env.REDISHOST || 'localhost',
    password: process.env.REDISPASSWORD || ''
  }
});

app.get('/search/:term', async (req, res) => {
  const job = await bullQ.add({
    searchTerm: req.params.term
  });

  // Listen to the global completion of the queue in order to return result.
  bullQ.on('global:completed', (jobId, result) => {
    // Check if id is a number without any additions
    if (/^\d+$/.test(jobId) && !res.headersSent) {
      console.log(`Producer get: Job ${jobId} completed! Result: ${result}`);
      res.json(`Job is completed with result: ${result}`);
    }
  });
});

consumer.js

消费者有两个角色.1.按照书本应有的方式消费工作2. 根据上一个作业的结果创建新作业.

consumer.js

The consumer has 2 roles. 1. To consume the jobs as it should be by the book 2. To create new jobs based on the result of the last job.

const BullQ = require('bull');

let bullQ = BullQ('my-first-queue', {
  redis: {
    host: process.env.REDISHOST || 'localhost',
    password: process.env.REDISPASSWORD || ''
  }
});

bullQ.process((job, done) => {
  // Simulate asynchronous server request.
  setTimeout(async () => {
    // Done the first job and return an answer to the producer after the timeout.
    done(null, `Search result for ${job.data.searchTerm}`);
    // next job run
    if (counter < 10) {
      // For the first run the id is just a number if not changed via the jobId in JobOpts,
      // the next time the job id will be set to {{id}}_next_{{counter}} we need only the first number in order not to end with a long and not clear concatenated string.
      let jobID = (/^\d+$/.test(job.id)) ? job.id : job.id.replace(/[^\d].*/,'');
      await createNextJob(jobID, ++counter);
    }
  }, 100);
});

// Create next job and add it to the queue.
// Listen to the completed jobs (locally)
const createNextJob = async (id, counter) => {
  const nextJob = bullQ.add({
    searchTerm: "Next job"
  }, {
    jobId: `${id}_next_${counter}`
  });

  await bullQ.on('completed', (job, result) => {
    job.finished();
    console.log(`Consumer(next): Job ${job.id} completed! Result: ${result}`);
  });
};

推荐答案

您可以 await job.finished() 以获取特定于 queue.add().

You can await job.finished() to get your result specific to a job object returned by queue.add().

这是一个简单的、可运行的示例,没有 Express 来说明:

Here's a simple, runnable example without Express to illustrate:

const Queue = require("bull"); // "bull": "^3.22.6"

const sleep = (ms=1000) =>
  new Promise(resolve => setTimeout(resolve, ms))
;

const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
  await sleep(job.data.seconds * 1000); // waste time
  return Promise.resolve(`job ${job.id} complete!`);
});

(async () => {
  const job = await queue.add({seconds: 5});
  const result = await job.finished();
  console.log(result); // => job 42 complete!
  await queue.close();
})();

使用快递:

const Queue = require("bull");
const express = require("express");

const sleep = (ms=1000) =>
  new Promise(resolve => setTimeout(resolve, ms))
;

const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
  await sleep(job.data.seconds * 1000);
  return Promise.resolve(`job ${job.id} complete!`);
});

const app = express();
app
  .set("port", process.env.PORT || 5000)
  .get("/", async (req, res) => {
    try {
      const job = await queue.add({
        seconds: Math.abs(+req.query.seconds) || 10,
      });
      const result = await job.finished();
      res.send(result);
    }
    catch (err) {
      res.status(500).send({error: err.message});
    }
  })
  .listen(app.get("port"), () => 
    console.log("app running on port", app.get("port"))
  )
;

样品运行:

$ curl localhost:5000?seconds=2
job 42 complete!

这篇关于如何在牛市队列中收听已完成的事件 - 仅针对当前作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆