为什么此KStream/KTable拓扑传播未通过过滤器的记录? [英] Why is this KStream/KTable topology propagating records that don't pass the filter?

查看:55
本文介绍了为什么此KStream/KTable拓扑传播未通过过滤器的记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我具有以下拓扑:

  1. 创建状态存储
  2. 根据SOME_CONDITION过滤记录,将其值映射到新实体,最后将这些记录发布到另一个主题STATIONS_LOW_CAPACITY_TOPIC

但是我在STATIONS_LOW_CAPACITY_TOPIC上看到了这一点:

However I am seeing this on the STATIONS_LOW_CAPACITY_TOPIC:

�   null
�   null
�   null
�   {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
�   {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
�   null

这就是说,好像也将那些未通过过滤器的记录也发布到STATIONS_LOW_CAPACITY_TOPIC主题中.这怎么可能?如何防止它们被发布?

That is to say, it's as if it were also publishing to the STATIONS_LOW_CAPACITY_TOPIC topic those records that didn't pass the filter. How is this possible? How can I prevent them to be published?

这是ksteams代码:

This is the ksteams code:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .filter { _, value -> SOME_CONDITION }
                .mapValues { station ->
                    Stats(XXX)
                }
                .toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

更新:我只是简单地选择了拓扑并打印了结果表.由于某种原因,最终的KTable还包含与未通过过滤器的上游记录相对应的空值记录:

UPDATE: I've simplyfied to topology and printed the resulting table. For some reason the final KTable also contains null valued records corresponding to upstream records that didn't pass the filter:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .filter { _, value ->
                    val conditionResult = (SOME_CONDITION)
                    println(conditionResult)
                    conditionResult
                }
                .print()

日志:

false
[KTABLE-FILTER-0000000002]: 1, (null<-null)
false
[KTABLE-FILTER-0000000002]: 2, (null<-null)
false
[KTABLE-FILTER-0000000002]: 3, (null<-null)
false
[KTABLE-FILTER-0000000002]: 4, (null<-null)
true
[KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)

推荐答案

答案在 KTable.filter(...)的javadoc中:

The answer was in the javadoc of KTable.filter(...):

请注意,变更日志流的过滤器与记录的工作原理不同流过滤器,因为记录具有空值(所谓的逻辑删除)记录)具有删除语义.因此,对于墓碑,过滤谓词未评估,但逻辑删除记录为根据需要直接转发(即是否有任何内容要转发)删除).此外,对于删除的每条记录(即,点不满足给定的谓词),转发了墓碑记录.

Note that filter for a changelog stream works different to record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record is forwarded.

这说明了为什么我看到向下游发送空值(逻辑删除)记录.

That explains why I'm seeing null valued (tombstone) records sent downstream.

为避免这种情况,我将KTable转换为KStream,然后应用了过滤器:

To avoid it I converted the KTable to KStream and then applied the filter:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, Stations, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .toStream()
                .filter { _, value -> SOME_CONDITION }
                .mapValues { station ->
                    StationStats(station.id, station.latitude, station.longitude, ...)
                }
                .to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

结果:

4   {"id":4,"latitude":"40.4302937","longitude":"-3.7069171",...}
5   {"id":5,"latitude":"40.4285524","longitude":"-3.7025875",...}
...

这篇关于为什么此KStream/KTable拓扑传播未通过过滤器的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆