如何确保以失败的子任务调用Celery和弦回调? [英] How do you ensure a Celery chord callback gets called with failed subtasks?

查看:33
本文介绍了如何确保以失败的子任务调用Celery和弦回调?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Celery中使用Chord来执行一个回调,当一组并行任务完成执行时,该回调将被调用.具体来说,我有一组函数可以包装对外部API的调用.我想等待所有这些返回,然后再处理结果并在Chord回调中更新数据库.我希望所有API调用结束后执行回调,无论其状态如何.

I am using a Chord in Celery to have a callback that gets called when a Group of parallel tasks finish executing. Specifically, I have a group of functions that wrap calls to an external API. I want to wait for all of these to return before I process the results and update my database in the Chord callback. I would like the callback to execute when all of the API calls have finished, regardless of their status.

我的问题是,仅当组的子任务均未引发异常时才调用回调函数.但是,如果一个子任务引发了异常,则会调用可选的错误处理程序 on_error(),并使用和弦的 task_id 的字符串表示形式.该组中的其余任务仍会继续执行,但永远不会调用该回调.

My problem is that the callback function only gets called if none of the group's subtasks raise an exception. If, however, one subtask raises an exception then an optional error handler on_error() gets called with a string representation of the task_id of the chord. The remaining tasks in the group do continue execution but the callback is never called.

我将通过以下示例对此进行说明:

I'll illustrate this with an example below:

@app.task
def maybe_succeed():
  divisor = randint(0, 10)
  return 1 / divisor

@app.task
def master_task():
 g = group([maybe_succeed.s() for i in range(100)])
 c = g | chord_callback.s()
 return c.delay()

@app.task
def chord_callback(results):
  print 'Made it here!'

在上面的示例中,调用 master_task()将运行该组中的所有任务,但是,由于 maybe_succeed()将会失败(除非您非常幸运!).

In the above example, calling master_task() will run all of the tasks in the group, however, the callback will never get called because one of the maybe_succeed() will fail (unless you're super lucky!).

现在,我正在通过捕获等效于 maybe_succeed()的所有异常来解决此问题,以使和弦永不失败.我想这是一个很好的解决方案,尽管它感觉不合适.

Right now, I'm dealing with this problem by catching all exceptions in my equivalent of maybe_succeed() so that the chord will never fail. I guess this is a fine solution though it doesn't feel right.

所以,我的问题是:有没有一种方法可以执行Celery Chord回调,而不管其组的子任务的返回状态如何?

So, my question is: Is there a way to have a Celery Chord callback execute regardless of the return status of its group's subtasks?

推荐答案

您可以尝试在errback中调用原始回调:

You could try calling the original callback in the errback:

@celery.task
def plus(x, y):
    print(f'Running plus {x}, {y}')
    return x + y


@celery.task
def failure():
    print('Running failure')
    raise ValueError('BAD')


@celery.task
def callme(stuff):
    print('Callback')
    print(f'Callback arg: {stuff}')


@celery.task
def on_chord_error(task_id, extra_info):
    print('ON ERROR CALLBACK')
    print(f'Task ID: {task_id}')
    print(f'Extra info: {extra_info}')
    callme.delay(extra_info)


@celery.task
def chord_test():
    tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]
    callback = callme.s().on_error(on_chord_error.s('extra info'))
    chord(tasks)(callback)

这将导致:

Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7]  ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):
   File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chord
    ret = j(timeout=3.0, propagate=True)
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in join
   interval=interval, no_ack=no_ack, on_interval=on_interval,
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in get
   self.maybe_throw(callback=callback)
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throw
   self.throw(value, self._to_remote_traceback(tb))
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throw
    self.on_ready.throw(*args, **kwargs)
   File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throw
    reraise(type(exc), exc, tb)
   File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraise
    raise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info

这篇关于如何确保以失败的子任务调用Celery和弦回调?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆