在 Spring Cloud Stream Kafka 中正确管理 DLQ [英] Correctly manage DLQ in Spring Cloud Stream Kafka

查看:52
本文介绍了在 Spring Cloud Stream Kafka 中正确管理 DLQ的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 kafka 在 Spring Cloud Stream 中管理一个 DLQ.

application.yaml

服务器:端口:8091尤里卡:客户:服务网址:默认区域:http://IP:8761/eureka春天:应用:名称:员工-消费者云:溪流:卡夫卡:粘合剂:经纪人:IP:9092绑定:问候语:目的地:问候内容类型:应用程序/json问候:目的地:问候内容类型:应用程序/json绑定:问候:消费者:enableDlq: 真dlqName: 死了卡夫卡:消费者:组 ID:A

正如您在我的配置中看到的,我启用了 dlq 并为 dlq 主题设置了一个名称.

为了测试 DLQ 行为,我对某些消息抛出异常

我的监听器组件

@StreamListener("问候结束")public void handleGreetingsInput(@Payload Greetings greetings) 抛出异常 {logger.info("问候语输入 -> {}", 问候语);if (greetings.getMessage().equals("ciao")) {throw new Exception("eer");}}

这样,等于 "ciao" 的消息抛出异常,在日志中我看到它被处理了 3 次

2018-07-09 13:19:57.256 INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener :问候输入 ->com.mitro.model.Greetings@3da9d701[timestamp=0,message=ciao]2018-07-09 13:19:58.259 INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener :问候输入 ->com.mitro.model.Greetings@5bd62aaf[timestamp=0,message=ciao]2018-07-09 13:20:00.262 INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener :问候输入 ->com.mitro.model.Greetings@c26f92b[timestamp=0,message=ciao]2018-07-09 13:20:00.266 错误 1 ​​--- [container-0-C-1] osintegration.handler.LoggingHandler:org.springframework.messaging.MessagingException:调用 com.mitro.service.GreetingsListener 时抛出异常#handleGreetingsInput[1 args];嵌套异常是 java.lang.Exception: eer, failedMessage=GenericMessage [payload=byte[32], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@510302cb, deliveryAttempt=3,kafka_timestampType=CREATE_TIME,kafka_receivedMessageKey=null,kafka_receivedPartitionId=0,contentType=application/json,kafka_receivedTopic=greetings-out,kafka_receivedTimestamp=15311482}3972

这对我来说很好,但我不明白为什么会创建一个名为 dead-out 的主题(请看下图).

我做错了什么?

编辑 1:(仍然没有为 DLQ 创建主题)

服务器:端口:8091尤里卡:客户:服务网址:默认区域:http://IP:8761/eureka春天:应用:名称:员工-消费者云:溪流:卡夫卡:流:粘合剂:serdeError:sendToDlq粘合剂:经纪人:IP:9092自动创建主题:true绑定:问候:目的地:问候内容类型:应用程序/json消费者:enableDql: 真dlqName: 死了autoCommitOnError: 真自动提交偏移量:真绑定:问候:目的地:问候内容类型:应用程序/json消费者:enableDlq: 真dlqName: 死了autoCommitOnError: 真自动提交偏移量:真卡夫卡:消费者:组 ID:A

解决方案

看起来你的属性颠倒了;通用属性 - 目的地、内容类型 - 必须在 spring.cloud.stream.bindings 下.kafka 特定的属性(enableDlq、dlqName)必须在 spring.clound.stream.kafka.bindings 下.

你把它们颠倒了.

编辑

您的(修改后的)配置有两个问题.

  1. typo enableDql 而不是 enableDlq
  2. 无组 - 您不能拥有匿名消费者的 DLQ:

<块引用>

Caused by: java.lang.IllegalArgumentException: DLQ 支持不适用于匿名订阅

这很好用:

弹簧:应用:名称:员工-消费者云:溪流:卡夫卡:粘合剂:经纪人:本地主机:9092自动创建主题:true绑定:输入:消费者:enableDlq: 真dlqName: 死了autoCommitOnError: 真自动提交偏移量:真绑定:输入:群:so51247113目的地:问候内容类型:应用程序/json

@SpringBootApplication@EnableBinding(Sink.class)公共类 So51247113Application {公共静态无效主(字符串 [] args){SpringApplication.run(So51247113Application.class, args);}@StreamListener(Sink.INPUT)公共无效输入(字符串输入){System.out.println(in);throw new RuntimeException("fail");}@KafkaListener(id = "foo", topic = "dead-out")public void dlq(Message in) {System.out.println("DLQ:" + in);}}

I want manage a DLQ in Spring Cloud Stream using kafka.

application.yaml

server:
    port: 8091
eureka:
    client:
        serviceUrl:
            defaultZone: http://IP:8761/eureka
spring:
    application:
        name: employee-consumer
    cloud:
        stream:
            kafka:
                binder:
                    brokers: IP:9092
                bindings:
                    greetings-in:
                        destination: greetings
                        contentType: application/json
                    greetings-out:
                        destination: greetings
                        contentType: application/json
            bindings:
                greetings-out:
                    consumer:
                        enableDlq: true
                        dlqName: dead-out
    kafka:
      consumer:
        group-id: A

As you can see in my configuration I enable dlq and set a name to the dlq topic.

To test DLQ behaviour I throw an exception on certain messages

My listener component

@StreamListener("greetings-out")
    public void handleGreetingsInput(@Payload Greetings greetings) throws Exception {
        logger.info("Greetings input -> {}", greetings);
        if (greetings.getMessage().equals("ciao")) {
            throw new Exception("eer");
        }
    }

In this way, the message that is equal to "ciao" throws an exception and in logs I see that it gets processed three times

2018-07-09 13:19:57.256  INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener      : Greetings input -> com.mitro.model.Greetings@3da9d701[timestamp=0,message=ciao]
2018-07-09 13:19:58.259  INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener      : Greetings input -> com.mitro.model.Greetings@5bd62aaf[timestamp=0,message=ciao]
2018-07-09 13:20:00.262  INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener      : Greetings input -> com.mitro.model.Greetings@c26f92b[timestamp=0,message=ciao]
2018-07-09 13:20:00.266 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.mitro.service.GreetingsListener#handleGreetingsInput[1 args]; nested exception is java.lang.Exception: eer, failedMessage=GenericMessage [payload=byte[32], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@510302cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=greetings-out, kafka_receivedTimestamp=1531142397248}]

This is fine for me, but I don't understand why a topic called dead-out is created as result (take a look at the image below).

What am I doing wrong?

EDIT 1: (still doesn't create topic for DLQ)

server:
    port: 8091
eureka:
    client:
        serviceUrl:
            defaultZone: http://IP:8761/eureka
spring:
    application:
        name: employee-consumer
    cloud:
        stream:
            kafka:
                streams:
                    binder:
                        serdeError: sendToDlq
                binder:
                    brokers: IP:9092
                    auto-create-topics: true
                bindings:
                    greetings-out:
                        destination: greetings-out
                        contentType: application/json
                        consumer:
                          enableDql: true
                          dlqName: dead-out
                          autoCommitOnError: true
                          autoCommitOffset: true
            bindings:
                greetings-out:
                    destination: greetings-out
                    contentType: application/json
                    consumer:
                        enableDlq: true
                        dlqName: dead-out
                        autoCommitOnError: true
                        autoCommitOffset: true
    kafka:
      consumer:
        group-id: A

解决方案

Looks like your properties are reversed; the common properties - destination, contentType - must be under spring.cloud.stream.bindings. The kafka-specific properties (enableDlq, dlqName) must be under spring.clound.stream.kafka.bindings.

You have them reversed.

EDIT

There are two problems with your (modified) config.

  1. typo enableDql instead of enableDlq
  2. no group - you can't have a DLQ with an anonymous consumer:

Caused by: java.lang.IllegalArgumentException: DLQ support is not available for anonymous subscriptions

This works fine:

spring:
  application:
    name: employee-consumer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true
        bindings:
          input:
            consumer:
              enableDlq: true
              dlqName: dead-out
              autoCommitOnError: true
              autoCommitOffset: true
      bindings:
        input:
          group: so51247113
          destination: greetings-out
          contentType: application/json

and

@SpringBootApplication
@EnableBinding(Sink.class)
public class So51247113Application {

    public static void main(String[] args) {
        SpringApplication.run(So51247113Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in) {
        System.out.println(in);
        throw new RuntimeException("fail");
    }

    @KafkaListener(id = "foo", topics = "dead-out")
    public void dlq(Message<?> in) {
        System.out.println("DLQ:" + in);
    }

}

这篇关于在 Spring Cloud Stream Kafka 中正确管理 DLQ的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆