芹菜-链接组和子任务。 ->乱序执行 [英] celery - chaining groups and subtasks. -> out of order execution

查看:87
本文介绍了芹菜-链接组和子任务。 ->乱序执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我遇到以下类似情况

group1 = group(task1.si(), task1.si(), task1.si())
group2 = group(task2.si(), task2.si(), task2.si())

workflow = chain(group1, group2, task3.si())

直观的解释是task3仅在第2组中的所有任务完成后才执行。

The intuitive interpretation is that task3 should only execute after all tasks in group 2 have finished.

实际上,任务3在group1开始但尚未完成时执行。

In reality, task 3 executes while group1 has started but hasn't completed yet.

我是什么

推荐答案

事实证明,在芹菜中,不能将两个组链接在一起。

我怀疑这是因为与任务链接的组会自动变成和弦

-> Celery文档: http://docs.celeryproject.org/en/latest/userguide/canvas.html

So as it turns out, in celery you cannot chain two groups together.
I suspect this is because groups chained with tasks automatically become a chord
--> Celery docs: http://docs.celeryproject.org/en/latest/userguide/canvas.html


束缚群组和另一个任务一起自动将
升级为和弦:

Chaining a group together with another task will automatically upgrade it to be a chord:

组将返回父任务。将两个组链接在一起时,我怀疑当第一个组完成时,和弦会启动回调任务。我怀疑这个任务实际上是第二组的父母任务。我进一步怀疑该父任务在启动该组中的所有子任务后立即完成,因此执行了第二个组之后的下一个项目。

Groups return a parent task. When chaining two groups together, I suspect that when the first group completes, the chord starts the callback "task". I suspect this "task" is actually the "parent task" of the second group. I further suspect that this parent task completes as soon as it finishes kicking off all the subtasks within the group and as a result the next item after the 2nd group is executed.

为了证明这一点,这里是一些示例代码。您需要已经有一个正在运行的celery实例。

To demonstrate this here is some sample code. You'll need to already have a running celery instance.

# celery_experiment.py

from celery import task, group, chain, chord
from celery.signals import task_sent, task_postrun, task_prerun

import time
import logging

import random
random.seed()

logging.basicConfig(level=logging.DEBUG)

### HANDLERS ###    
@task_prerun.connect()
def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):    
    try:
        logging.info('[%s] starting' % kwargs['id'])
    except KeyError:
        pass

@task_postrun.connect()
def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    try:    
        logging.info('[%s] finished' % kwargs['id'])
    except KeyError:
        pass


def random_sleep(id):
    slp = random.randint(1, 3)
    logging.info('[%s] sleep for %ssecs' % (id, slp))
    time.sleep(slp)

@task()
def thing(id):
    logging.info('[%s] begin' % id)
    random_sleep(id)
    logging.info('[%s] end' % id)


def exec_exp():
    st = thing.si(id='st')
    st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),]
    st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),]
    st2 = thing.si(id='st2')
    st3 = thing.si(id='st3')
    st4 = thing.si(id='st4')

    grp1 = group(st_arr)
    grp2 = group(st_arr2)

    # chn can chain two groups together because they are seperated by a single subtask
    chn = (st | grp1 | st2 | grp2 | st3 | st4)

    # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes
    #chn2 = (st | st2 | grp1 | grp2 | st3 |  st4)

    r = chn()
    #r2 = chn2()

这篇关于芹菜-链接组和子任务。 ->乱序执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆