无法在Spring Boot中使用Kafka消息 [英] Unable to consume Kafka messages within Spring Boot

查看:542
本文介绍了无法在Spring Boot中使用Kafka消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个Java应用程序,它使用org.apache.kafka.clients.consumer.KafkaConsumer

We have a Java application which consumes Kafka messages, using org.apache.kafka.clients.consumer.KafkaConsumer

我们创建了一个具有Spring-Kafka依赖关系的Spring Boot应用程序,但无法读取新项目中的消息.检查了明显的参数,包括引导服务器的主机名和端口(日志显示可识别的端口),组,主题,以及像原始使用者一样使用Spring Boot的StringDeserializer.这是我们的配置文件:

We have created a Spring Boot application with a Spring-Kafka dependency, but are unable to read the messages within the new project. Have checked the obvious parameters, including hostname and port of the bootstrap servers (which the logs show are recognized), the group, the topic and that Spring Boot, like the original consumer, uses StringDeserializer . Here is our configuration file:

spring:
  kafka:
    bootstrap-servers: hostname1:9092,hostname2:9092
    consumer:
      auto-offset-reset: earliest
      group-id: our_group
      enable-auto-commit: false
      fetch-max-wait: 500
      max-poll-records: 1

kafka:
  topic:
    boot: topic.name 

和接收方:

@Component
public class Receiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {
        return latch;
    }

    @KafkaListener(topics = "${kafka.topic.boot}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        latch.countDown();
    }

}

以下是启动Boot应用程序的代码:

Here is the code to start the Boot application:

@SpringBootApplication
public class EmsDemoUsingSpringBootApplication {

    public static void main(String[] args) {
        SpringApplication.run(EmsDemoUsingSpringBootApplication.class, args);
    }
}

正在捕获此异常:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

有什么明显的我忽略了吗? 调试此错误的最佳方法是什么?

Is there anything obvious I have overlooked? What is the best way to debug this?

谢谢

推荐答案

我也遇到了这个问题,结果是我无法连接到服务器.您可以在application.propertiesapplication.yml中更改日志级别以查看更多详细信息.恶魔在日志中.

I had this problem too and what happens was that I could not connect to the server. You can change log level in the application.properties or application.yml to see more details. The demon is in the log..

logging:
  level:
    root: WARN
    org.springframework: INFO
    org.apache.kafka: DEBUG

有人告诉我Kafka无法处理名称查找,并且根据我的经验,要连接的主机几乎总是FQDN名称(包括域名和全部名称).就我而言,我想我没有在虚拟框中设置域,也无法找到我的来宾框,即使我们在同一个子网中并且ping也可以正常工作.

I am told that Kafka is not able to handle name lookup and from my experience, the host to connect should almost always be FQDN names(with domain name and all). In my case, I think I haven't set the domain in my virtual box and it is not possible to find my guest box, even we are in the same subnet and ping works.

此外,我为Kafka零件创建了另一个主类,结果是错误的.这不是一个好习惯,您应该使用@EnableKafka注释应用程序主类,然后将设置放入yml文件中,然后将其加载.不需要其他配置类.

Also, I create another main class for Kafka part and it turns out wrong. It is not good practice and you should annotate the application main class with @EnableKafka and just put the settings in the yml file, and they should be loaded. No need for another configuration class.

我的消费者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics={"testtopic"})
    public void listen(@Payload String message) {
        log.info("Received message is {}", message);
    }
}

我的应用程序:

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.kafka.annotation.EnableKafka;

@Slf4j
@SpringBootApplication(exclude = { SecurityAutoConfiguration.class })
@EnableKafka    // <----- I only had to add this line
public class SomeApplication {

    public static void main(String[] args) {
        SpringApplication.run(SomeApplication.class, args);
        log.info("Application launched. ");
    }
}

我的配置yml:

logging:
  level:
    root: WARN
    org.springframework: INFO
    org.apache.kafka: DEBUG

spring:
  kafka:
    bootstrap-servers: <FQDN names here:9092>
    consumer:
      group-id: <unique-group-id>
      enable-auto-commit: false # never ack messsage when it is received.
    listener:
      ack-mode: manual # I am responsible to ack the messages

并启动该应用程序.就是这样.

And launch the application. That's all.

这篇关于无法在Spring Boot中使用Kafka消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆