是否可以从火花流检查点恢复广播值 [英] Is it possible to recover an broadcast value from Spark-streaming checkpoint

查看:17
本文介绍了是否可以从火花流检查点恢复广播值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的电光流媒体项目中,我使用HBase-电光录制PV/UV。然后,当我关闭应用程序并重启它时,在检查点恢复时出现以下异常:

16/03/02 10:17:21错误HBaseContext:无法从广播获取配置 java.lang.ClassCastException:[B不能强制转换为org.apache.partk.SerializableWritable 在com.paitao.xmlife.contrib.hbase.HBaseContext.getConf(HBaseContext.scala:645) 在com.paitao.xmlife.contrib.hbase.HBaseContext.com$paitao$xmlife$contrib$hbase$HBaseContext$$hbaseForeachPartition(HBaseContext.scala:627) 在com.paitao.xmlife.contrib.hbase.HBaseContext$$anonfun$com$paitao$xmlife$contrib$hbase$HBaseContext$$bulkMutation$1.apply(HBaseContext.scala:457) 在com.paitao.xmlife.contrib.hbase.HBaseContext$$anonfun$com$paitao$xmlife$contrib$hbase$HBaseContext$$bulkMutation$1.apply(HBaseContext.scala:457) 在org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) 在org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) 在org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) 在org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在org.apache.spark.scheduler.Task.run(Task.scala:88) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)

我检查了HBaseContext的代码,它使用广播来存储HBase配置。

class HBaseContext(@transient sc: SparkContext,
               @transient config: Configuration,
               val tmpHdfsConfgFile: String = null) extends Serializable with Logging {

    @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
    @transient var tmpHdfsConfiguration: Configuration = config
    @transient var appliedCredentials = false
    @transient val job = Job.getInstance(config)

    TableMapReduceUtil.initCredentials(job)
    // <-- broadcast for HBaseConfiguration here !!!
    var broadcastedConf = sc.broadcast(new SerializableWritable(config))
    var credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials()))
    ...

当检查点恢复时,它尝试在其getConf函数中访问此广播值:

if (tmpHdfsConfiguration == null) {
  try {
    tmpHdfsConfiguration = configBroadcast.value.value
  } catch {
    case ex: Exception => logError("Unable to getConfig from broadcast", ex)
  }
}

然后引发异常。我的问题是:是否可以在Spark应用程序中从检查点恢复广播值?我们是否有其他解决方案可以在恢复后重新广播值?

感谢您的反馈!

推荐答案

遵循以下方法

  1. 创建火花上下文。
  2. 初始化广播变量。
  3. 使用上述Spark上下文并传递已初始化的广播变量,使用检查点目录创建流上下文。

当流作业开始时,检查点目录中没有数据,它将初始化广播变量。

重新启动流时,它将从检查点目录恢复广播变量。

这篇关于是否可以从火花流检查点恢复广播值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆