Spring Kafka@DltHandler注释方法在非阻塞重试实现中未正确接收标头 [英] Spring Kafka @DltHandler annotated method is not receiving headers correctly in Non-Blocking retries implementation

查看:24
本文介绍了Spring Kafka@DltHandler注释方法在非阻塞重试实现中未正确接收标头的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试用Spring Kafka实现一个非阻塞的Retries。根据文档here,在完成在@KafkaListnener中设置的所有尝试之后,我们可以设置一个处理程序方法来处理来自DLT主题的消息。我打算捕获DLT处理程序方法上的一些标头,如以下代码所示:

@DltHandler
    fun processaDlt(
        @Payload mensagem: String,
        @Header("event") eventName: String,
        @Header(KafkaHeaders.ORIGINAL_OFFSET) offset: String,
        @Header(KafkaHeaders.EXCEPTION_FQCN) descException: String,
        @Header(KafkaHeaders.EXCEPTION_STACKTRACE) stacktrace: String,
        @Header(KafkaHeaders.EXCEPTION_MESSAGE) errorMessage: String
    ) {

但是,有些标头没有正确发送,或者根本没有发送。我已经尝试了一些标头的值,比如KafkaHeaders.DLT_ORIGING_OFFSET、KafkaHeaders.OFFSET等。我在Spring Kafka代码中看到,一些标头以字符串";kafka_";为前缀,并且我在重试失败后转发到xpto-DLT主题的消息上看到了这些值,但一些标头的值被截断,如:

卡夫卡_原始-偏移量:�,卡夫卡_原始-分区:,卡夫卡_原始-时间戳:{�xr,卡夫卡_原始-时间戳-类型:创建时间,卡夫卡_原始-主题:xpto-主题,重试_主题-尝试:,重试_主题-回退-时间戳:{�@,重试_主题-原始-时间戳:{�*xr

我的Listen方法使用原始消息,代码如下:

@KafkaListener(topics = xpto-topic, groupId = my-group-to-xpto)
fun listen(@Payload mensagem: String,
           @Header("event") event: String,
           @Header(KafkaHeaders.OFFSET) offset: Long,
           @Header(KafkaHeaders.CONSUMER)  consumer: KafkaConsumer<String, String>,
           @Header(KafkaHeaders.RECEIVED_TIMESTAMP) timestamp: Long
) {

毕竟,使用@DltHandler注释的方法可以接受哪些标头?为什么某些值的值被截断?

操作:

  • 上面的代码是用带有Spring Boot的Kotlin编写的
  • 我使用的是Spring Boot 2.5.4-Release和:
编译‘org.springframework.kafka:spring-kafka:2.7.6’

编译‘org.apache.kafka:kafka-客户端:2.8.0’

推荐答案

这些标头由byte[]转换而来。

偏移量似乎存在转换问题-当我将参数声明为long时,它返回零。

这对我来说很好...

@RetryableTopic(attempts = "1")
@KafkaListener(id = "so69229529", topics = "so69229529")
void listen(String in) {
    throw new RuntimeException();
}

@DltHandler
void handler(Message<?> msg,
        @Header(KafkaHeaders.ORIGINAL_OFFSET) byte[] offset,
        @Header(KafkaHeaders.EXCEPTION_FQCN) String descException,
        @Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stacktrace,
        @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {
    System.out.println(msg);
    System.out.println(ByteBuffer.wrap(offset).getLong());
    System.out.println(descException);
    System.out.println(stacktrace);
    System.out.println(errorMessage);
}
4
org.springframework.kafka.listener.ListenerExecutionFailedException
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.TimestampedException: Listener method 'void com.example.demo.So69229529Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'void com.example.demo.So69229529Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException
...
Caused by: java.lang.RuntimeException
...
Listener failed; nested exception is org.springframework.kafka.listener.TimestampedException: Listener method 'void com.example.demo.So69229529Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'void com.example.demo.So69229529Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException

https://github.com/spring-projects/spring-kafka/issues/1951

这篇关于Spring Kafka@DltHandler注释方法在非阻塞重试实现中未正确接收标头的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆