Kafka Consumer-接收消息不一致 [英] Kafka Consumer - receiving messages Inconsistently

查看:91
本文介绍了Kafka Consumer-接收消息不一致的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我可以针对Kafka位置安装在命令行上发送和接收消息.我也可以通过Java代码发送消息.这些消息显示在Kafka命令提示符下.我也有一个针对Kafka使用者的Java代码.该代码昨天收到消息.但是,今天早上它没有收到任何消息.代码未更改.我想知道属性配置是否不正确.这是我的配置:

I can send and receive messages on command line against a Kafka location installation. I also can send messages through a Java code. And those messages are showing up in a Kafka command prompt. I also have a Java code for the Kafka consumer. The code received message yesterday. It doesn't receive any messages this morning, however. The code has not been changed. I am wondering whether the property configuration isn't quite right nor not. Here is my configuration:

生产者:

bootstrap.servers - localhost:9092
group.id - test
key.serializer - StringSerializer.class.getName()
value.serializer - StringSerializer.class.getName()

并将ProducerRecord设置为

and the ProducerRecord is set as

ProducerRecord<String, String>("test", "mykey",  "myvalue")

消费者:

zookeeper.connect - "localhost:2181"
group.id - "test"
zookeeper.session.timeout.ms - 500
zookeeper.sync.time.ms - 250
auto.commit.interval.ms - 1000
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer

以及Java代码:

   Map<String, Integer> topicCount = new HashMap<>();
   topicCount.put("test", 1);

   Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
            .createMessageStreams(topicCount);
   List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

缺少什么?

推荐答案

可能正在发生许多事情.

A number of things could be going on.

首先,您的使用者的ZooKeeper会话超时非常低,这意味着使用者由于垃圾收集暂停而可能遇到许多软故障".发生这种情况时,消费群体将重新平衡,这可能会暂停消费.而且,如果这种情况经常发生,那么使用者可能会进入一种状态,在这种状态下,由于不断地进行平衡,因此它从不使用消息.我建议将ZooKeeper会话超时增加到30秒,以查看是否可以解决问题.如果是这样,您可以尝试将其设置得更低.

First, your consumer's ZooKeeper session timeout is very low, which means the consumer may be experiencing many "soft failures" due to garbage collection pauses. When this happens, the consumer group will rebalance, which can pause consumption. And if this is happening very frequently, the consumer could get into a state where it never consumes messages because it's constantly being rebalanced. I suggest increasing the ZooKeeper session timeout to 30 seconds to see if this resolves the issue. If so, you can experiment setting it lower.

第二,您可以确认正在针对测试"主题生成新消息吗?您的使用者只会使用尚未提交的新消息.该主题可能没有任何新消息.

Second, can you confirm new messages are being produced to the "test" topic? Your consumer will only consume new messages that it hasn't committed yet. It's possible the topic doesn't have any new messages.

第三,您在同一消费者组中是否有其他消费者可以处理消息?如果一个使用者遇到频繁的软故障,则将为其他使用者分配其分区.

Third, do you have other consumers in the same consumer group that could be processing the messages? If one consumer is experiencing frequent soft failures, other consumers will be assigned its partitions.

最后,您使用的是旧"使用者,该使用者最终将被删除.如果可能的话,我建议转移到Kafka 0.9中可用的新"消费者(KafkaConsumer.java).尽管我不能保证这会解决您的问题.

Finally, you're using the "old" consumer which will eventually be removed. If possible, I suggest moving to the "new" consumer (KafkaConsumer.java) which was made available in Kafka 0.9. Although I can't promise this will resolve your issue.

希望这会有所帮助.

这篇关于Kafka Consumer-接收消息不一致的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆