当行包含复杂类型时,如何从RDD [Row]创建Spark DataFrame [英] How to create Spark DataFrame from RDD[Row] when Row contains complex types

查看:158
本文介绍了当行包含复杂类型时,如何从RDD [Row]创建Spark DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 RDD [HbaseRecord] ,其中包含一个自定义的复杂类型 Name .这两个类的定义如下:

I have an RDD[HbaseRecord] which contains a custom complex type Name. Both classes are defined below:

class HbaseRecord(
      val uuid: String,
      val timestamp: String,
      val name: Name
)

class Name(    
    val firstName:                String,     
    val middleName:               String,       
    val lastName:                 String
)

在代码的某个时刻,我想从该RDD生成一个DataFrame,因此可以将其另存为avro文件.我尝试了以下方法:

At some point in my code I want to generate a DataFrame from that RDD, so I can save it as an avro file. I tried the following:

//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ... 

//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD .map(
    hbaseRecord => {
      val uuid : String = hbaseRecord.uuid
      val timestamp : String = hbaseRecord.timestamp
      val name : Name = hbaseRecord.name

      Row(uuid, timestamp, name)
    })

//Here I define the schema
   val schema = new StructType()
                  .add("uuid",StringType)
                  .add("timestamp", StringType)
                  .add("name", new StructType()
                                  .add("firstName",StringType)
                                  .add("middleName",StringType)
                                  .add("lastName",StringType)

//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)

但是我遇到以下错误:

scala.MatchError :(类java.lang.String的)位于org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)在org.apache.spark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.apply(CatalystTypeConverters.scala:401)在org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply(SQLContext.scala:492)在org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply(SQLContext.scala:492)在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)在scala.collection.Iterator $$ anon $ 10.next(Iterator.scala:312)在scala.collection.Iterator $ class.foreach(Iterator.scala:727)在scala.collection.AbstractIterator.foreach(Iterator.scala:1157)在scala.collection.generic.Growable $ class.$ plus $ plus $ eq(Growable.scala:48)在scala.collection.mutable.ArrayBuffer.$ plus $ plus $ eq(ArrayBuffer.scala:103)在scala.collection.mutable.ArrayBuffer.$ plus $ plus $ eq(ArrayBuffer.scala:47)在scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)在scala.collection.AbstractIterator.to(Iterator.scala:1157)在scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)在scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)在scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)在scala.collection.AbstractIterator.toArray(Iterator.scala:1157)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan.scala:212)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan.scala:212)在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858)在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)在org.apache.spark.scheduler.Task.run(Task.scala:89)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:213)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)在java.lang.Thread.run(Thread.java:745)

scala.MatchError: (of class java.lang.String) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

我尝试从行中删除复杂类型,因此它将是 Row [String,String] ,然后没有错误.因此,我认为问题出在复杂类型上.

I tried removing the complex type from the Row, so it would be Row[String, String] and there is no error then. So I assume the problem is with the complex type.

我做错了什么?还是我可以采用什么其他方法来生成具有复杂类型的DataFrame?

What am I doing wrong? or what other approach could I follow to generate that DataFrame with the complex type?

推荐答案

我只是为此使用了简单的 case类而不是类. name 列不符合所定义的架构.将 name 列转换为Row type,它应该可以工作.

I just used simple case class for this instead of class. name column didn't conform to the schema that was defined. Convert the name column to Row type and it should work.

val rowRDD : RDD[Row] = objectRDD .map(
    hbaseRecord => {
      val uuid : String = hbaseRecord.uuid
      val timestamp : String = hbaseRecord.timestamp
      val name = Row(hbaseRecord.name.firstName,
                     hbaseRecord.name.middleName,hbaseRecord.name.lastName)
      Row(uuid, timestamp, name)
    })

这篇关于当行包含复杂类型时,如何从RDD [Row]创建Spark DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆