使用 Kafka Streams DSL 进行两步窗口聚合 [英] 2 step windowed aggregation with Kafka Streams DSL

查看:51
本文介绍了使用 Kafka Streams DSL 进行两步窗口聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个流stream-1",每秒由 1 个数据点组成,我想计算一个派生流stream-5",它包含使用 5 秒的跳跃窗口和另一个流stream"的总和-10",它基于包含使用 10 秒跳跃窗口的总和的stream-5".需要分别为每个键完成聚合,我希望能够在不同的过程中运行每个步骤.如果流 5 和流 10 包含相同密钥/时间戳的更新(所以我不一定需要 如何发送时间窗口KTable的最终kafka-streams聚合结果?)作为只要最后的值是正确的.

Suppose I have a stream "stream-1" consisting of 1 datapoint every second and I'd like to calculate a derived stream "stream-5" which contains the sum using a hopping window of 5 seconds and another stream "stream-10" which is based off "stream-5" containing the sum using a hopping window of 10 seconds. The aggregation needs to be done for each key separately and I'd like to be able to run each step in a different process. It is not a problem in itself if stream-5 and stream-10 contain updates for the same key/timestamp (so I don't necessarily need How to send final kafka-streams aggregation result of a time windowed KTable?) as long as the last values are correct.

是否有一种(简单的)方法可以使用高级 Kafka Streams DSL 来解决这个问题?到目前为止,我没有看到一种优雅的方法来处理由于聚合而在流 5 上产生的中间更新.

Is there an (easy) way to solve this using the high-level Kafka Streams DSL? So far I fail to see an elegant way to deal with the intermediate updates produced on stream-5 due to the aggregation.

我知道可以通过 cache.max.bytes.bufferingcommit.interval.ms 设置以某种方式控制中间更新,但我认为没有任何设置可以保证在所有情况下都不会产生中间值.此外,我可以尝试使用密钥的时间戳部分将stream-5"转换为读取时的 KTable,但似乎 KTable 不支持像 KStreams 那样的窗口操作.

I know the intermediate updates can be somehow controlled with the cache.max.bytes.buffering and commit.interval.ms settings but I don't think any setting can guarantee in all cases that no intermediate values will be produced. Also I could try converting "stream-5" to a KTable on read with the timestamp part of the key, but then it seems KTable does not support windowing operations like KStreams do.

这是我到目前为止所拥有的,但由于流 5 上的中间聚合值而失败

This is what I have so far and which fails due to the intermediate aggregate values on stream-5

Reducer<DataPoint> sum = new Reducer<DataPoint>() {                                                                           
    @Override                                                                                                                 
    public DataPoint apply(DataPoint x, DataPoint y) {                                                                        
        return new DataPoint(x.timestamp, x.value + y.value);                                                                 
    }                                                                                                                         
 };                                                                                                                           

 KeyValueMapper<Windowed<String>, DataPoint, String> strip = new 
           KeyValueMapper<Windowed<String>, DataPoint, String>() {      
      @Override                                                                                                               
      public String apply(Windowed<String> wKey, DataPoint arg1) {                                                            
          return wKey.key();                                                                                                  
      }                                                                                                                       
 };                                                                                                                           

KStream<String, DataPoint> s1 = builder.stream("stream-1");                                                                      

s1.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(5000).advanceBy(5000))                                                                     
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-5");                                                                                                          

KStream<String, DataPoint> s5 = builder.stream("stream-5");                                                                      

s5.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(10000).advanceBy(10000))                                                                   
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-10");      

现在如果 stream-1 包含输入(key 就是 KEY)

Now if stream-1 contains intputs (the key is just KEY)

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":1000,"value":1.0}
KEY {"timestamp":2000,"value":1.0}
KEY {"timestamp":3000,"value":1.0}
KEY {"timestamp":4000,"value":1.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":6000,"value":1.0}
KEY {"timestamp":7000,"value":1.0}
KEY {"timestamp":8000,"value":1.0}
KEY {"timestamp":9000,"value":1.0}

stream-5 包含正确的(最终)值:

stream-5 contains the correct (final) values:

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":2.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":4.0}
KEY {"timestamp":0,"value":5.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":5000,"value":2.0}
KEY {"timestamp":5000,"value":3.0}
KEY {"timestamp":5000,"value":4.0}
KEY {"timestamp":5000,"value":5.0}

但是 stream-10 是错误的(最终值应该是 10.0),因为它还考虑了 stream-5 上的中间值:

but stream-10 is wrong (final value should be 10.0) because it also takes into account the intermediate values on stream-5:

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":6.0}
KEY {"timestamp":0,"value":10.0}
KEY {"timestamp":0,"value":15.0}
KEY {"timestamp":0,"value":21.0}
KEY {"timestamp":0,"value":28.0}
KEY {"timestamp":0,"value":36.0}
KEY {"timestamp":0,"value":45.0}
KEY {"timestamp":0,"value":55.0}

推荐答案

问题是所有聚合的结果都是 KTables,这意味着生成到其输出主题的记录代表一个 changlog.但是,当您随后将它们作为流加载时,下游聚合将被重复计算.

The problem is that the results of all aggregations are KTables, which means the records produced to their output topic represent a changlog. However, when you subsequently load them as a stream, the downstream aggregations will be double-counting then.

相反,您需要将中间主题加载为表,而不是流.但是,您将无法对它们使用窗口聚合,因为它们仅在流上可用.

Instead, you need to load the intermediate topic as tables, not streams. However, you will not be able to use windowed aggregations on them then, as those are only available on streams.

您可以使用以下模式来完成对表而不是流的窗口聚合:

You can use the following pattern to accomplish a windowed aggregation over tables instead of streams:

https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows

如果您想在单独的过程中运行每个步骤,您可以调整它,只需记住使用 builder.table() 而不是 builder.stream() 加载中间表.

If you want to run each step in a separate process you can adapt that, just remember to load intermediate tables using builder.table(), not builder.stream().

这篇关于使用 Kafka Streams DSL 进行两步窗口聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆