Spark DataFrame 不尊重模式并将所有内容视为字符串 [英] Spark DataFrame not respecting schema and considering everything as String
问题描述
我正面临一个我多年未能解决的问题.
I am facing a problem which I have failed to get over for ages now.
我使用的是 Spark 1.4 和 Scala 2.10.我现在无法升级(大型分布式基础架构)
I am on Spark 1.4 and Scala 2.10. I cannot upgrade at this moment (big distributed infrastructure)
我有一个包含几百列的文件,其中只有 2 列是字符串,其余都是长列.我想将此数据转换为标签/特征数据框.
I have a file with few hundred columns, only 2 of which are string and rest all are Long. I want to convert this data into a Label/Features dataframe.
我已经能够将其转换为 LibSVM 格式.
I have been able to get it into LibSVM format.
我只是无法将其转换为标签/功能格式.
I just cannot get it into a Label/Features format.
原因
我无法使用这里显示的 toDF()https://spark.apache.org/docs/1.5.1/ml-ensemble.html
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
1.4 不支持
所以我首先将 txtFile 转换为 DataFrame,在那里我使用了这样的东西
So I first converted the txtFile into a DataFrame where I used something like this
def getColumnDType(columnName:String):StructField = {
if((columnName== "strcol1") || (columnName== "strcol2"))
return StructField(columnName, StringType, false)
else
return StructField(columnName, LongType, false)
}
def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = {
val sfRDD = sc.textFile(staticfeatures_filepath)//
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// reads a space delimited string from application.properties file
val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("")
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
val data = sfRDD
.map(line => line.split(","))
.map(p => Row.fromSeq(p.toSeq))
var df = sqlContext.createDataFrame(data, schema)
//schemaString.split(" ").drop(4)
//.map(s => df = convertColumn(df, s, "int"))
return df
}
当我用这个返回的数据帧做一个 df.na.drop() df.printSchema()
时,我得到了完美的 Schema 像这样
When I do a df.na.drop() df.printSchema()
with this returned dataframe I get perfect Schema Like this
root
|-- rand_entry: long (nullable = false)
|-- strcol1: string (nullable = false)
|-- label: long (nullable = false)
|-- strcol2: string (nullable = false)
|-- f1: long (nullable = false)
|-- f2: long (nullable = false)
|-- f3: long (nullable = false)
and so on till around f300
但是 - 可悲的部分是我尝试用 df 做的任何事情(见下文),我总是收到 ClassCastException(java.lang.String 不能转换为 java.lang.Long)
But - the sad part is whatever I try to do (see below) with the df, I am always getting a ClassCastException (java.lang.String cannot be cast to java.lang.Long)
val featureColumns = Array("f1","f2",....."f300")
assertEquals(-99,df.select("f1").head().getLong(0))
assertEquals(-99,df.first().get(4))
val transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
所以 - 不好的是 - 即使模式说 Long - df 仍然在内部将所有内容视为字符串.
So - the bad is - even though the schema says Long - the df is still internally considering everything as String.
编辑
添加一个简单的例子
说我有一个这样的文件
1,A,20,P,-99,1,0,0,8,1,1,1,1,131153
1,B,23,P,-99,0,1,0,7,1,1,0,1,65543
1,C,24,P,-99,0,1,0,9,1,1,1,1,262149
1,D,7,P,-99,0,0,0,8,1,1,1,1,458759
和
sf-schema=f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11
(列名真的无关紧要,所以你可以忽略这些细节)
(column names really do not matter so you can disregard this details)
我想要做的就是创建一个标签/特征类型的数据框,其中我的第 3 列成为标签,第 5 到第 11 列成为特征 [矢量] 列.这样我就可以按照 https 中的其余步骤进行操作://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensemble.
All I am trying to do is create a Label/Features kind of dataframe where my 3rd column becomes a label and the 5th to 11th columns become a feature [Vector] column. Such that I can follow the rest of the steps in https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles.
我也像 zero323 建议的那样投射了列
I have cast the columns too like suggested by zero323
val types = Map("strCol1" -> "string", "strCol2" -> "string")
.withDefault(_ => "bigint")
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*)
df = df.drop("f0")
df = df.drop("strCol1")
df = df.drop("strCol2")
但是断言和 VectorAssembler 仍然失败.featureColumns = Array("f2","f3",....."f11")这是我拥有 df 后想做的整个序列
But the assert and VectorAssembler still fails. featureColumns = Array("f2","f3",....."f11") This is whole sequence I want to do after I have my df
var transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
transformeddf.show(2)
transformeddf = new StringIndexer()
.setInputCol("f1")
.setOutputCol("indexedF1")
.fit(transformeddf)
.transform(transformeddf)
transformeddf.show(2)
transformeddf = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(5)
.fit(transformeddf)
.transform(transformeddf)
来自 ScalaIDE 的异常跟踪 - 当它遇到 VectorAssembler 时如下
The exception trace from ScalaIDE - just when it hits the VectorAssembler is as below
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
推荐答案
你得到 ClassCastException
因为这正是应该发生的.模式参数不用于自动转换(某些 DataSources
可能会以这种方式使用模式,但不能像 createDataFrame
这样的方法).它只声明存储在行中的值的类型.您有责任传递与架构匹配的数据,而不是相反.
You get ClassCastException
because this is exactly what should happen. Schema argument is not used for automatic casting (some DataSources
may use schema this way, but not methods like createDataFrame
). It only declares what are the types of the values which are stored in the rows. It is you responsibility to pass data which matches the schema, not the other way around.
虽然 DataFrame
显示您声明的架构,但它仅在运行时验证,因此运行时异常.如果您想将数据转换为特定的数据,您必须显式地cast
数据.
While DataFrame
shows schema you've declared it is validated only on runtime, hence the runtime exception.If you want to transform data to specific you have cast
data explicitly.
首先将所有列读取为
StringType
:
val rows = sc.textFile(staticfeatures_filepath)
.map(line => Row.fromSeq(line.split(",")))
val schema = StructType(
schemaString.split(" ").map(
columnName => StructField(columnName, StringType, false)
)
)
val df = sqlContext.createDataFrame(rows, schema)
接下来将选定的列转换为所需的类型:
Next cast selected columns to desired type:
import org.apache.spark.sql.types.{LongType, StringType}
val types = Map("strcol1" -> StringType, "strcol2" -> StringType)
.withDefault(_ => LongType)
val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*)
使用汇编器:
Use assembler:
val transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(casted)
您可以使用 spark-csv
:
You can simply steps 1 and 2 using spark-csv
:
// As originally
val schema = StructType(
schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
val df = sqlContext
.read.schema(schema)
.format("com.databricks.spark.csv")
.option("header", "false")
.load(staticfeatures_filepath)
这篇关于Spark DataFrame 不尊重模式并将所有内容视为字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!