如何删除消费者已经消费的数据?卡夫卡 [英] How to delete data which already been consumed by consumer? Kafka

查看:86
本文介绍了如何删除消费者已经消费的数据?卡夫卡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 kafka 中做数据复制.但是,kafka 日志文件的大小增加得非常快.大小在一天内达到 5 GB.作为这个问题的解决方案,我想立即删除处理过的数据.我在 AdminClient 中使用删除记录方法来删除偏移量.但是当我查看日志文件时,并没有删除与该偏移量对应的数据.

I am doing data replication in kafka. But, the size of kafka log file is increases very quickly. The size reaches 5 gb in a day. As a solution of this problem, ı want to delete processed data immediately. I am using delete record method in AdminClient to delete offset. But when I look at the log file, data corresponding to that offset is not deleted.

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

我不想要像 (log.retention.hours , log.retention.bytes , log.segment.bytes , log.cleanup.policy=delete) 之类的建议

因为我只想删除消费者消费的数据.在这个解决方案中,我也删除了没有被消费的数据.

Because I just want to delete data consumed by the consumer. In this solution, I also deleted the data that is not consumed.

你有什么建议?

推荐答案

您没有做错任何事情.您提供的代码有效,我已经对其进行了测试.以防万一我忽略了您的代码中的某些内容,我的是:

You didn't do anything wrong. The code you provided works and I've tested it. Just in case I've overlooked something in your code, mine is:

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
    TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
    Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
    deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
    kafkaAdminClient.deleteRecords(deleteMap);
}

我使用过组:'org.apache.kafka',名称:'kafka-clients',版本:'2.0.0'

I've used group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0'

因此请检查您是否定位到正确的分区(第一个分区为 0)

So check if you are targeting right partition ( 0 for the first one)

检查您的经纪人版本:https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html 说:

Check your broker version: https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html says:

此操作由 0.11.0.0 版本的 broker 支持

This operation is supported by brokers with version 0.11.0.0

从同一应用程序生成消息,以确保您已正确连接.

Produce the messages from the same application, to be sure you're connected properly.

您还可以考虑另一种选择.使用 cleanup.policy=compact 如果您的消息键重复,您可以从中受益.不仅因为该键的旧消息将被自动删除,而且您可以使用具有空负载的消息删除该键的所有消息这一事实.只是不要忘记将 delete.retention.msmin.compaction.lag.ms 设置为足够小的值.在这种情况下,您可以使用一条消息,然后为同一个键生成空负载(但要谨慎使用这种方法,因为这样您可以删除未使用的消息(使用该键)

There is one more option you can consider. Using cleanup.policy=compact If your message keys are repeating you could benefit from it. Not just because older messages for that key will be automatically deleted but you can use the fact that message with null payload deletes all the messages for that key. Just don't forget to set delete.retention.ms and min.compaction.lag.ms to values small enough. In that case you can consume a message and than produce null payload for the same key ( but be cautious with this approach since this way you can delete messages ( with that key) you didn't consume)

这篇关于如何删除消费者已经消费的数据?卡夫卡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆