使用 Java DSL 在远程 S(ftp) 中移动已处理的文件 [英] Moving processed files in remote S(ftp) using Java DSL

查看:17
本文介绍了使用 Java DSL 在远程 S(ftp) 中移动已处理的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一旦批处理使用 Spring 集成和 Java DSL 成功处理了文件,我就会尝试在远程 SFTP 上移动文件.

I'm trying to move files on remote SFTP once the batch has successfully processed the files using Spring integration and Java DSL.

实现这一目标的最佳方法是什么?

What would be the best way to achieve that?

  1. 批量添加一个步骤来移动远程文件?
  2. 或者使用 FTP 出站网关并提供 MV 命令?

我倾向于使用第二种解决方案,让批处理只关注逻辑,但我很难用 java dsl 来实现它.

I tend to prefer the second solution and let the batch focus on the logic only, but I've hard times trying to implement it with java dsl.

我已阅读http://docs.spring.io/spring-integration/reference/html/ftp.html#ftp-outbound-gateway 并尝试这样实现:

I've read http://docs.spring.io/spring-integration/reference/html/ftp.html#ftp-outbound-gateway and tried to implement like that :

@Bean
public MessageHandler ftpOutboundGateway() {
    return Sftp.outboundGateway(SftpSessionFactory(), 
            AbstractRemoteFileOutboundGateway.Command.MV, "payload")
            .localDirectory(new File("/home/blabla/"))
            .get();

}

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(
                Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\\.xml.mini$")
                ...             
               , 
                e -> e.id("sftpInboundAdapter")
                .poller(
                        Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
                        .maxMessagesPerPoll(-1)
                        .advice(retryAdvice())
                        )
            )
            .enrichHeaders( h -> h
                    .header(FileHeaders.REMOTE_DIRECTORY,"/home/filedrop/")
                    .header(FileHeaders.REMOTE_FILE, "/home/filedrop/OFFERS.xml.mini")
                    .header(FileHeaders.RENAME_TO, "/home/filedrop/done/OFFERS.xml.mini")
            )
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw()))
            .transform(jobExecutionToFileStringTransformer())
            .handle(ftpOutboundGateway())
            .handle(logger())
            .get();
}

我知道我的标题应该是动态的,但我不知道如何去做,所以现在我使用现有文件的名称.我收到此错误消息(他正在尝试删除目标目录中的文件!):

I know my headers should be dynamic but I don't know how to do it so for now I use the name of an existing file. I get this error message (he's trying to delete the file in the destination directory!):

Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file     at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343)
    at org.springframework.integration.file.remote.RemoteFileTemplate.rename(RemoteFileTemplate.java:290)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doMv(AbstractRemoteFileOutboundGateway.java:482)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:400)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    ... 94 more
 Caused by: org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
    at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:211)
    at org.springframework.integration.file.remote.RemoteFileTemplate$3.doInSessionWithoutResult(RemoteFileTemplate.java:300)
    at org.springframework.integration.file.remote.SessionCallbackWithoutResult.doInSession(SessionCallbackWithoutResult.java:34)
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334)
    ... 100 more
 Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
    at org.springframework.integration.sftp.session.SftpSession.remove(SftpSession.java:83)
    at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:205)
    ... 103 more

感谢您的帮助!

编辑工作流程,然后我已经简化了很多,但这里是我以前问题的解决方案:

EDIT working flow, I've then simplified it a lot but here the solution of myprevious problem:

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(
                Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\\.xml$")
                ...
                , 
                e -> e.id("sftpInboundAdapter")
                .poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
                        .maxMessagesPerPoll(-1)
                        )
            )
            .enrichHeaders( h -> h
                    // headers necessary for moving remote files (ftpOutboundGateway)
                    .headerExpression(FileHeaders.RENAME_TO, "'/home/blabla/done/' + payload.getName()")
                    .headerExpression(FileHeaders.REMOTE_FILE, "payload.getName()")
                    .header(FileHeaders.REMOTE_DIRECTORY,"/home/blabla/")
                    // headers necessary for moving local files (fileOutboundGateway_MoveToProcessedDirectory)
                    .headerExpression(FileHeaders.ORIGINAL_FILE,  "payload.getAbsolutePath()" )
                    .headerExpression(FileHeaders.FILENAME,  "payload.getName()")
            )
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw(), e-> e.advice(retryAdvice()))

            .<JobExecution, Boolean>route(p -> BatchStatus.COMPLETED.equals(p.getStatus()),
                                            mapping -> mapping
                                            .subFlowMapping("true", sf -> sf


                                                .handle(org.springframework.batch.core.JobExecution.class,
                                                         (p, h) -> myServiceActivator.jobExecutionToString(p, 
                                                                 (String) h.get(FileHeaders.REMOTE_DIRECTORY),
                                                                 (String) h.get(FileHeaders.REMOTE_FILE)))
                                                .handle(ftpOutboundGateway())
                                                .handle(Boolean.class,
                                                         (p, h) -> myServiceActivator.BooleanToString(p, 
                                                                 (String) h.get(FileHeaders.FILENAME)))
                                                .handle(fileOutboundGateway_MoveToProcessedDirectory())

                                                                                    )


                                        .subFlowMapping("false", sf -> sf
                                            .channel("nullChannel")     

                                            )
            )

            .handle(logger())
            .get();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    return Pollers.fixedRate(500).get();
}


@Bean
public MessageHandler ftpOutboundGateway() {
    return Sftp
            .outboundGateway(SftpSessionFactory(),
                    AbstractRemoteFileOutboundGateway.Command.MV,
                    "payload")
            .renameExpression("headers['file_renameTo']").get();
}

推荐答案

也许你没有重命名的权限,或者因为其他原因重命名失败;此异常是尝试删除to"文件名,因为初始重命名失败.打开调试日志,你应该看到这个日志...

Perhaps you don't have permissions to do the rename, or the rename failed for some other reason; this exception is an attempt to remove the "to" filename because an initial rename failed. Turn on DEBUG logging and you should see this log...

if (logger.isDebugEnabled()){
    logger.debug("Initial File rename failed, possibly because file already exists. Will attempt to delete file: "
            + pathTo + " and execute rename again.");
}
try {
    this.remove(pathTo);

由于此 remove() 操作失败,您的失败表明重命名由于其他原因失败(因为显然to"文件不存在).

Since the failure is on this remove() operation, your failure indicates that the rename failed due to some other reason (because clearly the "to" file does not exist).

这篇关于使用 Java DSL 在远程 S(ftp) 中移动已处理的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆