芹菜:从任务中清除整个链的干净方式 [英] Celery: clean way of revoking the entire chain from within a task

查看:94
本文介绍了芹菜:从任务中清除整个链的干净方式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题可能很基本,但是我仍然无法在官方文档中获得解决方案。我在Django应用程序中定义了一个Celery链,执行依赖于eanch的其他任务:

  chain(tasks.apply_fetching_decision .s(x,y),
tasks.retrieve_public_info.s(z,x,y),
tasks.public_adapter.s())()

显然,第二个和第三个任务需要父母的输出,这就是为什么我使用了一个链。



现在的问题是:如果第一个任务的测试条件失败,我需要以编程方式撤销第2和第3个任务。怎么用干净的方法呢?我知道我可以从定义链的方法中撤销链的任务(请参阅这个问题和此文档),但在第一个任务里面,我没有对后续任务和链本身的可见性。



临时解决方案



我目前的解决方案是跳过后续的计算基于以前任务的结果的任务:

  @shared_task 
def retrieve_public_info(result,x,y):
如果不是结果:
return []
...

@shared_task
def public_adapter(result,z,x,y):
结果中的r:
...

但这个解决方法有一些缺陷:




  • 为每个任务添加不必要的逻辑(基于前身的结果),危及重用

  • 执行后续任务,所有结果开销



我没有过多的传递参数的链条到任务恐惧搞乱事情。我承认也没有尝试过异常抛出的方式,因为我认为选择不通过链条进行,可以是一个功能(非特殊的)场景...



感谢您的帮助!

解决方案

我想我发现 这个问题的答案: strong> 确实是正确的方法。我不知道为什么这种常见的情况在任何地方都没有记录。



为了完整,我发布了基本的代码快照:

  @ app.task(bind = True)#注意,我们需要bind = True为自己工作
def task1(self,other_args):
#do_stuff
如果end_chain:
self.request.callbacks [:] = []
....



更新



我实现了一个更优雅的方式来应对这个问题,我想和你分享。我正在使用一个名为 revoke_chain_authority 的装饰器,以便它可以自动撤销链,而不会重写我之前描述的代码。

从functools导入包裹

class RevokeChainRequested(Exception):
def __init __(self ,return_value):
异常.__ init __(self,)

#现在为您的自定义代码...
self.return_value = return_value


def revoke_chain_authority(a_shared_task):

@see:https://gist.github.com/bloudermilk/2173940
@param a_shared_task:a @shared_task(
$
@wraps(a_shared_task)
def inner(self,* args,** kwargs):
尝试:
返回a_shared_task(self,* args,** kwargs)
除了RevokeChainRequested,e:
#删除链中的后续任务(如果不是EAGER模式)
如果自己.request.callbacks:
self.request.callbacks [:] = []
return e.return_value

return inner
/ pre>

这个装饰或者可以在 共享任务 中使用,如下所示:

  @shared_task(bind = True)
@revoke_chain_authority
def apply_fetching_decision(self,latitude,longitude):
#...

如果条件:
raise RevokeChainRequested(False)

注意使用的 @wraps 。有必要保留原始函数的签名,否则后者将丢失,并且 celery 会使调用正确包装任务变得混乱(例如,它将始终调用首先注册的功能而不是正确的功能)


My question is probably pretty basic but still I can't get a solution in the official doc. I have defined a Celery chain inside my Django application, performing a set of tasks dependent from eanch other:

chain(  tasks.apply_fetching_decision.s(x, y),
        tasks.retrieve_public_info.s(z, x, y),
        tasks.public_adapter.s())()

Obviously the second and the third tasks need the output of the parent, that's why I used a chain.

Now the question: I need to programmatically revoke the 2nd and the 3rd tasks if a test condition in the 1st task fails. How to do it in a clean way? I know I can revoke the tasks of a chain from within the method where I have defined the chain (see thisquestion and this doc) but inside the first task I have no visibility of subsequent tasks nor of the chain itself.

Temporary solution

My current solution is to skip the computation inside the subsequent tasks based on result of the previous task:

@shared_task
def retrieve_public_info(result, x, y):
   if not result:
      return []
   ...

@shared_task
def public_adapter(result, z, x, y):
   for r in result:
       ...

But this "workaround" has some flaw:

  • Adds unnecessary logic to each task (based on predecessor's result), compromising reuse
  • Still executes the subsequent tasks, with all the resulting overhead

I haven't played too much with passing references of the chain to tasks for fear of messing up things. I admit also I haven't tried Exception-throwing approach, because I think that the choice of not proceeding through the chain can be a functional (thus non exceptional) scenario...

Thanks for helping!

解决方案

I think I found the answer to this issue: this seems the right way to proceed, indeed. I wonder why such common scenario is not documented anywhere, though.

For completeness I post the basic code snapshot:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks[:] = []
    ....

Update

I implemented a more elegant way to cope with the issue and I want to share it with you. I am using a decorator called revoke_chain_authority, so that it can revoke automatically the chain without rewriting the code I previously described.

from functools import wraps

class RevokeChainRequested(Exception):
    def __init__(self, return_value):
        Exception.__init__(self, "")

        # Now for your custom code...
        self.return_value = return_value


def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

This decorator can be used on a shared task as follows:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

Please note the use of @wraps. It is necessary to preserve the signature of the original function, otherwise this latter will be lost and celery will make a mess at calling the right wrapped task (e.g. it will call always the first registered function instead of the right one)

这篇关于芹菜:从任务中清除整个链的干净方式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆