与Check-尖星火流中流更改配置 [英] Mid-Stream Changing Configuration with Check-Pointed Spark Stream
问题描述
我有一个星火流/ DSTREAM应用如这个一>:
//函数来创建和设置一个新的StreamingContext
高清functionToCreateContext()的StreamingContext = {
VAL SSC =新的StreamingContext(...)//新的上下文
VAL线= ssc.socketTextStream(...)//创建DStreams
...
ssc.checkpoint(checkpointDirectory)//设置检查点目录
SSC
}//从检查点数据获取的StreamingContext或创建一个新的
VAL上下文= StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _)//请在需要做背景额外的设置,
//不论它是否被启动或重新启动
上下文。 ...//开始上下文
context.start()
context.awaitTermination()
在哪里我的上下文使用配置文件,在那里我可以拉像 appConf.getString
方法的项目。所以我实际使用:
VAL背景= StreamingContext.getOrCreate(
appConf.getString(spark.checkpointDirectory),
()=> createStreamContext(sparkConf,appConf))
其中, VAL sparkConf =新SparkConf()...
。
如果我停止我的应用程序和更改配置中的应用程序文件,这些更改不会拿起除非我删除检查点目录内容。例如,我想改 spark.streaming.kafka.maxRatePerPartition
或 spark.windowDurationSecs
动态。 (修改:我杀了应用程序,更改配置文件,然后重新启动应用程序。)我怎样做动态地更改这些设置或执行(已编辑WORD)的配置变化不诋毁我的检查点目录(即将包括国家信息检查点)?
我怎会动态地更改这些设置或执行更改配置,而不捣毁我检查点目录?
块引用>如果潜入code为
StreamingContext.getOrCreate
:高清getOrCreate(
checkpointPath:字符串,
creatingFunc:()=>的StreamingContext,
hadoopConf:配置= SparkHadoopUtil.get.conf,
createOnError:布尔= FALSE
)的StreamingContext = {
VAL checkpointOption = CheckpointReader.read(
checkpointPath,新SparkConf(),hadoopConf,createOnError)
checkpointOption.map(新的StreamingContext(NULL,_,NULL))。getOrElse(creatingFunc())
}您可以看到,如果
CheckpointReader
已在类路径检查点的数据,它使用新SparkConf()
作为一个参数,如超载不允许创建SparkConf
自定义的传递。默认情况下,SparkConf
将加载任何设置声明为一个环境变量或传递到类路径:类SparkConf(loadDefaults:布尔)扩展了Cloneable与记录{ 进口SparkConf._ / **创建一个从系统属性默认加载一个SparkConf和类路径* /
高清这个()=这(真)所以,实现你想要的是,而不是建立在code进行
SparkConf
对象的一个方法,你可以通过传递参数spark.driver.extraClassPath
和spark.executor.extraClassPath
到火花提交
。I have a Spark streaming / DStream app like this:
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
Where my context uses a configuration file where I can pull items with methods like
appConf.getString
. So I actually use:val context = StreamingContext.getOrCreate( appConf.getString("spark.checkpointDirectory"), () => createStreamContext(sparkConf, appConf))
where
val sparkConf = new SparkConf()...
.If I stop my app and change configuration in the app file, these changes are not picked up unless I delete the checkpoint directory contents. For example, I would like to change
spark.streaming.kafka.maxRatePerPartition
orspark.windowDurationSecs
dynamically. (EDIT: I kill the app, change the configuration file and then restart the app.) How can I do dynamically change these settings or enforce a (EDITED WORD) configuration change without trashing my checkpoint directory (which is about to include checkpoints for state info)?解决方案How can I do dynamically change these settings or enforce a configuration change without trashing my checkpoint directory?
If dive into the code for
StreamingContext.getOrCreate
:def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = SparkHadoopUtil.get.conf, createOnError: Boolean = false ): StreamingContext = { val checkpointOption = CheckpointReader.read( checkpointPath, new SparkConf(), hadoopConf, createOnError) checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) }
You can see that if
CheckpointReader
has checkpointed data in the class path, it usesnew SparkConf()
as a parameter, as the overload doesn't allow for passing of a custom createdSparkConf
. By default,SparkConf
will load any settings declared either as an environment variable or passed to the classpath:class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { import SparkConf._ /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true)
So one way of achieving what you want is instead of creating a
SparkConf
object in the code, you can pass the parameters viaspark.driver.extraClassPath
andspark.executor.extraClassPath
tospark-submit
.这篇关于与Check-尖星火流中流更改配置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!