作业完成时的Kue回调 [英] Kue callback when job is completed

查看:106
本文介绍了作业完成时的Kue回调的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的主Node实例派生一个工作进程,该工作进程通过IPC接受消息(使用内置Node process.send() process .on('message'... )是包含有关要添加到 Kue 。然后处理这些作业。

My main Node instance forks a worker process, which accepts messages over IPC (using the built-in Node process.send() and process.on('message'...) which are objects containing information about new jobs to add to Kue. It then processes those jobs.

我的主Node实例调用如下代码:

My main Node instance calls something like this:

worker.send({jobType:'filesystem', operation: 'delete', path: fileDir});

,工作实例执行以下操作:

and the worker instance does something like this:

jobs.create(message.jobType, message).save();

jobs.process('filesystem', function(job, done) {
    fs.delete(job.data.path, function(err) {
        done(err);
    });
});

并且作业成功完成。

当作业完成时,如何在主Node实例中获得类似回调的功能d?我如何从工作程序实例中将一些结果返回到主Node实例?

How can I get callback-like functionality in my main Node instance when the job is completed? How can I return some results to the main Node instance from the worker instance?

推荐答案

我相信我已经解决了这个问题,但是我如果任何人都可以改进我的解决方案或提供更好的解决方案,将无法解决该问题。

I believe I solved this, but I'll leave the question unsolved in case anyone can improve upon my solution or provide a better one.

当您使用Kue在单独的流程中处理作业时,您可以在工作完成后不只是执行回调。这是两个过程之间进行通信的示例。我本来希望使用Kue自动提供每个作业的ID(我相信这是它在Redis中收到的ID),但是app.js在将作业发送给工人之前需要知道该作业的ID,以便可以在收到消息时匹配ID。

When you're using Kue to process jobs in a separate process, you can't simply execute a callback when the job is finished. This is an example of communication between the two processes. I would have liked to have used the id that Kue provides each job automatically (which I believe is the same id it receives in Redis) but app.js needs to know the id of the job BEFORE it gets sent to the worker so that it can match the id when it receives a message.

app.js

var child = require('child_process');
var async = require('async');

var worker = child.fork("./worker.js");

//When a message is received, search activeJobs for it, call finished callback, and delete the job
worker.on('message', function(m) {
    for(var i = 0; i < activeJobs.length; i++) {
        if(m.jobId == activeJobs[i].jobId) {
            activeJobs[i].finished(m.err, m.results);
            activeJobs.splice(i,1);
            break;
        }
    }
});

// local job system
var newJobId = 0;
var activeJobs = [];

function Job(input, callback) {
    this.jobId = newJobId;
    input.jobId = newJobId;
    newJobId++;
    activeJobs.push(this);

    worker.send(input);

    this.finished = function(err, results) {
        callback(err, results);
    }
}


var deleteIt = function(req, res) {
    async.series([
        function(callback) {
            // An *EXAMPLE* asynchronous task that is passed off to the worker to be processed
            // and requires a callback (because of async.series)
            new Job({
                jobType:'filesystem',
                title:'delete project directory',
                operation: 'delete',
                path: '/deleteMe'
            }, function(err) {
                callback(err);
            });
        },
        //Delete it from the database
        function(callback) {
            someObject.remove(function(err) {
                callback(err);
            });
        },
    ],
    function(err) {
        if(err) console.log(err);
    });
};

worker.js

var kue = require('kue');
var fs = require('fs-extra');

var jobs = kue.createQueue();

//Jobs that are sent arrive here
process.on('message', function(message) {
    if(message.jobType) {
        var job = jobs.create(message.jobType, message).save();
    } else {
        console.error("Worker:".cyan + " [ERROR] No jobType specified, message ignored".red);
    }
});

jobs.process('filesystem', function(job, done) {

    if(job.data.operation == 'delete') {
        fs.delete(job.data.path, function(err) {
            notifyFinished(job.data.jobId, err);
            done(err);
        });
    }
});

function notifyFinished(id, error, results) {
    process.send({jobId: id, status: 'finished', error: error, results: results});
}

https://gist.github.com/winduptoy/4991718

这篇关于作业完成时的Kue回调的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆