为什么我看不到Kafka Streams reduce方法的任何输出? [英] Why don't I see any output from the Kafka Streams reduce method?

查看:350
本文介绍了为什么我看不到Kafka Streams reduce方法的任何输出?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给出以下代码:

KStream<String, Custom> stream =  
    builder.stream(Serdes.String(), customSerde, "test_in");

stream
    .groupByKey(Serdes.String(), customSerde)
    .reduce(new CustomReducer(), "reduction_state")
    .print(Serdes.String(), customSerde);

我在申请中有一个 println 声明Reducer的方法,当我预期减少时会成功打印出来。但是,上面显示的最终打印语句不显示任何内容。同样,如果我使用方法而不是 print ,我在目标主题中看不到任何消息。

I have a println statement inside the apply method of the Reducer, which successfully prints out when I expect the reduction to take place. However, the final print statement shown above displays nothing. likewise if I use a to method rather than print, I see no messages in the destination topic.

在reduce语句之后我需要什么来查看减少的结果?如果一个值被推送到输入,我不希望看到任何东西。如果按下具有相同键的第二个值,我希望减少器应用(它确实如此),并且我还期望减少的结果继续到处理管道中的下一步。如上所述,我在管道的后续步骤中没有看到任何内容,我不明白为什么。

What do I need after the reduce statement to see the result of the reduction? If one value is pushed to the input I don't expect to see anything. If a second value with the same key is pushed I expect the reducer to apply (which it does) and I also expect the result of the reduction to continue to the next step in the processing pipeline. As described I'm not seeing anything in subsequent steps of the pipeline and I don't understand why.

推荐答案

从Kafka开始 0.10.1.0 所有聚合运算符都使用内部重复数据删除缓存来减少结果KTable changelog流的负载。例如,如果您使用相同的密钥直接计算和处理两个记录,则完整的更改日志流将为< key:1>,< key:2>

As of Kafka 0.10.1.0 all aggregation operators use an internal de-duplication cache to reduce the load of the result KTable changelog stream. For example, if you count and process two records with same key directly after each other, the full changelog stream would be <key:1>, <key:2>.

使用新的缓存功能,缓存将收到< key:1> 并存储它,但是不要马上把它送到下游。当计算< key:2> 时,它将替换缓存的第一个条目。根据缓存大小,不同密钥数,吞吐量和提交间隔,缓存会向下游发送条目。这种情况发生在单个密钥条目的缓存逐出或缓存的完全刷新(向下游发送所有条目)。因此,KTable更改日志可能只显示< key:2> (因为< key:1> 得到了de -duplicated)。

With the new caching feature, the cache would receive <key:1> and store it, but not send it downstream right away. When <key:2> is computed, it replace the first entry of the cache. Depending on the cache size, number of distinct key, throughput, and your commit interval, the cache sends entries downstream. This happens either on cache eviction for a single key entry or as a complete flush of the cache (sending all entries downstream). Thus, the KTable changelog might only show <key:2> (because <key:1> got de-duplicated).

您可以通过Streams配置参数 StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 来控制缓存的大小。如果将值设置为零,则完全禁用缓存,KTable更改日志将包含所有更新(有效提供前 0.10.1.0 行为)。

You can control the size of the cache via Streams configuration parameter StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG. If you set the value to zero, you disable caching completely and the KTable changelog will contain all updates (effectively providing pre 0.10.1.0 behavior).

汇编文档包含一个更详细解释缓存的部分:

Confluent documentation contains a section explaining the cache in more detail:

  • http://docs.confluent.io/current/streams/architecture.html#record-caches
  • http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management

这篇关于为什么我看不到Kafka Streams reduce方法的任何输出?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆