克隆芹菜链 [英] Cloning a celery chain

查看:117
本文介绍了克隆芹菜链的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个有趣的问题,试图克隆一个芹菜链供小组使用,我的预期用例是group([chain.clone(args=args) for args in it])之类的东西,但是它一直抱怨没有足够的参数.

I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it]) however it keeps complaining about not having enough arguments.

我已使用以下内容对此进行了细分

I have broken this down using the below

在名为tasks.py

@app.task
def add(x,y):
    return x+y

然后从python shell中

and then from the python shell

>>> from tasks import add
>>> chain=add.s()|add.s(1)
>>> chain
magic_carpet.celery.add() | add(1)
>>> chain.args
()
>>> chain.delay(2,2)
<AsyncResult: fcc97c30-4700-47a6-aeb6-ffca19a1446f>
>>> cloned_chain=chain.clone(args=(2,))
>>> cloned_chain.args
()
>>> cloned_chain.delay(2)
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 179, in delay
    return self.apply_async(partial_args, partial_kwargs)
  File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 557, in apply_async
    dict(self.options, **options) if options else self.options))
  File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 584, in run
    first_task.apply_async(**options)
  File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/canvas.py", line 218, in apply_async
    return _apply(args, kwargs, **options)
  File "/home/bjorn/.local/share/virtualenvs/magic_carpet-PeFVEcL-/lib/python3.6/site-packages/celery/app/task.py", line 513, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() missing 1 required positional argument: 'y'
>>> 

很明显,clone不会替换链的克隆副本中的args,但是我不确定为什么_chain类的克隆方法已实现为

obviously, clone isn't replacing the args in the cloned copy of the chain, but I'm uncertain why, the _chain class has the clone method implemented documented as

>>> from celery.canvas import _chain
>>> help(_chain.clone)

Help on function clone in module celery.canvas:

clone(self, *args, **kwargs)
    Create a copy of this signature.

    Arguments:
        args (Tuple): Partial args to be prepended to the existing args.
        kwargs (Dict): Partial kwargs to be merged with existing kwargs.
        options (Dict): Partial options to be merged with
            existing options.

阅读芹菜来源,我看不出会导致这种情况的任何明显原因.

Reading the celery source I see nothing obvious that would cause this.

当前正在运行Celery 4.2.1和Python 3.6.6

Currently running Celery 4.2.1 and Python 3.6.6

此功能是否因某种原因而被破坏,不受支持,或者我是难以置信的钝器并且做错了什么事?

Is this functionality broken somehow, unsupported, or am I being incredibly obtuse and doing something wrong?

推荐答案

因此,事实证明,核心问题是克隆通过回旋过程调用构造函数进行链接以创建新实例.此构造函数不接受任何应用于该链的args或kwargs,而是将它们默认为空值,从而导致它们丢失.

So it turns out that the core issue is that via a roundabout course, clone calls the constructor to chain to create the new instance. this constructor does not accept any args or kwargs to apply to the chain and instead defaults them to empty values causing them to get lost.

目前,我的解决方案是通过创建自己的克隆方法(该方法修改链中第一个任务的参数)来解决此问题.设置args属性时似乎也可以使用,但是,如果再次在芹菜中克隆该链,则存储在其中的值将丢失.

My solution at this point is to work around the issue by creating my own clone method that modifies the arguments to the first task in the chain. while setting the args attribute appears to also work, if the chain gets cloned again within celery the values stored within it are lost.

我的克隆方法目前支持克隆taskschains,尽管添加对groups的支持将是一个简单的扩展

my clone method currently supports cloning tasks and chains, though adding support for groups would be a trivial extension

def clone_signature(sig, args=(), kwargs=(), **opts):
    if sig.subtask_type and sig.subtask_type != "chain":
        raise NotImplementedError(
            "Cloning only supported for Tasks and chains, not %s" % sig.subtask_type
        )
    clone = sig.clone()
    if hasattr(clone, "tasks"):
        t = clone.tasks[0]
    else:
        t = clone
    args, kwargs, opts = t._merge(args=args, kwargs=kwargs, options=opts)
    t.update(args=args, kwargs=kwargs, options=deepcopy(opts))
    return clone

这篇关于克隆芹菜链的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆