Spring Kafka该类不在受信任的软件包中 [英] Spring Kafka The class is not in the trusted packages

查看:46
本文介绍了Spring Kafka该类不在受信任的软件包中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在库更新之前的Spring Boot/Kafka应用程序中,我使用了以下类 org.telegram.telegrambots.api.objects.Update 以便将消息发布到Kafka主题.现在,我使用以下 org.telegram.telegrambots.meta.api.objects.Update .如您所见-它们具有不同的程序包.

In my Spring Boot/Kafka application before the library update, I used the following class org.telegram.telegrambots.api.objects.Update in order to post messages to the Kafka topic. Right now I use the following org.telegram.telegrambots.meta.api.objects.Update. As you may see - they have different packages.

应用程序重新启动后,我遇到了以下问题:

After application restart I ran into the following issue:

[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

这是我的配置:

@EnableAsync
@Configuration
public class ApplicationConfig {

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

}

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Update> updateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Update> updateKafkaTemplate() {
        return new KafkaTemplate<>(updateProducerFactory());
    }

}

@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.max.poll.interval.ms}")
    private String kafkaConsumerMaxPollIntervalMs;

    @Value("${kafka.consumer.max.poll.records}")
    private String kafkaConsumerMaxPollRecords;

    @Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
    private Integer updateConsumerConcurrency;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory(kafkaProperties));

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
        factory.setConcurrency(updateConsumerConcurrency);

        return factory;
    }

}

application.properties

application.properties

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

如何解决此问题并使Kafka将旧消息反序列化为新消息?

How to solve this issue and let Kafka deserialize old messages into the new ones ?

已更新

这是我的听众

@Component
public class UpdateConsumer {

    @KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
    public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {

        //do some logic here

        ack.acknowledge();
    }

}

推荐答案

请参见文档.

从2.1版开始,类型信息可以在记录标题中传送,从而可以处理多种类型.此外,可以使用Kafka属性配置序列化器/解串器.

Starting with version 2.1, type information can be conveyed in record Headers, allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka properties.

JsonSerializer.ADD_TYPE_INFO_HEADERS(默认为true);设置为false可在JsonSerializer上禁用此功能(设置addTypeInfo属性).

JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).

JsonDeserializer.KEY_DEFAULT_TYPE;如果没有标题信息,则为密钥的反序列化的后备类型.

JsonDeserializer.KEY_DEFAULT_TYPE; fallback type for deserialization of keys if no header information is present.

JsonDeserializer.VALUE_DEFAULT_TYPE;如果没有标题信息,则为值的反序列化的后备类型.

JsonDeserializer.VALUE_DEFAULT_TYPE; fallback type for deserialization of values if no header information is present.

JsonDeserializer.TRUSTED_PACKAGES(默认java.util,java.lang);逗号分隔的允许反序列化的封装模式列表;*表示全部反序列化.

JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang); comma-delimited list of package patterns allowed for deserialization; * means deserialize all.

默认情况下,序列化程序会将类型信息添加到标题中.

By default, the serializer will add type information to the headers.

请参见

类似地,您可以禁用在标头中发送类型信息的JsonSerializer默认行为:

Similarly, you can disable the JsonSerializer default behavior of sending type information in headers:

spring.kafka.producer.value-serializer = org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers = false

或者您可以将类型映射添加到入站邮件转换器中,以将源类型映射到目标类型.

Or you can add type mapping to the inbound message converter to map the source type to the destination type.

编辑

话虽如此,您使用的是哪个版本?

Having said that, what version are you using?

这篇关于Spring Kafka该类不在受信任的软件包中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆