使用 Spring Cloud Stream 无法在消费者端设置到同一目的地和组的多个绑定 [英] Fail to setup multiple bindings to the same destination and group on the consumer side using Spring Cloud Stream

查看:43
本文介绍了使用 Spring Cloud Stream 无法在消费者端设置到同一目的地和组的多个绑定的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我打算为 Kafka 消费者弹性设置多个绑定.更具体地说,备份侦听器与主侦听器具有相同的目标和组,期望代理 IP.备份侦听器的自动启动在启动时关闭,但会在发生故障转移时以编程方式打开.

I intend to setup multiple bindings for Kafka Consumer resiliency. More specifically, the backup listener has the same destination and group as the main listener expect the broker IPs. The backup listener's autoStartUp is turned off at startup, but will be turned on programmatic as failover occurs.

但是,微服务在启动时抛出以下异常:

However, the micro-service throws the following exception when it launches:

org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:169) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:126) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_181]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at com.example.demo.DemoApplication.main(DemoApplication.java:14) ~[classes/:na]
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'my-kafka-topic-name.my-kafka-consumer-group-name.errors.recoverer' defined in null: Cannot register bean definition [Root bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] for bean 'my-kafka-topic-name.my-kafka-consumer-group-name.errors.recoverer.errors.recoverer': There is already [Root bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] bound.
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.registerBeanDefinition(DefaultListableBeanFactory.java:927) ~[spring-beans-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.GenericApplicationContext.registerBeanDefinition(GenericApplicationContext.java:323) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.GenericApplicationContext.registerBean(GenericApplicationContext.java:471) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:681) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:633) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:644) ~[spring-cloud-stream-binder-kafka-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:148) ~[spring-cloud-stream-binder-kafka-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:407) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    ... 24 common frames omitted

配置:

spring:
  cloud:
    function:
      definition: mainListener;backupListener;
    stream:
      default-binder: my-main-binder
      function:
        bindings:
          mainListener-in-0: event-in
          backupListener-in-0: backup-event-in
      bindings:
        event-in:                                 <-- main listener
          binder: my-main-binder
          consumer:
            qulifer: local
            use-native-decoding: true
          destination: my-kafka-topic-name
          group: my-kafka-consumer-group-name
        backup-event-in:                          <-- backup listener with autoStartup turned off
          binder: my-backup-binder
          consumer:
            autoStartup: false
            use-native-decoding: true
          destination: my-kafka-topic-name        <-- same destination and group as the main listener
          group: my-kafka-consumer-group-name
      binders:
        my-main-binder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip_1; ip_2;
        my-backup-binder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: backup_ip_1; backup_ip_2;

依赖:spring-cloud-function-context: 3.0.2.RELEASEspring-cloud-stream-binder-kafka: 3.0.2.RELEASE

Dependencies: spring-cloud-function-context: 3.0.2.RELEASE spring-cloud-stream-binder-kafka: 3.0.2.RELEASE

另一个帖子的已接受答案表明将添加 Qualifier 属性来解决此问题.但是我没有在 spring-cloud-stream:3.0.2.RELEASE 的 org.springframework.cloud.stream.binder.ConsumerProperties 类中找到它

The accepted answer of another post indicates that Qualifier property will be added to address this issue. But I fail to find it in org.springframework.cloud.stream.binder.ConsumerProperties class in spring-cloud-stream:3.0.2.RELEASE

启动消费者时抛出异常- (不能为微服务中的不同频道分配相同的组名)

https://github.com/garyrussell/spring-cloud-stream/commit/5b87b8cae494ae9568d924f64adc436374a67ea7

更新#1:按照建议将allow-bean-definition-overriding配置为true后,我看到了一个不同的异常如下:

Update# 1: After the allow-bean-definition-overriding is configured to true as recommended, I saw a different exception as follows:

org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:169) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:126) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_181]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at com.example.demo.DemoApplication.main(DemoApplication.java:14) ~[classes/:na]
Caused by: java.lang.IllegalStateException: Only one LastSubscriberMessageHandler is allowed
    at org.springframework.cloud.stream.binder.BinderErrorChannel.subscribe(BinderErrorChannel.java:44) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:712) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:633) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:644) ~[spring-cloud-stream-binder-kafka-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:148) ~[spring-cloud-stream-binder-kafka-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:407) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    ... 24 common frames omitted

推荐答案

目前默认不支持该配置;错误基础结构 bean 名称基于组和目标.

That configuration is not currently supported by default; the error infrastructure bean names are based on the group and destination.

如果您不介意两个消费者使用相同的错误基础结构(在这种情况下听起来不错),您可以设置

If you don't mind the same error infrastructure being used for both consumers (which sounds ok in this case), you can set

spring.main.allow-bean-definition-overriding=true

这篇关于使用 Spring Cloud Stream 无法在消费者端设置到同一目的地和组的多个绑定的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆