python芹菜:如何将任务附加到旧链 [英] python celery: How to append a task to an old chain

查看:144
本文介绍了python芹菜:如何将任务附加到旧链的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

 从任务导入t1 ,t2,t3 
从芹菜进口链
res = chain(t1.s(123)| t2.s()| t3.s())()
res.get()

我如何将其他任务附加到这个特定的链? p>

  res.append(t2.s())

我的目标是确保链条按照我在代码中指定的顺序执行。
如果一个任务在我的链中失败,以下任务不会执行。



知道我在指定队列中使用超大的任务。 / p>

解决方案

所有信息都包含在消息中。



在运输中,也许在世界的另一边,或者它们可能被中间处理器消耗。因此,在发送邮件之后不可能修改邮件。



请参阅 http://docs.celeryproject.org/en/latest/userguide/tasks.html#state


我的目标是确保链条按照我在代码中指定的相同顺序执行。如果一个任务在我的链中失败,则下列任务不会执行。


您可以确定,订单是作为消息的一部分发送,如果任何任务失败,它将不会继续



现在,如果您真的希望能够在运行时添加任务,那么您可以将
的信息存储在数据库中,并使任务本身检查和调用新任务。
这样做有一些挑战:



1)如果成功,链中的第一个任务将调用下一个任务,
下一个任务将调用下一个任务,然后等等。



2)如果将此任务添加到此进程,如果第一个任务已执行,会发生什么?
或第二个,或第三个?



所以你可能会猜到这将需要一些重的同步才能工作。



我想一个更简单的解决方案是创建一个任务,等待一个任务完成
然后应用回调:



<$来自芹菜进口子任务的$ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $
$
result = from_serializable(result)
如果不是result.ready():
raise self.retry(countdown = 1)
如果任务.successful():
子任务(回调).delay(result.get())
else:
如果errback:
子任务(errback)()


def add_to_chain(result,callback,errback = None):
callback = callback.clone()#不要修改调用者
new_result = callback.freeze()#sets id for回调,返回AsyncResult
new_result.parent = result
after_task.delay(result.serializable(),callback,errback)
return new_result

那么你可以这样使用:

 从任务导入t1,t2,t3 

res =(t1.s(123)| t2.s()| t3.s())()
res = add_to_chain(t2.s())

注意:



bind = True 是即将推出的3.1版本的新版本,对于旧版本
你必须删除自我参数,并使用 current_task.retry (从芹菜导入current_task 获取此)。



Signature.freeze 也是3.1中的新增功能,可以在旧版本中使用
相同:

 从芹菜进口uuid 

def freeze(sig,_id = None):
opts = sig .options
try:
tid = opts ['task_id']
除了KeyError:
tid = opts ['task_id'] = _id或uuid()
return sig.AsyncResult(tid)


I keep in my database, the reference to a chain.

from tasks import t1, t2, t3
from celery import chain
res = chain(t1.s(123)|t2.s()|t3.s())()
res.get()

How can I append an other task to this particular chain ?

res.append(t2.s())

My goal is to be sure that chains are executed in the same order I specified in my code. And if a task fail in my chain, the following tasks are not executed.

For know I'm using super big tasks in a specify queue.

解决方案

All the information is contained in the message.

Messages can be in transit, maybe on the other side of the world, or they may be consumed by an intermediate processor. For this reason it's not possible to modify a message after it has been sent.

See http://docs.celeryproject.org/en/latest/userguide/tasks.html#state

My goal is to be sure that chains are executed in the same order I specified in my code. And > if a task fail in my chain, the following tasks are not executed.

You can be sure of that, the order is sent as part of the message and it will not continue should any of the tasks fail.

Now, if you really want to be able to add tasks at runtime then you could store the information in a database and have the task itself check that and call the new tasks. There are some challenges when doing this though:

1) The first task in the chain will call the next task if it succeeds, then the next task will call the next task after that and so on.

2) If you add a task to this process, what happens if the first task already executed? or the second, or the third?

So as you may guess this will require some heavy synchronization to be able to work.

I guess an easier solution would be to create a task that waits for one task to complete then apply a callback:

from celery import subtask
from celery.result import from_serializable

@app.task(bind=True)
def after_task(self, result, callback, errback=None):
    result = from_serializable(result)
    if not result.ready():
        raise self.retry(countdown=1)
    if task.successful():
        subtask(callback).delay(result.get())
    else:
        if errback:
            subtask(errback)()


def add_to_chain(result, callback, errback=None):
    callback = callback.clone()     # do not modify caller
    new_result = callback.freeze()  # sets id for callback, returns AsyncResult
    new_result.parent = result
    after_task.delay(result.serializable(), callback, errback)
    return new_result

Then you can use it like this:

from tasks import t1, t2, t3

res = (t1.s(123) | t2.s() | t3.s())()
res = add_to_chain(t2.s())

NOTES:

bind=True is new in the upcoming 3.1 version, for older versions you must remove the self argument and use current_task.retry (get this from celery import current_task).

Signature.freeze is also new in 3.1, to do the same in older versions you can use:

from celery import uuid

def freeze(sig, _id=None):
    opts = sig.options
    try:
        tid = opts['task_id']
    except KeyError:
        tid = opts['task_id'] = _id or uuid()
    return sig.AsyncResult(tid)

这篇关于python芹菜:如何将任务附加到旧链的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆