将 Spring Cloud Stream 与 Kafka 一起使用时,如何正常关闭应用程序? [英] How to perform a graceful application shutdown when using Spring Cloud Stream with Kafka?

查看:121
本文介绍了将 Spring Cloud Stream 与 Kafka 一起使用时,如何正常关闭应用程序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 Spring Cloud Stream (v1.3.0) 和 Kafka (v1.1.6) 的 Spring Boot (v.1.57) 应用程序.我希望能够优雅地关闭它,即在关闭时,所有流侦听器(即用 @StreamListener 注释)应该:

I have a spring boot (v.1.57) application which uses Spring Cloud Stream (v1.3.0) and Kafka (v1.1.6). I want to be able to gracefully shut it down, i.e., when shutting down, all stream listeners (i.e., annotated with @StreamListener) should:

  1. 停止轮询新消息
  2. 完成工作
  3. 将偏移量提交给 Kafka

我注意到 ContainerProperties 中有一个名为shutdownTimeout"的属性(默认设置为 10000 毫秒),因此我尝试通过反射扩展 ConcurrentKafkaListenerContainerFactoryConfigurer 类(因为它有 @ConditionalOnMissingBean 注释)将其修改为 30000像这样:

I noticed that there's a property called 'shutdownTimeout' in ContainerProperties (which is set to a default of 10000ms) so I've tried to modify it to 30000 by extending ConcurrentKafkaListenerContainerFactoryConfigurer class (Since it has a @ConditionalOnMissingBean annotation) via reflection like so:

@Slf4j
@Component
public class BehalfConcurrentKafkaListenerContainerFactoryConfigurer extends ConcurrentKafkaListenerContainerFactoryConfigurer {

    @Autowired
    private KafkaProperties kproperties;

    @Override
    public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
                          ConsumerFactory<Object, Object> consumerFactory) {
        PropertyAccessor myAccessor = PropertyAccessorFactory.forDirectFieldAccess(this);
        myAccessor.setPropertyValue("properties", kproperties);

        ContainerProperties containerProperties = listenerContainerFactory
                .getContainerProperties();
        super.configure(listenerContainerFactory, consumerFactory);
        containerProperties.setShutdownTimeout(30000);
    }
}

但是没有成功.还尝试将它 (shutdownTimeout: 30000) 放在 application.yml 中的 spring cloud 流绑定器设置下,但它再次没有帮助.

But it wasn't successful. Also tried putting it (shutdownTimeout: 30000) in application.yml under the spring cloud stream binder settings, but again it didn't help.

有什么方法可以控制关​​机过程并实现我的目标吗?

Is there any way to control the shutdown process and achieve my goals?

推荐答案

EDIT

不再需要做这个反射 hack;只需将 ListenerContainerCustomizer @Bean 添加到应用程序上下文即可.参见 此处.

It is no longer necessary to do this reflection hack; just add a ListenerContainerCustomizer @Bean to the application context. See here.

EDIT_END

不再支持spring-kafka 1.1.x;你应该使用 1.3.9 和引导 1.5.x.

spring-kafka 1.1.x is no longer supported; you should be using 1.3.9 with boot 1.5.x.

当前的 Boot 1.5.x 版本是 1.5.21.

The current Boot 1.5.x version is 1.5.21.

您应该立即升级.

但是,所有这些项目都有更新的版本.

However, there a much newer versions of all of these projects.

Spring Cloud Stream 不使用该工厂或引导属性来创建其容器;它没有公开在容器上配置该属性的机制.

Spring Cloud Stream doesn't use that factory, or the boot properties, to create its containers; it doesn't expose a mechanism to configure that property on the container.

Spring Cloud Stream 2.1 添加了 ListenerContainerCustomizer,它允许您通过设置任何属性来自定义绑定容器.

Spring Cloud Stream 2.1 added the ListenerContainerCustomizer which allows you to customize the binding container by setting any properties on it.

我建议您升级到 Boot 2.1.6 和 Spring Cloud Stream Germantown (2.2.0).

I suggest you upgrade to Boot 2.1.6 and Spring Cloud Stream Germantown (2.2.0).

编辑

这有点麻烦,但它应该可以工作,直到您可以升级到更新的流版本...

This is a bit of a hack, but it should work until you can upgrade to a newer stream release...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So56883620Application {

    public static void main(String[] args) {
        SpringApplication.run(So56883620Application.class, args).close();
    }

    private final CountDownLatch latch = new CountDownLatch(1);

    @StreamListener(Sink.INPUT)
    public void listen(String in) throws InterruptedException {
        this.latch.countDown();
        System.out.println(in);
        Thread.sleep(6_000);
        System.out.println("exiting");
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            IntStream.range(0,2).forEach(i -> template.send("mytopic", ("foo" + i).getBytes()));
            // wait for listener to start
            this.latch.await(10, TimeUnit.SECONDS);
            System.out.println("Shutting down");
        };
    }

    @Bean
    public SmartLifecycle bindingFixer(BindingService bindingService) {
        return new SmartLifecycle() {

            @Override
            public int getPhase() {
                return Integer.MAX_VALUE;
            }

            @Override
            public void stop() {
                // no op
            }

            @Override
            public void start() {
                @SuppressWarnings("unchecked")
                Map<String, Binding<?>> consumers = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService)
                        .getPropertyValue("consumerBindings");
                @SuppressWarnings("unchecked")
                Binding<?> inputBinding = ((List<Binding<?>>) consumers.get("input")).get(0);
                ((AbstractMessageListenerContainer<?, ?>) new DirectFieldAccessor(inputBinding)
                        .getPropertyValue("lifecycle.messageListenerContainer"))
                                .getContainerProperties().setShutdownTimeout(30_000L);
            }

            @Override
            public boolean isRunning() {
                return false;
            }

            @Override
            public void stop(Runnable callback) {
                callback.run();
            }

            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };
    }

}

这篇关于将 Spring Cloud Stream 与 Kafka 一起使用时,如何正常关闭应用程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆