Spring Kafka JsonDesirialization MessageConversionException无法解析类名找不到类 [英] Spring Kafka JsonDesirialization MessageConversionException failed to resolve class name Class not found

查看:246
本文介绍了Spring Kafka JsonDesirialization MessageConversionException无法解析类名找不到类的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个应该通过Kafka进行通信的服务. 我们将第一个服务称为 WriteService ,将第二个服务称为 QueryService .

I have two services that should communicate via Kafka. Let's call the first service WriteService and the second service QueryService.

WriteService 端,我为生产者提供了以下配置.

On the WriteService side, I have the following configuration for producers.

@Configuration
public class KafkaProducerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

我正在尝试发送类com.example.project.web.routes.dto.RouteDto

QueryService 端,使用者配置定义如下.

On the QueryService side, the consumer configuration is defined as follows.

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.groupid}")
    private String serviceGroupId;

    @Value("${spring.kafka.consumer.trusted-packages}")
    private String trustedPackage;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

侦听器具有以下定义.有效负载类具有完全限定的名称-com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto

The listener has the following definition. The payload class has the fully qualified name - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto

@KafkaListener(topics = "${spring.kafka.topics.routes}",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
                               @Payload RouteDto payload) {
        logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
    }

    private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
    }

发送邮件时,出现以下错误

When a message is sent, I get the following error

起因: org.springframework.messaging.converter.MessageConversionException: 无法解析类名.找不到课程 [com.example.project.web.routes.dto.RouteDto];嵌套的例外是 java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.project.web.routes.dto.RouteDto]; nested exception is java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto

该错误很明显.但是我不明白为什么默认情况下它具有此行为.我不希望在不同的服务中使用相同的程序包,这根本没有任何意义.

The error is clear enough. However I cannot understand why does it has this behaviour by default. I don't expect to have the same package in different services, that makes no sense at all.

我还没有找到一种禁用此功能并使用提供给侦听器的类的方法,该类带有@Payload

I haven't found a way to disable this and use a class provided to the listener, annotated with @Payload

如何解决此问题,而无需手动配置映射器?

How this could be solved, without manually configuring the mapper?

推荐答案

如果您使用的是spring-kafka-2.2.x,则可以通过JsonDeserializer

If you are using spring-kafka-2.2.x you can disable the default header by overloaded constructors of JsonDeserializer docs

从2.2版开始,您可以使用具有boolean useHeadersIfPresent(默认为true)的重载构造函数之一,将反序列化器显式配置为使用提供的目标类型并忽略标头中的类型信息.以下示例显示了如何执行此操作:

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default). The following example shows how to do so:

DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

如果使用较低版本,请使用MessageConverter(您可能会在spring-kafka-2.1.x及更高版本中看到此问题)

If with lower version use the MessageConverter (you might see this problem from spring-kafka-2.1.x and above)

适用于Apache Kafka的Spring提供了MessagingMessageConverter实现及其StringJsonMessageConverter和BytesJsonMessageConverter定制的MessageConverter抽象.您可以直接将MessageConverter注入到KafkaTemplate实例中,并使用@ KafkaListener.containerFactory()属性的AbstractKafkaListenerContainerFactory bean定义.以下示例显示了如何执行此操作:

Spring for Apache Kafka provides a MessageConverter abstraction with the MessagingMessageConverter implementation and its StringJsonMessageConverter and BytesJsonMessageConverter customization. You can inject the MessageConverter into a KafkaTemplate instance directly and by using AbstractKafkaListenerContainerFactory bean definition for the @KafkaListener.containerFactory() property. The following example shows how to do so:

@Bean
 public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
  }

  @KafkaListener(topics = "jsonData",
            containerFactory = "kafkaJsonListenerContainerFactory")
    public void jsonListener(RouteDto dto) {
     ...
   }

注意:只有在方法级别声明@KafkaListener批注时,才能实现此类型推断.在类级别的@KafkaListener中,有效负载类型用于选择要调用的@KafkaHandler方法,因此在选择该方法之前,必须已经对其进行了转换.

这篇关于Spring Kafka JsonDesirialization MessageConversionException无法解析类名找不到类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆