与Check-尖星火流中流更改配置 [英] Mid-Stream Changing Configuration with Check-Pointed Spark Stream

查看:232
本文介绍了与Check-尖星火流中流更改配置的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个星火流/ DSTREAM应用如这个

  //函数来创建和设置一个新的StreamingContext
高清functionToCreateContext()的StreamingContext = {
  VAL SSC =新的StreamingContext(...)//新的上下文
  VAL线= ssc.socketTextStream(...)//创建DStreams
  ...
  ssc.checkpoint(checkpointDirectory)//设置检查点目录
  SSC
}//从检查点数据获取的StreamingContext或创建一个新的
VAL上下文= StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _)//请在需要做背景额外的设置,
//不论它是否被启动或重新启动
上下文。 ...//开始上下文
context.start()
context.awaitTermination()

在哪里我的上下文使用配置文件,在那里我可以拉像 appConf.getString 方法的项目。所以我实际使用:

  VAL背景= StreamingContext.getOrCreate(
    appConf.getString(spark.checkpointDirectory),
    ()=> createStreamContext(sparkConf,appConf))

其中, VAL sparkConf =新SparkConf()...

如果我停止我的应用程序和更改配置中的应用程序文件,这些更改不会拿起除非我删除检查点目录内容。例如,我想改 spark.streaming.kafka.maxRatePerPartition spark.windowDurationSecs 动态。 (修改:我杀了应用程序,更改配置文件,然​​后重新启动应用程序。)我怎样做动态地更改这些设置或执行(已编辑WORD)的配置变化不诋毁我的检查点目录(即将包括国家信息检查点)?


解决方案

  

我怎会动态地更改这些设置或执行更改配置,而不​​捣毁我检查点目录?


如果潜入code为 StreamingContext.getOrCreate

 高清getOrCreate(
    checkpointPath:字符串,
    creatingFunc:()=>的StreamingContext,
    hadoopConf:配置= SparkHadoopUtil.get.conf,
    createOnError:布尔= FALSE
  )的StreamingContext = {
    VAL checkpointOption = CheckpointReader.read(
      checkpointPath,新SparkConf(),hadoopConf,createOnError)
    checkpointOption.map(新的StreamingContext(NULL,_,NULL))。getOrElse(creatingFunc())
}

您可以看到,如果 CheckpointReader 已在类路径检查点的数据,它使用新SparkConf()作为一个参数,如超载不允许创建 SparkConf 自定义的传递。默认情况下, SparkConf 将加载任何设置声明为一个环境变量或传递到类路径:

 类SparkConf(loadDefaults:布尔)扩展了Cloneable与记录{  进口SparkConf._  / **创建一个从系统属性默认加载一个SparkConf和类路径* /
  高清这个()=这(真)

所以,实现你想要的是,而不是建立在code进行 SparkConf 对象的一个​​方法,你可以通过传递参数spark.driver.extraClassPath spark.executor.extraClassPath 火花提交

I have a Spark streaming / DStream app like this:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

Where my context uses a configuration file where I can pull items with methods like appConf.getString. So I actually use:

val context = StreamingContext.getOrCreate(
    appConf.getString("spark.checkpointDirectory"), 
    () => createStreamContext(sparkConf, appConf))

where val sparkConf = new SparkConf()....

If I stop my app and change configuration in the app file, these changes are not picked up unless I delete the checkpoint directory contents. For example, I would like to change spark.streaming.kafka.maxRatePerPartition or spark.windowDurationSecs dynamically. (EDIT: I kill the app, change the configuration file and then restart the app.) How can I do dynamically change these settings or enforce a (EDITED WORD) configuration change without trashing my checkpoint directory (which is about to include checkpoints for state info)?

解决方案

How can I do dynamically change these settings or enforce a configuration change without trashing my checkpoint directory?

If dive into the code for StreamingContext.getOrCreate:

def getOrCreate(
    checkpointPath: String,
    creatingFunc: () => StreamingContext,
    hadoopConf: Configuration = SparkHadoopUtil.get.conf,
    createOnError: Boolean = false
  ): StreamingContext = {
    val checkpointOption = CheckpointReader.read(
      checkpointPath, new SparkConf(), hadoopConf, createOnError)
    checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}

You can see that if CheckpointReader has checkpointed data in the class path, it uses new SparkConf() as a parameter, as the overload doesn't allow for passing of a custom created SparkConf. By default, SparkConf will load any settings declared either as an environment variable or passed to the classpath:

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

  import SparkConf._

  /** Create a SparkConf that loads defaults from system properties and the classpath */
  def this() = this(true)

So one way of achieving what you want is instead of creating a SparkConf object in the code, you can pass the parameters via spark.driver.extraClassPath and spark.executor.extraClassPath to spark-submit.

这篇关于与Check-尖星火流中流更改配置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆