通过从 kafka 读取细节来动态创建 flink 窗口 [英] Dynamic flink window creation by reading the details from kafka
问题描述
假设 Kafka 消息包含 flink 窗口大小配置.
Let's say Kafka messages contain flink window size configuration.
我想从Kafka读取消息并在flink中创建一个全局窗口.
I want to read the message from Kafka and create a global window in flink.
问题陈述:
我们可以使用 BroadcastStream 处理上述场景吗?
Can we handle the above scenario by using BroadcastStream ?
或
任何其他方法可以支持上述情况?
Any other approach which will support the above case ?
推荐答案
Flink 的窗口 API 不支持动态改变窗口大小.
Flink's window API does not support dynamically changing window sizes.
您需要做的是使用进程函数实现您自己的窗口.在本例中为 KeyedBroadcastProcessFunction,其中广播窗口配置.
What you'll need to do is to implement your own windowing using a process function. In this case a KeyedBroadcastProcessFunction, where the window configuration is broadcast.
您可以查看Flink 培训 有关如何使用 KeyedProcessFunction 实现时间窗口的示例(复制如下):
You can examine the Flink training for an example of how to implement time windows with a KeyedProcessFunction (copied below):
public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
// Keyed, managed state, with an entry for each window.
// There is a separate MapState object for each sensor.
private MapState<Long, Integer> countInWindow;
boolean eventTimeProcessing;
int durationMsec;
/**
* Create the KeyedProcessFunction.
* @param eventTime whether or not to use event time processing
* @param durationMsec window length
*/
public PseudoWindow(boolean eventTime, int durationMsec) {
this.eventTimeProcessing = eventTime;
this.durationMsec = durationMsec;
}
@Override
public void open(Configuration config) {
MapStateDescriptor<Long, Integer> countDesc =
new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
countInWindow = getRuntimeContext().getMapState(countDesc);
}
@Override
public void processElement(
KeyedDataPoint<Double> dataPoint,
Context ctx,
Collector<KeyedDataPoint<Integer>> out) throws Exception {
long endOfWindow = setTimer(dataPoint, ctx.timerService());
Integer count = countInWindow.get(endOfWindow);
if (count == null) {
count = 0;
}
count += 1;
countInWindow.put(endOfWindow, count);
}
public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
long time;
if (eventTimeProcessing) {
time = dataPoint.getTimeStampMs();
} else {
time = System.currentTimeMillis();
}
long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);
if (eventTimeProcessing) {
timerService.registerEventTimeTimer(endOfWindow);
} else {
timerService.registerProcessingTimeTimer(endOfWindow);
}
return endOfWindow;
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
// Get the timestamp for this timer and use it to look up the count for that window
long ts = context.timestamp();
KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
out.collect(result);
countInWindow.remove(timestamp);
}
}
这篇关于通过从 kafka 读取细节来动态创建 flink 窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!