为什么这个 KStream/KTable 拓扑会传播没有通过过滤器的记录? [英] Why is this KStream/KTable topology propagating records that don't pass the filter?

查看:27
本文介绍了为什么这个 KStream/KTable 拓扑会传播没有通过过滤器的记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下拓扑:

  1. 创建一个状态存储
  2. 根据 SOME_CONDITION 过滤记录,将其值映射到新实体,最后将这些记录发布到另一个主题 STATIONS_LOW_CAPACITY_TOPIC

但是我在 STATIONS_LOW_CAPACITY_TOPIC 上看到了这个:

However I am seeing this on the STATIONS_LOW_CAPACITY_TOPIC:

�   null
�   null
�   null
�   {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
�   {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
�   null

也就是说,它好像也在将那些没有通过过滤器的记录发布到 STATIONS_LOW_CAPACITY_TOPIC 主题.这怎么可能?如何防止它们被发布?

That is to say, it's as if it were also publishing to the STATIONS_LOW_CAPACITY_TOPIC topic those records that didn't pass the filter. How is this possible? How can I prevent them to be published?

这是ksteams代码:

This is the ksteams code:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .filter { _, value -> SOME_CONDITION }
                .mapValues { station ->
                    Stats(XXX)
                }
                .toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

更新:我已经简化了拓扑并打印了结果表.出于某种原因,最终的 KTable 还包含与未通过过滤器的上游记录相对应的空值记录:

UPDATE: I've simplyfied to topology and printed the resulting table. For some reason the final KTable also contains null valued records corresponding to upstream records that didn't pass the filter:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .filter { _, value ->
                    val conditionResult = (SOME_CONDITION)
                    println(conditionResult)
                    conditionResult
                }
                .print()

日志:

false
[KTABLE-FILTER-0000000002]: 1, (null<-null)
false
[KTABLE-FILTER-0000000002]: 2, (null<-null)
false
[KTABLE-FILTER-0000000002]: 3, (null<-null)
false
[KTABLE-FILTER-0000000002]: 4, (null<-null)
true
[KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)

推荐答案

答案在KTable.filter(...)的javadoc中:

The answer was in the javadoc of KTable.filter(...):

请注意,更改日志流的过滤器与记录的工作方式不同流过滤器,因为具有空值的记录(所谓的墓碑记录)具有删除语义.因此,对于墓碑,提供不评估过滤谓词,但墓碑记录是如果需要,直接转发(即,如果有什么需要删除).此外,对于每个被丢弃的记录(即点不满足给定的谓词)一个墓碑记录被转发.

Note that filter for a changelog stream works different to record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record is forwarded.

这就解释了为什么我看到向下游发送的空值(墓碑)记录.

That explains why I'm seeing null valued (tombstone) records sent downstream.

为了避免它,我将 KTable 转换为 KStream,然后应用过滤器:

To avoid it I converted the KTable to KStream and then applied the filter:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, Stations, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .toStream()
                .filter { _, value -> SOME_CONDITION }
                .mapValues { station ->
                    StationStats(station.id, station.latitude, station.longitude, ...)
                }
                .to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

结果:

4   {"id":4,"latitude":"40.4302937","longitude":"-3.7069171",...}
5   {"id":5,"latitude":"40.4285524","longitude":"-3.7025875",...}
...

这篇关于为什么这个 KStream/KTable 拓扑会传播没有通过过滤器的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆