通过链ID从异步python芹菜链获取进度 [英] Get progress from async python celery chain by chain id

查看:37
本文介绍了通过链ID从异步python芹菜链获取进度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过查询每个任务状态来获取任务链的进度.但是当通过它的id检索链时,我得到了一些行为不同的对象.

I'm trying to get the progress of a task chain by querying each task status. But when retrieving the chain by it's id, I get some object that behaves differently.

在tasks.py

from celery import Celery

celery = Celery('tasks')
celery.config_from_object('celeryconfig')

def unpack_chain(nodes): 
    while nodes.parent:
        yield nodes.parent
        nodes = nodes.parent
    yield nodes

@celery.task
def add(num, num2):
    return num + num2

从ipython进行查询时...

In [43]: from celery import chain
In [44]: from tasks import celery, add, unpack_chain
In [45]: c = chain(add.s(3,3), add.s(10).set(countdown=100))
In [46]: m = c.apply_async()
In [47]: a = celery.AsyncResult(m.id)
In [48]: a == m
Out[48]: True
In [49]: a.id == m.id
Out[49]: True
In [50]: [t.status for t in list(unpack_chain(a))]
Out[50]: ['PENDING']
In [51]: [t.status for t in list(unpack_chain(m))]
Out[51]: ['PENDING', 'SUCCESS']

在Redis下使用Python 2.7.3和Celery 3.0.19.

Using Python 2.7.3 and Celery 3.0.19 under Redis.

如您在 50&51 celery.AsyncResult 返回的值与原始链不同.

As you can see in 50 & 51, the value returned by celery.AsyncResult differs from the original chain.

如何通过链ID获得原始链任务列表?

How can I get the original chain tasks list by the chain id?

推荐答案

就像@Hernantz所说的那样,您不能仅从任务ID中恢复父链,您必须遍历队列,这可能会也可能不会取决于您用作经纪人的方式.

Like @Hernantz said, you can't recover the parent chain from just the task ID, you'd have to iterate over your queue which may or may not be possible depending on what you use as a broker.

但是,如果您有最后一个任务ID来进行查找,那么您就拥有了链,您只需要存储所有任务ID并在需要检查它们的状态时重建链即可.您可以使用以下功能:

But if you have the last task id to do the lookup then you have the chain, you just need to store all the task ids and rebuild the chain when you need to examine their status. You can use the following functions:

def store(node):
    id_chain = []
    while node.parent:
      id_chain.append(node.id)
      node = node.parent
    id_chain.append(node.id)
    return id_chain

def restore(id_chain):
    id_chain.reverse()
    last_result = None
    for tid in id_chain:
        result = celery.AsyncResult(tid)
        result.parent = last_result
        last_result = result
    return last_result

当您第一次从链中获取AsyncResult时,请调用存储.在其上调用restore将为您提供 AsyncResult 链之类的链接列表.

Call store when you first get a AsyncResult from chain. Calling restore on that will give you a linked list of AsyncResults like chain gives you.

这篇关于通过链ID从异步python芹菜链获取进度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆