如何动态创建Luigi任务 [英] How to Dynamically create a Luigi Task
问题描述
我正在为Luigi Tasks创建包装器,但遇到了 Register
类实际上是ABC元类,并且在创建动态type
时无法选择.
I am building a wrapper for Luigi Tasks and I ran into a snag with the Register
class that's actually an ABC metaclass and not being pickable when I create a dynamic type
.
下面的代码或多或少是我用来开发动态类的代码.
The following code, more or less, is what I'm using to develop the dynamic class.
class TaskWrapper(object):
'''Luigi Spark Factory from the provided JobClass
Args:
JobClass(ScrubbedClass): The job to wrap
options: Options as passed into the JobClass
'''
def __new__(self, JobClass, **options):
# Validate we have a good job
valid_classes = (
ScrubbedClass01,
# ScrubbedClass02,
# ScrubbedClass03,
)
if any(vc == JobClass for vc in valid_classes) or not issubclass(JobClass, valid_classes):
raise TypeError('Job is not the correct class: {}'.format(JobClass))
# Build a luigi task class dynamically
luigi_identifier = 'Task'
job_name = JobClass.__name__
job_name = job_name.replace('Pail', '')
if not job_name.endswith(luigi_identifier):
job_name += luigi_identifier
LuigiTask = type(job_name, (PySparkTask, ), {})
for k, v in options.items():
setattr(LuigiTask, k, luigi.Parameter())
def main(self, sc, *args):
job = JobClass(**options)
return job._run()
LuigiTask.main = main
return LuigiTask
但是,当我运行调用函数时,会得到PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed
.
When I run my calling function, however, I get PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed
.
调用函数:
def create_task(JobClass, **options):
LuigiTask = TaskWrapper(JobClass, **options)
# Add parameters
parameters = {
d: options.get(d)
for d in dir(LuigiTask)
if not d.startswith('_')
if isinstance(getattr(LuigiTask, d), luigi.Parameter)
if d in options
}
task = LuigiTask(**parameters)
return task
推荐答案
使用元类ABC
动态创建类时,模块将变为abc
,并且当工作人员尝试查找任务时,它将转到抽象基类模块并尝试在其中找到它,但是它当然不存在.
When creating classes dynamically with a meta-class of ABC
, the module becomes abc
, and when a worker tries to find the task it goes to the abstract base class module and tries to find it there, but of course it does not exist.
要解决此问题,请通过手动重置__module__
变量,确保luigi知道在哪里可以找到构建该类的代码.
To solve this, make sure luigi know where to find the code that build the class by manually resetting the __module__
variable.
将行更改为:
LuigiTask = type(job_name, (PySparkTask, ), {'__module__':__name__})
据我所知,这仅是Windows上的问题.
As far as I know, this is only a problem on Windows.
这篇关于如何动态创建Luigi任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!