运行线性回归 scala 2.12 时不可序列化异常 [英] Not serialazable exception while running Linear regression scala 2.12

查看:25
本文介绍了运行线性回归 scala 2.12 时不可序列化异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 scala 2.12.3 在本地模式下运行以下 spark mllib 时,遇到以下错误 lambda not serialazable

While running the following spark mllib on local mode with scala 2.12.3 , encountered the following error lambda not serialazable

任何输入将不胜感激?(转移到 Scala 2.11 对我来说不是一个选择)你能告诉我我能做些什么来避免这个问题吗?谢谢你

Any inputs would be much appreciated ? (Moving onto scala 2.11 is not an option for me) Can you please let me know what can i do to avoid this issue? Thankyou

import java.io.FileWriter

import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.TimestampType

import java.util.concurrent.atomic.AtomicBoolean


object MLAnalyzer {

  val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
        .set("spark.broadcast.compress", "false")
        .setAppName("local-spark-kafka-consumer-client")

      val spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate()
  def main(args: Array[String]): Unit = {
    process
  }


  def process():Unit= {


      // training data
      val filePath = "/home/vagrant/Desktop/Workspaces/SparkMachineLearning/sparkML/src/main/resources/train_pooling.csv"
      val modelPath = "file:///home/vagrant/Downloads/medium-articles-master/titanic_spark/training_batch/src/main/resources/poolSessionModelRecent.model"

      val schema = StructType(
        Array(
          StructField("PACKAGE_KEY", StringType),
          StructField("MOST_IDLE", IntegerType),
          StructField("MAX_WAIT", IntegerType),
          StructField("IDLE_COUNT", IntegerType),
          StructField("APPLICATION", StringType),
          StructField("LONGEST_WAIT", IntegerType),
          StructField("TIMEOUTS", IntegerType),
          StructField("LAST_ACCESS", TimestampType),
          StructField("MOST_ACTIVE", IntegerType),
          StructField("MAX_ACTIVE", IntegerType),
          StructField("MAX_IDLE", IntegerType),
          StructField("ACTIVE_COUNT", IntegerType),
          StructField("FACTOR_LOAD", DoubleType)))

          while (true) {
            Thread.sleep(100)
      // read the raw data
      var df_raw = spark
        .read
        .option("header", "true")
        //      .option("inferSchema","true")
        .schema(schema)
        .csv(filePath)

      df_raw = df_raw.drop(df_raw.col("PACKAGE_KEY"))
      df_raw = df_raw.drop(df_raw.col("MOST_IDLE"))
      df_raw = df_raw.drop(df_raw.col("MAX_IDLE"))
      df_raw = df_raw.drop(df_raw.col("MOST_ACTIVE"))
      df_raw = df_raw.drop(df_raw.col("LAST_ACCESS"))
      df_raw = df_raw.drop(df_raw.col("APPLICATION"))
      df_raw = df_raw.drop(df_raw.col("MAX_WAIT"))


      // fill all na values with 0
      val df = df_raw.na.fill(0)
      val packageKeyIndexer = new StringIndexer()
        .setInputCol("PACKAGE_KEY")
        .setOutputCol("PackageIndex")
        .setHandleInvalid("keep")

      // create the feature vector
      val vectorAssembler = new VectorAssembler()
        .setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" /*, "TOTAL_REQUEST_COUNT"*/ ))
        .setOutputCol("features_intermediate")


      import org.apache.spark.ml.feature.StandardScaler
      val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")

      var pipeline: Pipeline = null
      //    if (lr1 == null) {
      val lr =
        new LinearRegression()
          .setMaxIter(100)
          .setRegParam(0.1)
          .setElasticNetParam(0.8)
          //.setFeaturesCol("features")   // setting features column
          .setLabelCol("FACTOR_LOAD") // setting label column
      // create the pipeline with the steps
      pipeline = new Pipeline().setStages(Array( /*genderIndexer, cabinIndexer, embarkedIndexer,*/ vectorAssembler, scaler, lr))


      // create the model following the pipeline steps
      val cvModel = pipeline.fit(df)

      // save the model
      cvModel.write.overwrite.save(modelPath)

      var testschema = StructType(
        Array(
          //        StructField("PACKAGE_KEY", StringType),
          StructField("IDLE_COUNT", IntegerType),
          StructField("TIMEOUTS", IntegerType),
          StructField("ACTIVE_COUNT", IntegerType)))

      val df_raw1 = spark
        .read
        //      .option("header", "true")
        .schema(testschema)
        .csv("/home/vagrant/Desktop/Workspaces/SparkMachineLearning/sparkML/src/main/resources/test_pooling.csv")

      // fill all na values with 0
      val df1 = df_raw1.na.fill(0)

      val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("prediction")
      var rmse = evaluator.evaluate(cvModel.transform(df1))
      import org.apache.spark.sql.functions._
      import spark.implicits._
      val extracted = cvModel.transform(df1)

      val prediction = extracted.select("prediction").map(r => r(0).asInstanceOf[Double]).collect()
      if (prediction != null && prediction.length > 0) {
        val avg = prediction.sum / prediction.length
        val pw: FileWriter = new FileWriter("/home/vagrant/Desktop/Workspaces/SparkMachineLearning/sparkML/src/main/resources/result.csv");
        pw.append(avg.toString)
        pw.flush()
        pw.close()
        println("completed modelling process")
      } else {
        //do nothing
      }

          }


  }
}

给我以下错误

Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
    - object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2280/878458383, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2280/878458383@65af23c0)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(named_struct(IDLE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(IDLE_COUNT#1732, 0) as double), TIMEOUTS_double_vecAssembler_bc4ee3d99e56, cast(coalesce(TIMEOUTS#1735, 0) as double), ACTIVE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(ACTIVE_COUNT#1740, 0) as double))))
    - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
    - object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(named_struct(IDLE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(IDLE_COUNT#1732, 0) as double), TIMEOUTS_double_vecAssembler_bc4ee3d99e56, cast(coalesce(TIMEOUTS#1735, 0) as double), ACTIVE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(ACTIVE_COUNT#1740, 0) as double))) AS features_intermediate#1839)
    - element of array (index: 0)

推荐答案

升级到 Scala 2.12.8 解决了这个问题.但不确定根本原因.

Upgrading to Scala 2.12.8 solved the issue. Not sure about the rootcause though.

这篇关于运行线性回归 scala 2.12 时不可序列化异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆