Apache kafka使用者停止和启动之间丢失的消息 [英] messages lost between Apache kafka consumer stop and start

查看:75
本文介绍了Apache kafka使用者停止和启动之间丢失的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是kafka的新手,他使用Apache kafka使用者读取生产者的消息.但是当我停下来并开始一定时间时.之间的所有产生的消息都将丢失.如何处理这种情况.我正在使用以下属性"auto.offset.reset",最新"和"enable.auto.commit","false".

I am new to kafka and using Apache kafka consumer to read messages from producer. But when I stop and start for certain time. All the produced messages between are lost. how to handle this scenario. I am using these properties "auto.offset.reset", "latest" and "enable.auto.commit", "false" .

这是我正在使用的代码.感谢您的帮助.

This is the code I am using.Any help is appreciated.

Properties props = new Properties();
        props.put("bootstrap.servers", localhost:9092);
        props.put("group.id", "service");
        props.put("enable.auto.commit", "false"); 
        props.put("auto.offset.reset", "latest"); 
        props.put("key.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);
        props.put("value.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);

        @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topicname));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {        
                JSONObject jsonObj = new JSONObject(record.value());
                JdbcUtilToUdm.insertdataintodb(args, jsonObj);   
            }
        }   

推荐答案

由于您禁用了自动提交,因此您必须显式调用consumer.commitSync()或consumer.commitAsync().您可以同步进行提交,也可以不同步,具体取决于您需要/首选的方法.这就是消费组在日志中的位置将被保留的方式.您应该在处理记录之后(因此可能在完成所有插入之后但在再次轮询之前)调用commit.

You have to explicitly call consumer.commitSync() or consumer.commitAsync() since you disabled auto commit. You can do the commit synchronously or not depending on which method you need/prefer. This is how the consumer group position in the log will be persisted. You should call commit after records are processed (so probably after you have finished all inserts but before you poll again in your case).

这篇关于Apache kafka使用者停止和启动之间丢失的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆