Luigi:如何使用 luigi.build 接口将参数传递给依赖项? [英] Luigi: how to pass arguments to dependencies using luigi.build interface?

查看:77
本文介绍了Luigi:如何使用 luigi.build 接口将参数传递给依赖项?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑一个任务通过动态依赖依赖另一个任务的情况:

导入路易吉from luigi import Task, TaskParameter, IntParameter类任务A(任务):父 = 任务参数()arg = 内部参数(默认值 = 0)def需要(自我):返回 self.parent()定义运行(自我):打印(f任务 A arg = {self.arg}")类任务B(任务):arg = 内部参数(默认值 = 0)定义运行(自我):打印(f任务 B arg = {self.arg}")如果 __name__ == __main__":luigi.run(["TaskA", "--parent", "TaskB", "--arg", "1", "--TaskB-arg", "2"])

(注意默认的 arg=0 参数).

使用 luigi.run() 接口,这是有效的.如您所见,TaskA 有两个参数:parent=TaskBarg=1.此外,TaskB 也通过使用语法 --TaskB-arg 被赋予参数 arg=2.

<预><代码>预定2个任务,其中:* 1 运行成功:- 1 个任务B(arg=2)* 1 失败:- 1 个任务A(父=任务B,参数=1)这个进度看起来 :( 因为有失败的任务====== Luigi 执行总结 =====

(在此示例中,任务失败,因为 TaskB 没有将其输出写入 TaskA 可以读取的文件.但这只是为了使示例简短.重点是 TaskATaskB 都传递了正确的 arg).

我现在的问题是:我如何做完全相同的事情,但使用 luigi.build() 接口?我想这样做有两个原因:首先是 源代码说 luigi.run() 不应该被使用.但第二,我不能在每个进程中运行多个 luigi.run(),但我可以使用 luigi.build() 做到这一点.这很重要,因为我想做如下事情:

如果 __name__ == __main__":对于范围内的 i (3):luigi.run(["TaskA", "--parent", "TaskB", "--arg", f"{i}", "--TaskB-arg", f"{i}"])

但是,如果您尝试这样做,则会出现错误:

Pid(s) {10084} 已经在运行

因此,在 luigi.build() 接口中,您应该向它传递使用参数实例化的任务列表:

如果 __name__ == __main__":对于范围内的 i (3):luigi.build([TaskA(parent=TaskB, arg=i)])

这对 TaskA 做了预期的事情,但是 TaskB 采用默认的 arg=0.

那么问题:如何使用 luigi.build() 接口将参数传递给依赖项?

以下是我尝试过但不起作用的方法:

A)

如果 __name__ == __main__":对于范围内的 i (3):luigi.build([TaskA(parent=TaskB, arg=i), TaskB(arg=i)])

不起作用,因为运行了 TaskB 的两个实例:一个具有 TaskA 依赖的默认(错误)arg,另一个具有正确的 arg,TaskA 不依赖于哪个.

B)

如果 __name__ == __main__":对于范围内的 i (3):luigi.build([TaskA(parent=TaskB(arg=i), arg=i)])

TypeError: 'TaskB' 对象不可调用

C)

如果 __name__ == __main__":对于范围内的 i (3):luigi.build([TaskA(parent=TaskB, arg=i)], "--TaskB-arg", f"{i}")

越来越绝望.我尝试过类似旧界面的东西,但不起作用:

AttributeError: 'str' 对象没有属性 'create_remote_scheduler'

解决方案

我相信您的问题是您将父对象作为类而不是 Task 对象传递.尝试像这样传递它:

luigi.build([TaskA(parent=TaskB(arg=i), ...)])

您可能需要修改TaskA,因为您有

def 要求(自我):返回 self.parent()

将父对象构造为具有默认参数的 TaskB 对象.

Edit2:实际上不鼓励这种设计模式.如果您与多个工作人员一起运行,那么这将无法正确进行pickle-depickle.我建议创建一个新的 ParameterizedTaskParameter(或一些更好的名称)来腌制任务实例并将其存储为对象参数.

Consider a situation where a task depends on another through a dynamic dependency:

import luigi
from luigi import Task, TaskParameter, IntParameter

class TaskA(Task):
    parent = TaskParameter()
    arg = IntParameter(default=0)
    def requires(self):
        return self.parent()
    def run(self):
        print(f"task A arg = {self.arg}")

class TaskB(Task):
    arg = IntParameter(default=0)
    def run(self):
        print(f"task B arg = {self.arg}")

if __name__ == "__main__":
    luigi.run(["TaskA", "--parent" , "TaskB", "--arg", "1", "--TaskB-arg", "2"])

(Notice the default arg=0 Parameter).

Using the luigi.run() interface, this works. As you can see, TaskA is given two arguments: parent=TaskB and arg=1. Furthermore TaskB is also given argument arg=2 by using the syntax --TaskB-arg.


Scheduled 2 tasks of which:
* 1 ran successfully:
    - 1 TaskB(arg=2)
* 1 failed:
    - 1 TaskA(parent=TaskB, arg=1)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

(In this example tasks failed because TaskB is not writing its output to a file that TaskA can read. But that's just to keep the example short. The important point is that both TaskA and TaskB are passed the correct arg).

My problem now is: how do I do the exact same thing, but using the luigi.build() interface? There's two reasons why I want to do this: First is that the source code says that luigi.run() shouldn't be used. But second, I can't run more than one luigi.run() per process, but I can do so with luigi.build(). This is important because I want to do something like:

if __name__ == "__main__":
    for i in range(3):
        luigi.run(["TaskA", "--parent" , "TaskB", "--arg", f"{i}", "--TaskB-arg", f"{i}"])

However if you try this you get the error:

Pid(s) {10084} already running

So, in the luigi.build() interface you're supposed to pass it a list of the tasks instantiated with their parameters:

if __name__ == "__main__":
    for i in range(3):
        luigi.build([TaskA(parent=TaskB, arg=i)])

This does what's expected with regards to TaskA, but TaskB takes the default arg=0.

So question: how to pass arguments to dependencies using luigi.build() interface?

Here's things that I've tried and don't work:

A)

if __name__ == "__main__":
    for i in range(3):
        luigi.build([TaskA(parent=TaskB, arg=i), TaskB(arg=i)])

Doesn't work because two instances of TaskB are ran: one with the default (wrong) arg, which TaskA depends on, and one with the correct arg, which TaskA doesn't depend on.

B)

if __name__ == "__main__":
    for i in range(3):
        luigi.build([TaskA(parent=TaskB(arg=i), arg=i)])

TypeError: 'TaskB' object is not callable

C)

if __name__ == "__main__":
    for i in range(3):
        luigi.build([TaskA(parent=TaskB, arg=i)], "--TaskB-arg", f"{i}")

Getting desperate. I tried something like the old interface, but doesn't work:

AttributeError: 'str' object has no attribute 'create_remote_scheduler'

解决方案

I believe that your problem is that you are passing the parent as a class and not a Task object. Try to pass it like this:

luigi.build([TaskA(parent=TaskB(arg=i), ...)])

Edit: You may then need to modify TaskA because you have

def requires(self):
   return self.parent()

which constructs the parent as a TaskB object with default params.

Edit2: This design model is actually not encouraged. If you are running with multiple workers, then this will not pickle-depickle correctly. I would recommend creating a new ParameterizedTaskParameter (or some better name) that pickles a task instance and stores it as an object parameter does.

这篇关于Luigi:如何使用 luigi.build 接口将参数传递给依赖项?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆