收到消息后如何停止轮询?弹簧集成 [英] How to stop polling after a message is received? Spring Integration

查看:40
本文介绍了收到消息后如何停止轮询?弹簧集成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想轮询目录中的文件并在找到文件后停止轮询.我对 Spring 框架很陌生,其中很多仍然很混乱.在做了一些研究之后,我发现了几种这样做的方法,但对其中任何一种都没有任何运气.

I want to poll for a file in a directory and stop the polling once the file is found. I am very new to Spring framework and a lot of it still is very confusing. After doing some research, I found out a couple of ways of doing this but haven't any luck with any of them.

其中一种方法是使用如图所示的控制总线 这里.但是,似乎轮询仅在 2 秒后停止.我不确定如何包含仅在收到文件时停止的条件.

One of the ways is using a control bus as shown here. However, it just seems that the polling just stops after 2 seconds. I am not sure how to include the condition to stop only when a file is received.

另一种方法是使用智能轮询"作为回答here.答案中的链接很旧,但它指向此处的官方 Spring 文档:智能投票.通过这篇文章,我了解了AbstractMessageSourceAdviceSimpleActiveIdleMessageSourceAdvice.后者似乎适合我的目标并且是最容易实现的,所以我决定试一试.我的代码如下:

Another way is to use "Smart Polling" as answered here. The link in the answer is old but it points to the official Spring docs here: Smart Polling. Through the article, I learned about AbstractMessageSourceAdvice and SimpleActiveIdleMessageSourceAdvice. The latter seems to suit my goal and would be the simplest to implement, so I decided to give that a go. My codes are as below:

IntegrationConfig.java

package com.example.springexample;

import java.io.File;

import org.aopalliance.aop.Advice;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.aop.SimpleActiveIdleMessageSourceAdvice;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.util.DynamicPeriodicTrigger;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableIntegration
public class IntegrationConfig {

    @Bean
    public IntegrationFlow advised() {
        return IntegrationFlows.from("fileInputChannel")
                .handle("runBatchScript", "run", c -> c.advice(stopPollingAdvice()))
                .get();
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File("."));
        source.setFilter(new SimplePatternFileListFilter("*.bat"));
        return source;
    }

    @Bean
    public RunBatchScript runBatchScript() {
        return new RunBatchScript();
    }

    @Bean
    public Advice stopPollingAdvice() {
        DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(10000);
        SimpleActiveIdleMessageSourceAdvice advice = new SimpleActiveIdleMessageSourceAdvice(trigger);
        advice.setActivePollPeriod(60000);
        return advice;
    }
}

RunBatchScript.java

package com.example.springexample;

import java.io.IOException;
import java.util.Date;
import java.util.logging.Logger;

public class RunBatchScript {

    Logger logger = Logger.getLogger(RunBatchScript.class.getName());

    public void run() throws IOException {
        logger.info("Running the batch script at " + new Date());
        Runtime.getRuntime().exec("cmd.exe /c simplebatchscript.bat");
        logger.info("Finished running the batch script at " + new Date());
    }
}

SpringExampleApplication.java

package com.example.springexample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringExampleApplication.class, args);
    }

}

我使用了这个this 作为我的代码的基础.但是,它似乎不起作用,因为轮询器仍然每 1 秒轮询一次,而不是新的 10 秒或 60 秒轮询一次.此外,我不确定如何实际停止轮询器.我尝试将 null 放入 SimpleActiveIdleMessageSource 的构造函数中,但它只返回 NullPointerException.

I used this and this as the base for my codes. However, it doesn't seem to be working as the poller still polls every 1 second instead of the new 10 seconds or 60 seconds. Moreover, I am not sure how to actually stop the poller. I tried putting null into the constructor for SimpleActiveIdleMessageSource but it just returns NullPointerException.

运行应用程序时的输出:

The output when I run the application:

2020-03-15 13:57:46.081  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:46 SRET 2020
2020-03-15 13:57:46.084  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:46 SRET 2020
2020-03-15 13:57:47.085  INFO 37504 --- [ask-scheduler-2] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:47 SRET 2020
2020-03-15 13:57:47.087  INFO 37504 --- [ask-scheduler-2] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:47 SRET 2020
2020-03-15 13:57:48.089  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:48 SRET 2020
2020-03-15 13:57:48.092  INFO 37504 --- [ask-scheduler-1] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:48 SRET 2020
2020-03-15 13:57:49.093  INFO 37504 --- [ask-scheduler-3] c.example.springexample.RunBatchScript   : Running the batch script at Sun Mar 15 13:57:49 SRET 2020
2020-03-15 13:57:49.096  INFO 37504 --- [ask-scheduler-3] c.example.springexample.RunBatchScript   : Finished running the batch script at Sun Mar 15 13:57:49 SRET 2020

非常感谢您对某些代码的任何帮助.

Any help with some code is greatly appreciated.

推荐答案

您应该将 SimpleActiveIdleMessageSourceAdvice 应用到 @InboundChannelAdapter.此外,SimpleActiveIdleMessageSourceAdvice 的触发器应该与用于轮询文件的触发器相同:

You should apply SimpleActiveIdleMessageSourceAdvice to @InboundChannelAdapter. Also , the trigger of SimpleActiveIdleMessageSourceAdvice should be the same as the trigger that is used to poll the files:

    @Bean
    @EndpointId("fileInboundChannelAdapter")
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller("fileReadingMessageSourcePollerMetadata"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File("."));
        source.setFilter(new SimplePatternFileListFilter("*.bat"));
        return source;
    }

    @Bean
    public PollerMetadata fileReadingMessageSourcePollerMetadata() {
        PollerMetadata meta = new PollerMetadata();

        DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(1000);

        SimpleActiveIdleMessageSourceAdvice advice = new SimpleActiveIdleMessageSourceAdvice(trigger);
        advice.setActivePollPeriod(60000);

        meta.setTrigger(trigger);
        meta.setAdviceChain(List.of(advice));
        meta.setMaxMessagesPerPoll(1);
        return meta;
    }

请注意 SimpleActiveIdleMessageSourceAdvice 只是在下次轮询文件时更改.您可以将其设置为一个非常大的数字,例如几千年后,这可以以某种方式实现您在有生之年不再轮询文件的意图.但是轮询文件的调度程序线程仍然处于活动状态.

Please note that SimpleActiveIdleMessageSourceAdvice just change the next time to poll files. You can set it to a very large number such as several thousand years later which can somehow achieve your intention which never poll the file again in your lifetime. But the scheduler thread that poll the file still active.

如果你真的想关闭这个调度器线程,你可以向控制总线发送一个关闭信号.

If you really want to shut down this scheduler thread too, you can send a shut down signal to the control bus.

首先定义一个控制总线:

First define a control bus :

    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from("controlBus")
                  .controlBus()
                  .get();
    }

然后实现一个AbstractMessageSourceAdvice,它在轮询文件后向控制总线发送关闭信号:

Then implements an AbstractMessageSourceAdvice that send a shutdown signal to the control bus after a file is polled :

@Service
public class StopPollingAdvice extends AbstractMessageSourceAdvice{

    @Lazy
    @Qualifier("controlBus")
    @Autowired
    private MessageChannel controlBusChannel;


    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return super.beforeReceive(source);
    }

    @Override
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
        Message operation = MessageBuilder.withPayload("@fileInboundChannelAdapter.stop()").build();
        controlBusChannel.send(operation);
        return result;
    }
}

并将轮询文件的 PollerMetadata 更改为:

and change the PollerMetadata that poll files to :

@Bean
public PollerMetadata fileReadingMessageSourcePollerMetadata(StopPollingAdvice stopPollingAdvice) {
    PollerMetadata meta = new PollerMetadata(); 
    meta.setTrigger(new PeriodicTrigger(1000));
    meta.setAdviceChain(List.of(stopPollingAdvice));
    meta.setMaxMessagesPerPoll(1);
    return meta;
}

这篇关于收到消息后如何停止轮询?弹簧集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆