为什么commitAsync无法提交前两个偏移量 [英] Why commitAsync fails to commit the first 2 offsets

查看:105
本文介绍了为什么commitAsync无法提交前两个偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了一个奇怪的问题,那就是消费者无法使comitAsync日志的前两个偏移量,而且我不知道原因.这很奇怪,因为在生产者的同一异步发送中的其他消息已由消费者接收并成功提交.有人可以找到此问题的根源.我在下面引用我的代码和输出示例

I faced a weird problem at which the consumer can not make comitAsync the first 2 offsets of the log and i don't know the reason. It is very weird because the other messages at the same asynchronous send of the producer received and commited succesfuly by the consumer .Can someone find the source of this problem.. I quote my code below and an output example

package com.panos.example;

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class Consumer extends ShutdownableThread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;

    public Consumer(String topic) {
        super("KafkaConsumerExample", false);
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.75:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<Integer, String>(props);
        this.topic = topic;
    }

    @Override
    public void doWork() {

        consumer.subscribe(Collections.singletonList(this.topic));
        try {
            ConsumerRecords<Integer, String> records = consumer.poll(1000);
            long startTime = System.currentTimeMillis();
            if (!records.isEmpty()) {
                System.out.println("C : {} Total No. of records received : {}" + records.count());

                for (ConsumerRecord<Integer, String> record : records) {
                    System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
                    consumer.commitAsync(new ConsumerCallBack(startTime,record.value(), record.offset()));
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public String name() {
        return null;
    }

    @Override
    public boolean isInterruptible() {
        return false;
    }


    class ConsumerCallBack implements OffsetCommitCallback {

        private final long startTime;
        private String message;
        private final String NewLine = System.getProperty("line.separator");
        private long offset;

        public ConsumerCallBack(long startTime) {
            this.startTime = startTime;
        }

        public ConsumerCallBack(long startTime, String message, long offset) {
            this.startTime = startTime;
            this.message=message;
            this.offset = offset;
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> CurrentOffset,
                               Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (exception != null) {
                System.out.println("Message : {" + message + "}, committed successfully at offset " + offset +
                        CurrentOffset + "elapsed time :" + elapsedTime);
            } else {
                System.out.println(exception.toString());
               /* JOptionPane.showMessageDialog(new Frame(),
                        "Something Goes Wrong with the Server Please Try again Later.",
                        "Inane error",
                        JOptionPane.ERROR_MESSAGE);*/
            }
        }
    }
}

您可以看到除前两个消息外,所有消息均已成功提交,没有任何异常.为什么会这样?

As you can see all message committed successfully except the first 2 without any exception. Why this happens?

Received message: (1, Message_1) at offset 160
Received message: (2, Message_2) at offset 161
Received message: (3, Message_3) at offset 162
Received message: (4, Message_4) at offset 163
Message : {Message_3}, committed successfully at offset 162{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6
Message : {Message_4}, committed successfully at offset 163{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6

推荐答案

如果使用commitAsync,则可能会将多个提交压缩在一起成为一条提交消息.当偏移量以递增的顺序提交时,偏移量X的提交是所有小于X的偏移量的隐式提交.在您的情况下,似乎是一次偏移量3的提交或前三个偏移量完成了

If you use commitAsync it can happen that multiple commits are squashed together into a single commit message. As offsets are committed in increasing order, a commit of offset X is an implicit commit for all offsets that are smaller than X. In your case, it seems, that the commits or the first three offsets are done my a single commit of offset 3.

这篇关于为什么commitAsync无法提交前两个偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆