为什么Spark对空属性抛出ArrayIndexOutOfBoundsException期望? [英] Why is spark throwing an ArrayIndexOutOfBoundsException expection for empty attributes?

查看:731
本文介绍了为什么Spark对空属性抛出ArrayIndexOutOfBoundsException期望?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

上下文

我正在使用 Spark 1.5 .

我有一个文件 records.txt ,该文件以ctrl A分隔,并且在该文件中的第31个索引是subscriber_id.对于某些记录,subscriber_id为空.带subscription_id的记录不能为空.

I have a file records.txt which is ctrl A delimited and in that file 31st index is for subscriber_id. For some records the subscriber_id is empty. Record with subscriber_id is NOT empty.

在这里,subscriber_id(UK8jikahasjp23)位于最后一个属性的前面:

Here subscriber_id(UK8jikahasjp23) is located at one before the last attribute:

99^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^AUK8jikahasjp23^A

subscriber_id的记录为空:

Record with subscriber_id is empty:

23^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^A^A

问题

我正在获取 java.lang.ArrayIndexOutOfBoundsException ,其中记录的订户ID为空.

I am getting java.lang.ArrayIndexOutOfBoundsException for the records with empty subscriber_id.

为什么火花会为字段Subscriber_id的空值抛出 java.lang.ArrayIndexOutOfBoundsException ?

Why is spark throwing java.lang.ArrayIndexOutOfBoundsException for the empty values for the field subscriber_id?

16/08/20 10:22:18 WARN Scheduler.TaskSetManager:在阶段8.0中丢失了任务31.0:java.lang.ArrayIndexOutOfBoundsException:31

16/08/20 10:22:18 WARN scheduler.TaskSetManager: Lost task 31.0 in stage 8.0 : java.lang.ArrayIndexOutOfBoundsException: 31

 case class CustomerCard(accountNumber:String, subscriber_id:String,subscriptionStatus:String )

     object CustomerCardProcess {
    val log = LoggerFactory.getLogger(this.getClass.getName)


   def doPerform(sc: SparkContext, sqlContext: HiveContext, custCardRDD: RDD[String]): DataFrame = {

    import sqlContext.implicits._
    log.info("doCustomerCardProcess method started")
     val splitRDD        =    custCardRDD.map(elem => elem.split("\\u0001"))
     val schemaRDD       =    splitRDD.map(arr => new CustomerCard( arr(3).trim, arr(31).trim,arr(8).trim))

schemaRDD.toDF().registerTempTable("customer_card")
val custCardDF = sqlContext.sql(
  """
    |SELECT
    |accountNumber,
    |subscriber_id
    |FROM
    |customer_card
    |WHERE
    |subscriptionStatus IN('AB', 'AC', 'PC')
    |AND accountNumber IS NOT NULL AND LENGTH(accountNumber) > 0
  """.stripMargin)

log.info("doCustomerCardProcess method ended")
custCardDF
  }

}

错误

13/09/12 23:22:18 WARN Scheduler.TaskSetManager:在任务31.0中丢失 阶段8.0(TID 595,:java.lang.ArrayIndexOutOfBoundsException:31 at com.org.CustomerCardProcess $$ anonfun $ 2.apply(CustomerCardProcess.scala:23) 在 com.org.CustomerCardProcess $$ anonfun $ 2.apply(CustomerCardProcess.scala:23) 在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)在 scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)在 scala.collection.Iterator $$ anon $ 14.hasNext(Iterator.scala:389)在 scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:327)在 scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:327)在 org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:118) 在 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 在org.apache.spark.scheduler.Task.run(Task.scala:88)处 org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615) 在java.lang.Thread.run(Thread.java:745)

13/09/12 23:22:18 WARN scheduler.TaskSetManager: Lost task 31.0 in stage 8.0 (TID 595, : java.lang.ArrayIndexOutOfBoundsException: 31 at com.org.CustomerCardProcess$$anonfun$2.apply(CustomerCardProcess.scala:23) at com.org.CustomerCardProcess$$anonfun$2.apply(CustomerCardProcess.scala:23) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:118) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

有人可以帮助我解决此问题吗?

Could anyone help me to fix this issue ?

推荐答案

split函数忽略分隔行末尾的所有空字段.所以,

The split function is neglecting all the empty fields at the end of splitted line. So,

更改您的下一行

 val splitRDD = custCardRDD.map(elem => elem.split("\\u0001"))

val splitRDD = custCardRDD.map(elem => elem.split("\\u0001", -1))

-1告诉您考虑所有空字段.

-1 tells to consider all the empty fields.

这篇关于为什么Spark对空属性抛出ArrayIndexOutOfBoundsException期望?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆