从龙卷风中调用芹菜任务 [英] invoke celery task from tornado

查看:73
本文介绍了从龙卷风中调用芹菜任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

某人如何从龙卷风中调用芹菜任务,并通过回调获取结果?



帖子声称某人必须仅通过RabbitMQ发出一条消息,然后该任务才能执行。这是有道理的,但是有人可以在python中举个例子(在龙卷风中甚至更好,带有回调)?就个人而言,我将mongodb用作我的消息代理,但是我也可以切换到Redis或RabbitMQ。.



编辑:为澄清起见,我想要一个带有回调的示例。例如,此龙卷风代码

  TestTask.delay(callback = self._on_celery_response)
...
def _on_celery_response(自己,结果):
打印你好,来自_on_celery_repsonse,结果

不起作用。我的TestTask是:

  class TestTask(Task):
name = tornadoServer.Test
def run(self,callback = None,** kwargs):
结果= {'结果':来自龙卷风调用的芹菜任务的问候}}
,如果回调函数不是None:
subtask( callback).delay(result)
返回结果

以及回溯:

 文件 /home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/tornado/stack_context.py,行183,包装后的
callback(* args,** kwargs)
文件 /home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/asyncmongo/connection.py,第183行,在_parse_response中
callback(response)
文件 /home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/asyncmongo/cursor.py,第399行,位于_handle_response
orig_callback(result ['data'],error = None)
文件 /home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/basic_auth_handlers.py,第66行,位于_on_response
celer中y_tasks.TestTask.delay(self._on_celery_response)
文件 /usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/task/base.py ,第338行,延迟
返回self.apply_async(args,kwargs)
文件 /usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6 .egg / celery / task / base.py,第460行,位于apply_async
**选项中)
文件 /usr/local/lib/python2.6/dist-packages/celery-2.2。 7-py2.6.egg / celery / app / amqp.py,第230行,在delay_task
send(body,exchange = exchange,** extract_msg_options(kwargs))
文件 / usr / local / lib / python2.6 / dist-packages / kombu-1.1.6-py2.6.egg / kombu / compat.py,第101行,在发送
中返回self.publish(* args,** kwargs)
文件 /usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/messaging.py,行124,在发布$ b $中b压缩,标题)
文件 /usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/messaging.py,第147行,在_prepare中
body)=编码(body,serial izer = serializer)
文件 /usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/serialization.py,行119,编码
有效负载=编码器(数据)
文件 /usr/lib/python2.6/copy_reg.py,第70行,在_reduce_ex
中引发TypeError,无法腌制%s对象 %base .__ name__
TypeError:无法腌制实例方法对象

如果没有

解决方案

回调对象也应该是celery Task或您的代码不起作用。 / p>

如果您的回调函数不必是芹菜任务,则可以在任务体内使用信号。



http://docs.python.org/library/signal.html


How can someone invoke a celery task from tornado, and get the result via a callback?

This post claims that someone must simply put a message via RabbitMQ and then the task shall be executed. This makes sense, but can someone give an example in python (even better in tornado, with a callback)? Personally, I use mongodb as my message broker, but I can switch to Redis or RabbitMQ as well..

EDIT: To clarify things, I want an example with a callback. For example, this tornado code

TestTask.delay(callback = self._on_celery_response) 
...
def _on_celery_response(self, result):
    print "hello from _on_celery_repsonse" , result

does not work. My TestTask is:

class TestTask(Task):
    name = "tornadoServer.Test"
    def run(self, callback=None,  **kwargs):
        result = {'result': "hello from celery task invoked by tornado"}
        if callback is not None:
            subtask(callback).delay(result)
        return result

and the traceback:

    File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/tornado/stack_context.py", line 183, in wrapped
    callback(*args, **kwargs)
  File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/asyncmongo/connection.py", line 183, in _parse_response
    callback(response)
  File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/asyncmongo/cursor.py", line 399, in _handle_response
    orig_callback(result['data'], error=None)
  File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/basic_auth_handlers.py", line 66, in _on_response
    celery_tasks.TestTask.delay(self._on_celery_response)
  File "/usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/task/base.py", line 338, in delay
    return self.apply_async(args, kwargs)
  File "/usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/task/base.py", line 460, in apply_async
    **options)
  File "/usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/app/amqp.py", line 230, in delay_task
    send(body, exchange=exchange, **extract_msg_options(kwargs))
  File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/compat.py", line 101, in send
    return self.publish(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/messaging.py", line 124, in publish
    compression, headers)
  File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/messaging.py", line 147, in _prepare
    body) = encode(body, serializer=serializer)
  File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/serialization.py", line 119, in encode
    payload = encoder(data)
  File "/usr/lib/python2.6/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects

The Task works ok without the callback.. Any suggestions?

解决方案

The callback object should be a celery Task too or your code doesn't work.

You can use signals inside the task body if your callback function don't have to be a celery task.

http://docs.python.org/library/signal.html

这篇关于从龙卷风中调用芹菜任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆