使用Java更新kafka中特定主题的TTL [英] Update TTL for a particular topic in kafka using Java

查看:614
本文介绍了使用Java更新kafka中特定主题的TTL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

更新主题的TTL,使记录在主题中保留10天.我只需要通过保留所有其他主题TTL相同的当前配置来对特定主题进行此操作,就必须使用java进行此操作,因为我是通过Java将主题推到kafka的.我正在设置以下属性以将主题推送到kafka

Update TTL for a topic so records stay in the topic for 10 days. I have to do this for a particular topic only by Leaving all other topics TTL's the same, current configuration, I have to do this using java because I am pushing a topic to kafka through Java. I am setting following properties for pushing a topic to kafka

Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_SERVERS);
props.put("acks", ACKS);
props.put("retries", RETRIES);
props.put("linger.ms", new Integer(LINGER_MS));
props.put("buffer.memory", new Integer(BUFFER_MEMORY));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

推荐答案

您可以使用AdminClient,在获取当前配置(仅用于测试)的一小段代码之后,然后更新"retention.ms在名为"test"的主题上进行配置.

You can do that using the AdminClient, following a snippet of code that get the current configuration (just for testing) and then update the "retention.ms" config on the topic named "test".

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

AdminClient adminClient = AdminClient.create(props);

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "test");

// get the current topic configuration
DescribeConfigsResult describeConfigsResult  =
        adminClient.describeConfigs(Collections.singleton(resource));

Map<ConfigResource, Config> config = describeConfigsResult.all().get();

System.out.println(config);

// create a new entry for updating the retention.ms value on the same topic
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "50000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(updateConfig);
alterConfigsResult.all();

describeConfigsResult  = adminClient.describeConfigs(Collections.singleton(resource));

config = describeConfigsResult.all().get();

System.out.println(config);

adminClient.close();

这篇关于使用Java更新kafka中特定主题的TTL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆