Kafka 0.10 Java 消费者没有从主题中读取消息 [英] Kafka 0.10 Java consumer not reading message from topic

查看:33
本文介绍了Kafka 0.10 Java 消费者没有从主题中读取消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个像下面这样的简单 java 生产者

I have a simple java producer like below

public class Producer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Producer<String, byte[]> producer = createProducer();
        for(int i=0;i<3000;i++) {
            String msg = "Test Message-" + i;
            final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
            producer.send(record).get();
            System.out.println("Sent message " + msg);
        }
        producer.close();
    }

    private static Producer<String, byte[]> createProducer() {
        Properties props = new Properties();
        props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("client.id", "AppFromJava");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("compression.codec", "snappy");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer<String, byte[]>(props);
    }
}

我正在尝试读取如下数据

I am trying to read data as below

public class Consumer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Consumer<String, byte[]> consumer = createConsumer();
        start(consumer);
    }

    static void start(Consumer<String, byte[]> consumer) throws InterruptedException {
        final int giveUp = 10;   
        int noRecordsCount = 0;
        int stopCount = 1000;

        while (true) {
            final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }


            consumerRecords.forEach(record -> {
               // Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic());
            });

            consumer.commitSync();
            break;
        }
        consumer.close();
        System.out.println("DONE");
    }

    private static Consumer<String, byte[]> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                    BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                                    "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class.getName());
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
        props.put("enable.auto.commit", "false");

        // Create the consumer using props.
        final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }
}

但是消费者没有从 kafka 读取任何消息.如果我在 start()

But the consumer is not reading any message from kafka. If I add the below at the very start()

consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());

然后消费者开始阅读主题.但是每次重新启动消费者时,它都会从我不想要的主题开头读取消息.如果我在启动 Consumer 时添加以下配置

Then the consumer starts reading from the topic. But then each time the consumer is restarted it is reading message from the start of the topic which I don't want. If I add the below config while starting Consumer

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

然后它从主题中读取消息,但如果消费者在处理所有消息之前重新启动,则它不会读取未处理的消息.

then it reads message from the topic but if the consumer gets restarted before processing all the message then it does not read the unprocessed message.

谁能告诉我出了什么问题,我该如何解决?

Can someone let me know what is going wrong and how can I fix this?

Kafka broker 和 zookeeper 以默认配置运行.

Kafka broker and zookeeper is running with the default configuration.

推荐答案

您对 commitSync() 的调用正在确认来自上次 poll() 的批处理中的所有消息,而不仅仅是您正在处理的每个单独的消息,这就是我认为您正在尝试这样做.

Your call to commitSync() is acknowledging all messages in the batch from the last poll() and not just each individual one as you are processing them which is what I think you are trying to do.

来自文档

上面的示例使用 commitSync 将所有接收到的记录标记为已提交.在某些情况下,您可能希望通过明确指定偏移量来更好地控制已提交的记录.在下面的示例中,我们在处理完每个分区中的记录后提交偏移量.

"The above example uses commitSync to mark all received records as committed. In some cases you may wish to have even finer control over which records have been committed by specifying an offset explicitly. In the example below we commit offset after we finish handling the records in each partition.

 try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }

注意:提交的偏移量应该始终是您的应用程序将读取的下一条消息的偏移量.因此,当调用 commitSync(offsets) 时,您应该在处理的最后一条消息的偏移量上加一个."

Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed. "

这篇关于Kafka 0.10 Java 消费者没有从主题中读取消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆