flink计数不同的问题 [英] flink count distinct issue

查看:93
本文介绍了flink计数不同的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

现在,我们使用滚动窗口进行计数.我们面临的问题是,如果我们将翻滚窗口逐日延长,我们现在就无法获得数字.这意味着,如果将翻滚窗口设置为1个月,那么我们得到的数字就是每月的1号.我现在如何获得当前的非重复计数(现在是3月9日)?

Now we use tumbling window to count distinct. The issue we have is if we extend our tumbling window from day to month, We can't have the number as of now distinct count. That means if we set the tumbling window as 1 month, the number we get is from every 1st of each month. How can I get the current distinct count for now(Now is Mar 9.)?

package flink.trigger;

import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Date;

public class CustomCountDistinctTigger extends Trigger<Object, TimeWindow> {

    private final ReducingStateDescriptor<Long> timeState =
            new ReducingStateDescriptor<>("fire-interval", new DistinctCountAggregateFunction(), LongSerializer.INSTANCE);
    private long interval;


    public CustomCountDistinctTigger(long interval) {
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
//        System.out.println("onProcessingTime called at "+System.currentTimeMillis() );
//        return TriggerResult.FIRE_AND_PURGE;
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(df.format(new Date()));
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);

        if(window.maxTimestamp() == time) {
            return TriggerResult.FIRE_AND_PURGE;
        }
        else if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

    }

}


distinct count:
DataStreamSink<Tuple2<String, Integer>> finalResultStream = keyedStream
                            .flatMap(new KPIDistinctDataFlatMapFunction(inputSchema))
                            .map(new SwapMap())
                            .keyBy(new WordKeySelector())
                            .window(TumblingProcessingTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                            .trigger(new CustomCountDistinctTigger(1 * 60 * 6000))
                            .aggregate(new DistinctCountAggregateFunction())
                            .print("final print");

推荐答案

您可以定义一个自定义触发器,该触发器每天返回FIRE一次以触发中间结果,然后在月底执行FIRE_AND_PURGE关闭窗口.

You can define a custom Trigger that returns FIRE once a day to trigger intermediate results, and then does a FIRE_AND_PURGE at the end of the month to close the window.

每次触发器返回FIRE时,通过调用 ProcessWindowFunction process()方法对窗口进行评估,此时,它可以使用生成结果提供的收集器.FIRE_AND_PURGE最后一次评估该窗口,然后销毁它.

Every time the Trigger returns FIRE your window is evaluated by calling the process() method of your ProcessWindowFunction, at which point it can produce results with the Collector that is provided. FIRE_AND_PURGE evaluates the window one last time, and then destroys it.

另请参阅此问题的答案-如何在窗口化的流式ETL中显示中间结果?-涵盖了相关主题.

See also the answers to this question -- How to display intermediate results in a windowed streaming-etl? -- which covered a related topic.

这篇关于flink计数不同的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆