Spark VectorAssembler 错误 - PySpark 2.3 - Python [英] Spark VectorAssembler Error - PySpark 2.3 - Python
问题描述
我正在使用 pySpark 2.3.0,并创建了一个非常简单的 Spark 数据框来测试 VectorAssembler 的功能.这是一个更大的数据框的一个子集,我只选择了几个数字(双数据类型)列:
>>>cols = ['index','host_listings_count','neighborhood_group_cleansed',\'浴室','卧室','床','square_feet','guests_included',\'review_scores_rating']>>>test = df[cols]>>>test.take(3)
<块引用>
[Row(index=0, host_listings_count=1,neighborhood_group_cleansed=无,浴室=1.5,卧室=2.0,床位=3.0,square_feet=无,guests_included=1,review_scores_rating=100.0), Row(index=1, host_listings_count=1,neighborhood_group_cleansed=无,浴室=1.5,卧室=2.0,床位=3.0,square_feet=无,guests_included=1,review_scores_rating=100.0), Row(index=2, host_listings_count=1,neighborhood_group_cleansed=无,浴室=1.5,卧室=2.0,床位=3.0,square_feet=无,guests_included=1,review_scores_rating=100.0)]
从上面看来,这个 Spark 数据框没有任何问题.所以我然后创建如下所示的汇编程序并得到显示的错误.可能出了什么问题?
>>>from pyspark.ml.feature import VectorAssembler>>>assembler = VectorAssembler(inputCols=cols, outputCol="features")>>>输出 = assembler.transform(test)>>>output.take(3)
<块引用>
Py4JJavaError:调用 o279.collectToPython 时出错.:org.apache.spark.SparkException:由于阶段失败,作业中止:阶段 5.0 中的任务 0 失败 1 次,最近失败:丢失任务 0.0在 5.0 阶段(TID 10、本地主机、执行程序驱动程序):org.apache.spark.SparkException:无法执行用户定义函数($anonfun$3:(结构)=> 向量)在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在org.apache.spark.scheduler.Task.run(Task.scala:99) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:748) 引起的:org.apache.spark.SparkException:要组装的值不能为空.在org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)在scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)在org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)……还有 16 个
驱动程序堆栈跟踪:在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)在scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)在org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)在 scala.Option.foreach(Option.scala:257) 在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 在org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)在org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)在org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768)在org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)在org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)在org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)在org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)在 org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765)在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:498) 在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在py4j.Gateway.invoke(Gateway.java:280) 在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在py4j.GatewayConnection.run(GatewayConnection.java:214) 在java.lang.Thread.run(Thread.java:748) 引起的:org.apache.spark.SparkException:无法执行用户定义函数($anonfun$3:(结构)=> 向量)在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在org.apache.spark.scheduler.Task.run(Task.scala:99) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)... 1 导致:org.apache.spark.SparkException:值assemble 不能为空.在org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)在scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)在org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)……还有 16 个
您发布的堆栈跟踪提到问题是由正在组装的列中的空值引起的.
您需要处理 cols
列中的 null
值.在调用转换之前尝试 test.fillna(0,subset=cols)
,或者过滤掉那些列中具有空值的行.
I am working with pySpark 2.3.0 and have a very simple Spark dataframe I created to test the functionality of VectorAssembler. This is a subset of a larger dataframe where I only picked a few numeric (double data type) columns:
>>>cols = ['index','host_listings_count','neighbourhood_group_cleansed',\
'bathrooms','bedrooms','beds','square_feet', 'guests_included',\
'review_scores_rating']
>>>test = df[cols]
>>>test.take(3)
[Row(index=0, host_listings_count=1, neighbourhood_group_cleansed=None, bathrooms=1.5, bedrooms=2.0, beds=3.0, square_feet=None, guests_included=1, review_scores_rating=100.0), Row(index=1, host_listings_count=1, neighbourhood_group_cleansed=None, bathrooms=1.5, bedrooms=2.0, beds=3.0, square_feet=None, guests_included=1, review_scores_rating=100.0), Row(index=2, host_listings_count=1, neighbourhood_group_cleansed=None, bathrooms=1.5, bedrooms=2.0, beds=3.0, square_feet=None, guests_included=1, review_scores_rating=100.0)]
From the above it seems to me that there is nothing wrong with this Spark dataframe. So I then create the assembler as shown below and get the shown error. What could possibly have gone wrong?
>>>from pyspark.ml.feature import VectorAssembler
>>>assembler = VectorAssembler(inputCols=cols, outputCol="features")
>>>output = assembler.transform(test)
>>>output.take(3)
Py4JJavaError: An error occurred while calling o279.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 10, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct) => vector) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Values to assemble cannot be null. at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98) ... 16 more
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct) => vector) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: org.apache.spark.SparkException: Values to assemble cannot be null. at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98) ... 16 more
The stack trace you posted mentions that the problem is caused by null values in the columns being assembled.
You need to deal with null
values in your cols
columns.
Try test.fillna(0, subset=cols)
before calling transform, or alternatively, filter out rows with null values in those columns.
这篇关于Spark VectorAssembler 错误 - PySpark 2.3 - Python的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!