芹菜-链内组 [英] Celery - group inside a chain

查看:62
本文介绍了芹菜-链内组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在一条链中使用一组(或大块),例如:

I want to use a group (or chunks) inside a chain, like:

chain(getRange.s(3),  GROUP() , xsum.s() )

其中 GROUP()是一组 double()任务,即 group(double(0),double(1),double(2)).在中发布了类似的问题如何将一个将列表返回到一个组的Celery任务链接起来?,但没有说明如何将输出从组传递到该链中的下一个任务.

Where GROUP() is a group of double() tasks, i.e. group(double(0),double(1),double(2)). A similar question was posted in How to chain a Celery task that returns a list into a group? but it's not explained how to pass the output from group to the next task in the chain.

@task
def getRange(x):
    return range(x)

@task
def double(nr):
    return nr*2

@task
def xsum(list):
    return sum(list)

推荐答案

我不认为有一种方法可以对单个链中的当前基元执行此操作.像您提到的问题中那样传递回调将使您无法在组任务完成时进行收听.您可以得到的最接近的是:

I don't believe there is a way to do that with the current primitives in a single chain. Passing callbacks like in the question you mention won't allow you to listen to when the group tasks have finished. The closest you can get is something like:

@task
def get_range(x):
  return range(x)

@task
def mapper(nr):
  return nr * 2

@task
def reducer(nrs):
  return sum(nrs)

@task
def double_then_sum(nrs):
  return (
    group([mapper.s(nr) for nr in nrs]) |
    reducer.s()
  )()

ar = (get_range.s(3) | double_then_sum.s())() # call the procedure
ar.result.result # get the result

否则,您可以尝试使用动态链接,这将导致更简单的解决方案,或者如果您不需要分组任务并行运行,则只需使用 map .

Otherwise you could try using dynamic chaining, which would lead to a simpler solution, or just use map if you don't need your grouped tasks to run in parallel.

这篇关于芹菜-链内组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆