Spring Cloud 流 kafka 暂停/恢复活页夹 [英] Spring cloud stream kafka pause/resume binders

查看:55
本文介绍了Spring Cloud 流 kafka 暂停/恢复活页夹的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用的是 spring cloude stream 2.0 &Kafka 作为消息代理.
对于目标系统(数据库或第 3 方 API)不可用的情况,我们已经实现了一个可以停止应用程序上下文的断路器,如下所示:当目标系统关闭时停止Spring Cloud Stream @StreamListener 监听

We are using spring cloude stream 2.0 & Kafka as a message broker.
We've implemented a circuit breaker which stops the Application context, for cases where the target system (DB or 3rd party API) is unavilable, as suggested here: Stop Spring Cloud Stream @StreamListener from listening when target system is down

现在在 spring cloud stream 2.0 中有一种方法可以使用执行器来管理 binder 的生命周期:绑定可视化和控制

Now in spring cloud stream 2.0 there is a way to manage the lifecycle of binder using actuator: Binding visualization and control


是否可以从代码中控制 binder 生命周期,这意味着如果目标服务器关闭,暂停 binder,以及当它启动时,resume?


Is it possible to control the binder lifecycle from the code, means in case target server is down, to pause the binder, and when it's up, to resume?

推荐答案

抱歉,我误读了您的问题.

Sorry, I misread your question.

您可以自动连接 BindingsEndpoint,但不幸的是,它的 State 枚举是私有的,因此您无法以编程方式调用 changeState().

You can auto wire the BindingsEndpoint but, unfortunately, its State enum is private so you can't call changeState() programmatically.

我已经为此打开了一个问题.

编辑

你可以用反射来做,但它有点丑......

You can do it with reflection, but it's a bit ugly...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So53476384Application {

    public static void main(String[] args) {
        SpringApplication.run(So53476384Application.class, args);
    }

    @Autowired
    BindingsEndpoint binding;

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Class<?> clazz = ClassUtils.forName("org.springframework.cloud.stream.endpoint.BindingsEndpoint$State",
                    So53476384Application.class.getClassLoader());
            ReflectionUtils.doWithMethods(BindingsEndpoint.class, method -> {
                try {
                    method.invoke(this.binding, "input", clazz.getEnumConstants()[2]); // PAUSE
                }
                catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }, method -> method.getName().equals("changeState"));
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {

    }

}

这篇关于Spring Cloud 流 kafka 暂停/恢复活页夹的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆