如何在使用 Spring 创建期间配置 kafka 主题保留策略? [英] How to configure kafka topic retention policy during creation with Spring?

查看:71
本文介绍了如何在使用 Spring 创建期间配置 kafka 主题保留策略?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在创建期间配置特定主题的保留策略.我试图寻找解决方案,我只能找到如下命令级别的更改命令

<块引用>

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config reserved.ms=1680000

谁能告诉我一种在创建过程中配置它的方法,例如 xml 或 spring-mvc 中的属性文件配置.

解决方案

Spring Kafka 允许您通过在应用程序上下文中声明 @Bean 来创建新主题.这将需要应用程序上下文中类型为 KafkaAdmin 的 bean,如果使用 Spring Boot,它将自动创建.您可以按如下方式定义主题:

@Bean公共新话题 myTopic() {return TopicBuilder.name("my-topic").分区(4).replicas(3).config(TopicConfig.RETENTION_MS_CONFIG, "1680000").建造();}

如果您不使用 Spring Boot,则还必须定义 KafkaAdmin bean:

@Bean公共 KafkaAdmin 管理员(){映射<字符串,对象>configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");返回新的 KafkaAdmin(configs);}

如果要编辑现有主题的配置,则必须使用 AdminClient,这是更改主题中 retention.ms 的代码段级别:

Mapconfig = new HashMap<>();config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");AdminClient 客户端 = AdminClient.create(config);ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");//更新retention.ms 值ConfigEntry留存条目=新的ConfigEntry(TopicConfig.RETENTION_MS_CONFIG,1680000");映射updateConfig = new HashMap<>();updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);映射<ConfigResource, Collection<AlterConfigOp>>configs = new HashMap<>(1);configs.put(resource, Arrays.asList(op));AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);alterConfigsResult.all();

可以使用这个 @PostConstruct 方法自动设置配置,该方法接受 NewTopic bean.

<预><代码>@自动连线私人集话题;@PostConstructpublic void reconfigureTopics() 抛出 ExecutionException,InterruptedException {尝试 (最终 AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {adminClient.incrementalAlterConfigs(topics.stream().filter(topic -> topic.configs() != null).collect(Collectors.toMap(主题 ->new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),主题 ->topic.configs().entrySet().溪流().map(e -> new ConfigEntry(e.getKey(), e.getValue())).peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value())).map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET)).collect(Collectors.toList())))).全部().得到();}}

I need to configure retention policy of a particular topic during creation. I tried to look for solution i could only find command level alter command as below

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000

Can someone let me know a way to configure it during creation, something like xml or properties file configuration in spring-mvc.

解决方案

Spring Kafka lets you create new topics by declaring @Beans in your application context. This will require a bean of type KafkaAdmin in the application context, which will be created automatically if using Spring Boot. You could define your topic as follows:

@Bean
public NewTopic myTopic() {
    return TopicBuilder.name("my-topic")
            .partitions(4)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
            .build();
}

If you are not using Spring Boot, you'll additionally have to define the KafkaAdmin bean:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    return new KafkaAdmin(configs);
}

If you want to edit the configuration of an existing topic, you'll have to use the AdminClient, here's the snippet to change the retention.ms at a topic level:

Map<String, Object> config = new HashMap<>();                
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
                         
AdminClient client = AdminClient.create(config);
                         
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
            
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
configs.put(resource, Arrays.asList(op));

AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
        alterConfigsResult.all();

The configuration can be set up automatically using this @PostConstruct method that takes in NewTopic beans.


    @Autowired
    private Set<NewTopic> topics;

    @PostConstruct
    public void reconfigureTopics() throws ExecutionException, InterruptedException {

        try (final AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
            adminClient.incrementalAlterConfigs(topics.stream()
                .filter(topic -> topic.configs() != null)
                .collect(Collectors.toMap(
                    topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),
                    topic -> topic.configs().entrySet()
                        .stream()
                        .map(e -> new ConfigEntry(e.getKey(), e.getValue()))
                        .peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value()))
                        .map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
                        .collect(Collectors.toList())
                )))
                .all()
                .get();
        }

    }

这篇关于如何在使用 Spring 创建期间配置 kafka 主题保留策略?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆