无法腌制< class'abc.class_name'&gt ;:在abc上进行属性查找class_name失败 [英] Can't pickle <class 'abc.class_name'>: attribute lookup class_name on abc failed

查看:115
本文介绍了无法腌制< class'abc.class_name'&gt ;:在abc上进行属性查找class_name失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在尝试根据字典("cmdList")中定义的依赖关系创建依赖关系(子任务)时遇到上述错误.例如,"BDX010"是"BDX020"的依赖关系. Python 3.7.

I'm getting the above error as I try to create dependencies (subtasks) based on dependency relationship defined in a dictionary ("cmdList). For instance, "BDX010" is a dependency of "BDX020". I'm using Python 3.7.

请参阅底部的堆栈跟踪以获取确切的错误消息.

Please see the stack trace at the bottom for the exact error message.

import luigi
from helpers import SQLTask
import helpers
import logging 
import time

acctDate = '201904'
ssisDate = '201905'
runDesc0xx = 'prod period 4 test2'  
runDesc9xx = 'test2'  

YY = acctDate[:4]
MM = acctDate[4:6]


bdx_sql = 'r:\\1.SQL\\BDX_SQL\\'
cmdList = {
        'BDX010': (f'"{bdx_sql}BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 ',''),
        'BDX020': (f'"{bdx_sql}BDX_001_NI_DM 020.sql"  ','BDX010'),
        'BDX022a': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql"  -S LWVPDBSQLC070 ','BDX020'),
        'BDX022b': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql"  -S LWVPDBSQLC070  -v Year1={YY} MM={MM}','BDX022a'),
        'BDX022c': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql"  -v Year={YY} Month={MM}', 'BDX022b'),
    }

class BDX_Task(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter(default=None)
    queryKey = luigi.Parameter()
    queryCmd = luigi.Parameter()
    runDesc = luigi.Parameter()
    dependQry = luigi.Parameter()


    def __init__(self, *args, **kwargs):
        super(BDX_Task, self).__init__(*args, **kwargs)
        self.trans_id = f"00903_BDX_Query_{self.queryKey}__{self.runDesc}"

    def requires(self):
        cmdListComb = dict(cmdList)

        if self.dependQry != '' and self.dependQry in cmdListComb:
            dep_cmd, dep_dep_key = cmdListComb[self.dependQry]

            klass = globals()[self.dependQry]
            return [klass(         
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = self.dependQry,
                queryCmd = dep_cmd,
                runDesc = self.runDesc,
                dependQry = dep_dep_key
            )]
        else:
            return []

    def run(self):

        strQuery_and_args = f""" -i {self.queryCmd} """
        time.sleep(5)
        print(strQuery_and_args)
        self.get_target().touch()


class BDX_Query_0XX(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter()  
    runDesc = luigi.Parameter()


    def __init__(self, *args, **kwargs):
        super(BDX_Query_0XX, self).__init__(*args, **kwargs)

        self.trans_id =  "00902_BDX_Query_0XX" + "__" + self.runDesc  # static.


    def requires(self):
        for queryKey, (queryCmd, dependQry) in cmdList.items():
            klass = type(queryKey, (BDX_Task,),{})
            globals()[queryKey] = klass
            yield klass(
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = queryKey,
                queryCmd = queryCmd,
                runDesc = self.runDesc,  
                dependQry = dependQry
            )

    def run(self):
        self.get_target().touch()



class BDX_Query_Main(SQLTask):
    acctDate = luigi.Parameter(default=acctDate)
    ssisDate = luigi.Parameter(default=ssisDate)  # one month lag/later than acctDate
    # runDesc = globals().runDesc

    trans_id = "09000_Metaclass test" + "__" + runDesc9xx  # static.

    def requires(self):
        YY = self.acctDate[:4]
        MM = self.acctDate[4:6]
        acctDate = self.acctDate
        ssisDate = self.ssisDate

        return [BDX_Query_0XX( acctDate=self.acctDate, ssisDate = self.ssisDate, runDesc = runDesc0xx )
               ]

    def run(self):
        self.get_target().touch()


if __name__ == '__main__':
    luigi.run()

堆栈跟踪:

DEBUG: Checking if BDX_Query_Main(acctDate=201904, ssisDate=201905) is complete
DEBUG: Checking if BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=prod period 4 test2) is complete
INFO: Informed scheduler that task   BDX_Query_Main_201904_201905_444c47aebc   has status   PENDING
DEBUG: BDX_Task.__init__ called for queryKey ="BDX010"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX020"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022a"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022b"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022c"
DEBUG: Checking if BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=) is complete
DEBUG: Checking if BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql"  , runDesc=prod period 4 test2, dependQry=BDX010) is complete
DEBUG: Checking if BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql"  -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=BDX020) is complete
DEBUG: Checking if BDX022b(acctDate=201904, ssisDate=201905, queryKey=BDX022b, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql"  -S LWVPDBSQLC070  -v Year1=2019 MM=04, runDesc=prod period 4 test2, dependQry=BDX022a) is complete
DEBUG: Checking if BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql"  -v Year=2019 Month=04, runDesc=prod period 4 test2, dependQry=BDX022b) is complete
INFO: Informed scheduler that task   BDX_Query_0XX_201904_prod_period_4_te_201905_73ccfa7be3   has status   PENDING
INFO: Informed scheduler that task   BDX022c_201904_BDX022b__r__1_SQL_BDX_SQ_5c6660ab25   has status   PENDING
INFO: Informed scheduler that task   BDX022b_201904_BDX022a__r__1_SQL_BDX_SQ_c0677e7954   has status   PENDING
INFO: Informed scheduler that task   BDX022a_201904_BDX020__r__1_SQL_BDX_SQ_784cf5b40a   has status   PENDING
INFO: Informed scheduler that task   BDX020_201904_BDX010__r__1_SQL_BDX_SQ_d37e4e46a2   has status   PENDING
INFO: Informed scheduler that task   BDX010_201904___r__1_SQL_BDX_SQ_9d353a8cd3   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 5 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 7
INFO: Worker Worker(salt=751624561, workers=5, host=LWVPWEACT001, username=i805649, pid=4108) was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1664, in <module>
    main()
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1658, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1068, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "R:/1.PY/DataPipeLine/run_test.py", line 178, in <module>
    luigi.run()
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 192, in run
    return _run(*args, **kwargs)['success']
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 209, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 172, in _schedule_and_run
    success &= worker.run()
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 1184, in run
    self._run_task(get_work_response.task_id)
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 996, in _run_task
    task_process.start()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
**_pickle.PicklingError: Can't pickle <class 'abc.BDX010'>: attribute lookup BDX010 on abc failed**

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input

推荐答案

当使用ABC的元类动态创建类时,模块将变为abc,并且当工作人员尝试查找任务时,它将转到抽象基类模块并尝试在其中找到它,但是它当然不存在.

When creating classes dynamically with a meta-class of ABC, the module becomes abc, and when a worker tries to find the task it goes to the abstract base class module and tries to find it there, but of course it does not exist.

要解决此问题,请通过手动重置__module__变量,确保luigi知道在哪里可以找到构建该类的代码.

To solve this, make sure luigi know where to find the code that build the class by manually resetting the __module__ variable.

将行更改为:

klass = type(queryKey, (BDX_Task,),{'__module__':__name__})

据我所知,这仅是Windows上的问题.

As far as I know, this is only a problem on Windows.

对不起,我很愚蠢.如果新进程刚刚导入了模块,还必须确保重新创建并添加所有自定义类.

Sorry, stupid of me. Just must also make sure all custom classes gets re-created and added if a new process just imports the module.

# Run this first outside any other logic so it gets run if someone imports the module:
for queryKey in cmdList.keys():
    globals()[queryKey] = type(queryKey,(BDX_Task,){'__module__':__name__})

#Then you requires function can look like:
class BDX_Query_0XX(SQLTask):

    # ...

    def requires(self):
        for queryKey, (queryCmd, dependQry) in cmdList.items():
            yield globals()[queryKey](
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = queryKey,
                queryCmd = queryCmd,
                runDesc = self.runDesc,  
                dependQry = dependQry
            )

这篇关于无法腌制&lt; class'abc.class_name'&gt ;:在abc上进行属性查找class_name失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆