通过从kafka中读取详细信息来动态创建flink窗口 [英] Dynamic flink window creation by reading the details from kafka
问题描述
说
Kafka消息包含flink窗口大小配置.
Kafka message contain flink window size configuration.
我想从kafka中读取消息,并在flink中创建全局窗口.
I want read the message from kafka and create global window in flink.
问题陈述:
我们可以使用BroadcastStream处理上述情况吗?
Can we handle above scenario by using BroadcastStream ?
或
还有其他方法可以支持上述情况吗?
Any other approach which will support above case ?
推荐答案
Flink的窗口API不支持动态更改窗口大小.
Flink's window API does not support dynamically changing window sizes.
您需要做的是使用流程函数实现自己的窗口.在这种情况下,将使用KeyedBroadcastProcessFunction,在其中广播窗口配置.
What you'll need to do is to implement your own windowing using a process function. In this case a KeyedBroadcastProcessFunction, where the window configuration is broadcast.
您可以检查 Flink培训有关如何使用KeyedProcessFunction(在下面复制)实现时间窗口的示例:
You can examine the Flink training for an example of how to implement time windows with a KeyedProcessFunction (copied below):
public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
// Keyed, managed state, with an entry for each window.
// There is a separate MapState object for each sensor.
private MapState<Long, Integer> countInWindow;
boolean eventTimeProcessing;
int durationMsec;
/**
* Create the KeyedProcessFunction.
* @param eventTime whether or not to use event time processing
* @param durationMsec window length
*/
public PseudoWindow(boolean eventTime, int durationMsec) {
this.eventTimeProcessing = eventTime;
this.durationMsec = durationMsec;
}
@Override
public void open(Configuration config) {
MapStateDescriptor<Long, Integer> countDesc =
new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
countInWindow = getRuntimeContext().getMapState(countDesc);
}
@Override
public void processElement(
KeyedDataPoint<Double> dataPoint,
Context ctx,
Collector<KeyedDataPoint<Integer>> out) throws Exception {
long endOfWindow = setTimer(dataPoint, ctx.timerService());
Integer count = countInWindow.get(endOfWindow);
if (count == null) {
count = 0;
}
count += 1;
countInWindow.put(endOfWindow, count);
}
public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
long time;
if (eventTimeProcessing) {
time = dataPoint.getTimeStampMs();
} else {
time = System.currentTimeMillis();
}
long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);
if (eventTimeProcessing) {
timerService.registerEventTimeTimer(endOfWindow);
} else {
timerService.registerProcessingTimeTimer(endOfWindow);
}
return endOfWindow;
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
// Get the timestamp for this timer and use it to look up the count for that window
long ts = context.timestamp();
KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
out.collect(result);
countInWindow.remove(timestamp);
}
}
这篇关于通过从kafka中读取详细信息来动态创建flink窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!