将数据从 Dataframe 传递到现有 ML VectorIndexerModel 时出错 [英] Error when passing data from a Dataframe into an existing ML VectorIndexerModel
问题描述
我有一个数据框,我想用它对现有模型进行预测.使用模型的转换方法时出错.
I have a Dataframe which I want to use for prediction with an existing model. I get an error when using the transform method of my model.
这就是我处理训练数据的方式.
This is how I process the trainingdata.
forecast.printSchema()
我的数据框的架构:
root
|-- PM10: double (nullable = false)
|-- rain_3h: double (nullable = false)
|-- is_rain: double (nullable = false)
|-- wind_deg: double (nullable = false)
|-- wind_speed: double (nullable = false)
|-- humidity: double (nullable = false)
|-- is_newYear: double (nullable = false)
|-- season: double (nullable = false)
|-- is_rushHour: double (nullable = false)
|-- PM10_average: double (nullable = false)
打印第一行
forecast.show(5)
+----+-------+-------+--------+----------+--------+----------+------+-----------+------------+
|PM10|rain_3h|is_rain|wind_deg|wind_speed|humidity|is_newYear|season|is_rushHour|PM10_average|
+----+-------+-------+--------+----------+--------+----------+------+-----------+------------+
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
+----+-------+-------+--------+----------+--------+----------+------+-----------+------------+
only showing top 5 rows
准备功能
assembler = VectorAssembler(
inputCols=["rain_3h", "is_rain", "wind_deg", "wind_speed", "humidity", "is_newYear", "season", "is_rushHour", "PM10_average"],
outputCol="features")
output = assembler.transform(forecast)
output.registerTempTable("output")
features = spark.sql("SELECT features, PM10 as label FROM output")
features.printSchema()
+--------------------+-----+
| features|label|
+--------------------+-----+
|(9,[0,2,3,8],[1.0...| 1.1|
|(9,[0,2,3,8],[1.0...| 1.1|
|(9,[0,2,3,8],[1.0...| 1.1|
|(9,[0,2,3,8],[1.0...| 1.1|
|(9,[0,2,3,8],[1.0...| 1.1|
+--------------------+-----+
only showing top 5 rows
并将数据传递给模型
model = PipelineModel.load(path)
predict = model.transform(features)
predict.printSchema()
root
|-- features: vector (nullable = true)
|-- label: double (nullable = false)
|-- indexedFeatures: vector (nullable = true)
|-- prediction: double (nullable = true)
predict.show(5)
导致此错误:
17/09/16 19:12:25 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 287, in show
print(self._jdf.showString(n, truncate))
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o235.showString.
: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$11: (vector) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply5_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$anonfun$executeCollect$1.apply(limit.scala:132)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$anonfun$executeCollect$1.apply(limit.scala:132)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:132)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException: key not found: 1.0
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:339)
at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:317)
at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:362)
at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:362)
... 33 more
推荐答案
发生这种情况是因为 PipelineModel
包括 VectorIndexerModel
和 features
在标记为分类的列之一.您可以轻松地重现相同的错误,如下所示:
This happens because PipelineModel
includes VectorIndexerModel
and features
contain unseen levels in one of the columns marked as categorical. You can easily reproduce the same error as follows:
val train = Seq((1L, Vectors.dense(0.0))).toDF("id", "foo")
val test = Seq((1L, Vectors.dense(1.0))).toDF("id", "foo")
new VectorIndexer().setInputCol("foo").setOutputCol("bar")
.fit(train).transform(test).first
截至今天 VectorIndexer
(Spark 2.2) Spark 不支持处理 VectorIndexer
(与 StringIndexer
一样)但此功能 计划用于未来.
As of today VectorIndexer
(Spark 2.2) Spark doesn't support handling unseen levels in VectorIndexer
(as it does with StringIndexer
) but this functionality is planned for the future.
编辑:
在 Spark 2.3 中你可以使用 handleInvalid
,例如:
In Spark 2.3 you can use handleInvalid
, for example:
new VectorIndexer()
.setInputCol("foo").setOutputCol("bar")
.setHandleInvalid("keep")
.fit(train).transform(test).first
这篇关于将数据从 Dataframe 传递到现有 ML VectorIndexerModel 时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!