如何动态创建Luigi任务 [英] How to Dynamically create a Luigi Task

查看:74
本文介绍了如何动态创建Luigi任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为Luigi Tasks创建包装器,但遇到了 Register 类实际上是ABC元类,并且在创建动态type时无法选择.

I am building a wrapper for Luigi Tasks and I ran into a snag with the Register class that's actually an ABC metaclass and not being pickable when I create a dynamic type.

下面的代码或多或少是我用来开发动态类的代码.

The following code, more or less, is what I'm using to develop the dynamic class.

class TaskWrapper(object):
    '''Luigi Spark Factory from the provided JobClass

    Args:
        JobClass(ScrubbedClass): The job to wrap
        options: Options as passed into the JobClass
    '''

    def __new__(self, JobClass, **options):
        # Validate we have a good job
        valid_classes = (
            ScrubbedClass01,
            # ScrubbedClass02,
            # ScrubbedClass03,
        )
        if any(vc == JobClass for vc in valid_classes) or not issubclass(JobClass, valid_classes):
            raise TypeError('Job is not the correct class: {}'.format(JobClass))

        # Build a luigi task class dynamically
        luigi_identifier = 'Task'
        job_name = JobClass.__name__
        job_name = job_name.replace('Pail', '')
        if not job_name.endswith(luigi_identifier):
            job_name += luigi_identifier

        LuigiTask = type(job_name, (PySparkTask, ), {})

        for k, v in options.items():
            setattr(LuigiTask, k, luigi.Parameter())

        def main(self, sc, *args):
            job = JobClass(**options)
            return job._run()

        LuigiTask.main = main

        return LuigiTask

但是,当我运行调用函数时,会得到PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed.

When I run my calling function, however, I get PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed.

调用函数:

def create_task(JobClass, **options):
    LuigiTask = TaskWrapper(JobClass, **options)
    # Add parameters
    parameters = {
        d: options.get(d)
        for d in dir(LuigiTask)
        if not d.startswith('_')
        if isinstance(getattr(LuigiTask, d), luigi.Parameter)
        if d in options
    }

    task = LuigiTask(**parameters)
    return task

推荐答案

使用元类ABC动态创建类时,模块将变为abc,并且当工作人员尝试查找任务时,它将转到抽象基类模块并尝试在其中找到它,但是它当然不存在.

When creating classes dynamically with a meta-class of ABC, the module becomes abc, and when a worker tries to find the task it goes to the abstract base class module and tries to find it there, but of course it does not exist.

要解决此问题,请通过手动重置__module__变量,确保luigi知道在哪里可以找到构建该类的代码.

To solve this, make sure luigi know where to find the code that build the class by manually resetting the __module__ variable.

将行更改为:

LuigiTask = type(job_name, (PySparkTask, ), {'__module__':__name__})

据我所知,这仅是Windows上的问题.

As far as I know, this is only a problem on Windows.

这篇关于如何动态创建Luigi任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆