Spring 集成 MessageQueue 无需轮询 [英] Spring integration MessageQueue without polling

查看:67
本文介绍了Spring 集成 MessageQueue 无需轮询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将传入的消息写入消息队列,并让单个专用线程毫不延迟地使用这些消息 - 非常类似于 Spring Integration 在没有轮询器的情况下监听队列

I'd like to write incoming messages into a message queue and have the messages consumed by a single, dedicated thread without a delay - very similar to Spring Integration listen on queue without poller

我试过了:

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.queue(10_000))
  .bridge(spec -> spec.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
  .fixedSubscriberChannel()
  .route(inboundRouter())
  .get()

但这会导致任务调度程序线程调度轮询操作,然后将阻塞直到消息可用.这不是我对专用线程"的想法,如果任务调度程序线程也被用于写入队列,并且另一端没有消费者线程,则会导致我的应用程序死锁.

But this causes a task scheduler thread to schedule the polling operation, which will then block until a message is available. This is not my idea of a "dedicated thread" and causes a deadlock in my application if the task scheduler threads are also being used to write into the queue, and then there is no consumer thread left on the other side.

接下来我尝试的是:

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.queue(10_000))
  .bridge(spec -> spec.poller(Pollers.fixedDelay(0).taskExecutor(Executors.newSingleThreadExecutor()).receiveTimeout(Long.MAX_VALUE)))
  .fixedSubscriberChannel()
  .route(inboundRouter())
  .get()

但是由于 fixedDelay(0),这导致应用程序产生了无数的计划任务.

But this caused the application to spawn a gazillion of scheduled tasks, because of fixedDelay(0).

我遇到的下一个选项是:

Next option I came across was:

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.executor(Executors.newSingleThreadExecutor()))
  .route(inboundRouter())
  .get()

这似乎按预期工作;我有一个处理所有消息的专用线程.但是,我不再拥有可以监控其统计信息的消息队列(通过 JMX).

This seems to work as intended; I have a dedicated thread that processes all messages. However, I do no longer have a message queue whose statistics I can monitor (via JMX).

那么,有什么方法可以实现我的目标吗?如何实现?

So, is there any way to achieve my goal, and how?

推荐答案

好吧,那么您必须真正为该线程专门提供单独的 TaskScheduler.

Well, then you have to really dedicate separate TaskScheduler for that thread.

不幸的是,框架没有提供(还)明确的 API 来将其注入端点,但无论如何这是可能的:

Unfortunately, the Framework doesn't provide (yet) clear API for injecting that one into the endpoint, but that is possible anyway:

    @Bean
    public IntegrationFlow dedicatedPollingThreadFlow() {
        return IntegrationFlows.from(MessageChannels.queue("myQueueChannel"))
                .bridge(e -> e
                        .poller(Pollers.fixedDelay(0).receiveTimeout(-1))
                        .id("dedicatedPollingConsumer"))
                .channel(c -> c.queue("results"))
                .get();
    }

    @Bean
    public TaskScheduler dedicatedTaskScheduler() {
        return new ThreadPoolTaskScheduler();
    }

    @Bean
    @DependsOn("dedicatedPollingThreadFlow")
    public String dedicatedPollingConsumerConfigurer(
            @Qualifier("dedicatedPollingConsumer") PollingConsumer dedicatedPollingConsumer) {
        dedicatedPollingConsumer.setTaskScheduler(dedicatedTaskScheduler());
        return "";
    }

也要注意 .receiveTimeout(-1).这样它就会执行常规的 BlockingQueue.take() 永远阻塞你的专用线程.

Pay attention to the .receiveTimeout(-1) as well. This way it performs regular BlockingQueue.take() blocking your dedicated thread forever.

从框架的角度来看,允许将 TaskScheduler 与现有的 .poller() 一起注入到 GenericEndpointSpec 中的想法.

The idea from the Framework perspective to let to inject TaskScheduler into the GenericEndpointSpec alongside with the existing .poller().

同时是 JIRA 票.

这篇关于Spring 集成 MessageQueue 无需轮询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆