RDD 中元组的数量限制;读取 RDD 抛出 arrayIndexOutOfBoundsException [英] number of tuples limit in RDD; reading RDD throws arrayIndexOutOfBoundsException

查看:95
本文介绍了RDD 中元组的数量限制;读取 RDD 抛出 arrayIndexOutOfBoundsException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于包含 25 列的表,我尝试将 DF 修改为 RDD.此后我才知道 Scala(直到 2.11.8)最多可以使用 22 个元组.

val rdd = sc.textFile("/user/hive/warehouse/myDB.db/myTable/")rdd: org.apache.spark.rdd.RDD[String] =/user/hive/warehouse/myDB.db/myTable/MapPartitionsRDD[3] at textFile at :24

样本数据:

<预> <代码> [2017年2月26日,100052-ACC,100052,3260,1005,0.0000,0.0000,0.0000,0.0000,0.0000,0.0000,0.0000,0.0000,0.0000,0.0000,0.0000,0.0000,0.0000,0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000]

访问每一列:

val rdd3 = rdd.map(elements => {val el = elements.split(",")(el(0).substring(1,11).toString, el(1).toString ,el(2).toInt, el(3).toInt, el(4).toInt, el(5).sum.toDouble, el(6).sum.toDouble, el(7).sum.toDouble, el(8).sum.toDouble, el(9).sum.toDouble, el(10).sum.toDouble, el(11)).sum.toDouble, el(12).sum.toDouble, el(13).sum.toDouble, el(14).sum.toDouble, el(15).sum.toDouble, el(15).sum.toDouble, el(17).sum.toDouble, el(18).sum.toDouble, el(19).sum.toDouble, el(20).sum.toDouble, el(21).sum.toDouble, el(22).sum.toDouble, el(23).sum.toDouble, el(24).sum.toDouble)})

它抛出一个错误:

:1: 错误:元组元素过多:26,允许:22

这是 Scala 中的一个错误 (https://issues.scala-lang.org/浏览/SI-9572).所以我创建了一个案例类来解决这个问题.

case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double,col11: 双, col12: 双, col13: 双, col14: 双, col15: 双, col16: 双, col17: 双, col18: 双, col19: 双, col20: 双, col21: 双, col22: 双, col23:双,col24:双,col25:双)

因此新的 rdd 定义变为:

val rdd3 = rdd.map(elements => {val el = elements.split(",")(HandleMaxTuple(el(0).substring(1,11).toString, el(1).toString,el(2).toInt, el(3).toInt, el(4).toInt, el(5).toDouble, el(6).toDouble, el(7).toDouble, el(8).toDouble, el(9).toDouble, el(10).toDouble, el(11).toDouble, el(12).toDouble, el(13).toDouble, el(14).toDouble, el(15).toDouble, el(15).toDouble, el(17).toDouble, el(18).toDouble, el(19).toDouble,el(20).toDouble, el(21).toDouble, el(22).toDouble, el(23).toDouble, el(24).toDouble))})

但是,当我尝试读取 RDD 的内容时:

rdd.take(2).foreach(println)

它给我抛出一个 java.lang.ArrayIndexOutOfBoundsException:

的异常

错误堆栈:

驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)在 scala.Option.foreach(Option.scala:257)在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)在 org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)在 org.apache.spark.rdd.RDD.withScope(RDD.scala:362)在 org.apache.spark.rdd.RDD.take(RDD.scala:1327)... 48 省略引起:java.lang.ArrayIndexOutOfBoundsException:1

知道为什么会这样吗?有什么解决方法吗?

解决方案

我已经尝试使用 case class 执行与您的数据完全相同的操作,但我发现有两个问题.先看答案:

包 com.scalaspark.stackoverflow导入 org.apache.spark.sql.SparkSession对象 StackOverFlow {def main(args: Array[String]): Unit = {def解析器(行:字符串):HandleMaxTuple = {val fields = lines.split(",")val c1 = files(0).substring(1,10).toString()val c2 = files(1).toString()val c3 = files(2).replaceAll("\\s","").toIntval c4 = files(3).replaceAll("\\s","").toIntval c5 = files(4).replaceAll("\\s","").toIntval c6 = files(5).replaceAll("\\s","").toDoubleval c7 = files(6).replaceAll("\\s","").toDoubleval c8 = files(7).replaceAll("\\s","").toDoubleval c9 = files(8).replaceAll("\\s","").toDoubleval c10 = files(9).replaceAll("\\s","").toDoubleval c11 = files(10).replaceAll("\\s","").toDoubleval c12 = files(11).replaceAll("\\s","").toDoubleval c13 = files(12).replaceAll("\\s","").toDoubleval c14 = files(13).replaceAll("\\s","").toDoubleval c15 = files(14).replaceAll("\\s","").toDoubleval c16 = files(15).replaceAll("\\s","").toDoubleval c17 = files(16).replaceAll("\\s","").toDoubleval c18 = files(17).replaceAll("\\s","").toDoubleval c19 = files(18).replaceAll("\\s","").toDoubleval c20 = files(19).replaceAll("\\s","").toDoubleval c21 = files(20).replaceAll("\\s","").toDoubleval c22 = files(21).replaceAll("\\s","").toDoubleval c23 = files(22).replaceAll("\\s","").toDoubleval c24 = files(23).replaceAll("\\s","").toDoubleval c25 = files(24).replaceAll("\\s","").toDoubleval handleMaxTuple : HandleMaxTuple = HandleMaxTuple(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c23,c24,c25)返回句柄MaxTuple}val spark = SparkSession.builder().appName(RDD 中元组的数量限制").master(本地[*]").getOrCreate()val lines = spark.sparkContext.textFile(C:\\Users\\rajnish.kumar\\Desktop\\sampleData.txt", 1)行.foreach(println)val parseddata = lines.map(parser)parseddata.foreach(println)}case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12:双, col13: 双, col14: 双, col15: 双, col16: 双, col17: 双, col18: 双, col19: 双, col20: 双, col21: 双, col22: 双, col23: 双, col24: 双,col25:双)}

第一个问题是对于 el(0) 你使用的是 substring() 根据 Java 文档应该是:

String substring(int beginIndex, int endIndex)返回一个新字符串,它是此字符串的子字符串.

当我使用 el(0).substring(1,11) 时,我得到 java.lang.StringIndexOutOfBoundsException: String index out of range: 11.>

所以使用 el(0).substring(0,10)(因为索引从 0 开始,而不是从 1 开始).

第二个问题您正在使用 toInt 和 doubles 进行某些字段转换,但正如我所看到的,所有字段在开始时都包含一个空格,因此,请注意这可能会因 NumberFormatException<而失败 就像在 Java 中一样,像这样:

scala>val i = "foo".toIntjava.lang.NumberFormatException:对于输入字符串:foo";

有关更多信息,请访问 https://alvinalexander.com/scala/how-cast-string-to-int-in-scala-string-int-conversion.所以为了更正它,我使用了 .replaceAll("\\s","") 它删除了数字之前的所有空格,然后将它们转换为 int 和 doubles.

当您运行上述示例时,您将获得如下输出:

HandleMaxTuple(2017-02-26, 100052-ACC,100052,3260,1005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0.0.0.0.0,0.0,0.0,0.0,0.0,0.0,0.0)

I tried a modification of DF to RDD for a table containing 25 columns. Thereafter I came to know that Scala (until 2.11.8) has a limitation of a max of 22 tuples that could be used.

val rdd = sc.textFile("/user/hive/warehouse/myDB.db/myTable/")
rdd: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/myDB.db/myTable/ MapPartitionsRDD[3] at textFile at <console>:24

Sample Data:

[2017-02-26, 100052-ACC, 100052, 3260, 1005, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000]

Accessing each column:

val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(el(0).substring(1,11).toString, el(1).toString ,el(2).toInt, el(3).toInt, el(4).toInt, el(5).sum.toDouble, el(6).sum.toDouble, el(7).sum.toDouble, el(8).sum.toDouble, el(9).sum.toDouble, el(10).sum.toDouble, el(11).sum.toDouble, el(12).sum.toDouble, el(13).sum.toDouble, el(14).sum.toDouble, el(15).sum.toDouble, el(15).sum.toDouble, el(17).sum.toDouble, el(18).sum.toDouble, el(19).sum.toDouble, el(20).sum.toDouble, el(21).sum.toDouble, el(22).sum.toDouble, el(23).sum.toDouble, el(24).sum.toDouble)
}
)

It throws an error:

<console>:1: error: too many elements for tuple: 26, allowed: 22

It's a bug in Scala (https://issues.scala-lang.org/browse/SI-9572). So I created a case class to go ahead with the problem.

case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)

Thus the new rdd definition becomes:

val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(HandleMaxTuple(el(0).substring(1,11).toString, el(1).toString,el(2).toInt, el(3).toInt, el(4).toInt, el(5).toDouble, el(6).toDouble, el(7).toDouble, el(8).toDouble, el(9).toDouble, el(10).toDouble, el(11).toDouble, el(12).toDouble, el(13).toDouble, el(14).toDouble, el(15).toDouble, el(15).toDouble, el(17).toDouble, el(18).toDouble, el(19).toDouble, el(20).toDouble, el(21).toDouble, el(22).toDouble, el(23).toDouble, el(24).toDouble))
}
)

However, when I try to read the contents of RDD:

rdd.take(2).foreach(println)

it throws me an exception of java.lang.ArrayIndexOutOfBoundsException:

Error Stack:

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
  ... 48 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1

Any idea why it's happening? Any workarounds?

解决方案

I have tried to do exactly same as per your data using case class and I see two problems. First look at the answer:

package com.scalaspark.stackoverflow
import org.apache.spark.sql.SparkSession

object StackOverFlow {
  def main(args: Array[String]): Unit = {
    
    def parser(lines : String): HandleMaxTuple = {
      val fileds = lines.split(",")
      val c1 = fileds(0).substring(1,10).toString()
      val c2 = fileds(1).toString()
      val c3 = fileds(2).replaceAll("\\s","").toInt
      val c4 = fileds(3).replaceAll("\\s","").toInt
      val c5 = fileds(4).replaceAll("\\s","").toInt
      val c6 = fileds(5).replaceAll("\\s","").toDouble
      val c7 = fileds(6).replaceAll("\\s","").toDouble
      val c8 = fileds(7).replaceAll("\\s","").toDouble
      val c9 = fileds(8).replaceAll("\\s","").toDouble
      val c10 = fileds(9).replaceAll("\\s","").toDouble
      val c11 = fileds(10).replaceAll("\\s","").toDouble
      val c12 = fileds(11).replaceAll("\\s","").toDouble
      val c13 = fileds(12).replaceAll("\\s","").toDouble
      val c14 = fileds(13).replaceAll("\\s","").toDouble
      val c15 = fileds(14).replaceAll("\\s","").toDouble
      val c16 = fileds(15).replaceAll("\\s","").toDouble
      val c17 = fileds(16).replaceAll("\\s","").toDouble
      val c18 = fileds(17).replaceAll("\\s","").toDouble
      val c19 = fileds(18).replaceAll("\\s","").toDouble
      val c20 = fileds(19).replaceAll("\\s","").toDouble
      val c21 = fileds(20).replaceAll("\\s","").toDouble
      val c22 = fileds(21).replaceAll("\\s","").toDouble
      val c23 = fileds(22).replaceAll("\\s","").toDouble
      val c24 = fileds(23).replaceAll("\\s","").toDouble
      val c25 = fileds(24).replaceAll("\\s","").toDouble
   
      val handleMaxTuple : HandleMaxTuple = HandleMaxTuple(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25)
      return handleMaxTuple 
    }
    val spark = SparkSession
                .builder()
                .appName("number of tuples limit in RDD")
                .master("local[*]")
                .getOrCreate()
                
    val lines = spark.sparkContext.textFile("C:\\Users\\rajnish.kumar\\Desktop\\sampleData.txt", 1)
    lines.foreach(println)
    val parseddata = lines.map(parser)
    parseddata.foreach(println)
  }
  
  case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)
}

First problem is that for el(0) you are using substring() which as per Java doc should be:

String substring(int beginIndex, int endIndex)
Returns a new string that is a substring of this string. 

When I go with el(0).substring(1,11) I get java.lang.StringIndexOutOfBoundsException: String index out of range: 11.

So go with el(0).substring(0,10) (as index starts from zero not from 1).

Second problem you are using toInt and doubles for some fields conversion but as I can see all of them contains a space in starting, so, beware that this can fail with a NumberFormatException just like it does in Java, like this:

scala> val i = "foo".toInt
java.lang.NumberFormatException: For input string: "foo"

For more info go to https://alvinalexander.com/scala/how-cast-string-to-int-in-scala-string-int-conversion. So to correct it I have used .replaceAll("\\s","") which removes all spaces just before the numbers and then converted them to int and doubles.

When you run above sample you will get output as:

HandleMaxTuple(2017-02-26, 100052-ACC,100052,3260,1005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0)

这篇关于RDD 中元组的数量限制;读取 RDD 抛出 arrayIndexOutOfBoundsException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆