Spark DataFrame 不尊重模式并将所有内容视为字符串 [英] Spark DataFrame not respecting schema and considering everything as String

查看:21
本文介绍了Spark DataFrame 不尊重模式并将所有内容视为字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正面临一个我多年未能解决的问题.

I am facing a problem which I have failed to get over for ages now.

  1. 我使用的是 Spark 1.4 和 Scala 2.10.我现在无法升级(大型分布式基础架构)

  1. I am on Spark 1.4 and Scala 2.10. I cannot upgrade at this moment (big distributed infrastructure)

我有一个包含几百列的文件,其中只有 2 列是字符串,其余都是长列.我想将此数据转换为标签/特征数据框.

I have a file with few hundred columns, only 2 of which are string and rest all are Long. I want to convert this data into a Label/Features dataframe.

我已经能够将其转换为 LibSVM 格式.

I have been able to get it into LibSVM format.

我只是无法将其转换为标签/功能格式.

I just cannot get it into a Label/Features format.

原因

  1. 我无法使用这里显示的 toDF()https://spark.apache.org/docs/1.5.1/ml-ensemble.html

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()

1.4 不支持

所以我首先将 txtFile 转换为 DataFrame,在那里我使用了这样的东西

So I first converted the txtFile into a DataFrame where I used something like this

def getColumnDType(columnName:String):StructField = {

        if((columnName== "strcol1") || (columnName== "strcol2")) 
            return StructField(columnName, StringType, false)
        else
            return StructField(columnName, LongType, false)
    }
def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = {
        val sfRDD = sc.textFile(staticfeatures_filepath)//
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
         // reads a space delimited string from application.properties file
        val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("")

        // Generate the schema based on the string of schema
        val schema =
          StructType(
            schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))

        val data = sfRDD
        .map(line => line.split(","))
        .map(p => Row.fromSeq(p.toSeq))

        var df = sqlContext.createDataFrame(data, schema)

        //schemaString.split(" ").drop(4)
        //.map(s => df = convertColumn(df, s, "int"))

        return df
    }   

当我用这个返回的数据帧做一个 df.na.drop() df.printSchema() 时,我得到了完美的 Schema 像这样

When I do a df.na.drop() df.printSchema() with this returned dataframe I get perfect Schema Like this

root
 |-- rand_entry: long (nullable = false)
 |-- strcol1: string (nullable = false)
 |-- label: long (nullable = false)
 |-- strcol2: string (nullable = false)
 |-- f1: long (nullable = false)
 |-- f2: long (nullable = false)
 |-- f3: long (nullable = false)
and so on till around f300

但是 - 可悲的部分是我尝试用 df 做的任何事情(见下文),我总是收到 ClassCastException(java.lang.String 不能转换为 java.lang.Long)

But - the sad part is whatever I try to do (see below) with the df, I am always getting a ClassCastException (java.lang.String cannot be cast to java.lang.Long)

val featureColumns = Array("f1","f2",....."f300")
assertEquals(-99,df.select("f1").head().getLong(0))
assertEquals(-99,df.first().get(4))
val transformeddf = new VectorAssembler()
        .setInputCols(featureColumns)
        .setOutputCol("features")
        .transform(df)

所以 - 不好的是 - 即使模式说 Long - df 仍然在内部将所有内容视为字符串.

So - the bad is - even though the schema says Long - the df is still internally considering everything as String.

编辑

添加一个简单的例子

说我有一个这样的文件

1,A,20,P,-99,1,0,0,8,1,1,1,1,131153
1,B,23,P,-99,0,1,0,7,1,1,0,1,65543
1,C,24,P,-99,0,1,0,9,1,1,1,1,262149
1,D,7,P,-99,0,0,0,8,1,1,1,1,458759

sf-schema=f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11

(列名真的无关紧要,所以你可以忽略这些细节)

(column names really do not matter so you can disregard this details)

我想要做的就是创建一个标签/特征类型的数据框,其中我的第 3 列成为标签,第 5 到第 11 列成为特征 [矢量] 列.这样我就可以按照 https 中的其余步骤进行操作://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensemble.

All I am trying to do is create a Label/Features kind of dataframe where my 3rd column becomes a label and the 5th to 11th columns become a feature [Vector] column. Such that I can follow the rest of the steps in https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles.

我也像 zero323 建议的那样投射了列

I have cast the columns too like suggested by zero323

val types = Map("strCol1" -> "string", "strCol2" -> "string")
        .withDefault(_ => "bigint")
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*)
df = df.drop("f0")
df = df.drop("strCol1")
df = df.drop("strCol2")

但是断言和 VectorAssembler 仍然失败.featureColumns = Array("f2","f3",....."f11")这是我拥有 df 后想做的整个序列

But the assert and VectorAssembler still fails. featureColumns = Array("f2","f3",....."f11") This is whole sequence I want to do after I have my df

    var transformeddf = new VectorAssembler()
    .setInputCols(featureColumns)
    .setOutputCol("features")
    .transform(df)

    transformeddf.show(2)

    transformeddf = new StringIndexer()
    .setInputCol("f1")
    .setOutputCol("indexedF1")
    .fit(transformeddf)
    .transform(transformeddf)

    transformeddf.show(2)

    transformeddf = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(5)
    .fit(transformeddf)
    .transform(transformeddf)

来自 ScalaIDE 的异常跟踪 - 当它遇到 VectorAssembler 时如下

The exception trace from ScalaIDE - just when it hits the VectorAssembler is as below

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
    at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117)
    at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
    at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
    at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436)
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
    at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
    at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
    at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

推荐答案

你得到 ClassCastException 因为这正是应该发生的.模式参数不用于自动转换(某些 DataSources 可能会以这种方式使用模式,但不能像 createDataFrame 这样的方法).它只声明存储在行中的值的类型.您有责任传递与架构匹配的数据,而不是相反.

You get ClassCastException because this is exactly what should happen. Schema argument is not used for automatic casting (some DataSources may use schema this way, but not methods like createDataFrame). It only declares what are the types of the values which are stored in the rows. It is you responsibility to pass data which matches the schema, not the other way around.

虽然 DataFrame 显示您声明的架构,但它仅在运行时验证,因此运行时异常.如果您想将数据转换为特定的数据,您必须显式地cast 数据.

While DataFrame shows schema you've declared it is validated only on runtime, hence the runtime exception.If you want to transform data to specific you have cast data explicitly.

  1. 首先将所有列读取为 StringType:

val rows = sc.textFile(staticfeatures_filepath)
  .map(line => Row.fromSeq(line.split(",")))

val schema = StructType(
  schemaString.split(" ").map(
    columnName => StructField(columnName, StringType, false)
  )
)

val df = sqlContext.createDataFrame(rows, schema)

  • 接下来将选定的列转换为所需的类型:

  • Next cast selected columns to desired type:

    import org.apache.spark.sql.types.{LongType, StringType}
    
    val types = Map("strcol1" -> StringType, "strcol2" -> StringType)
      .withDefault(_ => LongType)
    
    val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*)
    

  • 使用汇编器:

  • Use assembler:

    val transformeddf = new VectorAssembler()
      .setInputCols(featureColumns)
      .setOutputCol("features")
      .transform(casted)
    

  • 您可以使用 spark-csv:

    You can simply steps 1 and 2 using spark-csv:

    // As originally 
    val schema = StructType(
      schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
    
    
    val df = sqlContext
      .read.schema(schema)
      .format("com.databricks.spark.csv")
      .option("header", "false")
      .load(staticfeatures_filepath)
    

    另见在 PySpark 中正确读取文件中的类型

    这篇关于Spark DataFrame 不尊重模式并将所有内容视为字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆