来自自定义模块的函数在 PySpark 中不起作用,但在交互模式下输入时它们起作用 [英] Functions from custom module not working in PySpark, but they work when inputted in interactive mode

查看:30
本文介绍了来自自定义模块的函数在 PySpark 中不起作用,但在交互模式下输入时它们起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个模块,其中包含作用于 PySpark DataFrames 的函数.他们对 DataFrame 中的列进行转换,然后返回一个新的 DataFrame.以下是代码示例,已缩短为仅包含其中一个功能:

I have a module that I've written containing functions that act on PySpark DataFrames. They do a transformation on columns in the DataFrame and then return a new DataFrame. Here is an example of the code, shortened to include only one of the functions:

from pyspark.sql import functions as F
from pyspark.sql import types as t

import pandas as pd
import numpy as np

metadta=pd.DataFrame(pd.read_csv("metadata.csv"))  # this contains metadata on my dataset

def str2num(text):
    if type(text)==None or text=='' or text=='NULL' or text=='null':
        return 0
    elif len(text)==1:
        return ord(text)
    else:
        newnum=''
        for lettr in text:
            newnum=newnum+str(ord(lettr))
        return int(newnum)

str2numUDF = F.udf(lambda s: str2num(s), t.IntegerType())

def letConvNum(df):    # df is a PySpark DataFrame
    #Get a list of columns that I want to transform, using the metadata Pandas DataFrame
    chng_cols=metadta[(metadta.comments=='letter conversion to num')].col_name.tolist()
    for curcol in chng_cols:
        df=df.withColumn(curcol, str2numUDF(df[curcol]))
    return df

这是我的模块,将其命名为 mymodule.py.如果我启动 PySpark shell,并执行以下操作:

So that is my module, call it mymodule.py. If I start the PySpark shell, and I do the following:

import mymodule as mm
myf=sqlContext.sql("select * from tablename lim 10")

我检查了 myf (PySpark DataFrame),没问题.我通过尝试使用 str2num 函数来检查我是否确实导入了 mymodule:

I check myf (PySpark DataFrame) and it is ok. I check that I have actually imported mymodule, by trying to use the str2num function:

mm.str2num('a')
97

所以它实际上是在导入模块.那么如果我试试这个:

So it actually is importing the module. Then if I try this:

df2=mm.letConvNum(df)

并执行此操作以检查它是否有效:

And do this to check that it worked:

df2.show()

它尝试执行操作,但随后崩溃:

It tries to perform the action, but then it crashes:

    16/03/10 16:10:44 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 365)
    org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
        command = pickleSer._read_with_length(infile)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
        return self.loads(obj)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
        return pickle.loads(obj)
      File "test2.py", line 16, in <module>
        str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType())
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf
        return UserDefinedFunction(f, returnType)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__
        self._judf = self._create_judf(name)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf
        pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD
        [x._jbroadcast for x in sc._pickled_broadcast_vars],
    AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars'

            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
            at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
            at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
            at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397)
            at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:88)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    16/03/10 16:10:44 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/dataframe.py", line 256, in show
        print(self._jdf.showString(n, truncate))
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
      File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in deco
        return f(*a, **kw)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o7299.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 365, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
        command = pickleSer._read_with_length(infile)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
        return self.loads(obj)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
        return pickle.loads(obj)
      File "test2.py", line 16, in <module>
        str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType())
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf
        return UserDefinedFunction(f, returnType)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__
        self._judf = self._create_judf(name)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf
        pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD
        [x._jbroadcast for x in sc._pickled_broadcast_vars],
    AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars'

            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
            at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
            at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
            at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397)
            at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:88)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
            at scala.Option.foreach(Option.scala:236)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
            at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
            at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
            at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
            at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
            at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
            at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
            at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
            at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
            at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
            at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
            at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
            at py4j.Gateway.invoke(Gateway.java:259)
            at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
            at py4j.commands.CallCommand.execute(CallCommand.java:79)
            at py4j.GatewayConnection.run(GatewayConnection.java:207)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
        command = pickleSer._read_with_length(infile)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
        return self.loads(obj)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
        return pickle.loads(obj)
      File "test2.py", line 16, in <module>
        str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType())
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf
        return UserDefinedFunction(f, returnType)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__
        self._judf = self._create_judf(name)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf
        pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
      File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD
        [x._jbroadcast for x in sc._pickled_broadcast_vars],
    AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars'

            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
            at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
            at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
            at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397)
            at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:88)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            ... 1 more

作为检查,我打开了一个干净的外壳,而不是导入模块,我只是在交互式外壳中定义了 str2num 函数和 UDF.然后我输入了最后一个函数的内容,并做了同样的最后检查:

As a check, I opened a clean shell and instead of importing the module, I just defined the str2num function and the UDF in the interactive shell. I then typed in the contents of the last function, and did the same final check:

df2.show()

这一次,我得到了我期待的转换后的 DataFrame.

This time, I get back the transformed DataFrame I was expecting.

为什么当函数以交互方式输入时有效,而从模块读入时无效?我知道它正在读取模块,因为常规函数 str2num 起作用.

Why does it work when the functions are inputted interactively but not when they are read in from a module? I know it is reading the module, as the regular function str2num works.

推荐答案

我遇到了同样的错误并跟踪了堆栈跟踪.

I had the same error and followed the stack trace.

就我而言,我正在构建一个 Egg 文件,然后通过 --py-files 选项将其传递给 spark.

In my case, I was building an Egg file and then passing it to spark via the --py-files option.

关于错误,我认为归结为当您调用 F.udf(str2num, t.IntegerType()) 时,会创建一个 UserDefinedFunction 实例在 Spark 运行之前,它有一个对某些 SparkContext 的空引用,称之为 sc.当您运行 UDF 时,会引用 sc._pickled_broadcast_vars,这会在您的输出中引发 AttributeError.

Concerning the error, I think it boils down to the fact that when you call F.udf(str2num, t.IntegerType()) a UserDefinedFunction instance is created before Spark is running, so it has an empty reference to some SparkContext, call it sc. When you run the UDF, sc._pickled_broadcast_vars is referenced and this throws the AttributeError in your output.

我的解决方法是避免在 Spark 运行之前创建 UDF(因此有一个活动的 SparkContext.在您的情况下,您可以更改

My work around is to avoid creating the UDF until Spark is running (and hence there is an active SparkContext. In your case, you could just change your definition of

def letConvNum(df):    # df is a PySpark DataFrame
    #Get a list of columns that I want to transform, using the metadata Pandas DataFrame
    chng_cols=metadta[(metadta.comments=='letter conversion to num')].col_name.tolist()

    str2numUDF = F.udf(str2num, t.IntegerType()) # create UDF on demand
    for curcol in chng_cols:
        df=df.withColumn(curcol, str2numUDF(df[curcol]))
    return df

注意:我还没有真正测试过上面的代码,但我自己的代码中的变化是相似的,一切正常.

Note: I haven't actually tested the code above, but the change in my own code was similar and everything worked fine.

另外,对于感兴趣的读者,请参阅UserDefinedFunction 的 Spark 代码

Also, for the interested reader, see the Spark code for UserDefinedFunction

这篇关于来自自定义模块的函数在 PySpark 中不起作用,但在交互模式下输入时它们起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆