在Celery中实现Twisted样式的本地多个延迟回调 [英] Implementing Twisted style local multiple deferred callbacks in Celery

查看:88
本文介绍了在Celery中实现Twisted样式的本地多个延迟回调的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚开始使用Celery,并且想知道如何在Celery中实现TWSITED类型的多个延迟回调

I am quite new to using Celery and was wondering how TWSITED type multiple deferred callbacks can be implemented in Celery

我的扭曲代码"使用透视代理,如下所示.我有一个处理程序(服务器),它处理一些事件并返回结果.分派器(客户端)打印使用延迟回调返回的结果.

MY TWISTED CODE uses perspective broker and is as follows. I have a Handler (server) which handles some events and returns the result. The Dispatcher (Client) prints the result returned using a deferred callback.

Handler.py(服务器)

Handler.py (Server)

from twisted.application import service, internet
from twisted.internet import reactor, task
from twisted.spread import pb
from Dispatcher import Event
from Dispatcher import CopyEvent

class ReceiverEvent(pb.RemoteCopy, Event):
    pass
pb.setUnjellyableForClass(CopyEvent, ReceiverEvent)


class Handler(pb.Root):

def remote_eventEnqueue(self, pond):
    d = task.deferLater(reactor,5,handle_event,sender=self)
    return d

def handle_event(sender):
    print "Do Something"
    return "did something"

if __name__ == '__main__':
    h=Handler()
    reactor.listenTCP(8739, pb.PBServerFactory(h))
    reactor.run()

现在是Dispatcher.py(客户端)

Now the Dispatcher.py (Client)

from twisted.spread import pb, jelly
from twisted.python import log
from twisted.internet import reactor
from Event import Event

class CopyEvent(Event, pb.Copyable):
    pass

class Dispatcher:
    def __init__(self, event):
        self.event = event

    def dispatch_event(self, remote):
        d = remote.callRemote("eventEnqueue", self.event)   
        d.addCallback(self.printMessage)

    def printMessage(self, text):
        print text

def main():
    from Handler import CopyEvent
    event = CopyEvent()
    d = Dispatcher(event)
    factory = pb.PBClientFactory()
    reactor.connectTCP("localhost", 8739, factory)
    deferred = factory.getRootObject()
    deferred.addCallback(d.dispatch_event)
    reactor.run()

if __name__ == '__main__':
    main()

我尝试在Celery中实现此功能.

I tried implementing this in Celery.

Handler.py(服务器)

Handler.py (Server)

from celery import Celery

app=Celery('tasks',backend='amqp',broker='amqp://guest@localhost//')

@app.task

def handle_event():
     print "Do Something"
     return "did something"

Dispatcher.py(客户端)

Dispatcher.py (Client)

from Handler import handle_event
from datetime import datetime

def print_message(text):
    print text


t=handle_event.apply_async(countdown=10,link=print_message.s('Done'))  ##HOWTO?

我的确切问题是,如何在Celery中的局部函数(如print_message)上实现延迟回调TWISTED样式.handle_Event方法完成后,它返回的结果是我想使用的另一个回调方法(print_message),它是LOCAL

My exact question is how can one implement deferred callbacks TWISTED style on local functions like print_message in Celery. When handle_Event method is finished it returns result on which I would like to have another callback method (print_message) which is LOCAL

在Celery中执行此操作的其他任何可能的设计工作流程?

Any other possible Design workflow to do this in Celery?

谢谢

JR

推荐答案

好,所以终于知道了.像Twisted样式一样,不可能直接在Celery客户端中添加回调.但是Celery支持任务监视功能,该功能使客户端能够监视各种类型的工作程序事件并在其上添加回调.

Ok, so finally figured it out. It is not quite possible to add callbacks directly in the Celery client like the Twisted style. But Celery supports task monitoring functionality, that enables the client to monitor different kinds of worker events and add callbacks on it.

一个简单的任务监视器(Task_Monitor.py)看起来像这样.(有关详细信息,请参见Celery真实处理文档 http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing )

A simple task monitor (Task_Monitor.py) would look something like this. (Details can be found in Celery real processing documentation http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing)

Task_Monitor.py

Task_Monitor.py

from celery import Celery

def task_monitor(app):
    state = app.events.State()

    def announce_completed_tasks(event):
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print('TASK SUCCEEDED: %s[%s] %s' % (task.name, task.uuid, task.info(), ))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'task-succeeded': announce_completed_tasks})
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@REMOTEHOST//')
    task_monitor(app)

Task_Monitor.py必须作为单独的进程(客户端)运行.除了Celery应用程序(服务器端)之外,还需要使用

Task_Monitor.py has to be run as a separate process (client side). Besides the Celery application (server side) needs to be configured using

app.conf.CELERY_SEND_EVENTS = TRUE

或在运行celery时使用-E选项

or using -E option while running celery

以便发送事件以便监视工作人员.

so that it sends events in order for worker to be monitored.

这篇关于在Celery中实现Twisted样式的本地多个延迟回调的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆