如何以递归方式将返回列表的芹菜任务链接到组中? [英] How to recursively chain a Celery task that returns a list into a group?

查看:54
本文介绍了如何以递归方式将返回列表的芹菜任务链接到组中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从以下问题开始:如何链接芹菜任务以将列表返回到组中?

但我想扩大两次。因此,在我的用例中,我有:

But I want to expand twice. So in my use case I have:


  • 任务A:确定给定日期的项目总数

  • 任务B:下载该日期的1000个元数据条目

  • 任务C:下载一项的内容

因此,每一步我都在扩展下一步的项数。我可以通过遍历任务中的结果并在下一个任务函数上调用 .delay()来实现。但是我想我不要让我的主要任务那样做。取而代之的是,他们返回一个元组列表-每个元组将被扩展为调用下一个函数的参数。

So each step I'm expanding the number of items of the next step. I can do it by looping through the results in my task and calling .delay() on the next task function. But I thought I'd try to not make my main tasks do that. Instead they'd return a list of tuples - each tuple would then be expanded into the arguments for a call to the next function.

上面的问题出现了一个答案满足我的需要,但我无法找到将其链接为两级扩展的正确方法。

The above question has an answer that appears to meet my need, but I can't work out the correct way of chaining it for a two level expansion.

这是我的代码的一个简化示例:

Here is a very cut down example of my code:

from celery import group
from celery.task import subtask
from celery.utils.log import get_task_logger

from .celery import app

logger = get_task_logger(__name__)

@app.task
def task_range(upper=10):
    # wrap in list to make JSON serializer work
    return list(zip(range(upper), range(upper)))

@app.task
def add(x, y):
    logger.info(f'x is {x} and y is {y}')
    char = chr(ord('a') + x)
    char2 = chr(ord('a') + x*2)
    result = x + y
    logger.info(f'result is {result}')
    return list(zip(char * result, char2 * result))

@app.task
def combine_log(c1, c2):
    logger.info(f'combine log is {c1}{c2}')

@app.task
def dmap(args_iter, celery_task):
    """
    Takes an iterator of argument tuples and queues them up for celery to run with the function.
    """
    logger.info(f'in dmap, len iter: {len(args_iter)}')
    callback = subtask(celery_task)
    run_in_parallel = group(callback.clone(args) for args in args_iter)
    return run_in_parallel.delay()

I然后尝试了各种方法来使我的嵌套映射起作用。首先,单级映射可以正常工作,因此:

I've then tried various ways to make my nested mapping work. First, a one level mapping works fine, so:

pp = (task_range.s() | dmap.s(add.s()))
pp(2)

产生我期望的结果,所以我并不完全满意。

Produces the kind of results I'd expect, so I'm not totally off.

但是当我尝试添加另一个级别:

But when I try to add another level:

ppp = (task_range.s() | dmap.s(add.s() | dmap.s(combine_log.s())))

然后在工人I看到错误:

Then in the worker I see the error:

[2019-11-23 22:34:12,024: ERROR/ForkPoolWorker-2] Task proj.tasks.dmap[e92877a9-85ce-4f16-88e3-d6889bc27867] raised unexpected: TypeError("add() missing 2 required positional arguments: 'x' and 'y'",)
Traceback (most recent call last):
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/trace.py", line 648, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/hdowner/dev/playground/celery/proj/tasks.py", line 44, in dmap
    return run_in_parallel.delay()
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 186, in delay
    return self.apply_async(partial_args, partial_kwargs)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 1008, in apply_async
    args=args, kwargs=kwargs, **options))
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 1092, in _apply_tasks
    **options)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 578, in apply_async
    dict(self.options, **options) if options else self.options))
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 607, in run
    first_task.apply_async(**options)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 229, in apply_async
    return _apply(args, kwargs, **options)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/task.py", line 532, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() missing 2 required positional arguments: 'x' and 'y'

而且我不确定为什么将参数更改为 dmap()从一个简单的任务签名到一个链,更改了将参数传递到 add()的方式。我的印象是不应该这样,它只是意味着将传回 add()的返回值。但是显然不是这样的。

And I'm not sure why changing the argument to dmap() from a plain task signature to a chain changes how the arguments get passed into add(). My impression was that it shouldn't, it just means the return value of add() would get passed on. But apparently that is not the case ...

推荐答案

原来的问题是 clone( )链实例上的方法在某些时候无法传递参数-请参见 https://stackoverflow.com/a/53442344/3189 了解详细信息。如果我在该答案中使用该方法,那么我的 dmap()代码将变为:

Turns out the problem is that the clone() method on a chain instance does not pass the arguments through at some point - see https://stackoverflow.com/a/53442344/3189 for the full details. If I use the method in that answer, my dmap() code becomes:

@app.task
def dmap(args_iter, celery_task):
    """
    Takes an iterator of argument tuples and queues them up for celery to run with the function.
    """
    callback = subtask(celery_task)
    run_in_parallel = group(clone_signature(callback, args) for args in args_iter)
    return run_in_parallel.delay()


def clone_signature(sig, args=(), kwargs=(), **opts):
    """
    Turns out that a chain clone() does not copy the arguments properly - this
    clone does.
    From: https://stackoverflow.com/a/53442344/3189
    """
    if sig.subtask_type and sig.subtask_type != "chain":
        raise NotImplementedError(
            "Cloning only supported for Tasks and chains, not {}".format(sig.subtask_type)
        )
    clone = sig.clone()
    if hasattr(clone, "tasks"):
        task_to_apply_args_to = clone.tasks[0]
    else:
        task_to_apply_args_to = clone
    args, kwargs, opts = task_to_apply_args_to._merge(args=args, kwargs=kwargs, options=opts)
    task_to_apply_args_to.update(args=args, kwargs=kwargs, options=deepcopy(opts))
    return clone

然后我这样做:

ppp = (task_range.s() | dmap.s(add.s() | dmap.s(combine_log.s())))

一切正常。

这篇关于如何以递归方式将返回列表的芹菜任务链接到组中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆