简单的 Kafka 消费者没有收到消息 [英] Simple Kafka Consumer not receiving messages

查看:42
本文介绍了简单的 Kafka 消费者没有收到消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Kafka 的新手,正在运行一个简单的 kafka 消费者/生产者示例,如 KafkaConsumerKafkaProducer.当我从终端运行消费者时,消费者正在接收消息,但我无法使用 Java 代码进行侦听.我也在 StackoverFlow 上搜索过类似的问题(链接:Link1Link2) 并尝试了该解决方案,但似乎没有任何效果为我工作.Kafka版本:kafka_2.10-0.10.2.1,pom中使用了对应的maven依赖.

I am a newbie to Kafka and running a simple kafka consumer/producer example as given on KafkaConsumer and KafkaProducer. When I am running consumer from terminal, consumer is receiving messages but I am not able to listen using Java code. I have searched for similar issues on StackoverFlow also (Links: Link1, Link2) and tried that solutions but nothing seems to be working for me. Kafka Version: kafka_2.10-0.10.2.1 and corresponding maven dependency is used in pom.

生产者和消费者的 Java 代码:

Java Code for producer and consumer:

public class SimpleProducer {
public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>("topic3", Integer.toString(i), Integer.toString(i)));

    producer.close();

}}

public class SimpleConsumer {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("group.id", "test");
    props.put("zookeeper.connect", "localhost:2181");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("topic3", "topic2"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}}

启动卡夫卡:bin/kafka-server-start.sh config/server.properties(我已经在属性文件中设置了端口、brokerid)

Starting kafka: bin/kafka-server-start.sh config/server.properties (I have already set port, brokerid in properties file)

推荐答案

首先使用以下方法检查所有可用的组:

First check what all the groups are available by using :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

然后使用下面的 cmd 检查您的主题属于哪个组:

Then check for which group your topic belongs by using below cmd :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe

一旦您找到您的主题和关联的组名(如果它不属于默认组,只需将 group.id 替换为您的组)然后尝试使用以下道具并让我知道它是否有效:

Once you find your topic and associated group name (just replace group.id with your group if it not belongs to default group) then try with below prop and let me know if it works :

  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "test-consumer-group"); // default topic name
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

  //Kafka Consumer subscribes list of topics here.
  consumer.subscribe(Arrays.asList(topicName));  // replace you topic name

  //print the topic name

  java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
  System.out.println("list of topic size :" + listTopics.size());

  for(String topic : listTopics.keySet()){
      System.out.println("topic name :"+topic);
  }

这篇关于简单的 Kafka 消费者没有收到消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆