如何在使用Spring创建过程中配置kafka主题保留策略? [英] How to configure kafka topic retention policy during creation with Spring?
问题描述
我需要在创建过程中配置特定主题的保留策略.我试图寻找解决方案,但只能找到如下所示的命令级别alter命令
I need to configure retention policy of a particular topic during creation. I tried to look for solution i could only find command level alter command as below
./bin/kafka-topics.sh --zookeeper本地主机:2181-更改--topic my-topic --configtention.ms = 1680000
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000
有人可以让我知道一种在创建过程中进行配置的方法,例如spring-mvc中的xml或属性文件配置.
Can someone let me know a way to configure it during creation, something like xml or properties file configuration in spring-mvc.
推荐答案
Spring Kafka允许您通过在应用程序上下文中声明 @Bean
来创建新主题.这将需要在应用程序上下文中类型为 KafkaAdmin
的bean,如果使用Spring Boot,则会自动创建该bean.您可以按如下方式定义主题:
Spring Kafka lets you create new topics by declaring @Bean
s in your application context. This will require a bean of type KafkaAdmin
in the application context, which will be created automatically if using Spring Boot. You could define your topic as follows:
@Bean
public NewTopic myTopic() {
return TopicBuilder.name("my-topic")
.partitions(4)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
.build();
}
如果您不使用Spring Boot,则还必须定义 KafkaAdmin
bean:
If you are not using Spring Boot, you'll additionally have to define the KafkaAdmin
bean:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
return new KafkaAdmin(configs);
}
如果要编辑现有主题的配置,则必须使用 AdminClient
,这是在主题上更改 retention.ms
的代码段级别:
If you want to edit the configuration of an existing topic, you'll have to use the AdminClient
, here's the snippet to change the retention.ms
at a topic level:
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient client = AdminClient.create(config);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
configs.put(resource, Arrays.asList(op));
AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
alterConfigsResult.all();
可以使用此 @PostConstruct
方法自动设置配置,该方法采用 NewTopic
bean.
The configuration can be set up automatically using this @PostConstruct
method that takes in NewTopic
beans.
@Autowired
private Set<NewTopic> topics;
@PostConstruct
public void reconfigureTopics() throws ExecutionException, InterruptedException {
try (final AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
adminClient.incrementalAlterConfigs(topics.stream()
.filter(topic -> topic.configs() != null)
.collect(Collectors.toMap(
topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),
topic -> topic.configs().entrySet()
.stream()
.map(e -> new ConfigEntry(e.getKey(), e.getValue()))
.peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value()))
.map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
.collect(Collectors.toList())
)))
.all()
.get();
}
}
这篇关于如何在使用Spring创建过程中配置kafka主题保留策略?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!