星火斯卡拉错误,同时节省数据框蜂巢 [英] Spark Scala Error while saving DataFrame to Hive

查看:391
本文介绍了星火斯卡拉错误,同时节省数据框蜂巢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经通过合并多个阵列诬陷数据帧。我想这个保存到一个蜂巢表,我收到ArrayIndexOutofBound例外。以下是code和我得到了错误。我试着用添加案例类的内外主高清,但仍然得到同样的错误。

 进口org.apache.spark {SparkConf,SparkContext}
进口org.apache.spark.sql {行,SQLContext,数据框}
进口org.apache.spark.ml.feature.RFormula
进口java.text._
进口java.util.Date
进口org.apache.hadoop.conf.Configuration
进口org.apache.hadoop.fs._
进口org.apache.spark.ml.regression.LinearRegressionModel
进口org.apache.spark.ml.classification.LogisticRegressionModel
进口org.apache.spark.ml.classification.DecisionTreeClassificationModel
进口org.apache.spark.ml.classification.RandomForestClassificationModel
进口org.apache.spark.ml.PipelineModel
进口org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
进口org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
进口org.apache.spark.mllib.evaluation.MulticlassMetrics
进口org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
进口org.apache.spark.sql.hive.HiveContext
//案例类行(COL1:字符串,COL2:字符串,COL3:字符串,COL4:字符串,COL5:字符串,COL6:字符串)
反对MLRCreate {
//案例类行(COL1:字符串,COL2:字符串,COL3:字符串,COL4:字符串,COL5:字符串,COL6:字符串)    高清主(参数:数组[字符串]){
            VAL的conf =新SparkConf()。setAppName(MLRCreate)
            VAL SC =新SparkContext(CONF)
            VAL hiveContext =新org.apache.spark.sql.hive.HiveContext(SC)
          进口hiveContext.implicits._
        进口hiveContext.sql            VAL REQID =新java.text.SimpleDateFormat中(YYYYMMDDHHMMSS)。格式(新java.util.Date())
            VAL目录名称=/用户/ EC2用户/ SavedModels /+ REQID
            VAL DF = Functions.loadData(hiveContext,ARGS(0),ARGS(1)​​)
            VAL形式= ARGS(1)​​.toLowerCase
            VAL LBL = form.split(〜)            VAR lrModel:LinearRegressionModel = NULL;
            VAL阵列(培训,测试)= df.randomSplit(阵列(参数(3).toDouble,(1-ARGS(3).toDouble)),种子= ARGS(4).toInt)
            lrModel = Functions.mlr(培训)            VAR COLUMNNAMES = Functions.resultColumns(DF).substring(1)
                            变种columnsFinal = columnnames.split(,)
                            columnsFinal =拦截+:columnsFinal
                            VAR COEFF = lrModel.coefficients.toArray.map(_。的toString)
                            COEFF = lrModel.intercept.toString +:COEFF
                            VAR STDERR = lrModel.summary.coefficientStandardErrors.map(_。的toString)
                            VAR tval = lrModel.summary.tValues​​.map(_。的toString)
                            VAR PVAL = lrModel.summary.pValues​​.map(_。的toString)                            VAR Signif:数组[字符串] =新的Array [字符串](pval.length)                             对于(J< - 0到pval.length-1){
                                 VAR符号= PVAL(J).toDouble;
                                 签署= Math.abs(符号);
                                 如果(符号< = 0.001){
                                     Signif(J)=***;
                                 }否则如果(符号< = 0.01){
                                     Signif(J)=**;
                                 }否则如果(符号< = 0.05){
                                     Signif(J)=*;
                                 }否则如果(符号< = 0.1){
                                     Signif(J)=;
                                 }其他{Signif(J)=;
                                 }
                                    println(columnsFinal(j)+\"#########\"+coeff(j)+\"#########\"+stdErr(j)+\"#########\"+tval(j)+\"#########\"+pval(j)+\"########\"+Signif)
                             }
                            案例类行(COL1:字符串,COL2:字符串,COL3:字符串,COL4:字符串,COL5:字符串,COL6:字符串)            // print(columnsFinali.mkString(\"#\"),coeff.mkString(\"#\"),stdErr.mkString(\"#\"),tval.mkString(\"#\"),pval.mkString(\"#\"))
  VAL总结=阵列(columnsFinal,COEFF,标准错误,tval,PVAL,Signif).transpose
                            VAL RDD = sc.parallelize(总和).MAP(YS = GT;行(YS(0),YS(1),YS(2),YS(3),YS(4),YS(5)))
                          // VAL hiveContext =新org.apache.spark.sql.hive.HiveContext(SC)
         //进口hiveContext.implicits._
       //进口hiveContext.sql                            VAL结果= rdd.toDF(姓名,系数,Std_Error,tValue,P值,意义)
                            result.show()
                            result.saveAsTable(iaw_model_summary.IAW _+ REQID)
            打印(REQID)
            lrModel.save(目录名)    }
}

和以下是错误我得到的,

  16/05/12 7时17分25秒错误执行人:异常的任务2.0级23.0(TID 839)
java.lang.ArrayIndexOutOfBoundsException:1
        在IAWMLRCreate $$ anonfun $ 5.apply(IAWMLRCreate.scala:96)
        在IAWMLRCreate $$ anonfun $ 5.apply(IAWMLRCreate.scala:96)
        在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)
        在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)
        在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)
        在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)
        在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)
        在scala.collection.Iterator $$不久$ 10.next(Iterator.scala:312)
        在scala.collection.Iterator $ class.foreach(Iterator.scala:727)
        在scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        在scala.collection.generic.Growable $ $类加$另加$ EQ(Growable.scala:48)。
        在scala.collection.mutable.ArrayBuffer $另加$另加$ EQ(ArrayBuffer.scala:103)。
        在scala.collection.mutable.ArrayBuffer $另加$另加$ EQ(ArrayBuffer.scala:47)。
        在scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)
        在scala.collection.AbstractIterator.to(Iterator.scala:1157)
        在scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)
        在scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        在scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)
        在scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan.scala:212)
        在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan.scala:212)
        在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858)
        在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858)
        在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        在org.apache.spark.scheduler.Task.run(Task.scala:89)
        在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:213)
        在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617)
        在java.lang.Thread.run(Thread.java:745)


解决方案

建议你检查你调换数组的长度: columnsFinal,COEFF,标准错误,tval,PVAL,Signif 。如果其中任何一个短/比别人更长的时间,那么一些转置后的行将会是不完整的。移调斯卡拉时不补空或任何你:

  VAL A1 =阵列(1,2,3)VAL A2 =阵列(5,6)阵列(A1,A2).transpose.foreach(X =>的println(x.toList))

打印:

 列表(1,5)
清单(2,6)
表(3)

i have framed a DataFrame by combining multiple Arrays. I am trying to save this into a hive table, i am getting ArrayIndexOutofBound Exception. Following is the code and the Error i got. i tried with adding case class outside and inside main def but still getting the same error.

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext, DataFrame}
import org.apache.spark.ml.feature.RFormula
import java.text._
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.RandomForestClassificationModel
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.hive.HiveContext
//case class Rows(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String)
object MLRCreate{
//      case class Row(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String)

    def main(args: Array[String]) {
            val conf = new SparkConf().setAppName("MLRCreate")
            val sc = new SparkContext(conf)
            val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
          import hiveContext.implicits._
        import hiveContext.sql

            val ReqId = new java.text.SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())
            val dirName = "/user/ec2-user/SavedModels/"+ReqId
            val df = Functions.loadData(hiveContext,args(0),args(1))
            val form = args(1).toLowerCase
            val lbl = form.split("~")

            var lrModel:LinearRegressionModel = null;
            val Array(training, test) = df.randomSplit(Array(args(3).toDouble, (1-args(3).toDouble)), seed = args(4).toInt)
            lrModel = Functions.mlr(training)

            var columnnames = Functions.resultColumns(df).substring(1)
                            var columnsFinal = columnnames.split(",")
                            columnsFinal = "intercept" +: columnsFinal
                            var coeff = lrModel.coefficients.toArray.map(_.toString)
                            coeff = lrModel.intercept.toString +: coeff
                            var stdErr =  lrModel.summary.coefficientStandardErrors.map(_.toString)
                            var tval = lrModel.summary.tValues.map(_.toString)
                            var pval = lrModel.summary.pValues.map(_.toString)

                            var Signif:Array[String] = new Array[String](pval.length)

                             for(j <- 0 to pval.length-1){
                                 var sign = pval(j).toDouble;
                                 sign =  Math.abs(sign);
                                 if(sign <= 0.001){
                                     Signif(j) = "***";
                                 }else if(sign <= 0.01){
                                     Signif(j) = "**";
                                 }else if(sign <= 0.05){
                                     Signif(j) = "*";
                                 }else if(sign <= 0.1){
                                     Signif(j) = ".";
                                 }else{Signif(j) = " ";
                                 }
                                    println(columnsFinal(j)+"#########"+coeff(j)+"#########"+stdErr(j)+"#########"+tval(j)+"#########"+pval(j)+"########"+Signif)
                             }
                            case class Row(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String)

            //      print(columnsFinali.mkString("#"),coeff.mkString("#"),stdErr.mkString("#"),tval.mkString("#"),pval.mkString("#"))


  val sums = Array(columnsFinal, coeff, stdErr, tval, pval, Signif).transpose
                            val rdd = sc.parallelize(sums).map(ys => Row(ys(0), ys(1), ys(2), ys(3),ys(4),ys(5)))
                          //  val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
         // import hiveContext.implicits._
       // import hiveContext.sql

                            val result = rdd.toDF("Name","Coefficients","Std_Error","tValue","pValue","Significance")
                            result.show()
                            result.saveAsTable("iaw_model_summary.IAW_"+ReqId)
            print(ReqId)
            lrModel.save(dirName)

    }
}

And the following is the error i get,

16/05/12 07:17:25 ERROR Executor: Exception in task 2.0 in stage 23.0 (TID 839)
java.lang.ArrayIndexOutOfBoundsException: 1
        at IAWMLRCreate$$anonfun$5.apply(IAWMLRCreate.scala:96)
        at IAWMLRCreate$$anonfun$5.apply(IAWMLRCreate.scala:96)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

解决方案

Suggest you check the lengths of the arrays you are transposing: columnsFinal, coeff, stdErr, tval, pval, Signif. If any of these is shorter/longer than the others, then some of the rows after the transpose would be incomplete. Scala does not fill nulls or anything for you when transposing:

val a1 = Array(1,2,3)

val a2 = Array(5,6)

Array(a1, a2).transpose.foreach(x => println(x.toList))

prints:

List(1, 5)
List(2, 6)
List(3)

这篇关于星火斯卡拉错误,同时节省数据框蜂巢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆