为什么在Spark Streaming中读取广播变量在运行几天后得到异常? [英] Why reading broadcast variable in Spark Streaming got exception after days of running?

查看:754
本文介绍了为什么在Spark Streaming中读取广播变量在运行几天后得到异常?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的项目中使用了Spark Streaming(Spark V1.6.0)以及HBase,HBase(HBase V1.1.2)配置在具有广播变量的执行器之间传输。 Spark Streaming应用程序最初运行,大​​约2天后出现异常。

  val hBaseContext:HBaseContext = new HBaseContext(sc,HBaseCock.hBaseConfiguration())
private def _materialDStream(dStream:DStream [(String,Int)],columnName:String,batchSize:Int)= {
hBaseContext。 streamBulkIncrement [(String,Int)](
dStream,
hTableName,
(t)=> {
val rowKey = t._1
val incVal = t ._2
val增量=新增量(Bytes.toBytes(rowKey))
increment.addColumn(Bytes.toBytes(hFamily),Bytes.toBytes(columnName),incVal)
增量





$ b HBaseContext的整个源文件可以在 HBaseContext.scala ,和一些片段可以在下面找到。



运行几天后,将出现异常并且堆栈跟踪为:



<$
16/02/01 10:08:10 ERROR Executor:阶段187175.0(TID 561527)任务3.0中的异常
无法从广播获取配置
16/02/01 10:08:10 c $ c>

逻辑如下:


  1. 使用config(HBaseContext)创建HBaseContext并广播配置(如果指定了文件路径,则将配置保存到文件) 在连接HBase之前,首先会检查字段配置是否为空,如果是这样,请从指定的文件恢复它,或者如果没有指定文件路径,请从广播变量中恢复它。

问题当从广播变量恢复配置时发生,并发生异常,从广播读取值,在configBroadcast.value.value。

我猜如果Spark Streaming不会恢复如果主站失败,则广播变量,而getOr Create()用于获取SparkStreaming实例。我对HBaseContext.scala源代码更加好奇,该文件优先于广播变量来恢复值。那么在Spark Streaming中使用广播的最佳做法是什么?我需要将它们存储在文件中,在HDFS中说文件吗?

 类HBaseContext(@transient sc:SparkContext,@transient config :Configuration,val tmpHdfsConfgFile:String = null)extends Serializable {
@transient var tmpHdfsConfiguration:Configuration = config

val broadcastcastedConf = sc.broadcast(new SerializableWritable(config))

if(tmpHdfsConfgFile!= null&& config!= null){
//将配置保存到文件
}

private def getConf(configBroadcast:Broadcast如果(tmpHdfsConfiguration!= null){
tmpHdfsConfiguration
} else if(tmpHdfsConfgFile!= null){
[SerializableWritable [Configuration]]):Configuration = {

if //从文件中读取配置

tmpHdfsConfiguration

if(tmpHdfsConfiguration == null){
try {
//异常发生在这里!!!
tmpHdfsConfiguration = configBroadcast.value.value
tmpHdfsConfiguration
} catch {
case ex:Exception => {
println(无法从广播获取配置文件)
}
}
}
tmpHdfsConfiguration
}
}


解决方案

出于某种原因重新启动Spark任务后,广播变量被重置。或者驱动程序与作业失败后的尝试重新关联。

在流式作业的情况下,要使用广播变量,应在创建StreamingContext之前初始化sprarkContext中的广播。这将确保广播变量在流式传输开始时可用。

  JavaSparkContext javaSparkContext = createSparkContext(); 

广播< BroadcastContext> broadcastContext = getBroadcastContext(javaSparkContext);

JavaStreamingContext javaStreamingContext = JavaStreamingContext.getOrCreate(sparkCheckPointDir,
() - > processor.create(sparkCheckPointDir,javaSparkContext));


I am using Spark Streaming (Spark V1.6.0) along with HBase in my project, and HBase(HBase V1.1.2) configurations are transferred among executors with broadcast variable. The Spark Streaming application works at first, while about 2 days later, exception will appear.

  val hBaseContext: HBaseContext = new HBaseContext(sc, HBaseCock.hBaseConfiguration())
  private def _materialDStream(dStream: DStream[(String, Int)], columnName: String, batchSize: Int) = {
    hBaseContext.streamBulkIncrement[(String, Int)](
      dStream,
      hTableName,
      (t) => {
        val rowKey = t._1
        val incVal = t._2
        val increment = new Increment(Bytes.toBytes(rowKey))
        increment.addColumn(Bytes.toBytes(hFamily), Bytes.toBytes(columnName), incVal)
        increment
      }, batchSize)
  }

Whole source file for HBaseContext could be found HBaseContext.scala, and some snippets could be found below.

And after days of running, exception will appear and the stack trace is:

Unable to getConfig from broadcast
16/02/01 10:08:10 ERROR Executor: Exception in task 3.0 in stage 187175.0 (TID 561527)

The logic is as follows:

  1. Create HBaseContext with config (HBaseContext) and broadcast the config (save the config to file if file path is specified)
  2. Before connecting HBase, first it will check if the field config is null, and if so, restore it from the specified file, or if no file path is specified, restore it from the broadcast variable.

The problem happens when restoring configuration from broadcast variables, and exception happens reading value from broadcast, in "configBroadcast.value.value".

I guess if Spark Streaming won't restore the broadcast variables if master failed, while the getOrCreate() is used to get a SparkStreaming instance. I am more curious about in HBaseContext.scala source code, that file is preferred to broadcast variable to restore values. So what is the best practise using broadcast in Spark Streaming? Do I need to store them in files, say files in HDFS?

class HBaseContext(@transient sc: SparkContext, @transient config: Configuration, val tmpHdfsConfgFile: String = null) extends Serializable{
    @transient var tmpHdfsConfiguration: Configuration = config

    val broadcastedConf = sc.broadcast(new SerializableWritable(config))

    if(tmpHdfsConfgFile != null && config != null){
      // save config to file
    }

    private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration = {

      if (tmpHdfsConfiguration != null) {
        tmpHdfsConfiguration
      } else if (tmpHdfsConfgFile != null) {
        // read config from file

        tmpHdfsConfiguration
      }
      if (tmpHdfsConfiguration == null) {
        try {
          // Exception happens here!!!
          tmpHdfsConfiguration = configBroadcast.value.value
          tmpHdfsConfiguration
        } catch {
          case ex: Exception => {
            println("Unable to getConfig from broadcast")
          }
        }
      }
    tmpHdfsConfiguration
  }
}

解决方案

Broadcast variables are reset after spark job is restarted for some reason. Or Driver is re-associated with attempt after job failure.

In case of streaming job, to use broadcast variable one should initialize the broadcast from sprarkContext just before creating StreamingContext. This will ensure the broadcast variables are available when the streaming starts.

JavaSparkContext javaSparkContext = createSparkContext();

Broadcast<BroadcastContext> broadcastContext = getBroadcastContext(javaSparkContext);

JavaStreamingContext javaStreamingContext = JavaStreamingContext.getOrCreate(sparkCheckPointDir,
                () -> processor.create(sparkCheckPointDir, javaSparkContext));

这篇关于为什么在Spark Streaming中读取广播变量在运行几天后得到异常?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆