在创建数据帧时如何解决scala.MatchError [英] How to resolve scala.MatchError when creating a Data Frame

查看:2327
本文介绍了在创建数据帧时如何解决scala.MatchError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有文本文件具有复杂的结构化行。我正在使用客户转换器,它将给定的字符串(行)转换为Pojo类(countryInfo)。转换后,我正在建设DF。 POJO类有一个字段,它是一个Custome类型列表(GlobalizedPlayTimeWindows)。我创建了一个与此GlobalizedPlayTimeWindows匹配的Struct,并尝试将现有的Custom Type转换为Struct,但是仍然会遇到错误。



我创建的StructType:

  import org.apache.spark。 sql.types._ 

val PlayTimeWindow =
StructType(
StructField(startTime,DateType,true)::
StructField(endTime,DateType,真的):: Nil)


val globalizedPlayTimeWindows =
StructType(
StructField(countries,ArrayType(StringType,true),true)::
StructField(purchase,ArrayType(PlayTimeWindow,true),true)::
StructField(rental,ArrayType(PlayTimeWindow,true),true)::
StructField(free ArrayType(PlayTimeWindow,true),true)::
StructField(download,ArrayType(PlayTimeWindow,true),true)::
StructField(advertisement,ArrayType(PlayTimeWindow,true) )::
StructField(playTypeIds,ArrayType(PlayTimeWindow,true),true)::
StructField(benefitIds,MapType(StringType,ArrayType ayTimeWindow,true),true),true):: Nil)



val schema = StructType(
StructField(id,StringType,true) :
StructField(jazzCount,IntegerType,true)::
StructField(rockCount,IntegerType,true)::
StructField(classicCount,IntegerType,true)::
StructField(nonclassicCount,IntegerType,true)::
StructField(musicType,StringType,true)::
StructField(playType,ArrayType(globalizedPlayTimeWindows,true),true) :: Nil)

创建数据框架:

  val mappingFile = sc.textFile(s3:// input .....)

val inputData = mappingFile.map(x => {
val countryInfo = MappingUtils.getCountryInfo(x)

val id = countryInfo.getId

val musicType = if(countryInfo.getmusicType!= null&& ; StringUtils.isNotBlank(countryInfo.getmusicType)countryInfo.getmusicType elseUNKOWN_TYPE


val classicWestern = if(countryInfo.getClassic()!= null&& countryInfo.getClassic。 size()> 0)true else false

var nonclassicCount:Int = 0
var classicCount:Int = 0

如果(classicWestern){
classicCount = 1
} else {
nonclassicCount = 1
}


val jazzrock = if(countryInfo.getmusicType()!= null& & countryInfo.getmusicType!=JAZZ)true else false
var jazzCount:Int = 0
var rockCount:Int = 0

如果(jazzrock){
jazzCount = 1
} else {
rockCount = 1
}

val playType = if(countryInfo.getPlayTimeWi ndows!= null&& countryInfo.getPlayTimeWindows.size> 0){countryInfo.getPlayTimeWindows.asScala.toList} else null

(id,jazzCount,rockCount,classicCount,nonclassicCount,musicType,playType)
})map {case(id,jazzCount ,rockCount,classicCount,nonclassicCount,musicType,playType)=> Row(id,jazzCount,rockCount,classicCount,nonclassicCount,musicType,playType)
} .persist(DISK_ONLY)

val inputDataDF = sqlContext.createDataFrame(inputData,schema)

inputDataDF.printSchema:

  root 
| - id:string(nullable = true)
| - jazzCount:integer(nullable = true)
| - rockCount:integer(nullable = true)
| - classicCount:integer(nullable = true)
| - nonclassicCount:integer(nullable = true)
| - musicType:string(nullable = true)
| playType:array(nullable = true)
| | - element:struct(containsNull = true)
| | | - countries:array(nullable = true)
| | | | - element:string(containsNull = true)
| | | - purchase:array(nullable = true)
| | | | - 元素:struct(containsNull = true)
| | | | | - startTime:date(nullable = true)
| | | | | - endTime:date(nullable = true)
| | | - rental:array(nullable = true)
| | | | - element:struct(containsNull = true)
| | | | | - startTime:date(nullable = true)
| | | | | - endTime:date(nullable = true)
| | | - free:array(nullable = true)
| | | | - element:struct(containsNull = true)
| | | | | - startTime:date(nullable = true)
| | | | | - endTime:date(nullable = true)
| | | - 下载:array(nullable = true)
| | | | - element:struct(containsNull = true)
| | | | | - startTime:date(nullable = true)
| | | | | - endTime:date(nullable = true)
| | | - 广告:array(nullable = true)
| | | | - element:struct(containsNull = true)
| | | | | - startTime:date(nullable = true)
| | | | | - endTime:date(nullable = true)
| | | - playTypeIds:array(nullable = true)
| | | | - element:struct(containsNull = true)
| | | | | - startTime:date(nullable = true)
| | | | | - endTime:date(nullable = true)
| | | - benefitIds:map(nullable = true)
| | | | - key:string
| | | | - value:array(valueContainsNull = true)
| | | | | - element:struct(containsNull = true)
| | | | | | - startTime:date(nullable = true)
| | | | | | - endTime:date(nullable = true)

结构的等效POJO:

  @Data 
public GlobalizedPlayTimeWindows(

private final列表< String>国家;

私人最终列表< PlayTimeWindow>购买;

私人最终列表< PlayTimeWindow>租赁;

私人最终列表< PlayTimeWindow>免费;

私人最终列表< PlayTimeWindow>下载;

私人最终列表< PlayTimeWindow>广告;

私人最终列表< PlayTimeWindow> preorderExclusive;

私人最终地图< ; String,List< PlayTimeWindow>> playTypeIds;

}

@Data
public class PlayTimeWindow {

private final Date startTime ;

私人最终日期endTime;
}

错误我得到:

  org.apache.spark.SparkException :由于阶段失败,作业中止:阶段12.0中的任务0失败4次,最近失败:阶段12.0中丢失任务0.3(TID 393,ip-172-31-14-43.ec2.internal):scala.MatchError: GlobalizedPlayTimeWindows(Countries = [US],purchase = null,rental = null,free = null,download = null,advertisement = null,preorderExclusive = null,playTypeIds = null)(类com.model.global.GlobalizedPlayTimeWindows) apache.spark.sql.catalyst.CatalystTypeConverters $ org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)在org.apache.spark中的$ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)。 sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ ArrayConverter $$ anonfun $ toCatalystImpl $ 2.apply(CatalystTypeConverters.scala:163)at scala.collection。 TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244)at scala.collection.T raversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244)at scala.collection.immutable.List.foreach(List.scala:318)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:244)在org.apache.spark.sql.catalyst.CatalystTypeConverters上的org.apache.spark.sql.catalyst.CatalystTypeConverters $ ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:163)上的scala.collection.AbstractTraversable.map(Traversable.scala:105)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl中的org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)上的$ ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) (CatalystTypeConverters.scala:260)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala)上的org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) :102)在org.apa在org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply(SQLContext.scala:492),org.apark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.apply(CatalystTypeConverters.scala:401) apache.spark.sql.SQLContext $$ anonfun $ 6.apply(SQLContext.scala:492)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)at scala.collection.Iterator $$ anon $ 11。下一个(Iterator.scala:328)at scala.collection.Iterator $$ anon $ 10.next(Iterator.scala:312)at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator在.a.achlection.generic.Growable $ class。$ plus $ plus $ eq(Growable.scala:48)at scala.collection.mutable.ArrayBuffer。$ plus $ plus $ eq(ArrayBuffer .scala:103)at scala.collection.mutable.ArrayBuffer。$ plus $ plus $ eq(ArrayBuffer.scala:47)at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)at scala.collection.AbstractIterator .to(Iterator.scala:1157)at scala.collection.TraversableOnce $ class.toBuffe在scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)at scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)at scala.collection.AbstractIterator.toArray(Iterator)中的r(TraversableOnce.scala:265) .scala:1157)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan)上的org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan.scala:212) .scala:212)在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858)在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858 )在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)在org.apache.spark.scheduler.Task.run(Task.scala:89)在org.apache.spark.executor.Executor $ java.lang java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)上的java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)上的TaskRunner.run(Executor.scala:213) .Thread.run(Thread.java:745)驱动程序堆栈跟踪:在org.apa cheespark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1431)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala: 1419)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1418)at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)at scala.collection 。org.apache.spark.scheduler.DAGScheduler中的org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)中的.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala:799)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:799)at orga.apache的scala.Option.foreach(Option.scala:236)。 spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)在org.apache.spa在org.apache.spark.util.EventLoop $$ anon $ 1.run()中的org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)上的rk.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)在org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)在org.apache.spark.SparkContext的EventLoop.scala:48) org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)在org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)上的.runJob(SparkContext.scala:1845)。 apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)在org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)在org.apache.spark.sql.DataFrame $ $ anonfun $ org $ apache $ spark $ sql $ DataFrame $$执行$ 1 $ 1.apply(DataFrame.scala:1538)在org.apache.spark.sql.DataFrame $$ anonfun $ org $ apache $ spark $ sql $ DataFrame $ $ execute $ 1 $ 1.apply(DataFrame.scala:1538)在org.apache.spark .sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:56)在org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)在org.apache.spark.sql.DataFrame.org $ apache $ spark $ sql $ DataFrame $$在org.apache.spark.sql.DataFrame.org上执行$ 1(DataFrame.scala:1537)$ apache $ spark $ sql $ DataFrame $$ collect(DataFrame.scala:1544)在org.apache .spark.sql.DataFrame $$ anonfun $ head $ 1.apply(DataFrame.scala:1414)在org.apache.spark.sql.DataFrame $$ anonfun $ head $ 1.apply(DataFrame.scala:1413)在org.apache在org.apache.spark.sql.DataFrame.take(DataFrame.scala)上的org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)上的.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) :org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171)在org.apache.spark.sql.DataFrame.show(DataFrame.scala:394))在org.apache.spark.sql中。 DataFrame.show(DataFrame.scala:355)at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$$$$ c57e c8bf9b0d5f6161b97741d596ff0 $$$$ wC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:163)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$$$$ c57ec8bf9b0d5f6161b97741d596ff0 $$$$ wC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(<控制台>:168)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$$$$ c57ec8bf9b0d5f6161b97741d596ff0 $$$$ wC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(<控制台>:170)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$$$$ c57ec8bf9b0d5f6161b97741d596ff0 $$$$ wC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:172)$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$$$$ c57ec8bf9b0d5f6161b97741d596ff0 $$$$ wC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console&g t;:174)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$$$$ c57ec8bf9b0d5f6161b97741d596ff0 $$$$ wC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console> ;: 176)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$$$$ c57ec8bf9b0d5f6161b97741d596ff0 $$$$ wC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:178)at $ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console> :180)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>( < console>:182)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:184)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC。< init>(< console>:186)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init> (< console>:190)$ iwC $$ i wC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(<控制台>:192)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:194)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console> :196)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:198)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:200)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC。< init>(<控制台>:202)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:204)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:206)$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。& lt $ init>(< console>:208)$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC。< init>(<控制台>:210)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:212)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(<控制台>:214)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:216)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:218)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:220)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:222)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:224)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:226)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console> ;: 228)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(<控制台>:230)在$ iwC $$ iwC $$ iwC $$ i wC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC。< init>(< console>:232)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:234)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:236)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:238)at $ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console> ;:240)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC。< init>(< console>:242)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:244)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< c onsole>:246)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:248)$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。 < init>(< console>:250)$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< ; $ iwC上的init>(< console>:252)$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init> < console>:254)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:256)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(<控制台>:258)在$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:260)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< ; $ iwC上的init>(< console>:262)$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(<控制台>:264)在$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:266)at $ iwC $$ iwC $$ iwC $$ iwC。< init>(< console>:268)at $ iwC $$ iwC $$ iwC。< init>(< console>:270)在$ iwC $$ iwC。< init>(<在< init>(< console>:276)< init>(<控制台>:280)中的$ iwC。< init>(< console> < clinit>(< console>)在< init>(< console>:7)at。< clinit>(< console>)在$ print(<控制台>)在sun.reflect。在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)上的NativeMethodAccessorImpl.invoke0(Native Method)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAttributeImpl.java:43),在java.lang.reflect.Method.invoke(Method。在org.apache.spark.repl.SparkIMain $ org.apache.spark.repl.SparkIMain $ ReadEvalPrint.call(SparkIMain.scala:1065)在org.apache.spark.repl.SparkIMain $ Request.loadAndRun(SparkIMain.scala:1346)在org.apache在org.apache.spark.repl.SparkIMain.interpret(SparkIMain。)上的org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)上的.spark.repl.SparkIMain.loadAndRunReq $ 1(SparkIMain.scala:840)。 scala:819)在org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.ja va:664)在org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629)在org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622)在org.apache.zeppelin.interpreter org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer $ InterpretJob.jobRun(RemoteInterpreterServer.java)中的org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)上的.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) :276)在org.apache.zeppelin.scheduler.Job.run(Job.java:170)在org.apache.zeppelin.scheduler.FIFOScheduler $ 1.run(FIFOScheduler.java:118)在java.util.concurrent.Executors $ java.util.concurrent.ScheduledThreadPoolExecutor的$ RunnableAdapter.call(Executors.java:511)java.util.concurrent.FutureTask.run(FutureTask.java:266)$ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:180)在java .util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)在java.ut java.lang.Thread.run(Thread.java:745)上的java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)上的il.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

还尝试从inputData隐式的toDF:



inputData.toDF.printSchema但是得到错误:

  java.lang.UnsupportedOperationException:类型为com.model.global.GlobalizedPlayTimeWindows的模式在org.apache.spark.sql.catalyst.ScalaReflection的org.apache.spark.sql.catalyst.ScalaReflection $ class.schemaFor(ScalaReflection.scala:718)不支持$ org.SchemaFor(ScalaReflection.scala:30)在org在org.apache.spark的org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflection.scala:30)中的.apache.spark.sql.catalyst.ScalaReflection $ class.schemaFor(ScalaReflection.scala:667)。 sql.catalyst.ScalaReflection $$ anonfun $ schemaFor $ 1.apply(ScalaReflection.scala:693)在org.apache.spark.sql.catalyst.ScalaReflection $$ ano nfun $ schemaFor $ 1.apply(ScalaReflection.scala:691)at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244)at 


解决方案

可以 - 减少长时间的讨论,这里是一个工作的解决方案。基本上你有两个独立的问题:


  1. 您希望Spark能够将任意Java类解析为DataFrame - 不是这样,Spark只能解析特定类型,一般是:Scala集合;原始数据 java.sql.Date ;和 scala.Product 的所有子类 - 例如所有案例类和元组。所以 - 如评论中所讨论的,首先要做的是将现有的结构转换成这样的类型。


  2. 你的 code>不符合您的Java类 - 有一些区别:




    • 模式的 playType GlobalizedPlayTimeWindows 数组,而您的代码创建了一个单个项目,而不是数组

    • globalizedPlayTimeWindows schema包含 benefitIds 在Java中不存在class

    • playTypeIds schema是一个 Array ,而Java类中具有相同名称的字段是一个地图


所以 - 我更正了所有这些(改变了模式以匹配数据,您可以选择修改这些不同的方式,只要他们匹配),并完成Java类转换为案例类:

  //更正模式:
val PlayTimeWindow =
StructType(
StructField(startTime,DateType,true)::
StructField(endTime,DateType,true):: Nil)

val globalizedPlayTimeWindows =
StructType(
StructField(countries,ArrayType(StringType,true),true)::
StructField(purchase,ArrayType(PlayTimeWindow,true),true) ::
StructField(rental,ArrayType(PlayTimeWindow,true),true)::
StructField(free,ArrayType(PlayTimeWindow,true),true)::
StructField Download,ArrayType(PlayTimeWindow,true),true)::
StructField(广告,ArrayType(PlayTimeWindow,true),true)::
StructField(preorderExclusive,ArrayType(PlayTimeWindow, true)::
StructField(playTypeIds,MapType(StringType,ArrayType(PlayTimeWindow,true),true),true)::
Nil)

val schema = StructType(
StructField(id,StringType,true)::
Str uctField(jazzCount,IntegerType,true)::
StructField(rockCount,IntegerType,true)::
StructField(classicCount,IntegerType,true)::
StructField nonclassicCount,IntegerType,true)::
StructField(musicType,StringType,true)::
StructField(playType,globalizedPlayTimeWindows,true):: Nil)

//注意使用java.sql.Date,java.util.Date不支持
case类PlayTimeWindowScala(startTime:java.sql.Date,endTime:java.sql.Date)

case class GlobalizedPlayTimeWindowsScala(countries:List [String],
purchase:List [PlayTimeWindowScala],
rental:List [PlayTimeWindowScala],
free:List [PlayTimeWindowScala],
下载:列表[PlayTimeWindowScala],
广告:列表[PlayTime WindowScala],
preorderExclusive:List [PlayTimeWindowScala],
playTypeIds:Map [String,List [PlayTimeWindowScala]]

//一些转换方法:
def toSqlDate (jDate:java.util.Date):java.sql.Date = new java.sql.Date(jDate.getTime)

import scala.collection.JavaConverters._

def toScalaWindowList(l:java.util.List [PlayTimeWindow]):List [PlayTimeWindowScala] = {
l.asScala.map(javaWindow => PlayTimeWindowScala(toSqlDate(javaWindow.startTime),toSqlDate(javaWindow.endTime)))toList
}

def toScalaGlobalizedWindows(javaObj:GlobalizedPlayTimeWindows):GlobalizedPlayTimeWindowsScala = {
GlobalizedPlayTimeWindowsScala b $ b javaObj.countries.asScala.toList,
toScalaWindowList(javaObj.purchase),
toScalaWindowList(javaObj.rental),
toScalaWindowList(javaObj.free),
toScalaWindowList javaObj.download),
toScalaWindowList(javaObj.advertisement),
toScalaWindowList(javaObj.preorderExclusive),
javaObj.playTypeIds.asScala.mapValues(toScalaWindowList).toMap



val parsedJavaData:RDD [(String,Int,Int,Int,Int,String,GlobalizedPlayTimeWindows)] = mappingFile.map(x => {
//你的代码生成元组
})

//转换为Scala对象并转换成一行:
val inputData = parsedJavaData.map {
case(id,jazzCount,rockCount ,classicCount,nonclassi cCount,musicType,javaPlayType)=>
val scalaPlayType = toScalaGlobalizedWindows(javaPlayType)
Row(id, jazzCount, rockCount, classicCount, nonclassicCount, musicType, scalaPlayType)
}

// now - this works
val inputDataDF = sqlContext.createDataFrame(inputData, schema)


I have text file which has complex structured row. I am using customer converter which converts the given string(line) to Pojo class(countryInfo). After converting, I am building DF. The POJO class has a field which is a List of Custome Type(GlobalizedPlayTimeWindows). I created a Struct which matches this GlobalizedPlayTimeWindows and trying to convert the existing Custom Type to the Struct but keep getting error.

StructType I created :

import org.apache.spark.sql.types._

  val PlayTimeWindow =
    StructType(
      StructField("startTime", DateType, true) ::
        StructField("endTime", DateType, true) :: Nil)


  val globalizedPlayTimeWindows =
    StructType(
                StructField( "countries", ArrayType(StringType, true), true )  ::
        StructField( "purchase", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "rental", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "free", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "download", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "advertisement", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "playTypeIds", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "benefitIds", MapType(StringType, ArrayType(PlayTimeWindow, true), true), true)  :: Nil)



  val schema =    StructType(
     StructField("id", StringType, true) ::
      StructField("jazzCount", IntegerType, true) ::
      StructField("rockCount", IntegerType, true) ::
      StructField("classicCount", IntegerType, true) ::
      StructField("nonclassicCount", IntegerType, true) ::
      StructField("musicType", StringType, true) ::
      StructField( "playType", ArrayType(globalizedPlayTimeWindows, true), true) :: Nil)

Data frame creation :

val mappingFile = sc.textFile("s3://input.....")

val inputData = mappingFile.map(x=> {
    val countryInfo = MappingUtils.getCountryInfo(x)

    val id = countryInfo.getId

    val musicType = if(countryInfo.getmusicType != null && StringUtils.isNotBlank(countryInfo.getmusicType)) countryInfo.getmusicType else "UNKOWN_TYPE"


    val classicWestern = if (countryInfo.getClassic() != null && countryInfo.getClassic.size() > 0) true  else false

    var nonclassicCount : Int = 0
    var  classicCount : Int = 0

    if (classicWestern) {
      classicCount = 1
    } else {
      nonclassicCount = 1
    }


    val jazzrock = if (countryInfo.getmusicType() != null && countryInfo.getmusicType != "JAZZ") true  else false
    var jazzCount : Int = 0
    var  rockCount : Int = 0

    if (jazzrock) {
      jazzCount = 1
    } else {
      rockCount = 1
    }

    val playType = if(countryInfo.getPlayTimeWindows != null && countryInfo.getPlayTimeWindows.size > 0 ) { countryInfo.getPlayTimeWindows.asScala.toList } else null

  (id, jazzCount, rockCount, classicCount, nonclassicCount, musicType ,playType)
  }).map{case (id, jazzCount, rockCount, classicCount, nonclassicCount, musicType,playType) => Row(id, jazzCount, rockCount, classicCount, nonclassicCount, musicType,playType)
  }.persist(DISK_ONLY)

 val inputDataDF = sqlContext.createDataFrame(inputData, schema)

inputDataDF.printSchema :

root 
|-- id: string (nullable = true) 
|-- jazzCount: integer (nullable = true) 
|-- rockCount: integer (nullable = true) 
|-- classicCount: integer (nullable = true) 
|-- nonclassicCount: integer (nullable = true) 
|-- musicType: string (nullable = true) 
|-- playType: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- countries: array (nullable = true) 
| | | |-- element: string (containsNull = true) 
| | |-- purchase: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- rental: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- free: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- download: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- advertisement: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- playTypeIds: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- benefitIds: map (nullable = true) 
| | | |-- key: string 
| | | |-- value: array (valueContainsNull = true) 
| | | | |-- element: struct (containsNull = true) 
| | | | | |-- startTime: date (nullable = true) 
| | | | | |-- endTime: date (nullable = true) 

Struct's equivalent POJO :

@Data
public GlobalizedPlayTimeWindows(

    private final List<String> countries;

    private final List<PlayTimeWindow> purchase;

    private final List<PlayTimeWindow> rental;

    private final List<PlayTimeWindow> free;

    private final List<PlayTimeWindow> download;

    private final List<PlayTimeWindow> advertisement;

    private final List<PlayTimeWindow> preorderExclusive;

    private final Map<String, List<PlayTimeWindow>> playTypeIds;

}

@Data
public class PlayTimeWindow {

    private final Date startTime;

    private final Date endTime;
}

The error I am getting :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 393, ip-172-31-14-43.ec2.internal): scala.MatchError: GlobalizedPlayTimeWindows(countries=[US], purchase=null, rental=null, free=null, download=null, advertisement=null, preorderExclusive=null, playTypeIds=null) (of class com.model.global.GlobalizedPlayTimeWindows) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:163) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:355) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:163) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:168) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:170) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:172) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:174) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:176) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:178) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:180) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:182) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:184) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:186) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:188) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:190) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:192) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:194) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:196) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:198) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:200) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:202) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:204) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:206) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:208) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:210) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:212) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:214) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:216) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:218) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:220) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:222) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:224) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:226) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:228) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:230) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:232) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:234) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:236) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:238) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:240) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:242) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:244) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:246) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:248) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:250) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:252) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:254) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:256) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:258) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:260) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:262) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:264) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:266) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:268) at $iwC$$iwC$$iwC.<init>(<console>:270) at $iwC$$iwC.<init>(<console>:272) at $iwC.<init>(<console>:274) at <init>(<console>:276) at .<init>(<console>:280) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:664) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622) at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Also tried to do implicit toDF from inputData :

inputData.toDF.printSchema but getting error :

java.lang.UnsupportedOperationException: Schema for type com.model.global.GlobalizedPlayTimeWindows is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:718) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:667) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:693) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:691) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at 

解决方案

OK - to cut the long discussion short, here's a working solution. Basically you had two separate issues here:

  1. You expected Spark to be able to parse an arbitrary Java class into a DataFrame - that is not the case, Spark can only parse specific types, which are generally: Scala collections; Primitives; java.sql.Date; and any subclass of scala.Product - all case classes and tuples, for instance. So - as discussed in comments, the first thing to do is to convert your existing structure into such types.

  2. Your schema didn't match your Java class either - there were a few differences:

    • Schema's playType was an Array of GlobalizedPlayTimeWindows, while your code created a single item and not an array
    • globalizedPlayTimeWindows schema contained benefitIds which doesn't exist in the Java class
    • playTypeIds schema was an Array, while the field with the same name in the Java class was a Map

So - I corrected all these (changed the schema to match the data, you can choose to fix these differently as long as they match) and completed the conversion of the Java classes into case classes:

// corrected schemas:
val PlayTimeWindow =
  StructType(
    StructField("startTime", DateType, true) ::
      StructField("endTime", DateType, true) :: Nil)

val globalizedPlayTimeWindows =
  StructType(
    StructField( "countries", ArrayType(StringType, true), true )  ::
      StructField( "purchase", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "rental", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "free", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "download", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "advertisement", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "preorderExclusive", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "playTypeIds", MapType(StringType, ArrayType(PlayTimeWindow, true), true), true )  ::
      Nil)

val schema =    StructType(
  StructField("id", StringType, true) ::
    StructField("jazzCount", IntegerType, true) ::
    StructField("rockCount", IntegerType, true) ::
    StructField("classicCount", IntegerType, true) ::
    StructField("nonclassicCount", IntegerType, true) ::
    StructField("musicType", StringType, true) ::
    StructField( "playType", globalizedPlayTimeWindows, true) :: Nil)

// note the use of java.sql.Date, java.util.Date not supported
case class PlayTimeWindowScala(startTime: java.sql.Date, endTime: java.sql.Date)

case class GlobalizedPlayTimeWindowsScala (countries: List[String],
                                           purchase: List[PlayTimeWindowScala],
                                           rental: List[PlayTimeWindowScala],
                                           free: List[PlayTimeWindowScala],
                                           download: List[PlayTimeWindowScala],
                                           advertisement: List[PlayTimeWindowScala],
                                           preorderExclusive: List[PlayTimeWindowScala],
                                           playTypeIds: Map[String, List[PlayTimeWindowScala]])

// some conversion methods:
def toSqlDate(jDate: java.util.Date): java.sql.Date = new java.sql.Date(jDate.getTime)

import scala.collection.JavaConverters._

def toScalaWindowList(l: java.util.List[PlayTimeWindow]): List[PlayTimeWindowScala] = {
  l.asScala.map(javaWindow => PlayTimeWindowScala(toSqlDate(javaWindow.startTime), toSqlDate(javaWindow.endTime))).toList
}

def toScalaGlobalizedWindows(javaObj: GlobalizedPlayTimeWindows): GlobalizedPlayTimeWindowsScala = {
  GlobalizedPlayTimeWindowsScala(
    javaObj.countries.asScala.toList,
    toScalaWindowList(javaObj.purchase),
    toScalaWindowList(javaObj.rental),
    toScalaWindowList(javaObj.free),
    toScalaWindowList(javaObj.download),
    toScalaWindowList(javaObj.advertisement),
    toScalaWindowList(javaObj.preorderExclusive),
    javaObj.playTypeIds.asScala.mapValues(toScalaWindowList).toMap
  )
}

val parsedJavaData: RDD[(String, Int, Int, Int, Int, String, GlobalizedPlayTimeWindows)] = mappingFile.map(x => {
   // your code producing the tuple
})

// convert to Scala objects and into a Row:
val inputData = parsedJavaData.map{
  case (id, jazzCount, rockCount, classicCount, nonclassicCount, musicType, javaPlayType) =>
    val scalaPlayType = toScalaGlobalizedWindows(javaPlayType)
    Row(id, jazzCount, rockCount, classicCount, nonclassicCount, musicType, scalaPlayType)
}

// now - this works
val inputDataDF = sqlContext.createDataFrame(inputData, schema)

这篇关于在创建数据帧时如何解决scala.MatchError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆