卡夫卡消费者 - 不一致地接收消息 [英] Kafka Consumer - receiving messages Inconsistently

查看:24
本文介绍了卡夫卡消费者 - 不一致地接收消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我可以在命令行上针对 Kafka 位置安装发送和接收消息.我还可以通过 Java 代码发送消息.这些消息显示在 Kafka 命令提示符中.我还有一个供 Kafka 消费者使用的 Java 代码.代码昨天收到消息.但是,今天早上它没有收到任何消息.代码没有改变.我想知道属性配置是否不太正确.这是我的配置:

I can send and receive messages on command line against a Kafka location installation. I also can send messages through a Java code. And those messages are showing up in a Kafka command prompt. I also have a Java code for the Kafka consumer. The code received message yesterday. It doesn't receive any messages this morning, however. The code has not been changed. I am wondering whether the property configuration isn't quite right nor not. Here is my configuration:

制作人:

bootstrap.servers - localhost:9092
group.id - test
key.serializer - StringSerializer.class.getName()
value.serializer - StringSerializer.class.getName()

并且 ProducerRecord 设置为

and the ProducerRecord is set as

ProducerRecord<String, String>("test", "mykey",  "myvalue")

消费者:

zookeeper.connect - "localhost:2181"
group.id - "test"
zookeeper.session.timeout.ms - 500
zookeeper.sync.time.ms - 250
auto.commit.interval.ms - 1000
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer

对于 Java 代码:

and for Java code:

   Map<String, Integer> topicCount = new HashMap<>();
   topicCount.put("test", 1);

   Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
            .createMessageStreams(topicCount);
   List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

缺少什么?

推荐答案

可能会发生很多事情.

首先,您的消费者的 ZooKeeper 会话超时非常低,这意味着由于垃圾收集暂停,消费者可能会遇到许多软故障".发生这种情况时,消费者组将重新平衡,从而可以暂停消费.如果这种情况发生得非常频繁,消费者可能会进入一种从不消费消息的状态,因为它不断地被重新平衡.我建议将 ZooKeeper 会话超时增加到 30 秒,看看这是否能解决问题.如果是这样,您可以尝试将其调低.

First, your consumer's ZooKeeper session timeout is very low, which means the consumer may be experiencing many "soft failures" due to garbage collection pauses. When this happens, the consumer group will rebalance, which can pause consumption. And if this is happening very frequently, the consumer could get into a state where it never consumes messages because it's constantly being rebalanced. I suggest increasing the ZooKeeper session timeout to 30 seconds to see if this resolves the issue. If so, you can experiment setting it lower.

第二,您能否确认正在向测试"主题生成新消息?您的消费者只会消费尚未提交的新消息.该主题可能没有任何新消息.

Second, can you confirm new messages are being produced to the "test" topic? Your consumer will only consume new messages that it hasn't committed yet. It's possible the topic doesn't have any new messages.

第三,同一个消费者组中是否有其他消费者可以处理消息?如果一个消费者经常遇到软故障,其他消费者将被分配到它的分区.

Third, do you have other consumers in the same consumer group that could be processing the messages? If one consumer is experiencing frequent soft failures, other consumers will be assigned its partitions.

最后,您正在使用最终将被删除的旧"消费者.如果可能,我建议转移到 Kafka 0.9 中提供的新"消费者(KafkaConsumer.java).虽然我不能保证这会解决您的问题.

Finally, you're using the "old" consumer which will eventually be removed. If possible, I suggest moving to the "new" consumer (KafkaConsumer.java) which was made available in Kafka 0.9. Although I can't promise this will resolve your issue.

希望这会有所帮助.

这篇关于卡夫卡消费者 - 不一致地接收消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆