Luigi:如何使用 luigi.build 接口将参数传递给依赖项? [英] Luigi: how to pass arguments to dependencies using luigi.build interface?
问题描述
考虑一个任务通过动态依赖依赖另一个任务的情况:
导入路易吉from luigi import Task, TaskParameter, IntParameter类任务A(任务):父 = 任务参数()arg = 内部参数(默认值 = 0)def需要(自我):返回 self.parent()定义运行(自我):打印(f任务 A arg = {self.arg}")类任务B(任务):arg = 内部参数(默认值 = 0)定义运行(自我):打印(f任务 B arg = {self.arg}")如果 __name__ == __main__":luigi.run(["TaskA", "--parent", "TaskB", "--arg", "1", "--TaskB-arg", "2"])
(注意默认的 arg=0
参数).
使用 luigi.run()
接口,这是有效的.如您所见,TaskA
有两个参数:parent=TaskB
和 arg=1
.此外,TaskB
也通过使用语法 --TaskB-arg
被赋予参数 arg=2
.
(在此示例中,任务失败,因为 TaskB
没有将其输出写入 TaskA
可以读取的文件.但这只是为了使示例简短.重点是 TaskA
和 TaskB
都传递了正确的 arg
).
我现在的问题是:我如何做完全相同的事情,但使用 luigi.build()
接口?我想这样做有两个原因:首先是 源代码说 luigi.run()
不应该被使用.但第二,我不能在每个进程中运行多个 luigi.run()
,但我可以使用 luigi.build()
做到这一点.这很重要,因为我想做如下事情:
如果 __name__ == __main__":对于范围内的 i (3):luigi.run(["TaskA", "--parent", "TaskB", "--arg", f"{i}", "--TaskB-arg", f"{i}"])
但是,如果您尝试这样做,则会出现错误:
Pid(s) {10084} 已经在运行
因此,在 luigi.build()
接口中,您应该向它传递使用参数实例化的任务列表:
如果 __name__ == __main__":对于范围内的 i (3):luigi.build([TaskA(parent=TaskB, arg=i)])
这对 TaskA
做了预期的事情,但是 TaskB
采用默认的 arg=0
.
那么问题:如何使用 luigi.build()
接口将参数传递给依赖项?
以下是我尝试过但不起作用的方法:
A)
如果 __name__ == __main__":对于范围内的 i (3):luigi.build([TaskA(parent=TaskB, arg=i), TaskB(arg=i)])
不起作用,因为运行了 TaskB
的两个实例:一个具有 TaskA
依赖的默认(错误)arg,另一个具有正确的 arg,TaskA
不依赖于哪个.
B)
如果 __name__ == __main__":对于范围内的 i (3):luigi.build([TaskA(parent=TaskB(arg=i), arg=i)])
TypeError: 'TaskB' 对象不可调用
C)
如果 __name__ == __main__":对于范围内的 i (3):luigi.build([TaskA(parent=TaskB, arg=i)], "--TaskB-arg", f"{i}")
越来越绝望.我尝试过类似旧界面的东西,但不起作用:
AttributeError: 'str' 对象没有属性 'create_remote_scheduler'
我相信您的问题是您将父对象作为类而不是 Task 对象传递.尝试像这样传递它:
luigi.build([TaskA(parent=TaskB(arg=i), ...)])
您可能需要修改TaskA
,因为您有
def 要求(自我):返回 self.parent()
将父对象构造为具有默认参数的 TaskB
对象.
Edit2:实际上不鼓励这种设计模式.如果您与多个工作人员一起运行,那么这将无法正确进行pickle-depickle.我建议创建一个新的 ParameterizedTaskParameter
(或一些更好的名称)来腌制任务实例并将其存储为对象参数.
Consider a situation where a task depends on another through a dynamic dependency:
import luigi
from luigi import Task, TaskParameter, IntParameter
class TaskA(Task):
parent = TaskParameter()
arg = IntParameter(default=0)
def requires(self):
return self.parent()
def run(self):
print(f"task A arg = {self.arg}")
class TaskB(Task):
arg = IntParameter(default=0)
def run(self):
print(f"task B arg = {self.arg}")
if __name__ == "__main__":
luigi.run(["TaskA", "--parent" , "TaskB", "--arg", "1", "--TaskB-arg", "2"])
(Notice the default arg=0
Parameter).
Using the luigi.run()
interface, this works. As you can see, TaskA
is given two arguments: parent=TaskB
and arg=1
. Furthermore TaskB
is also given argument arg=2
by using the syntax --TaskB-arg
.
Scheduled 2 tasks of which:
* 1 ran successfully:
- 1 TaskB(arg=2)
* 1 failed:
- 1 TaskA(parent=TaskB, arg=1)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
(In this example tasks failed because TaskB
is not writing its output to a file that TaskA
can read. But that's just to keep the example short. The important point is that both TaskA
and TaskB
are passed the correct arg
).
My problem now is: how do I do the exact same thing, but using the luigi.build()
interface? There's two reasons why I want to do this: First is that the source code says that luigi.run()
shouldn't be used. But second, I can't run more than one luigi.run()
per process, but I can do so with luigi.build()
. This is important because I want to do something like:
if __name__ == "__main__":
for i in range(3):
luigi.run(["TaskA", "--parent" , "TaskB", "--arg", f"{i}", "--TaskB-arg", f"{i}"])
However if you try this you get the error:
Pid(s) {10084} already running
So, in the luigi.build()
interface you're supposed to pass it a list of the tasks instantiated with their parameters:
if __name__ == "__main__":
for i in range(3):
luigi.build([TaskA(parent=TaskB, arg=i)])
This does what's expected with regards to TaskA
, but TaskB
takes the default arg=0
.
So question: how to pass arguments to dependencies using luigi.build()
interface?
Here's things that I've tried and don't work:
A)
if __name__ == "__main__":
for i in range(3):
luigi.build([TaskA(parent=TaskB, arg=i), TaskB(arg=i)])
Doesn't work because two instances of TaskB
are ran: one with the default (wrong) arg, which TaskA
depends on, and one with the correct arg, which TaskA
doesn't depend on.
B)
if __name__ == "__main__":
for i in range(3):
luigi.build([TaskA(parent=TaskB(arg=i), arg=i)])
TypeError: 'TaskB' object is not callable
C)
if __name__ == "__main__":
for i in range(3):
luigi.build([TaskA(parent=TaskB, arg=i)], "--TaskB-arg", f"{i}")
Getting desperate. I tried something like the old interface, but doesn't work:
AttributeError: 'str' object has no attribute 'create_remote_scheduler'
I believe that your problem is that you are passing the parent as a class and not a Task object. Try to pass it like this:
luigi.build([TaskA(parent=TaskB(arg=i), ...)])
Edit: You may then need to modify TaskA
because you have
def requires(self):
return self.parent()
which constructs the parent as a TaskB
object with default params.
Edit2: This design model is actually not encouraged. If you are running with multiple workers, then this will not pickle-depickle correctly. I would recommend creating a new ParameterizedTaskParameter
(or some better name) that pickles a task instance and stores it as an object parameter does.
这篇关于Luigi:如何使用 luigi.build 接口将参数传递给依赖项?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!