简单的 Kafka 消费者没有收到消息 [英] Simple Kafka Consumer not receiving messages
问题描述
我是 Kafka 的新手,正在运行一个简单的 kafka 消费者/生产者示例,如 KafkaConsumer 和 KafkaProducer.当我从终端运行消费者时,消费者正在接收消息,但我无法使用 Java 代码进行侦听.我也在 StackoverFlow 上搜索过类似的问题(链接:Link1、Link2) 并尝试了该解决方案,但似乎没有任何效果为我工作.Kafka版本:kafka_2.10-0.10.2.1
,pom中使用了对应的maven依赖.
I am a newbie to Kafka and running a simple kafka consumer/producer example as given on KafkaConsumer and KafkaProducer. When I am running consumer from terminal, consumer is receiving messages but I am not able to listen using Java code.
I have searched for similar issues on StackoverFlow also (Links: Link1, Link2) and tried that solutions but nothing seems to be working for me.
Kafka Version: kafka_2.10-0.10.2.1
and corresponding maven dependency is used in pom.
生产者和消费者的 Java 代码:
Java Code for producer and consumer:
public class SimpleProducer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9094");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("topic3", Integer.toString(i), Integer.toString(i)));
producer.close();
}}
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9094");
props.put("group.id", "test");
props.put("zookeeper.connect", "localhost:2181");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic3", "topic2"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}}
启动卡夫卡:bin/kafka-server-start.sh config/server.properties
(我已经在属性文件中设置了端口、brokerid)
Starting kafka:
bin/kafka-server-start.sh config/server.properties
(I have already set port, brokerid in properties file)
推荐答案
首先使用以下方法检查所有可用的组:
First check what all the groups are available by using :
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
然后使用下面的 cmd 检查您的主题属于哪个组:
Then check for which group your topic belongs by using below cmd :
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe
一旦您找到您的主题和关联的组名(如果它不属于默认组,只需将 group.id 替换为您的组)然后尝试使用以下道具并让我知道它是否有效:
Once you find your topic and associated group name (just replace group.id with your group if it not belongs to default group) then try with below prop and let me know if it works :
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group"); // default topic name
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName)); // replace you topic name
//print the topic name
java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
System.out.println("list of topic size :" + listTopics.size());
for(String topic : listTopics.keySet()){
System.out.println("topic name :"+topic);
}
这篇关于简单的 Kafka 消费者没有收到消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!