RDD中的元组限制数;读取RDD引发arrayIndexOutOfBoundsException [英] number of tuples limit in RDD; reading RDD throws arrayIndexOutOfBoundsException

查看:64
本文介绍了RDD中的元组限制数;读取RDD引发arrayIndexOutOfBoundsException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试将包含25列的表的DF修改为RDD.此后,我知道Scala(直到2.11.8)最多可以使用22个元组.

I tried a modification of DF to RDD for a table containing 25 columns. Thereafter I came to know that Scala (until 2.11.8) has a limitation of a max of 22 tuples that could be used.

val rdd = sc.textFile("/user/hive/warehouse/myDB.db/myTable/")
rdd: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/myDB.db/myTable/ MapPartitionsRDD[3] at textFile at <console>:24

样本数据:

[2017-02-26, 100052-ACC, 100052, 3260, 1005, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000]

访问每一列:

val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(el(0).substring(1,11).toString, el(1).toString ,el(2).toInt, el(3).toInt, el(4).toInt, el(5).sum.toDouble, el(6).sum.toDouble, el(7).sum.toDouble, el(8).sum.toDouble, el(9).sum.toDouble, el(10).sum.toDouble, el(11).sum.toDouble, el(12).sum.toDouble, el(13).sum.toDouble, el(14).sum.toDouble, el(15).sum.toDouble, el(15).sum.toDouble, el(17).sum.toDouble, el(18).sum.toDouble, el(19).sum.toDouble, el(20).sum.toDouble, el(21).sum.toDouble, el(22).sum.toDouble, el(23).sum.toDouble, el(24).sum.toDouble)
}
)

它引发错误:

<console>:1: error: too many elements for tuple: 26, allowed: 22

这是Scala中的错误( https://issues.scala-lang.org/browse/SI-9572 ).因此,我创建了一个案例类来解决该问题.

It's a bug in Scala (https://issues.scala-lang.org/browse/SI-9572). So I created a case class to go ahead with the problem.

case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)

因此新的rdd定义变为:

Thus the new rdd definition becomes:

val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(HandleMaxTuple(el(0).substring(1,11).toString, el(1).toString,el(2).toInt, el(3).toInt, el(4).toInt, el(5).toDouble, el(6).toDouble, el(7).toDouble, el(8).toDouble, el(9).toDouble, el(10).toDouble, el(11).toDouble, el(12).toDouble, el(13).toDouble, el(14).toDouble, el(15).toDouble, el(15).toDouble, el(17).toDouble, el(18).toDouble, el(19).toDouble, el(20).toDouble, el(21).toDouble, el(22).toDouble, el(23).toDouble, el(24).toDouble))
}
)

但是,当我尝试读取RDD的内容时:

However, when I try to read the contents of RDD:

rdd.take(2).foreach(println)

它抛出了 java.lang.ArrayIndexOutOfBoundsException异常:

错误堆栈:

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
  ... 48 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1

知道为什么会这样吗?有任何解决方法吗?

Any idea why it's happening? Any workarounds?

推荐答案

我尝试使用case类按照您的数据做完全一样的操作,我看到了两个问题,首先来看一下答案

Hi i have tried to do exactly same as per your data using case class and i see two problems , first look at the answer

package com.scalaspark.stackoverflow
import org.apache.spark.sql.SparkSession

object StackOverFlow {
  def main(args: Array[String]): Unit = {

    def parser(lines : String): HandleMaxTuple = {
      val fileds = lines.split(",")
      val c1 = fileds(0).substring(1,10).toString()
      val c2 = fileds(1).toString()
      val c3 = fileds(2).replaceAll("\\s","").toInt
      val c4 = fileds(3).replaceAll("\\s","").toInt
      val c5 = fileds(4).replaceAll("\\s","").toInt
      val c6 = fileds(5).replaceAll("\\s","").toDouble
      val c7 = fileds(6).replaceAll("\\s","").toDouble
      val c8 = fileds(7).replaceAll("\\s","").toDouble
      val c9 = fileds(8).replaceAll("\\s","").toDouble
      val c10 = fileds(9).replaceAll("\\s","").toDouble
      val c11 = fileds(10).replaceAll("\\s","").toDouble
      val c12 = fileds(11).replaceAll("\\s","").toDouble
      val c13 = fileds(12).replaceAll("\\s","").toDouble
      val c14 = fileds(13).replaceAll("\\s","").toDouble
      val c15 = fileds(14).replaceAll("\\s","").toDouble
      val c16 = fileds(15).replaceAll("\\s","").toDouble
      val c17 = fileds(16).replaceAll("\\s","").toDouble
      val c18 = fileds(17).replaceAll("\\s","").toDouble
      val c19 = fileds(18).replaceAll("\\s","").toDouble
      val c20 = fileds(19).replaceAll("\\s","").toDouble
      val c21 = fileds(20).replaceAll("\\s","").toDouble
      val c22 = fileds(21).replaceAll("\\s","").toDouble
      val c23 = fileds(22).replaceAll("\\s","").toDouble
      val c24 = fileds(23).replaceAll("\\s","").toDouble
      val c25 = fileds(24).replaceAll("\\s","").toDouble

      val handleMaxTuple : HandleMaxTuple = HandleMaxTuple(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25)
      return handleMaxTuple 
    }
    val spark = SparkSession
                .builder()
                .appName("number of tuples limit in RDD")
                .master("local[*]")
                .getOrCreate()

    val lines = spark.sparkContext.textFile("C:\\Users\\rajnish.kumar\\Desktop\\sampleData.txt", 1)
    lines.foreach(println)
    val parseddata = lines.map(parser)
    parseddata.foreach(println)
  }

  case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)
}

第一个问题 是,对于el(0),您使用的是substring(),根据Java文档,它应该是

First problem is that for el(0) you are using substring() which as per java doc should be

String substring(int beginIndex, int endIndex)
Returns a new string that is a substring of this string. 

当我使用 el(0).substring(1,11)时,我得到 java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:11

然后转到el(0).substring(0,10)(因为索引从零开始而不是从1开始)

so go with to el(0).substring(0,10) (as index starts form zero not form 1)

第二个问题 ,您正在使用toInt并为某些字段转换加倍,但由于我看到它们都在开始处包含空格,因此请注意,这可能会失败就像在Java中一样,具有NumberFormatException,如下所示:

Second problem you are using toInt and doubles for some fields conversion but as i can see all of them contains a space in starting, so, beware that this can fail with a NumberFormatException just like it does in Java, like this:

scala> val i = "foo".toInt
java.lang.NumberFormatException: For input string: "foo"

有关更多信息,请访问https://alvinalexander.com/scala/how-cast-string-to-int-in-scala-string-int-conversion 因此,为了更正它,我使用了 .replaceAll("\\ s",")来删除数字之前的所有空格,然后将其转换为int和doubles

for more info go to https://alvinalexander.com/scala/how-cast-string-to-int-in-scala-string-int-conversion so to correct it i have used .replaceAll("\\s","") which removes all spaces just before the numbers and then converted them to int and doubles

希望这对您有帮助,当您运行上述示例时,您将获得

Hope this helps you and when you run above sample you will get output as

HandleMaxTuple(2017-02-26, 100052-ACC,100052,3260,1005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0)

这篇关于RDD中的元组限制数;读取RDD引发arrayIndexOutOfBoundsException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆