Kafka 0.9.0 新的 Java Consumer API 获取重复记录 [英] Kafka 0.9.0 New Java Consumer API fetching duplicate records

查看:24
本文介绍了Kafka 0.9.0 新的 Java Consumer API 获取重复记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 kafka 的新手,我正在尝试使用 Apache kafka 0.9.0 Java 客户端构建一个简单的消费者-生产者消息队列(传统队列)模型.

I am new to kafka and i am trying to prototype a simple consumer-producer message queue (traditional queue) model using Apache kafka 0.9.0 Java clients.

从生产者进程中,我将 100 条随机消息推送到配置有 3 个分区的主题.这看起来不错.

From the producer process, i am pushing 100 random messages to a topic configured with 3 partitions. This looks fine.

我创建了 3 个具有相同组 ID 的消费者线程,订阅了相同的主题.启用自动提交.由于所有 3 个消费者线程都订阅了相同的主题,因此我假设每个消费者都将获得一个要消费的分区,并将提交每个分区的偏移日志.

I created 3 consumer threads with same group id, subscribed to the same topic. auto commit enabled. Since all 3 consumer threads are subscribed to same topic i assume that each consumer will get a partition to consume and will commit the offset logs per partition.

但是我在这里遇到了奇怪的问题.我所有的消息都是重复的.我从我的每个线程中在消费者端获得了 x 倍的记录.由于我的每个消费者线程都执行无限循环以从主题中进行轮询,因此我必须终止该进程.

But i am facing weird problem here. all my messages are duplicated. i get x time more records at consumer side from each of my thread. Since each of my consumer thread does infinite loop to poll from topic i have to kill the process.

我什至尝试使用单线程,但仍然得到重复记录 x 次并且仍在继续.

I even tried with single thread and still i get duplicate records x times and still continues.

任何人都可以帮我找出我在这里做的错误.

Could any please help me identify what mistake i am doing here.

我正在发布我的消费者代码供您参考.

I am posting my consumer code for your reference.

public class ConsumerDemo {

public static void main(String[] args) {

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build();
    ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory);

    executor.submit(new ConsumerThread("topic1", "myThread-1"));
    executor.submit(new ConsumerThread("topic1", "myThread-2"));
    executor.submit(new ConsumerThread("topic1", "myThread-3"));

    //executor shutdown logic is skipped
}
}

消费者话题:

public class ConsumerThread  implements Runnable {

private static final String KAFKA_BROKER = "<<IP:port>>";

private final KafkaConsumer<String, String> consumer;

    public ConsumerThread(String topic, String name) {
        Properties props = new Properties();
        props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER);
        props.put("group.id", "DemoConsumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "6000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer(props);
        this.consumer.subscribe(Collections.singletonList(topic));
    }


    public void run() {
        try {
            boolean isRunning = true;
            while (isRunning) {
                ConsumerRecords<String,String> records= consumer.poll(10L);
                System.out.println("Partition Assignment to this Consumer: "+consumer.assignment());
                Iterator it = records.iterator();
                while(it.hasNext()) {
                    ConsumerRecord record = (ConsumerRecord)it.next();
                    System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset());
                }
            }
            consumer.close();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

同样非常重要的是,我的目标是恰好一次语义.我知道距离那是 1000 英里.非常感谢任何帮助.

Also very importantly, i am aiming for exactly once semantic. I know am 1000 mile away for that. Any help is really appreciated.

观察:调试系统输出打印所有 3 个 tpoics.这是否意味着分区没有分配给每个消费者?

Observation: Debug sysout prints all 3 tpoics. Does this mean that partitions are not assigned to each consumer?

分配给这个消费者的分区:[topic1-1, topic1-0, topic1-2]

Partition Assignment to this Consumer: [topic1-1, topic1-0, topic1-2]

Kafka 专家,除了上述问题,我正在寻找其他 2 个输入.

Kafka experts, apart from above problem i am looking 2 other inputs.

  1. 请帮助我理解上面代码中的错误.
  2. 一般来说,如何实现一次原理图.如果可能,请举例.
  3. 消费者停机等异常情况.如何在不丢失消息的情况下进行处理.

提前致谢.

推荐答案

好吧,我发现我的代码/理解有什么问题.

Well I figured out whats wrong with my code/undertanding.

在开始原型设计之前,我应该完全阅读 Kafka 文档.

I should have read Kafka documentation completely before i jump start to prototyping.

这是我发现的.

默认情况下,Kafka 保证至少一次原理图.这意味着消费者至少收到一次消息(可能多次.我假设如果我有 3 个分区并创建 3 个消费者,Kafka API 将负责将一个分区随机分配给一个消费者.这是错误的.

By default Kafka guarantees at least once schematic. This means consumer gets the message at least once (possibly multiple times. I assumed that if i have 3 partitions and create 3 consumer, Kafka API will take care of assigning a partition to only one consumer randomly. This is wrong.

所以我手动为每个消费者分配了一个分区,以确保我的消费者拥有该分区并控制偏移量,如下所示

So i manually assigned a partition to each consumer to make sure one that my consumer owns the partition and controls the offset like below

consumer = new KafkaConsumer(props)    
TopicPartition partition = new TopicPartition(topic, partitionNum);
consumer.assign(Collections.singletonList(partition));

恰好一次场景:为了确保我们只消费一次消息,我们需要控制偏移量.虽然到目前为止我还没有尝试过,但根据我从大量谷歌搜索中学到的东西,它可以更好地保存偏移量和数据.最好是同一笔交易.数据和偏移量肯定会保存或回滚重试.

Exactly once scenario: To make sure we consume a message exaclty once, we need to have control on the offset. Although i have not tried so far but based on what i learnt from lot of googling is that its better way to save the offset along with the data. Preferably same transaction. Both data and offset are definitely saved or rolled back for retry.

感谢任何其他解决方案.

Any other solutions are appreciated.

这篇关于Kafka 0.9.0 新的 Java Consumer API 获取重复记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆