卡夫卡流-第一个示例WordCount在第一圈没有正确计数 [英] Kafka streams - First example WordCount doesn't count correctly the first lap

查看:174
本文介绍了卡夫卡流-第一个示例WordCount在第一圈没有正确计数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究Kafka Streams,从文档中获取Java 8中WordCount的第一个示例时遇到了问题.

I'm studying Kafka Streams and I have a problem with the first example of WordCount in Java 8, taken from the documentation.

使用kafka流的最新可用版本,Kafka Connect和WordCount lambda表达式示例.

Using the latest available versions of kafka streams, Kafka Connect and WordCount lambda expressions example.

我遵循以下步骤: 我在Kafka中创建一个输入主题,然后在输出中创建一个主题.开始应用流式传输,然后通过从.txt文件中插入一些单词来上传输入主题

I follow the following steps: I create an input topic in Kafka, and an output one. Start the app streaming and then uploading the input topic by inserting some words from a .txt file

在第一个计数上,在输出主题中,我看到了正确分组的单词,但是计数是错误的.如果我尝试重新插入相同的单词,则先前错误计数中的连续计数都是正确的.

On the first count, in the output topic I see the words grouped correctly, but the counts are wrong. If I try to reinsert the same words, the successive counts from the previous incorrect counts are all correct.

如果我使用使用者控制台查看输入主题转储,则表明该转储已正确加载,并且没有脏数据.

If I Looking the input topic dump with a consumer console, it's loaded properly and there are no dirty data.

第一次算错了吗?

示例[FIRST DATA]: (在Kafka中输入主题) 嗨嗨 迈克迈克 测试

Example [FIRST DATA]: (input Topic in Kafka) hi hi mike mike test

(应用程序流正在运行)

(App streaming is running)

(输出主题)嗨12麦克4测试3(随机计数)

(output Topic) hi 12 mike 4 test 3 (casual counts)

[成功数据-在输入主题中输入相同的词]

[SUCCESSIVE DATA - Posting in the input topic the same words]

(输出主题)嗨14迈克6测试4

(output Topic) hi 14 mike 6 test 4

[新尝试]

(输出主题)嗨16麦克8测试5

(output Topic) hi 16 mike 8 test 5

以此类推....

推荐答案

Apache Kafka中的WordCount演示具有

The WordCount demo in Apache Kafka has the following lines:

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

这意味着,当您重新启动应用程序时,它将从最开始(最早")开始读取其输入主题. iff Kafka中存储的WordCount应用程序没有现有的使用方偏移量.在一定量的应用程序不活动之后,应用程序的消费者补偿在Kafka中到期,默认值为24小时(请参见offsets.retention.minutes

This means that, when you restart the app, it will read its input topic from the very beginning ("earliest") iff there are no existing consumer offsets for the WordCount app stored in Kafka. An app's consumer offsets expire in Kafka after a certain amount of app inactivity, the default is 24 hours (cf. the offsets.retention.minutes broker configuration).

我可以想象发生了以下事情:

I could imagine that the following happened:

  • 您早些时候对Kafka进行了实验,并将测试数据输入到输入主题中.
  • 然后,您需要休息24小时以上才能恢复实验.
  • 现在,该应用在重新启动时恢复为从头开始一直重新读取输入主题,从而获取了较旧的测试输入数据,从而导致计数膨胀".

如果我使用使用者控制台查看输入主题转储,则表明该转储已正确加载,并且没有脏数据.

If I Looking the input topic dump with a consumer console, it's loaded properly and there are no dirty data.

您可以通过在添加CLI选项--from-beginning时再次与控制台使用者一起查看输入主题来验证我的假设(请参见 https://kafka.apache.org/documentation/#quickstart_consume ).

You can verify my hypothesis above by looking at the input topic again with the console consumer while adding the CLI option --from-beginning (see https://kafka.apache.org/documentation/#quickstart_consume).

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning

这将向您显示主题"yourInputTopic"中的所有可用数据-减去与此同时从Kafka主题中清除的任何数据(默认代理配置将清除7天之前的数据,请参见.log.retention.hours).

This will show you all the available data in the topic "yourInputTopic" -- minus any data that might have been purged from the Kafka topics in the meantime (the default broker configuration will purge data that is older than 7 days, cf. log.retention.hours).

这篇关于卡夫卡流-第一个示例WordCount在第一圈没有正确计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆