不能在 pyspark 中应用 pandas_udf [英] Can't apply a pandas_udf in pyspark
问题描述
我正在连接到 AWS EMR 实例的 jupyter notebook 上尝试一些与 pyspark 相关的实验.我有一个 spark 数据框,它从 s3 读取数据,然后过滤掉一些东西.使用 df1.printSchema()
输出模式打印如下:
I'm trying out some pyspark related experiments on jupyter notebook attached to an AWS EMR instance. I've a spark dataframe which reads data from s3, and then filters out some stuffs. Printing the schema using df1.printSchema()
outputs like this:
root
|-- idvalue: string (nullable = true)
|-- locationaccuracyhorizontal: float (nullable = true)
|-- hour: integer (nullable = true)
|-- day: integer (nullable = true)
|-- date: date (nullable = true)
|-- is_weekend: boolean (nullable = true)
|-- locationlatrad: float (nullable = true)
|-- locationlonrad: float (nullable = true)
|-- epochtimestamp: integer (nullable = true)
我正在尝试在此数据帧上应用 pandas_udf
(示例 此处).我的 udf 是:
I'm trying to apply a pandas_udf
on this dataframe (examples here). My udf being:
@pandas_udf(df1.schema, PandasUDFType.GROUPED_MAP)
def normalize(pdf):
hour = pdf.hour
return pdf.assign(hour=(hour - hour.mean()) / hour.std())
调用是这样的:
df2 = df1.groupBy('idvalue') \
.apply(normalize).show()
不幸的是,这是抛出错误,说:
Unfortunately this is throwing error, saying:
An error occurred while calling o522.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 31, x.x.x.x, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 256, in _make_accessor
return maybe_to_datetimelike(data)
File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 82, in maybe_to_datetimelike
"datetimelike index".format(type(data)))
TypeError: cannot convert an object of type <class 'pandas.core.series.Series'> to a datetimelike index
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/worker.py", line 372, in main
process()
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/worker.py", line 367, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 283, in dump_stream
for series in iterator:
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 301, in load_stream
yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 301, in <listcomp>
yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 271, in arrow_to_pandas
s = _check_series_convert_date(s, from_arrow_type(arrow_column.type))
File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/sql/types.py", line 1692, in _check_series_convert_date
return series.dt.date
File "/usr/local/lib64/python3.6/site-packages/pandas/core/generic.py", line 3610, in __getattr__
return object.__getattribute__(self, name)
File "/usr/local/lib64/python3.6/site-packages/pandas/core/accessor.py", line 54, in __get__
return self.construct_accessor(instance)
File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 258, in _make_accessor
raise AttributeError("Can only use .dt accessor with "
AttributeError: Can only use .dt accessor with datetimelike values
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 256, in _make_accessor
return maybe_to_datetimelike(data)
File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 82, in maybe_to_datetimelike
"datetimelike index".format(type(data)))
TypeError: cannot convert an object of type <class 'pandas.core.series.Series'> to a datetimelike index
我不明白为什么它会抛出与日期时间相关的错误.我正在做的所有操作都与此无关.任何帮助表示赞赏.
I'm not understanding why it's throwing datetime related error. None of the operations that I'm doing is related to that. Any help is appreciated.
推荐答案
我认为 pandas_udf 尚不支持所有 spark 类型,而且您的 date_time 列似乎有问题.
I think pandas_udf doesn't support all the spark types yet, and it seems like it's having trouble with your date_time column.
任何 udf 的一个问题是所有数据都必须为您的 udf 实体化,即使 udf 忽略这些值,这可能会导致这样的问题,或最低限度的性能下降.在其他条件相同的情况下,您应该尝试减少传入 udf 的列数.例如,在 groupby 之前添加一个选择.
One issue with any udf is that all the data has to be materialized for your udf, even if the udf ignores the values, which can result in issues like this, or at minimum performance degradation. All else being equal, you should try to reduce the number of columns you pass into your udf. For example, by adding a select before your groupby.
df2 = df1.select('idvalue', 'hour').groupBy('idvalue').apply(normalize).show()
这篇关于不能在 pyspark 中应用 pandas_udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!