为什么我看不到 Kafka Streams reduce 方法的任何输出? [英] Why don't I see any output from the Kafka Streams reduce method?

查看:13
本文介绍了为什么我看不到 Kafka Streams reduce 方法的任何输出?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定以下代码:

KStream<String, Custom> stream =  
    builder.stream(Serdes.String(), customSerde, "test_in");

stream
    .groupByKey(Serdes.String(), customSerde)
    .reduce(new CustomReducer(), "reduction_state")
    .print(Serdes.String(), customSerde);

我在 Reducer 的 apply 方法中有一个 println 语句,当我期望减少发生时,它会成功打印出来.但是,上面显示的最终打印语句没有显示任何内容.同样,如果我使用 to 方法而不是 print,我在目标主题中看不到任何消息.

I have a println statement inside the apply method of the Reducer, which successfully prints out when I expect the reduction to take place. However, the final print statement shown above displays nothing. likewise if I use a to method rather than print, I see no messages in the destination topic.

在reduce语句之后需要什么才能看到reduce的结果?如果一个值被推送到输入,我不希望看到任何东西.如果按下具有相同键的第二个值,我希望减少器应用(它确实如此),并且我还希望减少的结果继续到处理管道的下一步.如上所述,我在管道的后续步骤中没有看到任何内容,我不明白为什么.

What do I need after the reduce statement to see the result of the reduction? If one value is pushed to the input I don't expect to see anything. If a second value with the same key is pushed I expect the reducer to apply (which it does) and I also expect the result of the reduction to continue to the next step in the processing pipeline. As described I'm not seeing anything in subsequent steps of the pipeline and I don't understand why.

推荐答案

从 Kafka 0.10.1.0 开始,所有聚合运算符都使用内部重复数据删除缓存来减少结果 KTable 更改日志流的负载.例如,如果您直接对具有相同键的两条记录进行计数和处理,则完整的变更日志流将是 , .

As of Kafka 0.10.1.0 all aggregation operators use an internal de-duplication cache to reduce the load of the result KTable changelog stream. For example, if you count and process two records with same key directly after each other, the full changelog stream would be <key:1>, <key:2>.

使用新的缓存功能,缓存将接收 并存储它,但不会立即将其发送到下游.当计算 时,它会替换缓存的第一个条目.根据缓存大小、不同键的数量、吞吐量和您的提交间隔,缓存向下游发送条目.这发生在单个键条目的缓存逐出时或作为缓存的完全刷新(向下游发送所有条目).因此,KTable 更改日志可能只显示 (因为 已删除重复).

With the new caching feature, the cache would receive <key:1> and store it, but not send it downstream right away. When <key:2> is computed, it replace the first entry of the cache. Depending on the cache size, number of distinct key, throughput, and your commit interval, the cache sends entries downstream. This happens either on cache eviction for a single key entry or as a complete flush of the cache (sending all entries downstream). Thus, the KTable changelog might only show <key:2> (because <key:1> got de-duplicated).

您可以通过 Streams 配置参数 StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 控制缓存的大小.如果将该值设置为零,则将完全禁用缓存,并且 KTable 更改日志将包含所有更新(有效提供 0.10.0.0 前的行为).

You can control the size of the cache via Streams configuration parameter StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG. If you set the value to zero, you disable caching completely and the KTable changelog will contain all updates (effectively providing pre 0.10.0.0 behavior).

Confluent 文档包含更详细地解释缓存的部分:

Confluent documentation contains a section explaining the cache in more detail:

这篇关于为什么我看不到 Kafka Streams reduce 方法的任何输出?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆