目标解析程序返回不存在的分区 [英] Destination resolver returned non-existent partition

查看:21
本文介绍了目标解析程序返回不存在的分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spring-Kafka消费来自Confluent Kafka的消息,我正在使用RetryTopicConfiguration Bean来配置主题和回退策略。我的应用程序运行正常,但我在日志中看到许多类似下面的警告日志,我想知道我的配置是否不正确。

DeadLetterPublishingRecovererFactory$1 : Destination resolver returned non-existent partition flow-events-retry-0-4, KafkaProducer will determine partition to use for this topic

配置代码

@Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .exponentialBackoff(BACKOFF_INITIAL_DELAY_10MINS, BACKOFF_EXPONENTIAL_MULTIPLIER_3, BACKOFF_MAX_DELAY_4HRS)
                .maxAttempts(5)
                .doNotAutoCreateRetryTopics()
                .setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
                .create(template);
    }

重试主题单独创建,分区为1,复制系数为3。

推荐答案

默认使用与原始主题相同的分区;您可以通过覆盖DeadLetterPublishingRecovererFactory@Bean

来覆盖该行为
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
    DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver) {

        @Override
        protected TopicPartition resolveTopicPartition(ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
            return new TopicPartition(nextDestination.getDestinationName(), -1); // Kafka Chooses
//          return new TopicPartition(nextDestination.getDestinationName(), 0);  // explict
        }

    };
    factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
        // ...
    });
    return factory;
}

如本例所示,您还可以在此处自定义DLPR属性。

/**
 * Creates and returns the {@link TopicPartition}, where the original record should be forwarded.
 * By default, it will use the partition same as original record's partition, in the next destination topic.
 *
 * <p>{@link DeadLetterPublishingRecoverer#checkPartition} has logic to check whether that partition exists,
 * and if it doesn't it sets -1, to allow the Producer itself to assign a partition to the record.</p>
 *
 * <p>Subclasses can inherit from this method to override the implementation, if necessary.</p>
 *
 * @param cr The original {@link ConsumerRecord}, which is to be forwarded to DLT
 * @param nextDestination The next {@link DestinationTopic}, where the consumerRecord is to be forwarded
 * @return An instance of {@link TopicPartition}, specifying the topic and partition, where the cr is to be sent
 */
protected TopicPartition resolveTopicPartition(final ConsumerRecord<?, ?> cr, final DestinationTopic nextDestination) {
    return new TopicPartition(nextDestination.getDestinationName(), cr.partition());
}

这篇关于目标解析程序返回不存在的分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆