使用Kafka Streams DSL进行2步窗口聚合 [英] 2 step windowed aggregation with Kafka Streams DSL

查看:102
本文介绍了使用Kafka Streams DSL进行2步窗口聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个每秒由1个数据点组成的流"stream-1",并且我想计算一个派生流"stream-5",该流使用5秒的跳变窗口包含和,而另一个流"stream-5" -10是基于"stream-5"的基础,其中包含使用10秒的跳变窗口的总和.聚合需要分别为每个键完成,我希望能够在不同的过程中运行每个步骤.如果stream-5和stream-10包含相同密钥/时间戳的更新本身就不是问题(因此我不一定需要

Suppose I have a stream "stream-1" consisting of 1 datapoint every second and I'd like to calculate a derived stream "stream-5" which contains the sum using a hopping window of 5 seconds and another stream "stream-10" which is based off "stream-5" containing the sum using a hopping window of 10 seconds. The aggregation needs to be done for each key separately and I'd like to be able to run each step in a different process. It is not a problem in itself if stream-5 and stream-10 contain updates for the same key/timestamp (so I don't necessarily need How to send final kafka-streams aggregation result of a time windowed KTable?) as long as the last values are correct.

是否有使用高级Kafka Streams DSL来解决此问题的(简便)方法?到目前为止,由于聚合,我无法看到一种优雅的方式来处理stream-5上产生的中间更新.

Is there an (easy) way to solve this using the high-level Kafka Streams DSL? So far I fail to see an elegant way to deal with the intermediate updates produced on stream-5 due to the aggregation.

我知道可以使用cache.max.bytes.bufferingcommit.interval.ms设置来控制中间更新,但是我认为任何设置都不能保证在任何情况下都不会产生中间值.另外,我可以尝试使用密钥的时间戳记部分将"stream-5"转换为读取时的KTable,但是似乎KTable不像KStreams那样支持窗口操作.

I know the intermediate updates can be somehow controlled with the cache.max.bytes.buffering and commit.interval.ms settings but I don't think any setting can guarantee in all cases that no intermediate values will be produced. Also I could try converting "stream-5" to a KTable on read with the timestamp part of the key, but then it seems KTable does not support windowing operations like KStreams do.

这是我到目前为止所拥有的,由于流5上的中间聚合值而失败了

This is what I have so far and which fails due to the intermediate aggregate values on stream-5

Reducer<DataPoint> sum = new Reducer<DataPoint>() {                                                                           
    @Override                                                                                                                 
    public DataPoint apply(DataPoint x, DataPoint y) {                                                                        
        return new DataPoint(x.timestamp, x.value + y.value);                                                                 
    }                                                                                                                         
 };                                                                                                                           

 KeyValueMapper<Windowed<String>, DataPoint, String> strip = new 
           KeyValueMapper<Windowed<String>, DataPoint, String>() {      
      @Override                                                                                                               
      public String apply(Windowed<String> wKey, DataPoint arg1) {                                                            
          return wKey.key();                                                                                                  
      }                                                                                                                       
 };                                                                                                                           

KStream<String, DataPoint> s1 = builder.stream("stream-1");                                                                      

s1.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(5000).advanceBy(5000))                                                                     
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-5");                                                                                                          

KStream<String, DataPoint> s5 = builder.stream("stream-5");                                                                      

s5.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(10000).advanceBy(10000))                                                                   
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-10");      

现在,如果stream-1包含intputs(密钥只是KEY)

Now if stream-1 contains intputs (the key is just KEY)

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":1000,"value":1.0}
KEY {"timestamp":2000,"value":1.0}
KEY {"timestamp":3000,"value":1.0}
KEY {"timestamp":4000,"value":1.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":6000,"value":1.0}
KEY {"timestamp":7000,"value":1.0}
KEY {"timestamp":8000,"value":1.0}
KEY {"timestamp":9000,"value":1.0}

stream-5包含正确的(最终)值:

stream-5 contains the correct (final) values:

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":2.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":4.0}
KEY {"timestamp":0,"value":5.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":5000,"value":2.0}
KEY {"timestamp":5000,"value":3.0}
KEY {"timestamp":5000,"value":4.0}
KEY {"timestamp":5000,"value":5.0}

但stream-10是错误的(最终值应为10.0),因为它还考虑了stream-5上的中间值:

but stream-10 is wrong (final value should be 10.0) because it also takes into account the intermediate values on stream-5:

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":6.0}
KEY {"timestamp":0,"value":10.0}
KEY {"timestamp":0,"value":15.0}
KEY {"timestamp":0,"value":21.0}
KEY {"timestamp":0,"value":28.0}
KEY {"timestamp":0,"value":36.0}
KEY {"timestamp":0,"value":45.0}
KEY {"timestamp":0,"value":55.0}

推荐答案

问题是所有聚合的结果都是KTables,这意味着为其输出主题生成的记录表示changlog.但是,当您随后将它们作为流加载时,下游聚合将被重复计算.

The problem is that the results of all aggregations are KTables, which means the records produced to their output topic represent a changlog. However, when you subsequently load them as a stream, the downstream aggregations will be double-counting then.

相反,您需要将中间主题加载为表而不是流.但是,您将无法在其上使用窗口聚合,因为这些聚合仅在流上可用.

Instead, you need to load the intermediate topic as tables, not streams. However, you will not be able to use windowed aggregations on them then, as those are only available on streams.

您可以使用以下模式在表而不是流上完成窗口聚合:

You can use the following pattern to accomplish a windowed aggregation over tables instead of streams:

https://cwiki .apache.org/confluence/display/KAFKA/Windowed + aggregations + over +成功+增加+ timed + windows

如果要在一个单独的过程中运行每个步骤,则可以对其进行调整,只需记住使用builder.table()而不是builder.stream()来加载中间表.

If you want to run each step in a separate process you can adapt that, just remember to load intermediate tables using builder.table(), not builder.stream().

这篇关于使用Kafka Streams DSL进行2步窗口聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆