完成所有任务后运行任务 [英] Running a task after all tasks have been completed

查看:73
本文介绍了完成所有任务后运行任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个应用程序,该应用程序需要并行运行一系列任务,然后运行所有任务的结果的单个任务:

I'm writing an application which needs to run a series of tasks in parallel and then a single task with the results of all the tasks run:

@celery.task
def power(value, expo):
    return value ** expo

@celery.task
def amass(values):
    print str(values)

这是一个非常人为和过于简化的示例,但希望这一点很好。基本上,我有很多项需要通过 power 运行,但是我只想运行 amass 有关所有任务的结果。所有这些都应该异步发生,并且我不需要 amass 方法返回的任何东西。

It's a very contrived and oversimplified example, but hopefully the point comes across well. Basically, I have many items which need to run through power, but I only want to run amass on the results from all of the tasks. All of this should happen asynchronously, and I don't need anything back from the amass method.

有人吗?知道如何在celery上进行设置,以便所有操作都异步执行,并在说完所有步骤后调用带有结果列表的单个回调吗?

Does anyone know how to set this up in celery so that everything is executed asynchronously and a single callback with a list of the results is called after all is said and done?

ve按照亚历山大·阿凡纳西耶夫(Alexander Afanasiev)的建议,将本示例设置为以和弦运行:

I've setup this example to run with a chord as Alexander Afanasiev recommended:

from time import sleep

import random

tasks = []

for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)

不幸的是,在上面例如,仅在调用 chord 方法时启动 task 中的所有任务。有没有一种方法可以使每个任务分别开始,然后可以将回调添加到组中以便在一切完成后运行?

Unfortunately, in the above example, all tasks in tasks are started only when the chord method is called. Is there a way that each task can start separately and then I could add a callback to the group to run when everything has finished?

推荐答案

这是一个对我有用的解决方案:

Here's a solution which worked for my purposes:

tasks.py

from time import sleep

import random

@celery.task
def power(value, expo):
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
    return value ** expo

@celery.task
def amass(results, tasks):
    completed_tasks = []
    for task in tasks:
        if task.ready():
            completed_tasks.append(task)
            results.append(task.get())

    # remove completed tasks
    tasks = list(set(tasks) - set(completed_tasks))

    if len(tasks) > 0:
        # resend the task to execute at least 1 second from now
        amass.delay(results, tasks, countdown=1)
    else:
        # we done
        print results

用例:

tasks = []

for i in xrange(10):
    tasks.append(power.delay(i, 2))

amass.delay([], tasks)

什么应该要做的是尽快异步启动所有任务。将它们全部发布到队列后, amass 任务也将发布到队列中。 amass任务将继续重新发布自身,直到所有其他任务都已完成。

What this should do is start all of the tasks as soon as possible asynchronously. Once they've all been posted to the queue, the amass task will also be posted to the queue. The amass task will keep reposting itself until all of the other tasks have been completed.

这篇关于完成所有任务后运行任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆