如何删除消费者已经消费的数据?卡夫卡 [英] How to delete data which already been consumed by consumer? Kafka

查看:96
本文介绍了如何删除消费者已经消费的数据?卡夫卡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在用kafka做数据复制.但是,kafka日志文件的大小增长非常快.大小一天达到5 GB.作为此问题的解决方案,ı希望立即删除已处理的数据.我在AdminClient中使用删除记录方法来删除偏移量.但是当我查看日志文件时,对应于该偏移量的数据不会被删除.

  RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);TopicPartition topicPartition =新的TopicPartition(topicName,partition);Map< TopicPartition,RecordsToDelete>deleteConf =新的HashMap<>();deleteConf.put(topicPartition,recordsToDelete);adminClient.deleteRecords(deleteConf); 

我不希望像(log.retention.hours,log.retention.bytes,log.segment.bytes,log.cleanup.policy = delete)之类的建议

因为我只想删除使用者消耗的数据.在此解决方案中,我还删除了未使用的数据.

您有什么建议?

解决方案

尝试一下

  DeleteRecordsResult结果= adminClient.deleteRecords(recordsToDelete);Map< TopicPartition,KafkaFuture< DeletedRecords>>lowWatermarks = result.lowWatermarks();尝试 {对于(Map.Entry< TopicPartition,KafkaFuture< DeletedRecords>>条目:lowWatermarks.entrySet()){System.out.println(entry.getKey().topic()+" + entry.getKey().partition()+" + entry.getValue().get().lowWatermark());}} catch(InterruptedException | ExecutionException e){e.printStackTrace();}adminClient.close(); 

在此代码中,您需要调用 entry.getValue().get().lowWatermark(),因为adminClient.deleteRecords(recordsToDelete)返回期货的地图,您需要等待通过调用get()运行未来

I am doing data replication in kafka. But, the size of kafka log file is increases very quickly. The size reaches 5 gb in a day. As a solution of this problem, ı want to delete processed data immediately. I am using delete record method in AdminClient to delete offset. But when I look at the log file, data corresponding to that offset is not deleted.

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

I don't want suggestions like (log.retention.hours , log.retention.bytes , log.segment.bytes , log.cleanup.policy=delete)

Because I just want to delete data consumed by the consumer. In this solution, I also deleted the data that is not consumed.

What are your suggestions?

解决方案

Try this

DeleteRecordsResult result = adminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
    for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
        System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();

In this code, you need to call entry.getValue().get().lowWatermark(), because adminClient.deleteRecords(recordsToDelete) returns a map of Futures, you need to wait for the Future to run by calling get()

这篇关于如何删除消费者已经消费的数据?卡夫卡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆