包装pyspark Pipeline .__ init__和装饰器 [英] Wrapping pyspark Pipeline.__init__ and decorators

查看:89
本文介绍了包装pyspark Pipeline .__ init__和装饰器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将pyspark Pipeline的构造函数包装起来. init 构造函数,并在新包装的构造函数中添加猴子补丁.但是,我遇到了一个错误,似乎与Pipeline的方式有关. init 使用装饰器

I am trying to wrap the constructor for pyspark Pipeline.init constructor, and monkey patch in the newly wrapped constructor. However, I am running into an error that seems to have something to do with the way Pipeline.init uses decorators

以下是实际完成猴子补丁的代码:

Here is the code that actually does the monkey patch:

    def monkeyPatchPipeline():
      oldInit = Pipeline.__init__

      def newInit(self, **keywordArgs):
        oldInit(self, stages=keywordArgs["stages"])

      Pipeline.__init__ = newInit

但是,当我运行一个简单程序时:

However, when I run a simple program:

import PythonSparkCombinatorLibrary
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

PythonSparkCombinatorLibrary.TransformWrapper.monkeyPatchPipeline()
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

我收到此错误:

Traceback (most recent call last):
  File "C:\<my path>\PythonApplication1\main.py", line 26, in <module>
   pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
  File "C:<my path>PythonApplication1 \PythonSparkCombinatorLibrary.py", line 36, in newInit
oldInit(self, stages=keywordArgs["stages"])
  File "C:\<pyspark_path>\pyspark\__init__.py", line 98, in wrapper
   return func(*args, **kwargs)
 File "C:\<pyspark_path>\pyspark\ml\pipeline.py", line 63, in __init__
    kwargs = self.__init__._input_kwargs
AttributeError: 'function' object has no attribute '_input_kwargs'

在pyspark界面中,我看到了Pipeline. init 看起来像这样:

Looking into the pyspark interface, I see that Pipeline.init looks like this:

@keyword_only
def __init__(self, stages=None):
    """
    __init__(self, stages=None)
    """
    if stages is None:
        stages = []
    super(Pipeline, self).__init__()
    kwargs = self.__init__._input_kwargs
    self.setParams(**kwargs)

注意到@keyword_only装饰器,我也检查了该代码:

And noting the @keyword_only decorator, I inspected that code as well:

def keyword_only(func):
    """
    A decorator that forces keyword arguments in the wrapped method
    and saves actual input keyword arguments in `_input_kwargs`.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        if len(args) > 1:
            raise TypeError("Method %s forces keyword arguments." % func.__name__)
        wrapper._input_kwargs = kwargs
        return func(*args, **kwargs)
    return wrapper

我完全困惑于代码最初是如何工作的,以及为什么它似乎导致我自己的包装器出现问题.我看到包装器为其自身添加了一个_input_kwargs字段,但是Pipeline .__ init__如何通过self .__ init __._ input_kwargs读取该字段?当我再次包装Pipeline .__ init__时,为什么不会发生同样的事情?

I'm totally confused both about how this code works in the first place, and also why it seems to cause problems with my own wrapper. I see that wrapper is adding a _input_kwargs field to itself, but how is Pipeline.__init__ about to read that field with self.__init__._input_kwargs? And why doesn't the same thing happen when I wrap Pipeline.__init__ again?

推荐答案

装饰器101.装饰器是一个高阶函数,该函数将一个函数作为其第一个参数(通常是唯一的参数),并返回一个函数. @ 批注只是用于简单函数调用的语法糖,因此请遵循

Decorator 101. Decorator is a higher-order function which takes a function as its first argument (and typically only), and returns a function. @ annotation is just a syntactic sugar for a simple function call, so following

@decorator
def decorated(x):
    ...

可以重写为例如:

def decorated_(x):
    ...

decorated  = decorator(decorated_)

所以 Pipeline .__ init __ 实际上是 functools.wrapped wrapper ,它捕获 keyword_only的已定义 __ init __ ( func 参数)作为其关闭的一部分.调用它时,它将使用接收到的 kwargs 作为函数属性.基本上,这里发生的事情可以简化为:

So Pipeline.__init__ is actually a functools.wrapped wrapper which captures defined __init__ (func argument of the keyword_only) as a part of its closure. When it is called, it uses received kwargs as a function attribute of itself. Basically what happens here can be simplified to:

def f(**kwargs):
    f._input_kwargs = kwargs  # f is in the current scope

hasattr(f, "_input_kwargs")

False

f(foo=1, bar="x")

hasattr(f, "_input_kwargs")

True

当您进一步包装(装饰) __ init __ 时,外部函数将不会附加 _input_kwargs ,因此会出现错误.如果要使其工作,您可以对原始版本应用与原始 __ init __ 相同的过程,例如使用相同的装饰器:

When you further wrap (decorate) __init__ the external function won't have _input_kwargs attached, hence the error. If you want to make it work you have apply the same process, as used by the original __init__, to your own version, for example with the same decorator:

@keyword_only
def newInit(self, **keywordArgs):
    oldInit(self, stages=keywordArgs["stages"])

但是我喜欢我在评论中提到的内容,您应该考虑使用子类化.

but I liked I mentioned in the comments, you should rather consider subclassing.

这篇关于包装pyspark Pipeline .__ init__和装饰器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆