是否可以从火花流检查点恢复广播值 [英] Is it possible to recover an broadcast value from Spark-streaming checkpoint
问题描述
在我的电光流媒体项目中,我使用HBase-电光录制PV/UV。然后,当我关闭应用程序并重启它时,在检查点恢复时出现以下异常:
16/03/02 10:17:21错误HBaseContext:无法从广播获取配置 java.lang.ClassCastException:[B不能强制转换为org.apache.partk.SerializableWritable 在com.paitao.xmlife.contrib.hbase.HBaseContext.getConf(HBaseContext.scala:645) 在com.paitao.xmlife.contrib.hbase.HBaseContext.com$paitao$xmlife$contrib$hbase$HBaseContext$$hbaseForeachPartition(HBaseContext.scala:627) 在com.paitao.xmlife.contrib.hbase.HBaseContext$$anonfun$com$paitao$xmlife$contrib$hbase$HBaseContext$$bulkMutation$1.apply(HBaseContext.scala:457) 在com.paitao.xmlife.contrib.hbase.HBaseContext$$anonfun$com$paitao$xmlife$contrib$hbase$HBaseContext$$bulkMutation$1.apply(HBaseContext.scala:457) 在org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) 在org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) 在org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) 在org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在org.apache.spark.scheduler.Task.run(Task.scala:88) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)
我检查了HBaseContext的代码,它使用广播来存储HBase配置。
class HBaseContext(@transient sc: SparkContext,
@transient config: Configuration,
val tmpHdfsConfgFile: String = null) extends Serializable with Logging {
@transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
@transient var tmpHdfsConfiguration: Configuration = config
@transient var appliedCredentials = false
@transient val job = Job.getInstance(config)
TableMapReduceUtil.initCredentials(job)
// <-- broadcast for HBaseConfiguration here !!!
var broadcastedConf = sc.broadcast(new SerializableWritable(config))
var credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials()))
...
当检查点恢复时,它尝试在其getConf函数中访问此广播值:
if (tmpHdfsConfiguration == null) {
try {
tmpHdfsConfiguration = configBroadcast.value.value
} catch {
case ex: Exception => logError("Unable to getConfig from broadcast", ex)
}
}
然后引发异常。我的问题是:是否可以在Spark应用程序中从检查点恢复广播值?我们是否有其他解决方案可以在恢复后重新广播值?
感谢您的反馈!
推荐答案
遵循以下方法
- 创建火花上下文。
- 初始化广播变量。
- 使用上述Spark上下文并传递已初始化的广播变量,使用检查点目录创建流上下文。
当流作业开始时,检查点目录中没有数据,它将初始化广播变量。
重新启动流时,它将从检查点目录恢复广播变量。
这篇关于是否可以从火花流检查点恢复广播值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!