Flink:如何处理Flink中的外部应用程序配置更改 [英] Flink: How to handle external app configuration changes in flink

查看:540
本文介绍了Flink:如何处理Flink中的外部应用程序配置更改的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的要求是一天要流式传输数百万条记录,并且它对外部配置参数有很大的依赖性.例如,用户可以随时在Web应用程序中去更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流传输.这些是应用程序级别的配置,我们还有一些动态排除参数,每个数据都必须通过这些参数并进行过滤.

My requirement is to stream millions of records in a day and it has huge dependency on external configuration parameters. For example, a user can go and change the required setting anytime in the web application and after the change is made, the streaming has to happen with the new application config parameters. These are app level configurations and we also have some dynamic exclude parameters which each data has to be passed through and filtered.

我看到flink没有在所有任务管理器和子任务之间共享的全局状态.拥有集中式缓存是一种选择,但是对于每个参数,我都必须从缓存中读取它,这会增加延迟.请就处理此类情况的更好方法以及其他应用程序如何处理提供建议.谢谢.

I see that flink doesn’t have global state which is shared across all task managers and subtasks. Having a centralized cache is an option but for each parameter I would have to read it from cache which will increase the latency. Please advise on the better approach to handle these kind of scenarios and how other applications are handling it. Thanks.

推荐答案

更新正在运行的流应用程序的配置是常见的要求.在Flink的DataStream API中,这可以使用处理两个输入流的所谓CoFlatMapFunction来完成.其中一个流可以是数据流,另一个可以是控制流.

Updating the configuration of a running streaming application is a common requirements. In Flink's DataStream API this can be done using a so-called CoFlatMapFunction which processes two input streams. One of the streams can be a data stream and the other a control stream.

以下示例显示了如何动态调整可过滤出超过一定长度的字符串的用户功能.

The following example shows how to dynamically adapt a user function that filters out strings that exceed a certain length.

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
  // broadcast all control messages to the following CoFlatMap subtasks
  .connect(control.broadcast)
  // process data and control messages
  .flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {

  var length = 0

  // filter strings by length
  override def flatMap1(value: String, out: Collector[String]): Unit = {
    if (value.length < length) {
      out.collect(value)
    }
  }

  // receive new filter length
  override def flatMap2(value: Int, out: Collector[String]): Unit = {
    length = value
  }

  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

  override def restoreState(state: Int): Unit = {
    length = state
  }
}

DynLengthFilter用户功能为过滤器长度实现Checkpointed接口.如果发生故障,此信息将自动恢复.

The DynLengthFilter user function implements the Checkpointed interface for the filter length. In case of a failure, this information is automatically restored.

这篇关于Flink:如何处理Flink中的外部应用程序配置更改的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆