芹菜和弦与链组,链中例外 [英] celery chord with group of chains with exception in chain

查看:56
本文介绍了芹菜和弦与链组,链中例外的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在用芹菜和一组链子演奏和弦.当组中的所有任务(链条...)成功完成时,将触发和弦回调,并且一切正常.但是,当组中的某个任务失败时(在这种情况下,我不希望和弦回调被调用),chord_unlock会无休止地循环.如果组中的链条之一出现故障,如何避免出现chord_unlock循环?

I am using celery to run a chord with a group of chains. When all tasks (chains...) in the group complete successfully, the chord callback is fired and things work as I expect them to. However, when a task in the group fails, in which case I do not expect the chord callback to be called, chord_unlock loops endlessly. How do I avoid the chord_unlock loop in case of failure of one of the chains in the group?

这是我的代码:

@app.task
def test1():
    logging.info("test1")
    raise Exception()

@app.task
def test2():
    logging.info("test2")

@app.task
def test3():
    logging.info("test3")

@app.task
def test4():
    logging.info("test4")

@app.task
def cb(id):
    logging.info("cb")

def test():
    chains = [chain(test1.si(), test2.si()), chain(test3.si(), test4.si())]
    chord(group(chains))(cb.si()) 

和日志:

[2018-09-09 15:15:12,933: INFO/MainProcess] Received task: projecttasks.tasks.test[e332ee64-84b3-4f3f-bb84-de83fe03b758]
[2018-09-09 15:15:12,973: INFO/MainProcess] Received task: projecttasks.tasks.test1[5a8191fd-a9c6-430f-bb46-61a111766776]
[2018-09-09 15:15:12,977: INFO/ForkPoolWorker-1] test1
[2018-09-09 15:15:12,986: INFO/MainProcess] Received task: projecttasks.tasks.test3[22c2ac6d-1bca-41f8-b617-2d7ee5b20d4d]
[2018-09-09 15:15:13,006: INFO/MainProcess] Received task: celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2]  ETA:[2018-09-09 15:15:13.978298+00:00]
[2018-09-09 15:15:13,015: ERROR/ForkPoolWorker-1] Task projecttasks.tasks.test1[5a8191fd-a9c6-430f-bb46-61a111766776] raised unexpected: Exception()
Traceback (most recent call last):
   File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 382, in trace_task
     R = retval = fun(*args, **kwargs)
   File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 641, in __protected_call__
     return self.run(*args, **kwargs)
   File "/project/projecttasks/tasks.py", line 185, in test1
     raise Exception()
Exception
[2018-09-09 15:15:13,017: INFO/ForkPoolWorker-2] Task projecttasks.tasks.test[e332ee64-84b3-4f3f-bb84-de83fe03b758] succeeded in 0.0738198000472s: None
[2018-09-09 15:15:13,021: INFO/ForkPoolWorker-2] test3
[2018-09-09 15:15:13,029: INFO/MainProcess] Received task: projecttasks.tasks.test4[6f791b45-0c9f-4b0c-984d-387429b39fad]
[2018-09-09 15:15:13,034: INFO/ForkPoolWorker-1] test4
[2018-09-09 15:15:13,042: INFO/ForkPoolWorker-2] Task projecttasks.tasks.test3[22c2ac6d-1bca-41f8-b617-2d7ee5b20d4d] succeeded in 0.0202670998406s: None
[2018-09-09 15:15:13,045: INFO/ForkPoolWorker-1] Task projecttasks.tasks.test4[6f791b45-0c9f-4b0c-984d-387429b39fad] succeeded in 0.0111124999821s: None
[2018-09-09 15:15:14,789: INFO/MainProcess] Received task: celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2]  ETA:[2018-09-09 15:15:15.785804+00:00]
2018-09-09 15:15:14,790: INFO/ForkPoolWorker-2] Task celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2] retry: Retry in 1s
[2018-09-09 15:15:15,953: INFO/ForkPoolWorker-2] Task celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2] retry: Retry in 1s
[2018-09-09 15:15:15,958: INFO/MainProcess] Received task: celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2]  ETA:[2018-09-09 15:15:16.952066+00:00]
[2018-09-09 15:15:18,792: INFO/ForkPoolWorker-2] Task celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2] retry: Retry in 1s
2018-09-09 15:15:18,795: INFO/MainProcess] Received task: celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2]  ETA:[2018-09-09 15:15:19.790674+00:00]
[2018-09-09 15:15:20,726: INFO/ForkPoolWorker-2] Task celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2] retry: Retry in 1s
[2018-09-09 15:15:20,728: INFO/MainProcess] Received task: celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2]  ETA:[2018-09-09 15:15:21.724806+00:00]
[2018-09-09 15:15:22,797: INFO/ForkPoolWorker-2] Task celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2] retry: Retry in 1s
[2018-09-09 15:15:22,799: INFO/MainProcess] Received task: celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2]  ETA:[2018-09-09 15:15:23.795726+00:00]
[2018-09-09 15:15:24,801: INFO/ForkPoolWorker-2] Task celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2] retry: Retry in 1s
[2018-09-09 15:15:24,802: INFO/MainProcess] Received task: celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2]  ETA:[2018-09-09 15:15:25.799817+00:00]
2018-09-09 15:15:25,952: INFO/ForkPoolWorker-2] Task celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2] retry: Retry in 1s
[2018-09-09 15:15:25,953: INFO/MainProcess] Received task: celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2]  ETA:[2018-09-09 15:15:26.951535+00:00]
[2018-09-09 15:15:28,809: INFO/ForkPoolWorker-2] Task celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2] retry: Retry in 1s
[2018-09-09 15:15:28,814: INFO/MainProcess] Received task: celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2]  ETA:[2018-09-09 15:15:29.807464+00:00]
[2018-09-09 15:15:30,723: INFO/ForkPoolWorker-2] Task celery.chord_unlock[872672ca-a627-4fe9-b81b-07b4bbc2c2c2] retry: Retry in 1s

推荐答案

我遇到了类似的问题.我调用了一组链,在完成所有任务后(不管结果如何),我想调用回调函数.将和弦和回调函数与错误处理(on_error)一起使用似乎是完美的解决方案.不幸的是,一切都适用于任务列表,但不适用于链表.丑陋的钩子是包装任务.

I had a similar problem. I called a group of chains and after all the tasks were done (regardless of the result), I wanted to call the callback function. Using a chord and a callback function with error handling (on_error) seemed like the perfect solution. Unfortunately, everything worked for the list of tasks, but not for the list of chains. The ugly hook is a wrapper task.

@celery_app.task
def task_one():
    return 'OKIDOKI'

@celery_app.task
def task_two(str):
    return f'{str} YOUPI'

@celery_app.task
def task_three(str):
    return f'{str} MAKAPAKA'

@celery_app.task
def task_exception(str):
    raise KeyError(f'{str} Ups')

@celery_app.task(ignore_result=True)
def task_wrapper(*args, **kwargs):
    if 'job' in kwargs:
        kwargs['job'].apply()

@celery_app.task(ignore_result=True)
def callback_task(*args, **kwargs):
    return (args, kwargs, 'Yeah')

def test():
    chains = []

    tasks = [
        task_one.s(),
        task_two.s(),
        task_exception.s(),
        task_three.s(),
    ]
    chains.append(task_wrapper.s(job=chain(*tasks)))

    tasks = [
        task_one.s(),
        task_two.s(),
        task_three.s(),
    ]
    chains.append(task_wrapper.s(job=chain(*tasks)))

    chord(chains, callback_task.s()).apply_async()

这篇关于芹菜和弦与链组,链中例外的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆