无法腌制< class'abc.class_name'> ;:在abc上进行属性查找class_name失败 [英] Can't pickle <class 'abc.class_name'>: attribute lookup class_name on abc failed
问题描述
我在尝试根据字典("cmdList")中定义的依赖关系创建依赖关系(子任务)时遇到上述错误.例如,"BDX010"是"BDX020"的依赖关系. Python 3.7.
I'm getting the above error as I try to create dependencies (subtasks) based on dependency relationship defined in a dictionary ("cmdList). For instance, "BDX010" is a dependency of "BDX020". I'm using Python 3.7.
请参阅底部的堆栈跟踪以获取确切的错误消息.
Please see the stack trace at the bottom for the exact error message.
import luigi
from helpers import SQLTask
import helpers
import logging
import time
acctDate = '201904'
ssisDate = '201905'
runDesc0xx = 'prod period 4 test2'
runDesc9xx = 'test2'
YY = acctDate[:4]
MM = acctDate[4:6]
bdx_sql = 'r:\\1.SQL\\BDX_SQL\\'
cmdList = {
'BDX010': (f'"{bdx_sql}BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 ',''),
'BDX020': (f'"{bdx_sql}BDX_001_NI_DM 020.sql" ','BDX010'),
'BDX022a': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 ','BDX020'),
'BDX022b': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1={YY} MM={MM}','BDX022a'),
'BDX022c': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql" -v Year={YY} Month={MM}', 'BDX022b'),
}
class BDX_Task(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter(default=None)
queryKey = luigi.Parameter()
queryCmd = luigi.Parameter()
runDesc = luigi.Parameter()
dependQry = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(BDX_Task, self).__init__(*args, **kwargs)
self.trans_id = f"00903_BDX_Query_{self.queryKey}__{self.runDesc}"
def requires(self):
cmdListComb = dict(cmdList)
if self.dependQry != '' and self.dependQry in cmdListComb:
dep_cmd, dep_dep_key = cmdListComb[self.dependQry]
klass = globals()[self.dependQry]
return [klass(
acctDate = self.acctDate,
ssisDate = self.ssisDate,
queryKey = self.dependQry,
queryCmd = dep_cmd,
runDesc = self.runDesc,
dependQry = dep_dep_key
)]
else:
return []
def run(self):
strQuery_and_args = f""" -i {self.queryCmd} """
time.sleep(5)
print(strQuery_and_args)
self.get_target().touch()
class BDX_Query_0XX(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter()
runDesc = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(BDX_Query_0XX, self).__init__(*args, **kwargs)
self.trans_id = "00902_BDX_Query_0XX" + "__" + self.runDesc # static.
def requires(self):
for queryKey, (queryCmd, dependQry) in cmdList.items():
klass = type(queryKey, (BDX_Task,),{})
globals()[queryKey] = klass
yield klass(
acctDate = self.acctDate,
ssisDate = self.ssisDate,
queryKey = queryKey,
queryCmd = queryCmd,
runDesc = self.runDesc,
dependQry = dependQry
)
def run(self):
self.get_target().touch()
class BDX_Query_Main(SQLTask):
acctDate = luigi.Parameter(default=acctDate)
ssisDate = luigi.Parameter(default=ssisDate) # one month lag/later than acctDate
# runDesc = globals().runDesc
trans_id = "09000_Metaclass test" + "__" + runDesc9xx # static.
def requires(self):
YY = self.acctDate[:4]
MM = self.acctDate[4:6]
acctDate = self.acctDate
ssisDate = self.ssisDate
return [BDX_Query_0XX( acctDate=self.acctDate, ssisDate = self.ssisDate, runDesc = runDesc0xx )
]
def run(self):
self.get_target().touch()
if __name__ == '__main__':
luigi.run()
堆栈跟踪:
DEBUG: Checking if BDX_Query_Main(acctDate=201904, ssisDate=201905) is complete
DEBUG: Checking if BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=prod period 4 test2) is complete
INFO: Informed scheduler that task BDX_Query_Main_201904_201905_444c47aebc has status PENDING
DEBUG: BDX_Task.__init__ called for queryKey ="BDX010"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX020"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022a"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022b"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022c"
DEBUG: Checking if BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=) is complete
DEBUG: Checking if BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" , runDesc=prod period 4 test2, dependQry=BDX010) is complete
DEBUG: Checking if BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=BDX020) is complete
DEBUG: Checking if BDX022b(acctDate=201904, ssisDate=201905, queryKey=BDX022b, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1=2019 MM=04, runDesc=prod period 4 test2, dependQry=BDX022a) is complete
DEBUG: Checking if BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql" -v Year=2019 Month=04, runDesc=prod period 4 test2, dependQry=BDX022b) is complete
INFO: Informed scheduler that task BDX_Query_0XX_201904_prod_period_4_te_201905_73ccfa7be3 has status PENDING
INFO: Informed scheduler that task BDX022c_201904_BDX022b__r__1_SQL_BDX_SQ_5c6660ab25 has status PENDING
INFO: Informed scheduler that task BDX022b_201904_BDX022a__r__1_SQL_BDX_SQ_c0677e7954 has status PENDING
INFO: Informed scheduler that task BDX022a_201904_BDX020__r__1_SQL_BDX_SQ_784cf5b40a has status PENDING
INFO: Informed scheduler that task BDX020_201904_BDX010__r__1_SQL_BDX_SQ_d37e4e46a2 has status PENDING
INFO: Informed scheduler that task BDX010_201904___r__1_SQL_BDX_SQ_9d353a8cd3 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 5 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 7
INFO: Worker Worker(salt=751624561, workers=5, host=LWVPWEACT001, username=i805649, pid=4108) was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1664, in <module>
main()
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "R:/1.PY/DataPipeLine/run_test.py", line 178, in <module>
luigi.run()
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 192, in run
return _run(*args, **kwargs)['success']
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 209, in _run
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 172, in _schedule_and_run
success &= worker.run()
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 1184, in run
self._run_task(get_work_response.task_id)
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 996, in _run_task
task_process.start()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 112, in start
self._popen = self._Popen(self)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
**_pickle.PicklingError: Can't pickle <class 'abc.BDX010'>: attribute lookup BDX010 on abc failed**
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 115, in _main
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
推荐答案
当使用ABC的元类动态创建类时,模块将变为abc,并且当工作人员尝试查找任务时,它将转到抽象基类模块并尝试在其中找到它,但是它当然不存在.
When creating classes dynamically with a meta-class of ABC, the module becomes abc, and when a worker tries to find the task it goes to the abstract base class module and tries to find it there, but of course it does not exist.
要解决此问题,请通过手动重置__module__
变量,确保luigi知道在哪里可以找到构建该类的代码.
To solve this, make sure luigi know where to find the code that build the class by manually resetting the __module__
variable.
将行更改为:
klass = type(queryKey, (BDX_Task,),{'__module__':__name__})
据我所知,这仅是Windows上的问题.
As far as I know, this is only a problem on Windows.
对不起,我很愚蠢.如果新进程刚刚导入了模块,还必须确保重新创建并添加所有自定义类.
Sorry, stupid of me. Just must also make sure all custom classes gets re-created and added if a new process just imports the module.
# Run this first outside any other logic so it gets run if someone imports the module:
for queryKey in cmdList.keys():
globals()[queryKey] = type(queryKey,(BDX_Task,){'__module__':__name__})
#Then you requires function can look like:
class BDX_Query_0XX(SQLTask):
# ...
def requires(self):
for queryKey, (queryCmd, dependQry) in cmdList.items():
yield globals()[queryKey](
acctDate = self.acctDate,
ssisDate = self.ssisDate,
queryKey = queryKey,
queryCmd = queryCmd,
runDesc = self.runDesc,
dependQry = dependQry
)
这篇关于无法腌制< class'abc.class_name'> ;:在abc上进行属性查找class_name失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!