Spring Cloud Stream Kafka暂停/恢复活页夹 [英] Spring cloud stream kafka pause/resume binders

查看:68
本文介绍了Spring Cloud Stream Kafka暂停/恢复活页夹的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用Spring Cloude流 2.0 &Kafka作为消息代理.
对于目标系统(DB或第三方API)无法使用的情况,我们已实现了一个断路器,该断路器可停止应用程序上下文,如此处所示:解决方案

对不起,我没看懂你的问题.

您可以自动连接 BindingsEndpoint ,但是不幸的是,它的 State 枚举是私有的,因此您不能以编程方式调用 changeState().

我已经为此打开了一个问题./p>

编辑

您可以通过反射来做到这一点,但这有点难看...

  @SpringBootApplication@EnableBinding(Sink.class)公共类So53476384Application {公共静态void main(String [] args){SpringApplication.run(So53476384Application.class,args);}@AutowiredBindingsEndpoint绑定;@豆公共ApplicationRunnerRunner(){返回参数->{类<?>clazz = ClassUtils.forName("org.springframework.cloud.stream.endpoint.BindingsEndpoint $ State",So53476384Application.class.getClassLoader());ReflectionUtils.doWithMethods(BindingsEndpoint.class,method-> {尝试 {method.invoke(this.binding,"input",clazz.getEnumConstants()[2]);//暂停}catch(InvocationTargetException e){e.printStackTrace();}},方法->method.getName().equals("changeState")));};}@StreamListener(Sink.INPUT)公共无效的listen(String in){}} 

We are using spring cloude stream 2.0 & Kafka as a message broker.
We've implemented a circuit breaker which stops the Application context, for cases where the target system (DB or 3rd party API) is unavilable, as suggested here: Stop Spring Cloud Stream @StreamListener from listening when target system is down

Now in spring cloud stream 2.0 there is a way to manage the lifecycle of binder using actuator: Binding visualization and control


Is it possible to control the binder lifecycle from the code, means in case target server is down, to pause the binder, and when it's up, to resume?

解决方案

Sorry, I misread your question.

You can auto wire the BindingsEndpoint but, unfortunately, its State enum is private so you can't call changeState() programmatically.

I have opened an issue for this.

EDIT

You can do it with reflection, but it's a bit ugly...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So53476384Application {

    public static void main(String[] args) {
        SpringApplication.run(So53476384Application.class, args);
    }

    @Autowired
    BindingsEndpoint binding;

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Class<?> clazz = ClassUtils.forName("org.springframework.cloud.stream.endpoint.BindingsEndpoint$State",
                    So53476384Application.class.getClassLoader());
            ReflectionUtils.doWithMethods(BindingsEndpoint.class, method -> {
                try {
                    method.invoke(this.binding, "input", clazz.getEnumConstants()[2]); // PAUSE
                }
                catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }, method -> method.getName().equals("changeState"));
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {

    }

}

这篇关于Spring Cloud Stream Kafka暂停/恢复活页夹的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆