Kafka 0.10 Java使用者未从主题读取消息 [英] Kafka 0.10 Java consumer not reading message from topic

查看:71
本文介绍了Kafka 0.10 Java使用者未从主题读取消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个像下面这样的简单Java生产者

I have a simple java producer like below

public class Producer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Producer<String, byte[]> producer = createProducer();
        for(int i=0;i<3000;i++) {
            String msg = "Test Message-" + i;
            final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
            producer.send(record).get();
            System.out.println("Sent message " + msg);
        }
        producer.close();
    }

    private static Producer<String, byte[]> createProducer() {
        Properties props = new Properties();
        props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("client.id", "AppFromJava");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("compression.codec", "snappy");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer<String, byte[]>(props);
    }
}

我正在尝试读取以下数据

I am trying to read data as below

public class Consumer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Consumer<String, byte[]> consumer = createConsumer();
        start(consumer);
    }

    static void start(Consumer<String, byte[]> consumer) throws InterruptedException {
        final int giveUp = 10;   
        int noRecordsCount = 0;
        int stopCount = 1000;

        while (true) {
            final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }


            consumerRecords.forEach(record -> {
               // Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic());
            });

            consumer.commitSync();
            break;
        }
        consumer.close();
        System.out.println("DONE");
    }

    private static Consumer<String, byte[]> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                    BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                                    "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class.getName());
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
        props.put("enable.auto.commit", "false");

        // Create the consumer using props.
        final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }
}

但是,消费者没有阅读来自kafka的任何消息.如果我在start()

But the consumer is not reading any message from kafka. If I add the below at the very start()

consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());

然后,消费者开始阅读该主题.但是,每当使用者重新启动时,它都会从我不希望的主题开头读取消息.如果我在启动Consumer时添加以下配置

Then the consumer starts reading from the topic. But then each time the consumer is restarted it is reading message from the start of the topic which I don't want. If I add the below config while starting Consumer

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

然后它从该主题中读取消息,但是如果使用者在处理所有消息之前重新启动,则它不会读取未处理的消息.

then it reads message from the topic but if the consumer gets restarted before processing all the message then it does not read the unprocessed message.

有人可以让我知道发生了什么问题以及如何解决此问题吗?

Can someone let me know what is going wrong and how can I fix this?

Kafka代理和Zookeeper正在使用默认配置运行.

Kafka broker and zookeeper is running with the default configuration.

推荐答案

您对commitSync()的调用将确认来自上一个poll()的批处理中的所有消息,而不仅仅是在处理消息时确认每个消息,这是什么我认为您正在尝试做.

Your call to commitSync() is acknowledging all messages in the batch from the last poll() and not just each individual one as you are processing them which is what I think you are trying to do.

从文档

上面的示例使用commitSync将所有接收到的记录标记为已提交.在某些情况下,您可能希望通过显式指定偏移量来更好地控制已提交的记录.在下面的示例中,在处理完每个分区中的记录之后,我们提交偏移量.

"The above example uses commitSync to mark all received records as committed. In some cases you may wish to have even finer control over which records have been committed by specifying an offset explicitly. In the example below we commit offset after we finish handling the records in each partition.

 try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }

注意:提交的偏移量应始终是应用程序将读取的下一条消息的偏移量.因此,在调用commitSync(offsets)时,应在最后处理的消息的偏移量上添加一个. "

Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed. "

这篇关于Kafka 0.10 Java使用者未从主题读取消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆