通过从kafka中读取详细信息来动态创建flink窗口 [英] Dynamic flink window creation by reading the details from kafka

查看:512
本文介绍了通过从kafka中读取详细信息来动态创建flink窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Kafka消息包含flink窗口大小配置.

Kafka message contain flink window size configuration.

我想从kafka中读取消息,并在flink中创建全局窗口.

I want read the message from kafka and create global window in flink.

问题陈述:

我们可以使用BroadcastStream处理上述情况吗?

Can we handle above scenario by using BroadcastStream ?

还有其他方法可以支持上述情况吗?

Any other approach which will support above case ?

推荐答案

Flink的窗口API不支持动态更改窗口大小.

Flink's window API does not support dynamically changing window sizes.

您需要做的是使用流程函数实现自己的窗口.在这种情况下,将使用KeyedBroadcastProcessFunction,在其中广播窗口配置.

What you'll need to do is to implement your own windowing using a process function. In this case a KeyedBroadcastProcessFunction, where the window configuration is broadcast.

您可以检查 Flink培训有关如何使用KeyedProcessFunction(在下面复制)实现时间窗口的示例:

You can examine the Flink training for an example of how to implement time windows with a KeyedProcessFunction (copied below):

public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
    // Keyed, managed state, with an entry for each window.
    // There is a separate MapState object for each sensor.
    private MapState<Long, Integer> countInWindow;

    boolean eventTimeProcessing;
    int durationMsec;

    /**
     * Create the KeyedProcessFunction.
     * @param eventTime whether or not to use event time processing
     * @param durationMsec window length
     */
    public PseudoWindow(boolean eventTime, int durationMsec) {
        this.eventTimeProcessing = eventTime;
        this.durationMsec = durationMsec;
    }

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<Long, Integer> countDesc =
                new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
        countInWindow = getRuntimeContext().getMapState(countDesc);
    }

    @Override
    public void processElement(
            KeyedDataPoint<Double> dataPoint,
            Context ctx,
            Collector<KeyedDataPoint<Integer>> out) throws Exception {

        long endOfWindow = setTimer(dataPoint, ctx.timerService());

        Integer count = countInWindow.get(endOfWindow);
        if (count == null) {
            count = 0;
        }
        count += 1;
        countInWindow.put(endOfWindow, count);
    }

    public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
        long time;

        if (eventTimeProcessing) {
            time = dataPoint.getTimeStampMs();
        } else {
            time = System.currentTimeMillis();
        }
        long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);

        if (eventTimeProcessing) {
            timerService.registerEventTimeTimer(endOfWindow);
        } else {
            timerService.registerProcessingTimeTimer(endOfWindow);
        }
        return endOfWindow;
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
        // Get the timestamp for this timer and use it to look up the count for that window
        long ts = context.timestamp();
        KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
        out.collect(result);
        countInWindow.remove(timestamp);
    }
} 

这篇关于通过从kafka中读取详细信息来动态创建flink窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆