Spring Cloud 流 kafka 暂停/恢复活页夹 [英] Spring cloud stream kafka pause/resume binders
问题描述
我们使用的是 spring cloude stream 2.0 &Kafka 作为消息代理.
对于目标系统(数据库或第 3 方 API)不可用的情况,我们已经实现了一个可以停止应用程序上下文的断路器,如下所示:当目标系统关闭时停止Spring Cloud Stream @StreamListener 监听
We are using spring cloude stream 2.0 & Kafka as a message broker.
We've implemented a circuit breaker which stops the Application context, for cases where the target system (DB or 3rd party API) is unavilable, as suggested here: Stop Spring Cloud Stream @StreamListener from listening when target system is down
现在在 spring cloud stream 2.0 中有一种方法可以使用执行器来管理 binder 的生命周期:绑定可视化和控制
Now in spring cloud stream 2.0 there is a way to manage the lifecycle of binder using actuator: Binding visualization and control
是否可以从代码中控制 binder 生命周期,这意味着如果目标服务器关闭,暂停
binder,以及当它启动时,resume
?
Is it possible to control the binder lifecycle from the code, means in case target server is down, to pause
the binder, and when it's up, to resume
?
推荐答案
抱歉,我误读了您的问题.
Sorry, I misread your question.
您可以自动连接 BindingsEndpoint
,但不幸的是,它的 State
枚举是私有的,因此您无法以编程方式调用 changeState()
.
You can auto wire the BindingsEndpoint
but, unfortunately, its State
enum is private so you can't call changeState()
programmatically.
我已经为此打开了一个问题.
编辑
你可以用反射来做,但它有点丑......
You can do it with reflection, but it's a bit ugly...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So53476384Application {
public static void main(String[] args) {
SpringApplication.run(So53476384Application.class, args);
}
@Autowired
BindingsEndpoint binding;
@Bean
public ApplicationRunner runner() {
return args -> {
Class<?> clazz = ClassUtils.forName("org.springframework.cloud.stream.endpoint.BindingsEndpoint$State",
So53476384Application.class.getClassLoader());
ReflectionUtils.doWithMethods(BindingsEndpoint.class, method -> {
try {
method.invoke(this.binding, "input", clazz.getEnumConstants()[2]); // PAUSE
}
catch (InvocationTargetException e) {
e.printStackTrace();
}
}, method -> method.getName().equals("changeState"));
};
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
}
}
这篇关于Spring Cloud 流 kafka 暂停/恢复活页夹的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!