为什么 commitAsync 无法提交前 2 个偏移量 [英] Why commitAsync fails to commit the first 2 offsets

查看:27
本文介绍了为什么 commitAsync 无法提交前 2 个偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了一个奇怪的问题,消费者无法将comitAsync设置为日志的前2个偏移量,我不知道原因.这很奇怪,因为生产者的同一个异步发送的其他消息被消费者接收并成功提交.有人能找到这个问题的根源吗..我在下面引用我的代码和输出示例

I faced a weird problem at which the consumer can not make comitAsync the first 2 offsets of the log and i don't know the reason. It is very weird because the other messages at the same asynchronous send of the producer received and commited succesfuly by the consumer .Can someone find the source of this problem.. I quote my code below and an output example

package com.panos.example;

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class Consumer extends ShutdownableThread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;

    public Consumer(String topic) {
        super("KafkaConsumerExample", false);
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.75:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<Integer, String>(props);
        this.topic = topic;
    }

    @Override
    public void doWork() {

        consumer.subscribe(Collections.singletonList(this.topic));
        try {
            ConsumerRecords<Integer, String> records = consumer.poll(1000);
            long startTime = System.currentTimeMillis();
            if (!records.isEmpty()) {
                System.out.println("C : {} Total No. of records received : {}" + records.count());

                for (ConsumerRecord<Integer, String> record : records) {
                    System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
                    consumer.commitAsync(new ConsumerCallBack(startTime,record.value(), record.offset()));
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public String name() {
        return null;
    }

    @Override
    public boolean isInterruptible() {
        return false;
    }


    class ConsumerCallBack implements OffsetCommitCallback {

        private final long startTime;
        private String message;
        private final String NewLine = System.getProperty("line.separator");
        private long offset;

        public ConsumerCallBack(long startTime) {
            this.startTime = startTime;
        }

        public ConsumerCallBack(long startTime, String message, long offset) {
            this.startTime = startTime;
            this.message=message;
            this.offset = offset;
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> CurrentOffset,
                               Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (exception != null) {
                System.out.println("Message : {" + message + "}, committed successfully at offset " + offset +
                        CurrentOffset + "elapsed time :" + elapsedTime);
            } else {
                System.out.println(exception.toString());
               /* JOptionPane.showMessageDialog(new Frame(),
                        "Something Goes Wrong with the Server Please Try again Later.",
                        "Inane error",
                        JOptionPane.ERROR_MESSAGE);*/
            }
        }
    }
}

如您所见,除了前 2 条消息外,所有消息均已成功提交,没有任何异常.为什么会发生这种情况?

As you can see all message committed successfully except the first 2 without any exception. Why this happens?

Received message: (1, Message_1) at offset 160
Received message: (2, Message_2) at offset 161
Received message: (3, Message_3) at offset 162
Received message: (4, Message_4) at offset 163
Message : {Message_3}, committed successfully at offset 162{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6
Message : {Message_4}, committed successfully at offset 163{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6

推荐答案

如果您使用 commitAsync,可能会发生多个提交被压缩到单个提交消息中的情况.由于偏移量按递增顺序提交,因此偏移量 X 的提交是所有小于 X 的偏移量的隐式提交.在您的情况下,似乎提交或前三个偏移量已完成我对偏移量 3 的单次提交.

If you use commitAsync it can happen that multiple commits are squashed together into a single commit message. As offsets are committed in increasing order, a commit of offset X is an implicit commit for all offsets that are smaller than X. In your case, it seems, that the commits or the first three offsets are done my a single commit of offset 3.

这篇关于为什么 commitAsync 无法提交前 2 个偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆