如何将返回列表的芹菜任务链接到组中? [英] How to chain a Celery task that returns a list into a group?

查看:77
本文介绍了如何将返回列表的芹菜任务链接到组中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从Celery任务返回的列表中创建一个组,以便对于任务结果集中的每个项目,将一个任务添加到该组中。

I want to create a group from a list returned by a Celery task, so that for each item in the task result set, one task will be added to the group.

这是一个简单的代码示例来说明用例。 ??? 应该是上一个任务的结果。

Here's a simple code example to explain the use case. The ??? should be the result from the previous task.

@celery.task
def get_list(amount):
    # In reality, fetch a list of items from a db
    return [i for i in range(amount)]

@celery.task
def process_item(item):
    #do stuff
    pass

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))

我可能无法正确处理此问题,但是我很确定从任务中调用任务并不安全:

I'm probably not approaching this correctly, but I'm pretty sure it's not safe to call tasks from within tasks:

@celery.task
def process_list():
    for i in get_list.delay().get():
        process_item.delay(i)

我不需要秒任务的结果。

I don't need the result from the seconds task.

推荐答案

您可以得到使用中间任务的这种行为。这是创建类似于您建议的地图方法的演示。

You can get this kind of behavior using an intermediate task. Here's a demonstration of creating a "map" like method that works like you've suggested.

from celery import task, subtask, group

@task
def get_list(amount):
    return [i for i in range(amount)]

@task
def process_item(item):
    # do stuff
    pass

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s()))

当我在类似问题上向他寻求帮助时,请索勒姆(Sole Solem)给我这个建议。

Credit to Ask Solem for giving me this suggestion when I asked him for help on a similar issue.

这篇关于如何将返回列表的芹菜任务链接到组中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆