有没有办法在每次运行之前删除主题中的所有数据或删除主题? [英] Is there a way to delete all the data from a topic or delete the topic before every run?

查看:20
本文介绍了有没有办法在每次运行之前删除主题中的所有数据或删除主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法从一个主题中删除所有数据或在每次运行之前删除该主题?

Is there a way to delete all the data from a topic or delete the topic before every run?

我可以修改 KafkaConfig.scala 文件以更改 logRetentionHours 属性吗?有没有办法在消费者阅读后立即删除消息?

Can I modify the KafkaConfig.scala file to change the logRetentionHours property? Is there a way the messages gets deleted as soon as the consumer reads it?

我正在使用生产者从某处获取数据并将数据发送到消费者消费的特定主题,我可以在每次运行时删除该主题的所有数据吗?我每次都只想要主题中的新数据.有没有办法以某种方式重新初始化主题?

I am using producers to fetch the data from somewhere and sending the data to a particular topic where a consumer consumes, can I delete all the data from that topic on every run? I want only new data every time in the topic. Is there a way to reinitialize the topic somehow?

推荐答案

认为目前还不支持. 看看这个 JIRA 问题添加删除主题支持".

Don't think it is supported yet. Take a look at this JIRA issue "Add delete topic support".

手动删除:

  1. 关闭集群
  2. 清理 kafka 日志目录(由 kafka log.dir 属性指定)>config 文件)以及 zookeeper 数据
  3. 重启集群
  1. Shutdown the cluster
  2. Clean kafka log dir (specified by the log.dir attribute in kafka config file ) as well the zookeeper data
  3. Restart the cluster

对于任何给定的主题,您可以做的是

For any given topic what you can do is

  1. 停止卡夫卡
  2. 清理特定于分区的kafka日志,kafka以logDir/topic-partition"格式存储其日志文件;所以对于名为MyTopic"的主题分区 id 0 的日志将存储在 /tmp/kafka-logs/MyTopic-0 中,其中 /tmp/kafka-logs 日志指定.dir 属性
  3. 重启 kafka
  1. Stop kafka
  2. Clean kafka log specific to partition, kafka stores its log file in a format of "logDir/topic-partition" so for a topic named "MyTopic" the log for partition id 0 will be stored in /tmp/kafka-logs/MyTopic-0 where /tmp/kafka-logs is specified by the log.dir attribute
  3. Restart kafka

不是一种很好的推荐方法,但它应该有效.在 Kafka 代理配置文件中,log.retention.hours.per.topic 属性用于定义 在删除某个特定主题的日志文件之前保留日志文件的小时数

This is NOT a good and recommended approach but it should work. In the Kafka broker config file the log.retention.hours.per.topic attribute is used to define The number of hours to keep a log file before deleting it for some specific topic

另外,有没有办法在消费者阅读后立即删除消息?

Also, is there a way the messages gets deleted as soon as the consumer reads it?

来自 Kafka 文档:

Kafka 集群会在可配置的时间段内保留所有已发布的消息(无论它们是否已被使用).例如,如果日志保留时间设置为两天,那么在消息发布后的两天内,该消息可供使用,之后将被丢弃以释放空间.Kafka 的性能在数据大小方面实际上是恒定的,因此保留大量数据不是问题.

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.

事实上,基于每个消费者保留的唯一元数据是消费者在日志中的位置,称为偏移量".这个偏移量是由消费者控制的:通常消费者会在读取消息时线性地增加它的偏移量,但实际上位置是由消费者控制的,它可以按照自己喜欢的任何顺序消费消息.例如,消费者可以重置为旧的偏移量以重新处理.

In fact the only metadata retained on a per-consumer basis is the position of the consumer in in the log, called the "offset". This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.

用于查找要在 Kafka 0.8 中读取的起始偏移量 Simple Consumer例子他们说

For finding the start offset to read in Kafka 0.8 Simple Consumer example they say

Kafka 包含两个常量来提供帮助,kafka.api.OffsetRequest.EarliestTime() 在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatexTime() 只会流式传输新消息.

Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages.

您还可以在那里找到用于在消费者端管理偏移量的示例代码.

You can also find the example code there for managing the offset at your consumer end.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}

这篇关于有没有办法在每次运行之前删除主题中的所有数据或删除主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆