Celery:在关联的主体之后启动和弦回调 [英] Celery : launch chord callback after its associated body

查看:62
本文介绍了Celery:在关联的主体之后启动和弦回调的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我启动包含一组任务和一个回调的 chord()列表时,仅在完成所有任务组之后,即使不在其中的任务,才调用回调当前的和弦.

When I launch a list of chord() containing a group of tasks and a callback, the callbacks are called only after all the group of tasks have been done, even the tasks which are not in the current chord.

下面是用于更好解释的代码:

Here is the code for a better explanation :

import time

from celery import Celery, group, chord

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task(name='SHORT_TASK')
def short_task(t):
    time.sleep(t)
    return t

@app.task(name='FINISH_GROUP')
def finish_group(res, nb):
    print("Pipe #{} finished".format(nb))
    return True

@app.task
def main(total):
    tasks = []
    for nb in range(1, total+1):
        short_tasks = [short_task.si(i) for i in [0.5, 0.75, 1]]

        chord(
            group(short_tasks),
            finish_group.s(nb)
        ).apply_async()

例如,我用5个项目启动它:

I launch it with 5 items for example :

In [5]: main.delay(5)
Out[5]: <AsyncResult: db1f97f0-ff7a-4651-b2f9-11e27a001480>

结果:

[2017-11-06 13:50:38,374: INFO/MainProcess] Received task: tasks.main[6da738b5-4eae-4de4-9ac5-1dc67d210f1d]  
[2017-11-06 13:50:38,409: INFO/MainProcess] Received task: SHORT_TASK[9581f9e0-1128-4b87-ae6b-16f238b2337e]  
[2017-11-06 13:50:38,411: INFO/MainProcess] Received task: SHORT_TASK[579dc498-3770-4385-a25a-06173fbe639c]  
[2017-11-06 13:50:38,412: INFO/MainProcess] Received task: SHORT_TASK[bfafb943-46d8-42e3-941f-b48a9c8e0186]  
[2017-11-06 13:50:38,414: INFO/MainProcess] Received task: SHORT_TASK[a1208f06-250f-48ac-b3df-45c4525fe8eb]  
[2017-11-06 13:50:38,416: INFO/MainProcess] Received task: SHORT_TASK[86ee7408-9d61-4909-bce8-c42cf691e9c2]  
[2017-11-06 13:50:38,416: INFO/MainProcess] Received task: SHORT_TASK[e2bb22c0-1d20-4da7-91d9-45b7ed8bfc6f]  
[2017-11-06 13:50:38,419: INFO/MainProcess] Received task: SHORT_TASK[7551199b-4690-45dd-a434-3911861f0093]  
[2017-11-06 13:50:38,420: INFO/MainProcess] Received task: SHORT_TASK[362d18f4-2252-4a31-ad21-4a2d192fd22e]  
[2017-11-06 13:50:38,421: INFO/MainProcess] Received task: SHORT_TASK[7561c33b-7020-4feb-b054-3919e4ae31c2]  
[2017-11-06 13:50:38,423: INFO/MainProcess] Received task: SHORT_TASK[3ac997f5-6d0f-43b6-ab15-a6827a26665f]  
[2017-11-06 13:50:38,423: INFO/MainProcess] Received task: SHORT_TASK[8b2ebb3a-293c-4bb8-88a3-5242750a082e]  
[2017-11-06 13:50:38,423: INFO/ForkPoolWorker-3] Task tasks.main[6da738b5-4eae-4de4-9ac5-1dc67d210f1d] succeeded in 0.048569423001026735s: None
[2017-11-06 13:50:38,424: INFO/MainProcess] Received task: SHORT_TASK[efd68688-ec4a-418d-83b9-a55fd6cc1541]  
[2017-11-06 13:50:38,427: INFO/MainProcess] Received task: SHORT_TASK[4e82540e-f935-4288-828f-c6f66f84139a]  
[2017-11-06 13:50:38,427: INFO/MainProcess] Received task: SHORT_TASK[a94e4ec4-adcb-4a0f-b184-b36650105ed5]  
[2017-11-06 13:50:38,427: INFO/MainProcess] Received task: SHORT_TASK[0d4b5e24-7aaa-4eb8-8e54-70e769dfdb39]  
[2017-11-06 13:50:38,918: INFO/ForkPoolWorker-2] Task SHORT_TASK[9581f9e0-1128-4b87-ae6b-16f238b2337e] succeeded in 0.5051485379808582s: 0.5
[2017-11-06 13:50:38,926: INFO/ForkPoolWorker-3] Task SHORT_TASK[a1208f06-250f-48ac-b3df-45c4525fe8eb] succeeded in 0.5012409449846018s: 0.5
[2017-11-06 13:50:39,165: INFO/ForkPoolWorker-1] Task SHORT_TASK[579dc498-3770-4385-a25a-06173fbe639c] succeeded in 0.7524393269850407s: 0.75
[2017-11-06 13:50:39,445: INFO/ForkPoolWorker-4] Task SHORT_TASK[bfafb943-46d8-42e3-941f-b48a9c8e0186] succeeded in 1.031865488999756s: 1
[2017-11-06 13:50:39,448: INFO/MainProcess] Received task: FINISH_GROUP[4506631f-f9cc-4e9e-a9e7-9a59c8f7c998]  
[2017-11-06 13:50:39,668: INFO/ForkPoolWorker-1] Task SHORT_TASK[7551199b-4690-45dd-a434-3911861f0093] succeeded in 0.501304400007939s: 0.5
[2017-11-06 13:50:39,672: INFO/ForkPoolWorker-2] Task SHORT_TASK[86ee7408-9d61-4909-bce8-c42cf691e9c2] succeeded in 0.7513346789928619s: 0.75
[2017-11-06 13:50:39,932: INFO/ForkPoolWorker-3] Task SHORT_TASK[e2bb22c0-1d20-4da7-91d9-45b7ed8bfc6f] succeeded in 1.0058077470166609s: 1
[2017-11-06 13:50:39,936: INFO/MainProcess] Received task: FINISH_GROUP[f143d581-799b-45ff-9e11-edc0bb88006a]  
[2017-11-06 13:50:40,175: INFO/ForkPoolWorker-2] Task SHORT_TASK[3ac997f5-6d0f-43b6-ab15-a6827a26665f] succeeded in 0.502920284983702s: 0.5
[2017-11-06 13:50:40,198: INFO/ForkPoolWorker-4] Task SHORT_TASK[362d18f4-2252-4a31-ad21-4a2d192fd22e] succeeded in 0.752579735009931s: 0.75
[2017-11-06 13:50:40,685: INFO/ForkPoolWorker-3] Task SHORT_TASK[8b2ebb3a-293c-4bb8-88a3-5242750a082e] succeeded in 0.7518302960088477s: 0.75
[2017-11-06 13:50:40,701: INFO/ForkPoolWorker-4] Task SHORT_TASK[4e82540e-f935-4288-828f-c6f66f84139a] succeeded in 0.5013290829956532s: 0.5
[2017-11-06 13:50:40,715: INFO/ForkPoolWorker-1] Task SHORT_TASK[7561c33b-7020-4feb-b054-3919e4ae31c2] succeeded in 1.0464465210097842s: 1
[2017-11-06 13:50:40,715: WARNING/ForkPoolWorker-1] Pipe #1 finished
[2017-11-06 13:50:40,716: INFO/ForkPoolWorker-1] Task FINISH_GROUP[4506631f-f9cc-4e9e-a9e7-9a59c8f7c998] succeeded in 0.000513697013957426s: True
[2017-11-06 13:50:40,716: WARNING/ForkPoolWorker-1] Pipe #2 finished
[2017-11-06 13:50:40,717: INFO/ForkPoolWorker-1] Task FINISH_GROUP[f143d581-799b-45ff-9e11-edc0bb88006a] succeeded in 0.0003622350050136447s: True
[2017-11-06 13:50:40,718: INFO/MainProcess] Received task: FINISH_GROUP[fc9be8c2-99f7-46b2-a810-47023e0a072a]  
[2017-11-06 13:50:40,718: WARNING/ForkPoolWorker-1] Pipe #3 finished
[2017-11-06 13:50:40,718: INFO/ForkPoolWorker-1] Task FINISH_GROUP[fc9be8c2-99f7-46b2-a810-47023e0a072a] succeeded in 0.00038264598697423935s: True
[2017-11-06 13:50:41,215: INFO/ForkPoolWorker-2] Task SHORT_TASK[efd68688-ec4a-418d-83b9-a55fd6cc1541] succeeded in 1.0379863310081419s: 1
[2017-11-06 13:50:41,219: INFO/MainProcess] Received task: FINISH_GROUP[6a4dc66e-2232-4bad-9d85-9fbc63b8b847]  
[2017-11-06 13:50:41,221: WARNING/ForkPoolWorker-2] Pipe #4 finished
[2017-11-06 13:50:41,222: INFO/ForkPoolWorker-2] Task FINISH_GROUP[6a4dc66e-2232-4bad-9d85-9fbc63b8b847] succeeded in 0.0018843600118998438s: True
[2017-11-06 13:50:41,440: INFO/ForkPoolWorker-3] Task SHORT_TASK[a94e4ec4-adcb-4a0f-b184-b36650105ed5] succeeded in 0.7531412789830938s: 0.75
[2017-11-06 13:50:41,708: INFO/ForkPoolWorker-4] Task SHORT_TASK[0d4b5e24-7aaa-4eb8-8e54-70e769dfdb39] succeeded in 1.005872479028767s: 1
[2017-11-06 13:50:41,711: INFO/MainProcess] Received task: FINISH_GROUP[388ee1c3-b80c-41af-bbfd-29b968e90aff]  
[2017-11-06 13:50:41,712: WARNING/ForkPoolWorker-3] Pipe #5 finished
[2017-11-06 13:50:41,712: INFO/ForkPoolWorker-3] Task FINISH_GROUP[388ee1c3-b80c-41af-bbfd-29b968e90aff] succeeded in 0.0005500270053744316s: True

我启动了一个并发的Celery:4(预叉).

I launched a single Celery with concurrency: 4 (prefork).

我们可以看到一开始就收到了15个SHORT_TASK,然后工作人员执行了该操作,只有在调用FINISH_GROUP任务之后,该操作才得以执行.

We can see at the beginning that 15 SHORT_TASK have been received, then the worker executes it, and only after it the FINISH_GROUP tasks are called.

是否可以在与之相关的 SHORT_TASK完成后立即启动FINISH_GROUP任务,而不是等待所有其他与之无关的SHORT_TASK吗?

Is it possible to launch the FINISH_GROUP task just after their associated SHORT_TASK have been finished, and not wait all the others non-related SHORT_TASK please ?

也许我的画布不正确,或者我不知道这是一个错误的Celery配置.

Maybe my canvas is not correct, or it's a mistaken Celery configuration, I don't know.

感谢您的帮助!

推荐答案

您的测试存在偏见,因为您仅使用一个工作程序,因此 time.sleep()将阻止该工作程序.这意味着即使在4处并发,它也不再处理任务.

Your test have biais, since you are using only one worker, time.sleep() will block that worker. Meaning it is not processing anymore tasks even with concurrency at 4.

是否可以在关联的SHORT_TASK完成后立即启动FINISH_GROUP任务,而不是等待所有其他无关的SHORT_TASK吗?

Is it possible to launch the FINISH_GROUP task just after their associated SHORT_TASK have been finished, and not wait all the others non-related SHORT_TASK please ?

当前,您不等待其他 short_task 完成,它们都安排在同一时间执行.由于您正在睡觉,因此所有 short_task 各自的和弦都结束后, finish_group 将被调用.

Currently you are not waiting other short_task to finish, they are all scheduled for execution on the same time. Since you are using a sleep, finish_group will get called once all short_task of it's respective chord have ended.

您当前的执行情况如下:

Your current execution looks like:

| chord 1      | chord 2      | chord 3      |
|--------------|--------------|--------------|
| short_task 1 |              |              |      |
|              | short_task 1 |              |      |
|              |              | short_task 1 |      |
| short_task 2 |              |              |      |
|              | short_task 2 |              |      |
|              |              | short_task 2 |      |
| short_task 3 |              |              |      v
|              | short_task 3 |              | execution order
|              |              | short_task 3 |
| finish_group |              |              |
|              | finish_group |              |
|              |              | finish_group |

如果您取消睡眠,请添加更多工作程序或使用gevent.看起来应该像这样:

If you remove the sleep, add more worker, or use gevent. It should look like this:

| chord 1          | chord 2          | chord 3          |
|------------------|------------------|------------- ----|
| short_task 1,2,3 | short_task 1,2,3 | short_task 1,2,3 |
| finish_group     | finish_group     | finish_group     |

您应该看到,同一行中的任务将以不同的顺序(取决于哪个工作人员首先采用了它)按顺序显示在日志中.但是 finish_group 仍将排在最后.

And you should see tasks that are on the same line will appears in the log in sligthly (depending of which worker took it first) different order. But finish_group will still be last.

请注意,在使用 chord

chord(
    short_tasks,
    finish_group.s(nb)
)


相同的代码,但带有gevent:


Same code but with gevent:

import gevent
from celery import Celery, group, chord, chain

app = Celery('tasks', broker='redis://localhost/4', backend='redis://localhost/5')


@app.task()
def short_task(nb, i):
    print('TEST: start short_task({}, {})'.format(nb, i))
    gevent.sleep(1)
    print('TEST: end   short_task({}, {})'.format(nb, i))
    return i


@app.task(name='FINISH_GROUP')
def finish_group(results, nb):
    print('TEST: finish_group({}) -> {}'.format(nb, results))


@app.task
def main(total):
    for nb in range(1, total+1):
        short_tasks = [short_task.si(nb, i) for i in range(3)]

        chord(short_tasks, finish_group.s(nb)).apply_async()

启动:

$ celery worker -A celery_test --loglevel=debug --concurrency=20 -P gevent 2>&1 | grep TEST

由于执行是并行的,因此输出将变得混乱.

The output will be scramble because of the execution being in parallel.

[2017-11-06 16:40:08,085] TEST: start short_task(1, 0)
[2017-11-06 16:40:08,088] TEST: start short_task(1, 1)
[2017-11-06 16:40:08,091] TEST: start short_task(1, 2)
[2017-11-06 16:40:08,092] TEST: start short_task(2, 0)
[2017-11-06 16:40:08,094] TEST: start short_task(2, 1)
[2017-11-06 16:40:08,096] TEST: start short_task(2, 2)
[2017-11-06 16:40:08,100] TEST: start short_task(3, 0)
[2017-11-06 16:40:08,101] TEST: start short_task(3, 1)
[2017-11-06 16:40:08,103] TEST: start short_task(3, 2)
# ^ all short_task have been started at the same time

[2017-11-06 16:40:09,085] TEST: end   short_task(1, 0)
[2017-11-06 16:40:09,089] TEST: end   short_task(1, 1)
[2017-11-06 16:40:09,093] TEST: end   short_task(1, 2)
[2017-11-06 16:40:09,106] TEST: end   short_task(2, 0)
[2017-11-06 16:40:09,106] TEST: end   short_task(2, 1)
[2017-11-06 16:40:09,107] TEST: end   short_task(2, 2)
[2017-11-06 16:40:09,107] TEST: end   short_task(3, 0)
[2017-11-06 16:40:09,108] TEST: end   short_task(3, 1)
[2017-11-06 16:40:09,108] TEST: end   short_task(3, 2)
# ^ total execution is only 1 second since 9 greenlet have slept together

[2017-11-06 16:40:09,115] TEST: finish_group(1) -> [0, 1, 2]
[2017-11-06 16:40:09,126] TEST: finish_group(2) -> [2, 1, 0]
[2017-11-06 16:40:09,128] TEST: finish_group(3) -> [0, 1, 2]
# ^ order of results are mixed depending of which greenlet finished first

这篇关于Celery:在关联的主体之后启动和弦回调的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆