芹菜停止执行链 [英] Celery stop execution of a chain
问题描述
我有一个定期执行的check_orders任务。它组成了一组任务,以便我可以确定执行任务所花的时间,并在完成所有任务后执行一些操作(这是res.join [1]和grouped_subs的目的)。链接的任务。
I have a check_orders task that's executed periodically. It makes a group of tasks so that I can time how long executing the tasks took, and perform something when they're all done (this is the purpose of res.join [1] and grouped_subs) The tasks that are grouped are pairs of chained tasks.
我想要的是当第一个任务不满足条件(失败)时不执行链中的第二个任务。我一生都无法解决这个问题,我觉得这对工作队列管理器来说是非常基本的功能。当我尝试[2]之后我注释掉的东西(引发异常,删除回调)...由于某种原因,我们在check_orders中卡在join()上(破坏了组)。对于所有这些任务,我都尝试将ignore_result设置为False,但仍然无法使用。
What I want is for when the first task doesn't meet a condition (fails) don't execute the second task in the chain. I can't figure this out for the life of me and I feel this is pretty basic functionality for a job queue manager. When I try the things I have commented out after [2] (raising exceptions, removing callbacks)... we get stuck on the join() in check_orders for some reason (it breaks the group). I've tried setting ignore_result to False as well for all these tasks but it still doesn't work.
@task(ignore_result=True)
def check_orders():
# check all the orders and send out appropriate notifications
grouped_subs = []
for thingy in things:
...
grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )),
notify.subtask((args_sub_2, ), immutable=True)))
res = group(grouped_subs).apply_async()
res.join() #[1]
logger.info('Done checking orders at %s' % current_task.request.id))
@task(ignore_result=True)
def is_room_open(args_sub_1):
#something time consuming
if http_req_and_parse(args_sub_1):
# go on and do the notify task
return True
else:
# [2]
# STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
# None of the following things work:
# is_room_open.update_state(state='FAILURE')
# raise celery.exceptions.Ignore()
# raise Exception('spam', 'eggs')
# current_task.request.callbacks[:] = []
@task(ignore_result=True)
def notify(args_sub_2):
# something else time consuming, only do this if the first part of the chain
# passed a test (the chained tasks before this were 'successful'
notify_user(args_sub_2)
推荐答案
在我看来,这是一个普通的用例,无法获得足够的爱
In my opinion this is a common use-case that doesn't get enough love in the documentation.
假设您想中途中断链,同时仍将成功报告为已完成任务的状态,并且不发送任何错误日志或其他信息(否则您可以引发一个异常),那么一种实现方法是:
Assuming you want to abort a chain mid-way while still reporting SUCCESS as status of the completed tasks, and not sending any error log or whatnot (else you can just raise an exception) then a way to accomplish this is:
@app.task(bind=True) # Note that we need bind=True for self to work
def task1(self, other_args):
#do_stuff
if end_chain:
self.request.callbacks = None
return
#Other stuff to do if end_chain is False
所以在您的示例中:
@app.task(ignore_result=True, bind=True)
def is_room_open(self, args_sub_1):
#something time consuming
if http_req_and_parse(args_sub_1):
# go on and do the notify task
return True
else:
self.request.callbacks = None
会工作。请注意,可以使用快捷方式代替
,如@ abbasov-alexander ignore_result = True
和 subtask()
。 ()
Will work. Note that instead of ignore_result=True
and subtask()
you can use the shortcut .si()
as stated by @abbasov-alexander
编辑为与EAGER模式配合使用,如@PhilipGarnero在评论中所建议。
Edited to work with EAGER mode, as suggested by @PhilipGarnero in the comments.
这篇关于芹菜停止执行链的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!