Spring Boot Kafka Consumer 不消费,Kafka Listener 不触发 [英] Spring Boot Kafka Consumer not consuming, Kafka Listener not triggering

查看:90
本文介绍了Spring Boot Kafka Consumer 不消费,Kafka Listener 不触发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试构建一个简单的 Spring Boot Kafka Consumer 来使用来自 kafka 主题的消息,但是没有消息被使用,因为 KafkaListener 方法没有被触发.

I am trying to build a simple spring boot Kafka Consumer to consume messages from a kafka topic, however no messages get consumed as the KafkaListener method is not getting triggered.

我在其他答案中看到,以确保 AUTO_OFFSET_RESET_CONFIG 设置为最早",并且 GROUP_ID_CONFIG 是唯一的,我这样做了,但是仍然没有触发 KafkaListenerMethod.该应用程序只是启动并且不执行任何操作:

I saw in other answers to make sure that AUTO_OFFSET_RESET_CONFIG is set to "earliest" and that the GROUP_ID_CONFIG is unique which I did, however still the KafkaListenerMethod is not triggering. The application simply starts and doesn't do anything:

Application Started

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.4.RELEASE)

这是另一个gradle项目的子项目,这个子项目的build.gradle如下(代码中正确提供了MAIN_CLASS_PATH):

This is a sub project of another gradle project and build.gradle for this subproject is as below (MAIN_CLASS_PATH has been correctly provided in the code):

apply plugin: 'application'

mainClassName = <MAIN_CLASS_PATH>

dependencies {
    compile "org.springframework.kafka:spring-kafka:${SpringKafkaVersion}"
    compile "org.springframework.boot:spring-boot-starter:${SpringBootVersion}"
    compile group: 'org.springframework', name: 'spring-tx', version: '5.2.2.RELEASE'
}

Java 类:

Start.java:

Start.java:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Start {
    public static void main(String[] args) {
        try {
            System.out.println("Application Started");
            SpringApplication.run(Start.class, args);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

KafkaConsumerConfig.java:

KafkaConsumerConfig.java:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.UUID;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                <KAFKA_SERVER_ADDRESS>);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                UUID.randomUUID().toString());
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

KafkaConsumer.java

KafkaConsumer.java

@Service
public class KafkaConsumer {

    @KafkaListener(topics = <TOPIC_NAME>)
    public void consume(@Payload ConsumerRecord<String, String> message) {
        System.out.println("Consumed message: " + message);
    }
}

我的代码中正确提供了 KAFKA_SERVER_ADDRESS 和 TOPIC_NAME.此外,我还检查了该主题实际上已经包含消息.

The KAFKA_SERVER_ADDRESS and TOPIC_NAME have been correctly provided within my code. Also I have checked that the topic actually contains messages in it already.

关于为什么这不消耗来自 kafka 主题的任何消息的任何想法?

Any ideas as to why this doesn't consume any messages from a kafka topic?

推荐答案

问题是 kafka 连接问题.从 kafka 服务器消费的机器不允许从 kafka 服务器消费.将 kafka 服务器节点中的消费机器列入白名单解决了该问题.

The issue was a kafka connectivity issue. The machine consuming from the kafka server was not permitted to consume from the kafka server. WhiteListing the consuming machine in the kafka server nodes solved the issue.

如果kafka服务器url可以解析但由于权限不足而无法连接,则kafka消费者不会记录.这不是 spring kafka 特有的,但即使是普通的 kafka 客户端消费者也没有记录这个.

If the kafka server url can be resolved but can not be connected to due to insufficient permissions, the kafka consumer doesn't log that. This is not specific to spring kafka, but even the normal kafka clients consumer wasn't logging this.

这篇关于Spring Boot Kafka Consumer 不消费,Kafka Listener 不触发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆