丰富SparkContext而不会导致序列化问题 [英] Enriching SparkContext without incurring in serialization issues

查看:594
本文介绍了丰富SparkContext而不会导致序列化问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图用Spark来处理来自HBase表的数据。 此博文举例说明如何使用 NewHadoopAPI 从任何Hadoop InputFormat 读取数据。



我做了什么



由于我需要多次这样做,所以我试图使用implicits来丰富 SparkContext ,这样我就可以从HBase中给定的一组列中获取RDD。我写了以下帮助程序:

  trait HBaseReadSupport {
隐式def toHBaseSC(sc:SparkContext)= new HBaseSC sc)

implicit def bytes2string(bytes:Array [Byte])= new String(bytes)
}


final class HBaseSC(sc: SparkContext)扩展了Serializable {[
def extract [A](data:Map [String,List [String]],result:Result,interpret:Array [Byte] => A)=
data map { case(cf,columns)=>
val content = columns map {column =>
val cell = result.getColumnLatestCell(cf.getBytes,column.getBytes)

column - > interpret(CellUtil.cloneValue(cell))
} toMap

cf - >内容

$ b $ make makeConf(table:String)= {
val conf = HBaseConfiguration.create()

conf.setBoolean(hbase。 cluster.distributed,true)
conf.setInt(hbase.client.scanner.caching,10000)
conf.set(TableInputFormat.INPUT_TABLE,table)

conf

$ b def hbase [A](table:String,data:Map [String,List [String]])
(interpret:Array [Byte] => A )=

sc.newAPIHadoopRDD(makeConf(table),classOf [TableInputFormat],
classOf [ImmutableBytesWritable],classOf [Result])map {case(key,row)=>
Bytes.toString(key.get) - >提取(数据,行,解释)
}

}

它可以像

  val rdd = sc.hbase [String](table,Map(
cf - > List(col1,col2)
))

在这种情况下,我们得到了一个RDD,它是(String,Map [String,Map [String,String]]),其中第一个组件是rowkey,第二个是一个map,键是列系列,值是映射,其键是列,其内容是单元格值。



失败的地方



不幸的是,我的工作似乎获得了对 sc 的引用,这本身并不是按设计序列化的。我在执行作业时得到的是

 线程main中的异常org.apache.spark.SparkException:作业中止:任务不可序列化:java.io.NotSerializableException:org.apache.spark.SparkContext $ b $ org.apache.spark.scheduler.DAGScheduler $$ anonfun $ org $ apache $ spark $ scheduler $ DAGScheduler $$ abortStage $ 1.apply (DAGScheduler.scala:1028)

我可以删除帮助程序类,并在我的内联中使用相同的逻辑工作,一切运行良好。但我想得到一些我可以重复使用的东西,而不是一遍又一遍地写同样的样板。

顺便说一下,这个问题并不是特定于隐式的,即使使用 sc 的函数表现出同样的问题。



为了便于比较,以下帮助者阅读TSV文件(我知道它是破坏,因为它不支持引用等,不要介意)似乎工作正常:

$ p $特性TsvReadSupport {
隐式def toTsvRDD(sc:SparkContext)= new TsvRDD(sc)
}

final类TsvRDD(val sc:SparkContext)extends Serializable {
def tsv(path:String,字段:Seq [String],分隔符:Char ='\ t')= sc.textFile(path)map {line =>
val contents = line.split(separator).toList

(fields,contents).zipped.toMap
}
}


如何封装逻辑从HBase读取行而不会无意中捕获SparkContext?



解决方案

只需将 @transient 注释添加到<$ c
$ b

 final class HBaseSC(@transient val sc:SparkContext)extends Serializable $ c $ sc> 变量: {
...
}

并确保 sc 不会在提取函数中使用,因为它在工作人员中不可用。

如果需要从分布式计算中访问Spark上下文,可以使用 rdd.context 函数: $ b $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ =>
val ctx = rdd.context
....
}


I am trying to use Spark to process data that comes from HBase tables. This blog post gives an example of how to use NewHadoopAPI to read data from any Hadoop InputFormat.

What I have done

Since I will need to do this many times, I was trying to use implicits to enrich SparkContext, so that I can get an RDD from a given set of columns in HBase. I have written the following helper:

trait HBaseReadSupport {
  implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)

  implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}


final class HBaseSC(sc: SparkContext) extends Serializable {
  def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
    data map { case (cf, columns) =>
      val content = columns map { column =>
        val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)

        column -> interpret(CellUtil.cloneValue(cell))
      } toMap

      cf -> content
    }

  def makeConf(table: String) = {
    val conf = HBaseConfiguration.create()

    conf.setBoolean("hbase.cluster.distributed", true)
    conf.setInt("hbase.client.scanner.caching", 10000)
    conf.set(TableInputFormat.INPUT_TABLE, table)

    conf
  }

  def hbase[A](table: String, data: Map[String, List[String]])
    (interpret: Array[Byte] => A) =

    sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
        Bytes.toString(key.get) -> extract(data, row, interpret)
      }

}

It can be used like

val rdd = sc.hbase[String](table, Map(
  "cf" -> List("col1", "col2")
))

In this case we get an RDD of (String, Map[String, Map[String, String]]), where the first component is the rowkey and the second is a map whose key are column families and the values are maps whose keys are columns and whose content are the cell values.

Where it fails

Unfortunately, it seems that my job gets a reference to sc, which is itself not serializable by design. What I get when I run the job is

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

I can remove the helper classes and use the same logic inline in my job and everything runs fine. But I want to get something which I can reuse instead of writing the same boilerplate over and over.

By the way, the issue is not specific to implicit, even using a function of sc exhibits the same problem.

For comparison, the following helper to read TSV files (I know it's broken as it does not support quoting and so on, never mind) seems to work fine:

trait TsvReadSupport {
  implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}

final class TsvRDD(val sc: SparkContext) extends Serializable {
  def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
    val contents = line.split(separator).toList

    (fields, contents).zipped.toMap
  }
}

How can I encapsulate the logic to read rows from HBase without unintentionally capturing the SparkContext?

解决方案

Just add @transient annotation to sc variable:

final class HBaseSC(@transient val sc: SparkContext) extends Serializable {
  ...
}

and make sure sc is not used within extract function, since it won't be available on workers.

If it's necessary to access Spark context from within distributed computation, rdd.context function might be used:

val rdd = sc.newAPIHadoopRDD(...)
rdd map {
  case (k, v) => 
    val ctx = rdd.context
    ....
}

这篇关于丰富SparkContext而不会导致序列化问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆