如何以递归方式将返回列表的芹菜任务链接到组中? [英] How to recursively chain a Celery task that returns a list into a group?
问题描述
我从以下问题开始:如何链接芹菜任务以将列表返回到组中?
但我想扩大两次。因此,在我的用例中,我有:
But I want to expand twice. So in my use case I have:
- 任务A:确定给定日期的项目总数
- 任务B:下载该日期的1000个元数据条目
- 任务C:下载一项的内容
因此,每一步我都在扩展下一步的项数。我可以通过遍历任务中的结果并在下一个任务函数上调用 .delay()
来实现。但是我想我不要让我的主要任务那样做。取而代之的是,他们返回一个元组列表-每个元组将被扩展为调用下一个函数的参数。
So each step I'm expanding the number of items of the next step. I can do it by looping through the results in my task and calling .delay()
on the next task function. But I thought I'd try to not make my main tasks do that. Instead they'd return a list of tuples - each tuple would then be expanded into the arguments for a call to the next function.
上面的问题出现了一个答案满足我的需要,但我无法找到将其链接为两级扩展的正确方法。
The above question has an answer that appears to meet my need, but I can't work out the correct way of chaining it for a two level expansion.
这是我的代码的一个简化示例:
Here is a very cut down example of my code:
from celery import group
from celery.task import subtask
from celery.utils.log import get_task_logger
from .celery import app
logger = get_task_logger(__name__)
@app.task
def task_range(upper=10):
# wrap in list to make JSON serializer work
return list(zip(range(upper), range(upper)))
@app.task
def add(x, y):
logger.info(f'x is {x} and y is {y}')
char = chr(ord('a') + x)
char2 = chr(ord('a') + x*2)
result = x + y
logger.info(f'result is {result}')
return list(zip(char * result, char2 * result))
@app.task
def combine_log(c1, c2):
logger.info(f'combine log is {c1}{c2}')
@app.task
def dmap(args_iter, celery_task):
"""
Takes an iterator of argument tuples and queues them up for celery to run with the function.
"""
logger.info(f'in dmap, len iter: {len(args_iter)}')
callback = subtask(celery_task)
run_in_parallel = group(callback.clone(args) for args in args_iter)
return run_in_parallel.delay()
I然后尝试了各种方法来使我的嵌套映射起作用。首先,单级映射可以正常工作,因此:
I've then tried various ways to make my nested mapping work. First, a one level mapping works fine, so:
pp = (task_range.s() | dmap.s(add.s()))
pp(2)
产生我期望的结果,所以我并不完全满意。
Produces the kind of results I'd expect, so I'm not totally off.
但是当我尝试添加另一个级别:
But when I try to add another level:
ppp = (task_range.s() | dmap.s(add.s() | dmap.s(combine_log.s())))
然后在工人I看到错误:
Then in the worker I see the error:
[2019-11-23 22:34:12,024: ERROR/ForkPoolWorker-2] Task proj.tasks.dmap[e92877a9-85ce-4f16-88e3-d6889bc27867] raised unexpected: TypeError("add() missing 2 required positional arguments: 'x' and 'y'",)
Traceback (most recent call last):
File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/trace.py", line 648, in __protected_call__
return self.run(*args, **kwargs)
File "/home/hdowner/dev/playground/celery/proj/tasks.py", line 44, in dmap
return run_in_parallel.delay()
File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 186, in delay
return self.apply_async(partial_args, partial_kwargs)
File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 1008, in apply_async
args=args, kwargs=kwargs, **options))
File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 1092, in _apply_tasks
**options)
File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 578, in apply_async
dict(self.options, **options) if options else self.options))
File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 607, in run
first_task.apply_async(**options)
File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 229, in apply_async
return _apply(args, kwargs, **options)
File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/task.py", line 532, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() missing 2 required positional arguments: 'x' and 'y'
而且我不确定为什么将参数更改为 dmap()
从一个简单的任务签名到一个链,更改了将参数传递到 add()
的方式。我的印象是不应该这样,它只是意味着将传回 add()
的返回值。但是显然不是这样的。
And I'm not sure why changing the argument to dmap()
from a plain task signature to a chain changes how the arguments get passed into add()
. My impression was that it shouldn't, it just means the return value of add()
would get passed on. But apparently that is not the case ...
推荐答案
原来的问题是 clone( )
链实例上的方法在某些时候无法传递参数-请参见 https://stackoverflow.com/a/53442344/3189 了解详细信息。如果我在该答案中使用该方法,那么我的 dmap()
代码将变为:
Turns out the problem is that the clone()
method on a chain
instance does not pass the arguments through at some point - see https://stackoverflow.com/a/53442344/3189 for the full details. If I use the method in that answer, my dmap()
code becomes:
@app.task
def dmap(args_iter, celery_task):
"""
Takes an iterator of argument tuples and queues them up for celery to run with the function.
"""
callback = subtask(celery_task)
run_in_parallel = group(clone_signature(callback, args) for args in args_iter)
return run_in_parallel.delay()
def clone_signature(sig, args=(), kwargs=(), **opts):
"""
Turns out that a chain clone() does not copy the arguments properly - this
clone does.
From: https://stackoverflow.com/a/53442344/3189
"""
if sig.subtask_type and sig.subtask_type != "chain":
raise NotImplementedError(
"Cloning only supported for Tasks and chains, not {}".format(sig.subtask_type)
)
clone = sig.clone()
if hasattr(clone, "tasks"):
task_to_apply_args_to = clone.tasks[0]
else:
task_to_apply_args_to = clone
args, kwargs, opts = task_to_apply_args_to._merge(args=args, kwargs=kwargs, options=opts)
task_to_apply_args_to.update(args=args, kwargs=kwargs, options=deepcopy(opts))
return clone
然后我这样做:
ppp = (task_range.s() | dmap.s(add.s() | dmap.s(combine_log.s())))
一切正常。
这篇关于如何以递归方式将返回列表的芹菜任务链接到组中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!