Kafka 流字数统计应用程序 [英] Kafka streams word count application

查看:37
本文介绍了Kafka 流字数统计应用程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 kafka 流 API(Kakfa 版本:0.10.2.0)尝试制作一个简单的 wordcount 示例:Wordcount 应用要点.我同时运行生产者和控制台消费者:

I'm playing around with the kafka streaming API (Kakfa version: 0.10.2.0) trying to make a simple wordcount example work: Wordcount App gist. I'm running both producer and console consumer:

./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092

./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning

启动应用程序,一切似乎都运行良好,但是当我在控制台生产者中输入一些字符串时,消费者根本没有收到任何信息.如果我将应用程序更改为对输入执行简单的 toUppercase,则消费者会收到流(修改为大写)很好:

start the application and everything seems to be working fine but when I type in some strings within the console producer, the consumer receives nothing at all. If I change the app to do a simple toUppercase on the input the consumer receives the stream (modified to upper case) fine:

//下面的代码工作正常:val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase())uppercasedWithMapValues.to("output-topic")

有谁知道为什么我在字数统计示例中没有收到任何信息?我应该在消费者上指定任何序列化程序吗?在我上次测试中,控制台使用者处理了我通过控制台发送但没有显示它们的消息,请参见下面的输出:

Does anyone know why I'm receiving nothing on the word-count example? Should I specify any serializer on the consumer? In my last test the console consumer processed the messages that I sent through the console but didn't show them, see below the output:

➜  bin ./kafka-console-consumer.sh \
           --topic output-topic \
           --bootstrap-server localhost:9092 \
           --from-beginning                                                                                
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 : 
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned 
to any members in the group console-consumer-91651 : [output-topic]  
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

^C共处理了7条消息

推荐答案

KStream 可以工作,因为它不使用缓存.对于 KTable,您必须稍等片刻,或者将 cache.max.bytes.buffering 设置为 0(但不在生产代码中!)

KStream works because it doesn't use caching. For KTable you have to wait a bit, or set cache.max.bytes.buffering to 0 (but not in a production code!)

这篇关于Kafka 流字数统计应用程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆