Apache Flink Streaming 窗口 WordCount [英] Apache Flink Streaming window WordCount

查看:30
本文介绍了Apache Flink Streaming 窗口 WordCount的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码来计算 socketTextStream 中的字数.需要累积字数和时间窗口字数.该程序存在一个问题,即 cumulateCounts 始终与窗口计数相同.为什么会出现这个问题?基于窗口计数计算累积计数的正确方法是什么?

I have following code to count words from socketTextStream. Both cumulate word counts and time windowed word counts are needed. The program has an issue that cumulateCounts is always the same as windowed counts. Why this issue occurs? What is the correct way to calculate cumulate counts base on windowed counts?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();

final DataStream<Tuple2<String, Integer>> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .window(Time.of(5, TimeUnit.SECONDS))
            .groupBy(0).sum(1)
            .flatten();

counts.print();

counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        String word = value.f0;
        Integer delta_count = value.f1;
        Integer count = cumulateCounts.get(word);
        if (count == null)
            count = 0;
        count = count + delta_count;
        cumulateCounts.put(word, count);
        System.out.println("(" + word + "," + count.toString() + ")");
    }
});

推荐答案

您应该首先分组,并在键控数据流上应用窗口(您的代码适用于 Flink 0.9.1,但适用于 Flink 0.10 中的新 API.0 对此很严格):

You should first group-by, and apply the window on the keyed data stream (your code works on Flink 0.9.1 but the new API in Flink 0.10.0 is strict about this):

final DataStream<Tuple2<String, Integer>> counts = env
        .socketTextStream("localhost", 9999)
        .flatMap(new Splitter())
        .groupBy(0)
        .window(Time.of(5, TimeUnit.SECONDS)).sum(1)
        .flatten();

如果在非键控数据流上应用窗口,在单机上将只有一个单线程的窗口操作符(即没有并行性)来在整个流上构建窗口(在 Flink 0.9.1 中,这个全局窗口可以通过 groupBy() 拆分为子窗口——但是,在 Flink 0.10.0 中这将不再起作用).要计算字数,您需要为每个不同的键值构建一个窗口,即,首先获得每个键值的子流(通过 groupBy())并在每个子流上应用窗口运算符(因此,您可以为每个子流拥有一个自己的窗口运算符实例,从而允许并行执行).

If you apply a window on a non-keyed data stream, there will be only a single threaded window operator on a single machine (ie, no parallelism) to build the window on the whole stream (in Flink 0.9.1, this global window can be split into sub-windows by groupBy() -- however, in Flink 0.10.0 this will not work any more). To counts words, you want to build a window for each distinct key value, ie, you first get a sub-stream per key value (via groupBy()) and apply a window operator on each sub stream (thus, you could have an own window operator instance for each sub-stream, allowing for parallel execution).

对于全局(累积)计数,您可以简单地应用 groupBy().sum() 构造.首先,流被拆分为子流(每个键值一个).其次,您计算流的总和.由于流窗口化,计算(累积)和更新每个传入元组的总和(更详细地说,总和的初始结果值为零,每个元组的结果更新为result += tuple.value).每次调用 sum 后,都会发出新的当前结果.

For a global (cumulated) count, you can simple apply a groupBy().sum() construct. First, the stream is split into sub-stream (one for each key value). Second, you compute the sum over the stream. Because the stream is not windowed, the sum in computed (cumulative) and updated for each incoming tuple (in more details, the sum has an initial result value of zero and the result is updated for each tuple as result += tuple.value). After each invocation of sum, the new current result is emitted.

在你的代码中,你不应该使用你的特殊接收器函数,而是按如下操作:

In your code, you should not use your special sink function but do as follows:

counts.groupBy(0).sum(1).print();

这篇关于Apache Flink Streaming 窗口 WordCount的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆