在 Spring Integration 中,RequestHandlerRetryAdvice 不能与 Ftp.outboundGateway 一起使用 [英] RequestHandlerRetryAdvice cannot be made to work with Ftp.outboundGateway in Spring Integration

查看:16
本文介绍了在 Spring Integration 中,RequestHandlerRetryAdvice 不能与 Ftp.outboundGateway 一起使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的情况类似于这个问题.不同之处在于我不使用 WebFlux.outboundGateway 而是使用 Ftp.outboundGateway 在其上调用 AbstractRemoteFileOutboundGateway.Command.GET 命令,常见的问题是我无法使用定义的 RequestHandlerRetryAdvice.

My situation is similar to the one described in this SO question. The difference being that I don't use a WebFlux.outboundGateway but an Ftp.outboundGateway on which I call an AbstractRemoteFileOutboundGateway.Command.GETcommand, the common problem being that I can't get the defined RequestHandlerRetryAdvice to be used.

配置如下(分解为相关部分):

The configuration looks like this (stripped down to the relevant parts):

@RestController
@RequestMapping( value = "/somepath" )
public class DownloadController
{
   private DownloadGateway downloadGateway;

   public DownloadController( DownloadGateway downloadGateway )
   {
      this.downloadGateway = downloadGateway;
   }

   @PostMapping( "/downloads" )
   public void download( @RequestParam( "filename" ) String filename )
   {
      Map<String, Object> headers = new HashMap<>();

      downloadGateway.triggerDownload( filename, headers );
   }
}    

@MessagingGateway
public interface DownloadGateway
{
   @Gateway( requestChannel = "downloadFiles.input" )
   void triggerDownload( Object value, Map<String, Object> headers );
}

@Configuration
@EnableIntegration
public class FtpDefinition
{
   private FtpProperties ftpProperties;

   public FtpDefinition( FtpProperties ftpProperties )
   {
      this.ftpProperties = ftpProperties;
   }

   @Bean
   public DirectChannel gatewayDownloadsOutputChannel()
   {
      return new DirectChannel();
   }

   @Bean
   public IntegrationFlow downloadFiles( RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile )
   {
      return f -> f.handle( getRemoteFile, getRetryAdvice() )
                   .channel( "gatewayDownloadsOutputChannel" );
   }

   private Consumer<GenericEndpointSpec<AbstractRemoteFileOutboundGateway<FTPFile>>> getRetryAdvice()
   {
      return e -> e.advice( ( (Supplier<RequestHandlerRetryAdvice>) () -> {
         RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
         advice.setRetryTemplate( getRetryTemplate() );
         return advice;
      } ).get() );
   }

   private RetryTemplate getRetryTemplate()
   {
      RetryTemplate result = new RetryTemplate();

      FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
      backOffPolicy.setBackOffPeriod( 5000 );

      result.setBackOffPolicy( backOffPolicy );
      return result;
   }

   @Bean
   public RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile( SessionFactory sessionFactory )
   {
      return 
         Ftp.outboundGateway( sessionFactory,
                              AbstractRemoteFileOutboundGateway.Command.GET,
                              "payload" )
            .fileExistsMode( FileExistsMode.REPLACE )
            .localDirectoryExpression( "'" + ftpProperties.getLocalDir() + "'" )
            .autoCreateLocalDirectory( true );
   }

   @Bean
   public SessionFactory<FTPFile> ftpSessionFactory()
   {
      DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
      sessionFactory.setHost( ftpProperties.getServers().get( 0 ).getHost() );
      sessionFactory.setPort( ftpProperties.getServers().get( 0 ).getPort() );
      sessionFactory.setUsername( ftpProperties.getServers().get( 0 ).getUser() );
      sessionFactory.setPassword( ftpProperties.getServers().get( 0 ).getPassword() );
      return sessionFactory;
   }
}

@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class FtpTestApplication {

    public static void main(String[] args) {
        SpringApplication.run( FtpTestApplication.class, args );
    }
}

@Configuration
@PropertySource( "classpath:ftp.properties" )
@ConfigurationProperties( prefix = "ftp" )
@Data
public class FtpProperties
{
   @NotNull
   private String localDir;

   @NotNull
   private List<Server> servers;

   @Data
   public static class Server
   {
      @NotNull
      private String host;

      @NotNull
      private int port;

      @NotNull
      private String user;

      @NotNull
      private String password;
   }
}

控制器主要用于测试目的,在实际实现中有一个轮询器.我的 FtpProperties 包含服务器列表,因为在实际实现中,我使用 DelegatingSessionFactory 根据某些参数选择一个实例.

The Controller is mostly just there for testing purposes, in the actual implementation there's a poller. My FtpProperties hold a list of servers because in the actual implementation, I use a DelegatingSessionFactory to pick an instance based on some parameters.

根据 Gary Russell 的评论,我希望重试失败的下载.但是,如果我中断下载服务器端(通过在 FileZilla 实例中发出Kick user"),我只会立即获得堆栈跟踪,而无需重试:

According to Gary Russell's comment, I'd expect a failed download to be retried. But if I interrupt a download server-side (by issuing "Kick user" in a FileZilla instance), I just get an immediate stack trace and no retries:

org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received.  Server closed connection.
[...]

我还需要上传文件,为此我使用了 Ftp.outboundAdapter.在这种情况下,使用相同的 RetryTemplate,如果我中断上传服务器端,Spring Integration 会再执行两次尝试,每次延迟 5 秒,然后才记录 java.net.SocketException: 连接重置,一切如预期.

I also need to upload files, for which I use an Ftp.outboundAdapter. In this case and with the same RetryTemplate, if I interrupt an upload server-side, Spring Integration performs two more attempts with a delay of 5s each, and only then logs java.net.SocketException: Connection reset, all as expected.

我尝试调试了一下,注意到在第一次尝试通过 Ftp.outboundAdapter 上传之前,RequestHandlerRetryAdvice.doInvoke() 上的断点被命中.但是当通过Ftp.outboundGateway 下载时,断点永远被命中.

I tried to debug a little and noticed that right before the first attempt to upload through the Ftp.outboundAdapter, a breakpoint on RequestHandlerRetryAdvice.doInvoke() is hit. But when downloading through the Ftp.outboundGateway, that breakpoint is never hit.

我的配置是否有问题,有人可以让 RequestHandlerRetryAdviceFtp.outboundGateway/AbstractRemoteFileOutboundGateway.Command.GET 一起使用吗?

Is there a problem with my configuration, could someone get the RequestHandlerRetryAdvice to work with Ftp.outboundGateway/AbstractRemoteFileOutboundGateway.Command.GET?

推荐答案

抱歉耽搁了;我们本周在 SpringOne 平台.

Sorry for the delay; we are at SpringOne Platform this week.

问题是由于网关规范是一个 bean - 在应用建议之前网关最终被初始化.

The problem is due to the fact that the gateway spec is a bean - the gateway ends up being initialized before the advice is applied.

我像这样改变了你的代码......

I changed your code like this...

@Bean
public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
    return f -> f.handle(getRemoteFile(sessionFactory), getRetryAdvice())
            .channel("gatewayDownloadsOutputChannel");
}

...

private RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile(SessionFactory<FTPFile> sessionFactory) {
    return Ftp.outboundGateway(sessionFactory,
            AbstractRemoteFileOutboundGateway.Command.GET,
            "payload")
            .fileExistsMode(FileExistsMode.REPLACE)
            .localDirectoryExpression("'/tmp'")
            .autoCreateLocalDirectory(true);
}

...它奏效了.

通常最好不要直接处理规范,而是将它们内联到流定义中...

It's generally better to not deal with Specs directly and just have them inlined in the flow definition...

@Bean
public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
    return f -> f.handle(Ftp.outboundGateway(sessionFactory,
            AbstractRemoteFileOutboundGateway.Command.GET,
            "payload")
            .fileExistsMode(FileExistsMode.REPLACE)
            .localDirectoryExpression("'/tmp'")
            .autoCreateLocalDirectory(true), getRetryAdvice())
        .channel("gatewayDownloadsOutputChannel");
}

这篇关于在 Spring Integration 中,RequestHandlerRetryAdvice 不能与 Ftp.outboundGateway 一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆