Spring Cloud Stream Kafka暂停/恢复活页夹 [英] Spring cloud stream kafka pause/resume binders
问题描述
我们正在使用Spring Cloude流 2.0 &Kafka作为消息代理.
对于目标系统(DB或第三方API)无法使用的情况,我们已实现了一个断路器,该断路器可停止应用程序上下文,如此处所示:绑定可视化和控件
是否有可能通过代码控制活页夹的生命周期,是否意味着在目标服务器关闭的情况下暂停
活页夹,以及在启动时恢复 resume
?
对不起,我没看懂你的问题.
您可以自动连接 BindingsEndpoint
,但是不幸的是,它的 State
枚举是私有的,因此您不能以编程方式调用 changeState()
.
我已经为此打开了一个问题./p>
编辑
您可以通过反射来做到这一点,但这有点难看...
@SpringBootApplication@EnableBinding(Sink.class)公共类So53476384Application {公共静态void main(String [] args){SpringApplication.run(So53476384Application.class,args);}@AutowiredBindingsEndpoint绑定;@豆公共ApplicationRunnerRunner(){返回参数->{类<?>clazz = ClassUtils.forName("org.springframework.cloud.stream.endpoint.BindingsEndpoint $ State",So53476384Application.class.getClassLoader());ReflectionUtils.doWithMethods(BindingsEndpoint.class,method-> {尝试 {method.invoke(this.binding,"input",clazz.getEnumConstants()[2]);//暂停}catch(InvocationTargetException e){e.printStackTrace();}},方法->method.getName().equals("changeState")));};}@StreamListener(Sink.INPUT)公共无效的listen(String in){}}
We are using spring cloude stream 2.0 & Kafka as a message broker.
We've implemented a circuit breaker which stops the Application context, for cases where the target system (DB or 3rd party API) is unavilable, as suggested here: Stop Spring Cloud Stream @StreamListener from listening when target system is down
Now in spring cloud stream 2.0 there is a way to manage the lifecycle of binder using actuator: Binding visualization and control
Is it possible to control the binder lifecycle from the code, means in case target server is down, to pause
the binder, and when it's up, to resume
?
Sorry, I misread your question.
You can auto wire the BindingsEndpoint
but, unfortunately, its State
enum is private so you can't call changeState()
programmatically.
I have opened an issue for this.
EDIT
You can do it with reflection, but it's a bit ugly...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So53476384Application {
public static void main(String[] args) {
SpringApplication.run(So53476384Application.class, args);
}
@Autowired
BindingsEndpoint binding;
@Bean
public ApplicationRunner runner() {
return args -> {
Class<?> clazz = ClassUtils.forName("org.springframework.cloud.stream.endpoint.BindingsEndpoint$State",
So53476384Application.class.getClassLoader());
ReflectionUtils.doWithMethods(BindingsEndpoint.class, method -> {
try {
method.invoke(this.binding, "input", clazz.getEnumConstants()[2]); // PAUSE
}
catch (InvocationTargetException e) {
e.printStackTrace();
}
}, method -> method.getName().equals("changeState"));
};
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
}
}
这篇关于Spring Cloud Stream Kafka暂停/恢复活页夹的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!