Kafka 0.9.0新的Java Consumer API获取重复记录 [英] Kafka 0.9.0 New Java Consumer API fetching duplicate records

查看:89
本文介绍了Kafka 0.9.0新的Java Consumer API获取重复记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是kafka的新手,我正在尝试使用Apache kafka 0.9.0 Java客户端为简单的消费者-生产者消息队列(传统队列)模型建立原型.

I am new to kafka and i am trying to prototype a simple consumer-producer message queue (traditional queue) model using Apache kafka 0.9.0 Java clients.

在生产者流程中,我将100条随机消息推送到配置了3个分区的主题.看起来不错.

From the producer process, i am pushing 100 random messages to a topic configured with 3 partitions. This looks fine.

我创建了3个具有相同组ID的使用者线程,并订阅了相同的主题.自动提交已启用.由于所有3个使用者线程都订阅了相同的主题,因此我假设每个使用者将获得一个要使用的分区,并将针对每个分区提交偏移量日志.

I created 3 consumer threads with same group id, subscribed to the same topic. auto commit enabled. Since all 3 consumer threads are subscribed to same topic i assume that each consumer will get a partition to consume and will commit the offset logs per partition.

但是我在这里面临着奇怪的问题.我所有的消息都是重复的.我从每个线程在消费者端获得了x倍的记录.由于我的每个使用者线程都进行无限循环以从主题进行轮询,因此我必须终止该过程.

But i am facing weird problem here. all my messages are duplicated. i get x time more records at consumer side from each of my thread. Since each of my consumer thread does infinite loop to poll from topic i have to kill the process.

我什至尝试使用单线程,但仍然得到x次重复记录,并且仍然继续.

I even tried with single thread and still i get duplicate records x times and still continues.

有什么可以帮助我确定我在这里犯的错误吗?

Could any please help me identify what mistake i am doing here.

我正在发布我的消费者代码供您参考.

I am posting my consumer code for your reference.

public class ConsumerDemo {

public static void main(String[] args) {

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build();
    ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory);

    executor.submit(new ConsumerThread("topic1", "myThread-1"));
    executor.submit(new ConsumerThread("topic1", "myThread-2"));
    executor.submit(new ConsumerThread("topic1", "myThread-3"));

    //executor shutdown logic is skipped
}
}

消费者线程:

public class ConsumerThread  implements Runnable {

private static final String KAFKA_BROKER = "<<IP:port>>";

private final KafkaConsumer<String, String> consumer;

    public ConsumerThread(String topic, String name) {
        Properties props = new Properties();
        props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER);
        props.put("group.id", "DemoConsumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "6000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer(props);
        this.consumer.subscribe(Collections.singletonList(topic));
    }


    public void run() {
        try {
            boolean isRunning = true;
            while (isRunning) {
                ConsumerRecords<String,String> records= consumer.poll(10L);
                System.out.println("Partition Assignment to this Consumer: "+consumer.assignment());
                Iterator it = records.iterator();
                while(it.hasNext()) {
                    ConsumerRecord record = (ConsumerRecord)it.next();
                    System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset());
                }
            }
            consumer.close();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

同样非常重要的是,我的目标只是语义.我知道那是1000英里远.任何帮助,我们都感激不尽.

Also very importantly, i am aiming for exactly once semantic. I know am 1000 mile away for that. Any help is really appreciated.

观察:Debug sysout打印所有3种提示.这是否意味着未将分区分配给每个使用者?

Observation: Debug sysout prints all 3 tpoics. Does this mean that partitions are not assigned to each consumer?

此使用者的分区分配:[topic1-1,topic1-0,topic1-2]

Partition Assignment to this Consumer: [topic1-1, topic1-0, topic1-2]

Kafka专家,除了上述问题外,我还在寻找2个其他输入.

Kafka experts, apart from above problem i am looking 2 other inputs.

  1. 请帮助我了解以上代码中的错误.
  2. 通常,原理图可以实现一次.可能的例子.
  3. 异常情况,例如消费者死亡.如何处理而不会丢失消息.

谢谢.

推荐答案

好了,我弄清楚了我的代码/理解不足的地方.

Well I figured out whats wrong with my code/undertanding.

在我开始制作原型之前,我应该已经完全阅读了Kafka文档.

I should have read Kafka documentation completely before i jump start to prototyping.

这是我发现的东西.

默认情况下,Kafka至少保证提供一次原理图.这意味着使用者至少收到一次消息(可能多次).我假设如果我有3个分区并创建3个使用者,Kafka API将负责将分区随机分配给一个使用者.这是错误的.

By default Kafka guarantees at least once schematic. This means consumer gets the message at least once (possibly multiple times. I assumed that if i have 3 partitions and create 3 consumer, Kafka API will take care of assigning a partition to only one consumer randomly. This is wrong.

因此,我手动为每个使用者分配了一个分区,以确保我的使用者拥有该分区并控制偏移量,如下所示

So i manually assigned a partition to each consumer to make sure one that my consumer owns the partition and controls the offset like below

consumer = new KafkaConsumer(props)    
TopicPartition partition = new TopicPartition(topic, partitionNum);
consumer.assign(Collections.singletonList(partition));

恰好一次场景: 为了确保一次消耗一条消息,我们需要控制偏移量.尽管到目前为止我还没有尝试过,但是基于我从大量的谷歌搜索中学到的知识,这是一种将偏移量与数据一起保存的更好方法.最好是同一笔交易.数据和偏移量肯定都已保存或回滚以重试.

Exactly once scenario: To make sure we consume a message exaclty once, we need to have control on the offset. Although i have not tried so far but based on what i learnt from lot of googling is that its better way to save the offset along with the data. Preferably same transaction. Both data and offset are definitely saved or rolled back for retry.

任何其他解决方案都值得赞赏.

Any other solutions are appreciated.

这篇关于Kafka 0.9.0新的Java Consumer API获取重复记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆