任务无法在Scala中序列化 [英] Task not serializable in scala

查看:99
本文介绍了任务无法在Scala中序列化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的应用程序中,我正在使用parallelize方法将数组保存到文件中.

In my application, I'm using parallelize method to save an Array into file.

代码如下:

 val sourceRDD = sc.textFile(inputPath + "/source")

 val destinationRDD = sc.textFile(inputPath + "/destination")

val source_primary_key = sourceRDD.map(rec => (rec.split(",")(0).toInt, rec))
val destination_primary_key = destinationRDD.map(rec => (rec.split(",")(0).toInt, rec))

val extra_in_source = source_primary_key.subtractByKey(destination_primary_key)
val extra_in_destination = destination_primary_key.subtractByKey(source_primary_key)

val source_subtract = source_primary_key.subtract(destination_primary_key)

val Destination_subtract = destination_primary_key.subtract(source_primary_key)

val exact_bestmatch_src = source_subtract.subtractByKey(extra_in_source).sortByKey(true).map(rec => (rec._2))
val exact_bestmatch_Dest = Destination_subtract.subtractByKey(extra_in_destination).sortByKey(true).map(rec => (rec._2))

val exact_bestmatch_src_p = exact_bestmatch_src.map(rec => (rec.split(",")(0).toInt))

val primary_key_distinct = exact_bestmatch_src_p.distinct.toArray()

for (i <- primary_key_distinct) {

  var dummyVar: String = ""
  val src = exact_bestmatch_src.filter(line => line.split(",")(0).toInt.equals(i))
  var dest = exact_bestmatch_Dest.filter(line => line.split(",")(0).toInt.equals(i)).toArray

  for (print1 <- src) {

    var sourceArr: Array[String] = print1.split(",")
    var exactbestMatchCounter: Int = 0
    var index: Array[Int] = new Array[Int](1)

    println(print1 + "source")

    for (print2 <- dest) {

      var bestMatchCounter = 0
      var i: Int = 0

      println(print1 + "source + destination" + print2)

      for (i <- 0 until sourceArr.length) {
        if (print1.split(",")(i).equals(print2.split(",")(i))) {
          bestMatchCounter += 1
        }
      }
      if (exactbestMatchCounter < bestMatchCounter) {
        exactbestMatchCounter = bestMatchCounter
        dummyVar = print2
        index +:= exactbestMatchCounter //9,8,9      
      }
    }

    var z = index.zipWithIndex.maxBy(_._1)._2

    if (exactbestMatchCounter >= 0) {
      var samparr: Array[String] = new Array[String](4)
      samparr +:= print1 + "  BEST_MATCH  " + dummyVar     
      var deletedest: Array[String] = new Array[String](1)
      deletedest = dest.take(z) ++ dest.drop(1)
      dest = deletedest
val myFile = sc.parallelize((samparr)).saveAsTextFile(outputPath)

我使用了parallelize方法,甚至尝试使用以下方法将其保存为文件

I have used parallelize method and I even tried with below method to save it as a file

val myFile = sc.textFile(samparr.toString())
val finalRdd = myFile
finalRdd.coalesce(1).saveAsTextFile(outputPath)

但它不断抛出错误:

线程"main" org.apache.spark.SparkException中的异常:任务不可序列化

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

推荐答案

最后,这段代码可以帮助我将数组保存到文件中.

Finally this piece of code helps me to save an array to file.

 new PrintWriter(outputPath) { write(array.mkString(" ")); close }

这篇关于任务无法在Scala中序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆