Spark VectorAssembler 错误 - PySpark 2.3 - Python [英] Spark VectorAssembler Error - PySpark 2.3 - Python

查看:22
本文介绍了Spark VectorAssembler 错误 - PySpark 2.3 - Python的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 pySpark 2.3.0,并创建了一个非常简单的 Spark 数据框来测试 VectorAssembler 的功能.这是一个更大的数据框的一个子集,我只选择了几个数字(双数据类型)列:

>>>cols = ['index','host_listings_count','neighborhood_group_cleansed',\'浴室','卧室','床','square_feet','guests_included',\'review_scores_rating']>>>test = df[cols]>>>test.take(3)

<块引用>

[Row(index=0, host_listings_count=1,neighborhood_group_cleansed=无,浴室=1.5,卧室=2.0,床位=3.0,square_feet=无,guests_included=1,review_scores_rating=100.0), Row(index=1, host_listings_count=1,neighborhood_group_cleansed=无,浴室=1.5,卧室=2.0,床位=3.0,square_feet=无,guests_included=1,review_scores_rating=100.0), Row(index=2, host_listings_count=1,neighborhood_group_cleansed=无,浴室=1.5,卧室=2.0,床位=3.0,square_feet=无,guests_included=1,review_scores_rating=100.0)]

从上面看来,这个 Spark 数据框没有任何问题.所以我然后创建如下所示的汇编程序并得到显示的错误.可能出了什么问题?

>>>from pyspark.ml.feature import VectorAssembler>>>assembler = VectorAssembler(inputCols=cols, outputCol="features")>>>输出 = assembler.transform(test)>>>output.take(3)

<块引用>

Py4JJavaError:调用 o279.collectToPython 时出错.:org.apache.spark.SparkException:由于阶段失败,作业中止:阶段 5.0 中的任务 0 失败 1 次,最近失败:丢失任务 0.0在 5.0 阶段(TID 10、本地主机、执行程序驱动程序):org.apache.spark.SparkException:无法执行用户定义函数($anonfun$3:(结构)=> 向量)在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在org.apache.spark.scheduler.Task.run(Task.scala:99) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:748) 引起的:org.apache.spark.SparkException:要组装的值不能为空.在org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)在scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)在org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)……还有 16 个

驱动程序堆栈跟踪:在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)在scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)在org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)在 scala.Option.foreach(Option.scala:257) 在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 在org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)在org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)在org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:27​​68)在org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:27​​65)在org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:27​​65)在org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)在org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:27​​88)在 org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:27​​65)在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:498) 在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在py4j.Gateway.invoke(Gateway.java:280) 在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在py4j.GatewayConnection.run(GatewayConnection.java:214) 在java.lang.Thread.run(Thread.java:748) 引起的:org.apache.spark.SparkException:无法执行用户定义函数($anonfun$3:(结构)=> 向量)在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在org.apache.spark.scheduler.Task.run(Task.scala:99) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)... 1 导致:org.apache.spark.SparkException:值assemble 不能为空.在org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)在scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)在org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)在org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)……还有 16 个

解决方案

您发布的堆栈跟踪提到问题是由正在组装的列中的空值引起的.

您需要处理 cols 列中的 null 值.在调用转换之前尝试 test.fillna(0,subset=cols),或者过滤掉那些列中具有空值的行.

I am working with pySpark 2.3.0 and have a very simple Spark dataframe I created to test the functionality of VectorAssembler. This is a subset of a larger dataframe where I only picked a few numeric (double data type) columns:

>>>cols = ['index','host_listings_count','neighbourhood_group_cleansed',\
        'bathrooms','bedrooms','beds','square_feet', 'guests_included',\
        'review_scores_rating']
>>>test = df[cols]
>>>test.take(3)

[Row(index=0, host_listings_count=1, neighbourhood_group_cleansed=None, bathrooms=1.5, bedrooms=2.0, beds=3.0, square_feet=None, guests_included=1, review_scores_rating=100.0), Row(index=1, host_listings_count=1, neighbourhood_group_cleansed=None, bathrooms=1.5, bedrooms=2.0, beds=3.0, square_feet=None, guests_included=1, review_scores_rating=100.0), Row(index=2, host_listings_count=1, neighbourhood_group_cleansed=None, bathrooms=1.5, bedrooms=2.0, beds=3.0, square_feet=None, guests_included=1, review_scores_rating=100.0)]

From the above it seems to me that there is nothing wrong with this Spark dataframe. So I then create the assembler as shown below and get the shown error. What could possibly have gone wrong?

>>>from pyspark.ml.feature import VectorAssembler
>>>assembler = VectorAssembler(inputCols=cols, outputCol="features")
>>>output = assembler.transform(test)
>>>output.take(3)

Py4JJavaError: An error occurred while calling o279.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 10, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct) => vector) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Values to assemble cannot be null. at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98) ... 16 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct) => vector) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: org.apache.spark.SparkException: Values to assemble cannot be null. at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98) ... 16 more

解决方案

The stack trace you posted mentions that the problem is caused by null values in the columns being assembled.

You need to deal with null values in your cols columns. Try test.fillna(0, subset=cols) before calling transform, or alternatively, filter out rows with null values in those columns.

这篇关于Spark VectorAssembler 错误 - PySpark 2.3 - Python的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆