将ArrayFuffer中的GenericRowWithSchema异常转换为DataFrame中的HashSet与Hive表中的RDD [英] GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table

查看:1584
本文介绍了将ArrayFuffer中的GenericRowWithSchema异常转换为DataFrame中的HashSet与Hive表中的RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用

  create table myTable(var1 int,var2 string,var3 int,var4 string,var5 array< struct< a:int,b:string>>)存储为实木复合地板; 

我可以验证它是否已填充 - 此处为样本值

  [1,abcdef,2,ghijkl,ArrayBuffer([1,hello])] 

我希望把它放到一个形式为Spark的RDD中

<$ p $ ((1,abcdef),((2,ghijkl),Set((1,hello))))

现在,使用spark-shell(我在spark-submit中遇到了同样的问题),我用这些值做了一个测试RDD

  scala> val tempRDD = sc.parallelize(Seq(((1,abcdef),((2,ghijkl),ArrayBuffer [(Int,String)]((1,hello))))))
tempRDD:org.apache.spark.rdd.RDD [(Int,String),((Int,String),scala.collection.mutable.ArrayBuffer [(Int,String)]))] = ParallelCollectionRDD [44]使用迭代器在< console>:85

并行化,我可以将ArrayBuffer强制转换为HashSet在以下新的RDD中:

  scala> val tempRDD2 = tempRDD.map(a =>(a._1,(a._2._1,{var tempHashSet = new HashSet [(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet})))
tempRDD2:org.apache.spark.rdd.RDD [((Int,String),((Int,String),scala.collection.immutable .HashSet [(Int,String)]))] = MapPartitionsRDD [46] at map at< console>:87

scala> tempRDD2.collect.foreach(println)
((1,abcdef),((2,ghijkl),Set((1,hello))))

但是,当我尝试使用带有HiveContext / SQLContext的DataFrame完成同样的事情时,出现以下错误:

  scala> val hc = new HiveContext(sc)
scala>导入hc._
scala> import hc.implicits._

scala> val tempHiveQL = hc.sql(从myTable中选择var1,var2,var3,var4,var5)

scala> (1).toString),((a(2).toString.toInt,a(3).toString),a(1) (4).asInstanceOf [ArrayBuffer [(Int,String)]])))

scala> (a._2._1,{var tempHashSet = new HashSet [(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet})))
tempRDD3:org.apache.spark.rdd.RDD [((Int,String),((Int,String),scala.collection.immutable 。< console>在地图上的.HashSet [(Int,String)]))] = MapPartitionsRDD [47]:91

scala> tempRDD3.collect.foreach(println)
org.apache.spark.SparkException:由于阶段失败而导致作业中止:阶段14.0中的任务1失败1次,最近失败:阶段14.0中丢失的任务1.0(TID 5211, localhost):java.lang.ClassCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema无法转换为scala.Tuple2
at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC $ $ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC $$ iwC $$ anonfun $ 1 $$ anonfun $ apply $ 1.apply(< console>:91)
at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)
at $ iwC $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC $ $ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ anonfun $ 1.apply(< console> 91)
at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ IWC $$ anonfun $ 1.适用(小于控制台>:91)
at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)
at scala.collection.Iterator $ class.foreach(Iterator.scala:727)
at scala .collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable $ class。$ plus $ plus $ eq(Growable.scala:48)
at scala.collection。 mutable.ArrayBuffer。$ plus $ plus $ eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer。$ plus $ plus $ eq(ArrayBuffer.scala:47)
at scala。 collection.TraversableOnce $ class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce $ class.toBuffer (TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD $$ anonfun $ 17.apply(RDD.scala:813)
at org。 apache.spark.rdd.RDD $$ anonfun $ 17.apply(RDD.scala:813)
at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1503)
在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1503)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在java.util.concurrent.ThreadPoolExecutor $ Worker.ru n(ThreadPoolExecutor.java:615)
在java.lang.Thread.run(Thread.java:724)

驱动程序堆栈跟踪:
at org.apache.spark.scheduler .DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1192 )
at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1191)
at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala :59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler .scala:693)scala.O
(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive( DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop $$ anon $ 1.run( EventLoop.scala:48)

请注意,我得到同样的错误GenericRowWithSchema不能转换为scala。 Tuple2,当我使用spark-submit在编译的程序中运行它。该程序在RUN TIME遇到转换步骤时崩溃,并且没有编译器错误。



我的人工生成的RDDtempRDD似乎很奇怪与转换一起工作,而Hive查询DataFrame-> RDD则没有。我检查过了,两个RDD都有相同的格式:

  scala> tempRDD 
org.apache.spark.rdd.RDD [(Int,String),((Int,String),scala.collection.mutable.ArrayBuffer [(Int,String)]))] = MapPartitionsRDD [21 ]在DataFrame.scala的地图上:776

scala> tempRDDfromHive
org.apache.spark.rdd.RDD [(Int,String),((Int,String),scala.collection.mutable.ArrayBuffer [(Int,String)]))] = ParallelCollectionRDD [25 ]在处并行:70

唯一的区别是它们最后一步的起始位置。我甚至在运行tempRDD2和tempRDD3的步骤之前尝试了坚持,检查点和实现这些RDD。我也阅读了相关的stackoverflow问题和Apache Spark Jira问题,并从那些我尝试将ArrayBuffer作为Iterator投入使用,但是也有失败的第二步与相同的错误。

有谁知道如何正确地将ArrayBuffers转换为源自Hive表的DataFrame的HashSets?由于该错误似乎仅适用于Hive表版本,因此我很想知道这是SparkQL中Spark / Hive集成的一个问题。



任何想法?

预先感谢。

顺便说一下,我的Spark版本是1.3.0 CDH。 p>



  scala> tempRDDfromHive.printSchema()
root
| - var1:integer(nullable = true)
| - var2:string(nullable = true)
| - var3:integer (nullable = true)
| - var4:string(nullable = true)
| - var5:array(nullable = true)
| | - element:struct(containsNull = true)
| | | - a:integer(nullable = true)
| | | - b:string(nullable = true)


解决方案

map 阶段实际获得的内容不是 ArrayBuffer [(Int,String)] ,而是 ArrayBuffer [Row] 因此是错误。忽略其他列,你需要的是这样的:

  import org.apache.spark.sql.Row 

tempHiveQL.map((a:Row)=>
a.getAs [Seq [Row]](4).map {case Row(k:Int,v:String)=>(k ,v)}。toSet)

看起来这个问题已经在Spark 1.5.0中修复了。

I have a Hive table in parquet format that was generated using

create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;

I am able to verify that it was filled -- here is a sample value

[1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]

I wish to put this into a Spark RDD of the form

((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))

Now, using spark-shell (I get the same problem in spark-submit), I made a test RDD with these values

scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85

using an iterator, I can cast the ArrayBuffer as a HashSet in the following new RDD:

scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87

scala> tempRDD2.collect.foreach(println)
((1,abcdef),((2,ghijkl),Set((1,hello))))

But when I attempt to do the EXACT SAME THING with a DataFrame with a HiveContext / SQLContext, I get the following error:

scala> val hc = new HiveContext(sc)
scala> import hc._
scala> import hc.implicits._

scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")

scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))

scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91

scala> tempRDD3.collect.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
       at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
       at scala.collection.AbstractIterator.to(Iterator.scala:1157)
       at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
       at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
       at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
       at org.apache.spark.scheduler.Task.run(Task.scala:64)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:724)

Driver stacktrace:
       at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at scala.Option.foreach(Option.scala:236)
       at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Note that I get this same error "GenericRowWithSchema cannot be cast to scala.Tuple2" when I run this in a compiled program using spark-submit. The program crashes at RUN TIME when it encounters the conversion step, and I had no compiler errors.

It seems very strange to me that my artificially generated RDD "tempRDD" would work with the conversion, whereas the Hive query DataFrame->RDD did not. I checked, and both of the RDDs have the same form:

scala> tempRDD
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776

scala> tempRDDfromHive
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70

the only difference is where their last step originated. I even tried persisting, checkpointing, and materializing these RDDs before running the steps for tempRDD2 and tempRDD3. All got the same error message.

I also read though related stackoverflow questions and Apache Spark Jira issues, and from those I attempted casting the ArrayBuffer as an Iterator instead, but that also failed on the second step with the same error.

Does anyone know how to properly convert ArrayBuffers to HashSets for DataFrames originating from Hive tables? Since the error seems to be only for the Hive table version, I'm tempted to think that this is an issue with Spark/Hive integration in SparkQL.

Any ideas?

Thanks in advance.

[edited] BTW, my Spark version is 1.3.0 CDH.

[edited: here are the printSchema results]

scala> tempRDDfromHive.printSchema()
root
 |-- var1: integer (nullable = true)
 |-- var2: string (nullable = true)
 |-- var3: integer (nullable = true)
 |-- var4: string (nullable = true)
 |-- var5: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = true)
 |    |    |-- b: string (nullable = true)

解决方案

What you actually get during map phase is not an ArrayBuffer[(Int, String)] but an ArrayBuffer[Row] hence the error. Ignoring other columns what you need is something like this:

import org.apache.spark.sql.Row

tempHiveQL.map((a: Row) =>
    a.getAs[Seq[Row]](4).map{case Row(k: Int, v: String) => (k, v)}.toSet)

It looks like this issue has been fixed in Spark 1.5.0.

这篇关于将ArrayFuffer中的GenericRowWithSchema异常转换为DataFrame中的HashSet与Hive表中的RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆