Luigi-覆盖任务要求/输入 [英] Luigi - Overriding Task requires/input

查看:123
本文介绍了Luigi-覆盖任务要求/输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用luigi执行一系列任务,如下所示:

I am using luigi to execute a chain of tasks, like so:

class Task1(luigi.Task):
    stuff = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('test.json')

    def run(self):
        with self.output().open('w') as f:
            f.write(stuff)


class Task2(luigi.Task):
    stuff = luigi.Parameter()

    def requires(self):
        return Task1(stuff=self.stuff)

    def output(self):
        return luigi.LocalTarget('something-else.json')

    def run(self):
        with self.output().open('w') as f:
            f.write(stuff)

当我像这样启动整个工作流程时,这完全可以按需工作:

This works exactly as desired when I start the entire workflow like so:

luigi.build([Task2(stuff='stuff')])

使用luigi.build时,您还可以通过显式传递参数来运行多个任务,按照文档中的此示例.

When using luigi.build you can also run multiple tasks by explicitly passing arguments, as per this example in the documentation.

但是,在我的情况下,我还希望能够完全独立于Task2的业务逻辑来运行它.对于未实现requires

However, in my situation, I would also like to be able to run the business logic of Task2 completely independently of it's involvement in the workflow. This works fine for tasks that do not implement requires, as per this example.

我的问题是,如何在工作流程中以及单独运行此方法?显然,我可以添加一个新的私有方法(例如_my_custom_run),该方法获取数据并返回结果,然后在run中使用此方法,但是感觉就像应该烘焙到框架中一样,因此让我觉得我误会了Luigi的最佳做法(仍在学习框架).任何建议表示赞赏,谢谢!

My question is, how can I run this method both as part of the workflow, as well as on it's own? Obviously, I could just add a new private method like _my_custom_run, which takes the data and returns the result, and then use this method in run, but it just feels like something that should be baked into the framework, so it makes me feel like I am misunderstanding Luigi's best practices (still learning the framework). Any advice is appreciated, thanks!

推荐答案

听起来像您想要动态要求.使用该示例中显示的模式,您可以读取配置或传递带有任意数据的参数,并且yield仅基于字段想要的任务在配置中.

It sounds like you want dynamic requirements. Using the pattern shown in that example, you could read a config or pass a parameter with arbitrary data, and yield only the tasks that you want to require based on the fields in the config.

# tasks.py
import luigi
import json
import time


class Parameterizer(luigi.Task):
    params = luigi.Parameter() # Arbitrary JSON

    def output(self):
        return luigi.LocalTarget('./config.json')

    def run(self):
        with self.output().open('w') as f:
            json.dump(params, f)

class Task1(luigi.Task):
    stuff = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('{}'.format(self.stuff[:6]))

    def run(self):
        with self.output().open('w') as f:
            f.write(self.stuff)


class Task2(luigi.Task):
    stuff = luigi.Parameter()
    params = luigi.Parameter()


    def output(self):
        return luigi.LocalTarget('{}'.format(self.stuff[6:]))

    def run(self):

        config = Parameterizer(params=self.params)
        yield config

        with config.output().open() as f:
            parameters = json.load(f)

        if parameters["runTask1"]:
            yield Task1(stuff=self.stuff)
        else:
            pass
        with self.output().open('w') as f:
            f.write(self.stuff)

if __name__ == '__main__':
    cf_json = '{"runTask1": True}'

    print("Trying to run with Task1...")
    luigi.build([Task2(stuff="Task 1Task 2", params='{"runTask1":true}')], local_scheduler=True)

    time.sleep(10)

    cf_json = '{"runTask1": False}'

    print("Trying to run WITHOUT Task1...")
    luigi.build([Task2(stuff="Task 1Did just task 2", params='{"runTask1":false}')], local_scheduler=True)

(只需调用python tasks.py即可执行)

我们可以轻松想象将多个参数映射到多个任务,或者在允许执行各种任务之前应用自定义测试.我们也可以重写此代码以采用luigi.Config中的参数.

We could easily imagine mapping more than one parameter to more than one task, or applying custom tests before allowing various tasks to execute. We could also rewrite this to take the params from luigi.Config.

还请注意来自Task2的以下控制流:

Also note the following control flow from Task2:

    if parameters["runTask1"]:
        yield Task1(stuff=self.stuff)
    else:
        pass

在这里,我们可以运行替代任务,也可以如在luigi存储库中的示例中所看到的那样动态调用任务.例如:

Here we could run an alternative task, or dynamically call tasks as we saw in the example from the luigi repo. For example:

    if parameters["runTask1"]:
        yield Task1(stuff=self.stuff)
    else:
        # self.stuff is not automatically parsed to int, so this list comp is valid
        data_dependent_deps = [Task1(stuff=x) for x in self.stuff] 
        yield data_dependent_deps

这可能比简单的run_standalone()方法要复杂得多,但是我认为这与您在记录的luigi模式中寻找的内容最接近.

This may be a bit more involved than a simple run_standalone() method, but I think it's the closest thing to what you were looking for in the documented luigi patterns.

来源: https://luigi.readthedocs .io/en/stable/tasks.html?highlight = dynamic#dynamic-dependencies

这篇关于Luigi-覆盖任务要求/输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆