如何配置Spring Boot Kafka客户端以使其不尝试连接 [英] How to configure Spring Boot Kafka client so it does not try to connect

查看:135
本文介绍了如何配置Spring Boot Kafka客户端以使其不尝试连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这与Is there a "Circuit Breaker" for Spring Boot Kafka client?有关,但我仍然认为这是一个不同的问题:)

我们需要配置Spring Boot Kafka客户端,以便它根本不会尝试连接。

用例是,在测试环境中,我们没有运行Kafka,但我们仍然需要构建完整的Spring Boot上下文,因此使该Bean以配置文件为条件是不起作用的。我们不在乎BEY是否未连接,但我们需要它存在。

问题是不成功的连接尝试需要大约30-40秒,我们的测试会显著减慢。

哪种configuration parameters或哪种组合完全禁止连接尝试,或至少强制客户端仅尝试一次?

多次重试连接的代码为:

@Bean
public KafkaAdmin.NewTopics topics() {
    return new KafkaAdmin.NewTopics(
            TopicBuilder.name("MyTopic").build()
    );
}

它反复生成此警告:

WARN ... org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.

以下代码仅尝试连接一次:

@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> myConsumer(KafkaProperties properties) {
    return createConsumer(properties, "MyTopic", "MyConsumerGroup");
}

public <E> ReactiveKafkaConsumerTemplate<String, E> createConsumer(KafkaProperties properties, String topic, String consumerGroup) {
    final Map<String, Object> map = configureKafkaProperties(properties, consumerGroup);

    return new ReactiveKafkaConsumerTemplate<>(
        ReceiverOptions.<String, E>create(map)
            .subscription(List.of(topic)));
}

制作

WARN 7268 ... org.apache.kafka.clients.NetworkClient   : Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.

我也尝试过设置属性
spring.kafka.admin.fail-fast=true
但这似乎完全没有效果。

推荐答案

Spring Boot自动配置默认情况下将连接到代理以创建任何NewTopicBean。您可以将其autoCreate属性设置为False。

    /**
     * Set to false to suppress auto creation of topics during context initialization.
     * @param autoCreate boolean flag to indicate creating topics or not during context initialization
     * @see #initialize()
     */
    public void setAutoCreate(boolean autoCreate) {

编辑

若要获取对KafkaAdmin的引用,只需将其作为参数添加到任何Bean定义。

例如

@Bean
public KafkaAdmin.NewTopics topics(KafkaAdmin admin) {
    admin.setAutoCreate(false);
    return new KafkaAdmin.NewTopics(
            TopicBuilder.name("MyTopic").build()
    );
}

另请参阅KafkaAdmin.initialize()

    /**
     * Call this method to check/add topics; this might be needed if the broker was not
     * available when the application context was initialized, and
     * {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
     * or {@link #setAutoCreate(boolean) autoCreate} was set to false.
     * @return true if successful.
     * @see #setFatalIfBrokerNotAvailable(boolean)
     * @see #setAutoCreate(boolean)
     */
    public final boolean initialize() {

使用@KafkaListener设置autoStartup = "false"以防止使用者在上下文初始化时启动。

使用Reactive,只是不要订阅receive*()方法返回的Flux(这是触发消费者创建的内容)。

这篇关于如何配置Spring Boot Kafka客户端以使其不尝试连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆