Apache Spark:"SparkException:任务不可序列化"在RDD的Spark-shell中手动构建 [英] Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually

查看:68
本文介绍了Apache Spark:"SparkException:任务不可序列化"在RDD的Spark-shell中手动构建的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码从事件中检测出最常用的顶级域.我用它通过Spark SQL获取日期.

I have the following code to detect most used top level domain from events. I use it to get date via Spark SQL.

功能本身经过测试,可以正常工作.我使用Amazon EMR和spark-shell.当spark将任务几乎立即发送到节点时,我收到了很长的堆栈跟踪信息,并且最终收到"SparkException:任务不可序列化",没有任何具体说明.这是怎么回事?

Functions themselves are tested and work fine. I use Amazon EMR and spark-shell. When spark sends tasks to nodes, almost immediately, I receive a long stack trace and "SparkException: Task not serializable" in the end without anything specific. What's the deal here?

import scala.io.Source
val suffixesStr = 
    Source.fromURL("https://publicsuffix.org/list/public_suffix_list.dat").mkString
val suffList = 
    suffixesStr.lines.filter(line => !line.startsWith("//") && line.trim() != "")
val suffListRDD = sc.parallelize(suffList.toList).collect()

 val cleanDomain = (domain: String) => {
  var secLevelSuffix = 
    suffListRDD.find(suffix => domain.endsWith("."+suffix) && suffix.contains("."))
  var regex = """[^.]+\.[^.]+$""".r
  if (!secLevelSuffix.isEmpty){
    regex = """[^.]+\.[^.]+\.[^.]+$""".r
  }
  var cleanDomain = regex.findFirstMatchIn(domain).map(_ group 0)
  cleanDomain.getOrElse("")
}

val getDomain = (url: String) => {
  val domain = """(?i)^(?:(?:https?):\/\/)?(?:(?:www|www1|www2|www3)\.)?([^:?#/\s]+)""".r.findFirstMatchIn(url).map(_ group 1)
  var res = domain.getOrElse("")
  res = res.toLowerCase()
  if (res.contains("google.com")){
    res = res.replace("google.com.br", "google.com")
  }else{
    res = cleanDomain(res)
  }
  res
}

sqlContext.udf.register("getDomain", getDomain)
val domains = sqlContext.sql("SELECT count(*) c, domain from (SELECT getDomain(page_url) as domain FROM events) t group by domain order by c desc")
domains.take(20).foreach(println)

推荐答案

像在这种情况下那样以编程方式定义RDD时,请不要忘记将不会复制到工作程序节点的内容标记为 @transient.

When you define an RDD programatically like in this case, don't forget to mark things that won't be replicated to worker nodes as @transient.

在这种情况下:

@transient val suffixesStr = ...
@transient val suffList = ...

这篇关于Apache Spark:"SparkException:任务不可序列化"在RDD的Spark-shell中手动构建的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆