当行包含复杂类型时,如何从RDD [Row]创建Spark DataFrame [英] How to create Spark DataFrame from RDD[Row] when Row contains complex types
问题描述
我有一个 RDD [HbaseRecord]
,其中包含一个自定义的复杂类型 Name
.这两个类的定义如下:
I have an RDD[HbaseRecord]
which contains a custom complex type Name
. Both classes are defined below:
class HbaseRecord(
val uuid: String,
val timestamp: String,
val name: Name
)
class Name(
val firstName: String,
val middleName: String,
val lastName: String
)
在代码的某个时刻,我想从该RDD生成一个DataFrame,因此可以将其另存为avro文件.我尝试了以下方法:
At some point in my code I want to generate a DataFrame from that RDD, so I can save it as an avro file. I tried the following:
//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ...
//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD .map(
hbaseRecord => {
val uuid : String = hbaseRecord.uuid
val timestamp : String = hbaseRecord.timestamp
val name : Name = hbaseRecord.name
Row(uuid, timestamp, name)
})
//Here I define the schema
val schema = new StructType()
.add("uuid",StringType)
.add("timestamp", StringType)
.add("name", new StructType()
.add("firstName",StringType)
.add("middleName",StringType)
.add("lastName",StringType)
//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)
但是我遇到以下错误:
scala.MatchError :(类java.lang.String的)位于org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)在org.apache.spark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.apply(CatalystTypeConverters.scala:401)在org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply(SQLContext.scala:492)在org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply(SQLContext.scala:492)在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)在scala.collection.Iterator $$ anon $ 10.next(Iterator.scala:312)在scala.collection.Iterator $ class.foreach(Iterator.scala:727)在scala.collection.AbstractIterator.foreach(Iterator.scala:1157)在scala.collection.generic.Growable $ class.$ plus $ plus $ eq(Growable.scala:48)在scala.collection.mutable.ArrayBuffer.$ plus $ plus $ eq(ArrayBuffer.scala:103)在scala.collection.mutable.ArrayBuffer.$ plus $ plus $ eq(ArrayBuffer.scala:47)在scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)在scala.collection.AbstractIterator.to(Iterator.scala:1157)在scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)在scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)在scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)在scala.collection.AbstractIterator.toArray(Iterator.scala:1157)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan.scala:212)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan.scala:212)在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858)在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)在org.apache.spark.scheduler.Task.run(Task.scala:89)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:213)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)在java.lang.Thread.run(Thread.java:745)
scala.MatchError: (of class java.lang.String) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
我尝试从行中删除复杂类型,因此它将是 Row [String,String]
,然后没有错误.因此,我认为问题出在复杂类型上.
I tried removing the complex type from the Row, so it would be Row[String, String]
and there is no error then. So I assume the problem is with the complex type.
我做错了什么?还是我可以采用什么其他方法来生成具有复杂类型的DataFrame?
What am I doing wrong? or what other approach could I follow to generate that DataFrame with the complex type?
推荐答案
我只是为此使用了简单的 case类
而不是类. name
列不符合所定义的架构.将 name
列转换为Row type,它应该可以工作.
I just used simple case class
for this instead of class.
name
column didn't conform to the schema that was defined.
Convert the name
column to Row type and it should work.
val rowRDD : RDD[Row] = objectRDD .map(
hbaseRecord => {
val uuid : String = hbaseRecord.uuid
val timestamp : String = hbaseRecord.timestamp
val name = Row(hbaseRecord.name.firstName,
hbaseRecord.name.middleName,hbaseRecord.name.lastName)
Row(uuid, timestamp, name)
})
这篇关于当行包含复杂类型时,如何从RDD [Row]创建Spark DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!