flink 计数不同的问题 [英] flink count distinct issue

查看:29
本文介绍了flink 计数不同的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

现在我们使用滚动窗口来计算不同的.我们遇到的问题是,如果我们将翻滚窗口从一天延长到一个月,我们将无法获得目前不同的数量.这意味着如果我们将翻滚窗口设置为 1 个月,我们得到的数字来自每个月的 1 号.我现在如何获得当前的非重复计数(现在是 3 月 9 日.)?

Now we use tumbling window to count distinct. The issue we have is if we extend our tumbling window from day to month, We can't have the number as of now distinct count. That means if we set the tumbling window as 1 month, the number we get is from every 1st of each month. How can I get the current distinct count for now(Now is Mar 9.)?

package flink.trigger;

import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Date;

public class CustomCountDistinctTigger extends Trigger<Object, TimeWindow> {

    private final ReducingStateDescriptor<Long> timeState =
            new ReducingStateDescriptor<>("fire-interval", new DistinctCountAggregateFunction(), LongSerializer.INSTANCE);
    private long interval;


    public CustomCountDistinctTigger(long interval) {
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
//        System.out.println("onProcessingTime called at "+System.currentTimeMillis() );
//        return TriggerResult.FIRE_AND_PURGE;
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(df.format(new Date()));
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);

        if(window.maxTimestamp() == time) {
            return TriggerResult.FIRE_AND_PURGE;
        }
        else if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

    }

}


distinct count:
DataStreamSink<Tuple2<String, Integer>> finalResultStream = keyedStream
                            .flatMap(new KPIDistinctDataFlatMapFunction(inputSchema))
                            .map(new SwapMap())
                            .keyBy(new WordKeySelector())
                            .window(TumblingProcessingTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                            .trigger(new CustomCountDistinctTigger(1 * 60 * 6000))
                            .aggregate(new DistinctCountAggregateFunction())
                            .print("final print");

推荐答案

您可以定义一个自定义的 Trigger,每天返回 FIRE 一次以触发中间结果,然后在月底执行 FIRE_AND_PURGE 以关闭窗口.

You can define a custom Trigger that returns FIRE once a day to trigger intermediate results, and then does a FIRE_AND_PURGE at the end of the month to close the window.

每次触发器返回 FIRE 时,您的窗口都会通过调用 ProcessWindowFunctionprocess() 方法进行评估,此时它可以使用 生成结果提供的收集器.FIRE_AND_PURGE 最后一次评估窗口,然后销毁它.

Every time the Trigger returns FIRE your window is evaluated by calling the process() method of your ProcessWindowFunction, at which point it can produce results with the Collector that is provided. FIRE_AND_PURGE evaluates the window one last time, and then destroys it.

另请参阅此问题的答案--如何在窗口流媒体etl中显示中间结果?--其中涵盖了一个相关主题.

See also the answers to this question -- How to display intermediate results in a windowed streaming-etl? -- which covered a related topic.

这篇关于flink 计数不同的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆