任务不可序列化Flink [英] Task not serializable Flink

查看:242
本文介绍了任务不可序列化Flink的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试做一点修改的flink中的pagerank基本示例(仅在读取输入文件时,其他所有内容都是相同的),我收到错误,因为任务不可序列化,并且下面是输出错误的一部分

I am trying to do the pagerank Basic example in flink with little bit of modification(only in reading the input file, everything else is the same) i am getting the error as Task not serializable and below is the part of the output error

atorg.apache.flink.api.scala.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:179) 在org.apache.flink.api.scala.ClosureCleaner $ .clean(ClosureCleaner.scala:171)

atorg.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171)

下面是我的代码

object hpdb {

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val maxIterations = 10000

    val DAMPENING_FACTOR: Double = 0.85

    val EPSILON: Double = 0.0001

    val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"

    val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
                fieldDelimiter = "\t", includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target

    val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
      fieldDelimiter = "\t", includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid

    val noOfPages = pages.count()

    val pagesWithRanks = pages.map(p => Page(p.pageId, 1.0 / noOfPages))

    val adjacencyLists = links
      // initialize lists ._1 is the source id and ._2 is the traget id
      .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
      // concatenate lists
      .groupBy("sourceId").reduce {
      (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
    }

    // start iteration

    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
     // **//the output shows error here**     
     currentRanks =>
        val newRanks = currentRanks
          // distribute ranks to target pages
          .join(adjacencyLists).where("pageId").equalTo("sourceId") {
          (page, adjacent, out: Collector[Page]) =>
            for (targetId <- adjacent.targetIds) {
              out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
            }
        }

          // collect ranks and sum them up

          .groupBy("pageId").aggregate(SUM, "rank")
          // apply dampening factor
         //**//the output shows error here** 
           .map { p =>
          Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / pages.count()))
        }

        // terminate if no rank update was significant
        val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
          (current, next, out: Collector[Int]) =>
            // check for significant update
            if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
        }

        (newRanks, termination)
    }

    val result = finalRanks

    // emit result
    result.writeAsCsv(outpath, "\n", " ")

    env.execute()

    }
}

任何朝着正确方向提供帮助的人都受到高度赞赏吗?谢谢.

Any help in the right direction is highly appreciated? Thank you.

推荐答案

问题是您从MapFunction内部引用了DataSet pages.这是不可能的,因为DataSet只是数据流的逻辑表示,不能在运行时访问.

The problem is that you reference the DataSet pages from within a MapFunction. This is not possible, since a DataSet is only the logical representation of a data flow and cannot be accessed at runtime.

解决此问题所需要做的就是将val pagesCount = pages.count值分配给变量pagesCount,并在MapFunction中引用该变量.

What you have to do to solve this problem is to assign the val pagesCount = pages.count value to a variable pagesCount and refer to this variable in your MapFunction.

pages.count实际执行的操作是触发数据流图的执行,以便可以计算pages中的元素数.然后将结果返回到您的程序.

What pages.count actually does, is to trigger the execution of the data flow graph, so that the number of elements in pages can be counted. The result is then returned to your program.

这篇关于任务不可序列化Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆