无法在 Spring Boot 中使用 Kafka 消息 [英] Unable to consume Kafka messages within Spring Boot

查看:80
本文介绍了无法在 Spring Boot 中使用 Kafka 消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个使用 org.apache.kafka.clients.consumer.KafkaConsumer

我们创建了一个具有 Spring-Kafka 依赖项的 Spring Boot 应用程序,但无法读取新项目中的消息.检查了明显的参数,包括引导服务器的主机名和端口(日志显示可以识别)、组、主题以及 Spring Boot 与原始使用者一样使用 StringDeserializer .这是我们的配置文件:

We have created a Spring Boot application with a Spring-Kafka dependency, but are unable to read the messages within the new project. Have checked the obvious parameters, including hostname and port of the bootstrap servers (which the logs show are recognized), the group, the topic and that Spring Boot, like the original consumer, uses StringDeserializer . Here is our configuration file:

spring:
  kafka:
    bootstrap-servers: hostname1:9092,hostname2:9092
    consumer:
      auto-offset-reset: earliest
      group-id: our_group
      enable-auto-commit: false
      fetch-max-wait: 500
      max-poll-records: 1

kafka:
  topic:
    boot: topic.name 

和接收者:

@Component
public class Receiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {
        return latch;
    }

    @KafkaListener(topics = "${kafka.topic.boot}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        latch.countDown();
    }

}

这是启动启动应用程序的代码:

Here is the code to start the Boot application:

@SpringBootApplication
public class EmsDemoUsingSpringBootApplication {

    public static void main(String[] args) {
        SpringApplication.run(EmsDemoUsingSpringBootApplication.class, args);
    }
}

正在捕获此异常:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

有什么明显我忽略的地方吗?调试此问题的最佳方法是什么?

Is there anything obvious I have overlooked? What is the best way to debug this?

谢谢

推荐答案

我也遇到了这个问题,结果是我无法连接到服务器.您可以在 application.propertiesapplication.yml 中更改日志级别以查看更多详细信息.恶魔在日志中..

I had this problem too and what happens was that I could not connect to the server. You can change log level in the application.properties or application.yml to see more details. The demon is in the log..

logging:
  level:
    root: WARN
    org.springframework: INFO
    org.apache.kafka: DEBUG

我被告知 Kafka 无法处理名称查找,根据我的经验,要连接的主机几乎总是 FQDN 名称(包括域名和所有名称).就我而言,我想我没有在我的虚拟机中设置域,即使我们在同一个子网中并且 ping 有效,也无法找到我的访客机.

I am told that Kafka is not able to handle name lookup and from my experience, the host to connect should almost always be FQDN names(with domain name and all). In my case, I think I haven't set the domain in my virtual box and it is not possible to find my guest box, even we are in the same subnet and ping works.

此外,我为 Kafka 部分创建了另一个主类,但结果是错误的.这不是一个好习惯,你应该用 @EnableKafka 注释应用程序主类,并将设置放在 yml 文件中,它们应该被加载.不需要另一个配置类.

Also, I create another main class for Kafka part and it turns out wrong. It is not good practice and you should annotate the application main class with @EnableKafka and just put the settings in the yml file, and they should be loaded. No need for another configuration class.

我的消费者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics={"testtopic"})
    public void listen(@Payload String message) {
        log.info("Received message is {}", message);
    }
}

我的申请:

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.kafka.annotation.EnableKafka;

@Slf4j
@SpringBootApplication(exclude = { SecurityAutoConfiguration.class })
@EnableKafka    // <----- I only had to add this line
public class SomeApplication {

    public static void main(String[] args) {
        SpringApplication.run(SomeApplication.class, args);
        log.info("Application launched. ");
    }
}

我的配置 yml:

logging:
  level:
    root: WARN
    org.springframework: INFO
    org.apache.kafka: DEBUG

spring:
  kafka:
    bootstrap-servers: <FQDN names here:9092>
    consumer:
      group-id: <unique-group-id>
      enable-auto-commit: false # never ack messsage when it is received.
    listener:
      ack-mode: manual # I am responsible to ack the messages

并启动应用程序.仅此而已.

And launch the application. That's all.

这篇关于无法在 Spring Boot 中使用 Kafka 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆