Apache Flink Streaming窗口WordCount [英] Apache Flink Streaming window WordCount

查看:204
本文介绍了Apache Flink Streaming窗口WordCount的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码来计算来自socketTextStream的单词。需要累计单词计数和时间窗口单词计数。该程序存在一个问题,即cumulateCounts始终与窗口计数相同。为什么会出现此问题?根据窗口计数计算累积计数的正确方法是什么?

I have following code to count words from socketTextStream. Both cumulate word counts and time windowed word counts are needed. The program has an issue that cumulateCounts is always the same as windowed counts. Why this issue occurs? What is the correct way to calculate cumulate counts base on windowed counts?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();

final DataStream<Tuple2<String, Integer>> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .window(Time.of(5, TimeUnit.SECONDS))
            .groupBy(0).sum(1)
            .flatten();

counts.print();

counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        String word = value.f0;
        Integer delta_count = value.f1;
        Integer count = cumulateCounts.get(word);
        if (count == null)
            count = 0;
        count = count + delta_count;
        cumulateCounts.put(word, count);
        System.out.println("(" + word + "," + count.toString() + ")");
    }
});


推荐答案

首先应分组,并应用窗口在密钥数据流上(您的代码适用于Flink 0.9.1但Flink 0.10.0中的新API对此严格):

You should first group-by, and apply the window on the keyed data stream (your code works on Flink 0.9.1 but the new API in Flink 0.10.0 is strict about this):

final DataStream<Tuple2<String, Integer>> counts = env
        .socketTextStream("localhost", 9999)
        .flatMap(new Splitter())
        .groupBy(0)
        .window(Time.of(5, TimeUnit.SECONDS)).sum(1)
        .flatten();

如果在非键控数据流上应用窗口,则只有一个线程窗口单个机器上的运算符(即没有并行性)在整个流上构建窗口(在Flink 0.9.1中,这个全局窗口可以通过 groupBy() - 但是,在Flink 0.10.0中,这将不再起作用)。要对单词进行计数,您需要为每个不同的键值构建一个窗口,即,您首先获得每个键值的子流(通过 groupBy())并应用每个子流上的窗口运算符(因此,您可以为每个子流创建一个自己的窗口运算符实例,允许并行执行)。

If you apply a window on a non-keyed data stream, there will be only a single threaded window operator on a single machine (ie, no parallelism) to build the window on the whole stream (in Flink 0.9.1, this global window can be split into sub-windows by groupBy() -- however, in Flink 0.10.0 this will not work any more). To counts words, you want to build a window for each distinct key value, ie, you first get a sub-stream per key value (via groupBy()) and apply a window operator on each sub stream (thus, you could have an own window operator instance for each sub-stream, allowing for parallel execution).

对于全局(累积) count,你可以简单地应用 groupBy()。sum()构造。首先,将流分成子流(每个键值一个)。其次,计算流的总和。因为流窗口化,所​​以计算(累积)和为每个传入元组更新的总和(更详细地,总和的初始结果值为零,并且每个元组的结果都更新为结果+ = tuple.value )。每次调用sum之后,都会发出新的当前结果。

For a global (cumulated) count, you can simple apply a groupBy().sum() construct. First, the stream is split into sub-stream (one for each key value). Second, you compute the sum over the stream. Because the stream is not windowed, the sum in computed (cumulative) and updated for each incoming tuple (in more details, the sum has an initial result value of zero and the result is updated for each tuple as result += tuple.value). After each invocation of sum, the new current result is emitted.

在您的代码中,您不应该使用特殊的接收器功能,而是按照以下步骤操作:

In your code, you should not use your special sink function but do as follows:

counts.groupBy(0).sum(1).print();

这篇关于Apache Flink Streaming窗口WordCount的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆