Spring DSL Integration Flow 在出站适配器出错后发布消息 [英] Spring DSL Integration Flow publishing message after error in outbound adaptor

查看:33
本文介绍了Spring DSL Integration Flow 在出站适配器出错后发布消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个集成流程.

IntegrationFlows.from(
            Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))
            .remoteDirectory(config.getInboundDirectory())
            , e -> e.poller(
                    Pollers.cron(config.getCron())
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
                        try {

                            this.destroy(String.valueOf(config.getId()));
                            configurationService.removeConfigurationChannelById(config.getId());
                            //loggin here

                            }
                        } catch (Exception ex1) {
                            Logger.getLogger(ExceptionAspect.class.getName()).log(Level.SEVERE, null, ex1);
                        }
                    })
                    .advice(startup.scanRemoteDirectory())
            ))
            .transform(
                file -> util.transform((File) file, config.getSourceEncoding(), config.getTargetEncoding(), doEncoding, doZip))
            .publishSubscribeChannel(s -> s
                .subscribe(f -> {
                    f.handle(Sftp.outboundAdapter(outboundSftp)
                        .useTemporaryFileName(false)
                        .autoCreateDirectory(true)
                        .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));

                })
                .subscribe(f -> {

                    if(doArchive) {
                        f.handle(Sftp.outboundAdapter(inboundSftp)
                            .useTemporaryFileName(false)
                            .autoCreateDirectory(true)
                            .remoteDirectory(config.getInboundArchiveDirectory()));
                    } else {
                        f.handle(m -> {});
                    }

                })
                .subscribe(f -> f
                    .handle(m -> {

                        // file transfer logging here
                        if(doArchive) {
                          // file archived logging here
                         }
                        }
                    })
                )
            )
            .get();

在第一个订阅者中,如果文件上传失败,它仍会打印该文件已传输但实际上没有传输的日志.

in 1st subscriber, if file is failed to upload it still printing logs that file is transferred but actually its not.

我所理解的消息将传递给每个订阅者,当第一个订阅者完成其工作时,它将发送给下一个订阅者.

what i understand message will be pass to each on subscriber and when 1st subscriber complete its work it will sent to next one.

就我而言,第一个订阅者实际上无法上传文件.建议不是删除文件.

In my case first subscriber is actually fail to upload file. Advice is not removing file.

我确实尝试了一些东西.

I did try few things.

IntegrationFlows.from(
                Sftp.inboundAdapter(inboundSftp)
               ..........
                ))
                .transform(....
                   )
                .publishSubscribeChannel(s -> s
                    .subscribe(f -> {
                        f.handle(Sftp.outboundAdapter(outboundSftp)
                            .useTemporaryFileName(false)
                            .autoCreateDirectory(true)
                            .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));

                    }).publishSubcribeChannel(s -> s
                      .subscribe(f -> {

                        if(doArchive) {
                            f.handle(Sftp.outboundAdapter(inboundSftp)
                                .useTemporaryFileName(false)
                                .autoCreateDirectory(true)
                                .remoteDirectory(config.getInboundArchiveDirectory()));
                        } else {
                            f.handle(m -> {});
                        }

                    })
                    .subscribe(f -> f
                        .handle(m -> {

                            // file transfer logging here
                            if(doArchive) {
                              // file archived logging here
                             }
                            }
                        })
                    )
                    )
                )
                .get();

我尝试在 outboundAdapter 订阅者之后再次发布,但仍然错误地发布.

I try to publish again after outboundAdapter subscriber, but in error it still published.

还尝试在 outboundAdapter 订阅者中添加 .errorChannel,对我不起作用.

Also try to add .errorChannel in outboundAdapter subscriber, didn't work for me.

      .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.2.RELEASE)

2020-08-20 11:10:53,265 INFO c.t.i.s.I2SftpApplication - Starting I2SftpApplication on GSEUC5CG8393GR5 with PID 21636 (C:\Users\MuhammadUmair\IdeaProjects\i2sftpinboudservice\target\classes started by MuhammadUmair in C:\Users\MuhammadUmair\IdeaProjects\i2sftpinboudservice)
2020-08-20 11:10:53,269 INFO c.t.i.s.I2SftpApplication - No active profile set, falling back to default profiles: default
2020-08-20 11:10:54,464 INFO o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-08-20 11:10:54,478 INFO o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-08-20 11:10:54,577 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,584 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,597 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,906 INFO o.s.b.w.e.t.TomcatWebServer - Tomcat initialized with port(s): 8080 (http)
2020-08-20 11:10:54,917 INFO o.a.c.h.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8080"]
2020-08-20 11:10:54,917 INFO o.a.c.c.StandardService - Starting service [Tomcat]
2020-08-20 11:10:54,917 INFO o.a.c.c.StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.29]
2020-08-20 11:10:55,095 INFO o.a.c.c.C.[.[.[/] - Initializing Spring embedded WebApplicationContext
2020-08-20 11:10:55,095 INFO o.s.w.c.ContextLoader - Root WebApplicationContext: initialization completed in 1705 ms
2020-08-20 11:10:56,920 INFO o.s.s.c.ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
2020-08-20 11:10:57,243 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2020-08-20 11:10:57,415 INFO o.s.i.e.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-08-20 11:10:57,416 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.errorChannel' has 1 subscriber(s).
2020-08-20 11:10:57,416 INFO o.s.i.e.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2020-08-20 11:10:57,416 INFO o.s.i.e.EventDrivenConsumer - Adding {message-handler:startup.resultFileHandler.serviceActivator} as a subscriber to the 'fromSftpChannel' channel
2020-08-20 11:10:57,416 INFO o.s.i.c.DirectChannel - Channel 'application.fromSftpChannel' has 1 subscriber(s).
2020-08-20 11:10:57,417 INFO o.s.i.e.EventDrivenConsumer - started bean 'startup.resultFileHandler.serviceActivator'
2020-08-20 11:10:57,422 INFO o.s.i.e.SourcePollingChannelAdapter - started bean 'startup.sftpMessageSource.inboundChannelAdapter'
2020-08-20 11:10:57,441 INFO o.a.c.h.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
2020-08-20 11:10:57,498 INFO o.s.b.w.e.t.TomcatWebServer - Tomcat started on port(s): 8080 (http) with context path ''
2020-08-20 11:10:57,508 INFO c.t.i.s.I2SftpApplication - Started I2SftpApplication in 9.228 seconds (JVM running for 13.133)
Registering an Integration Flow with id : 1
2020-08-20 11:10:57,689 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#2.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,690 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,691 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.1.subFlow#0.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,691 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#1.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#1.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,692 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.1.subFlow#0.channel#0' has 2 subscriber(s).
2020-08-20 11:10:57,692 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#1.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,693 INFO o.s.i.e.EventDrivenConsumer - Adding {message-handler} as a subscriber to the '1.subFlow#0.channel#1' channel
2020-08-20 11:10:57,694 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#0.channel#1' has 1 subscriber(s).
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#0.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#0.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - Adding {transformer} as a subscriber to the '1.channel#0' channel
2020-08-20 11:10:57,694 INFO o.s.i.c.DirectChannel - Channel 'application.1.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-20 11:10:57,700 INFO o.s.i.e.SourcePollingChannelAdapter - started bean '1.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
2020-08-20 11:10:58,017 INFO c.j.jsch - Connecting to *.*.*.114 port 22
2020-08-20 11:10:58,341 INFO c.j.jsch - Connection established
2020-08-20 11:10:58,672 INFO c.j.jsch - Remote version string: SSH-2.0-OpenSSH_7.4
2020-08-20 11:10:58,673 INFO c.j.jsch - Local version string: SSH-2.0-JSCH-0.1.54
2020-08-20 11:10:58,674 INFO c.j.jsch - CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2020-08-20 11:10:58,728 INFO c.j.jsch - CheckKexes: diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521
2020-08-20 11:10:58,815 INFO c.j.jsch - CheckSignatures: ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2020-08-20 11:10:58,823 INFO c.j.jsch - SSH_MSG_KEXINIT sent
2020-08-20 11:10:59,036 INFO c.j.jsch - SSH_MSG_KEXINIT received
2020-08-20 11:10:59,038 INFO c.j.jsch - kex: server: curve25519-sha256,curve25519-sha256@libssh.org,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group-exchange-sha1,diffie-hellman-group14-sha256,diffie-hellman-group14-sha1,diffie-hellman-group1-sha1
2020-08-20 11:10:59,038 INFO c.j.jsch - kex: server: ssh-rsa,rsa-sha2-512,rsa-sha2-256,ecdsa-sha2-nistp256,ssh-ed25519
2020-08-20 11:10:59,039 INFO c.j.jsch - kex: server: aes256-ctr,aes192-ctr,aes128-ctr
2020-08-20 11:10:59,040 INFO c.j.jsch - kex: server: aes256-ctr,aes192-ctr,aes128-ctr
2020-08-20 11:10:59,040 INFO c.j.jsch - kex: server: hmac-sha2-512,hmac-sha2-256
2020-08-20 11:10:59,041 INFO c.j.jsch - kex: server: hmac-sha2-512,hmac-sha2-256
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server: none,zlib@openssh.com
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server: none,zlib@openssh.com
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server: 
2020-08-20 11:10:59,043 INFO c.j.jsch - kex: server: 
2020-08-20 11:10:59,043 INFO c.j.jsch - kex: client: ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2020-08-20 11:10:59,044 INFO c.j.jsch - kex: client: ssh-rsa,ssh-dss,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2020-08-20 11:10:59,044 INFO c.j.jsch - kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: none
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: none
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: 
2020-08-20 11:10:59,046 INFO c.j.jsch - kex: client: 
2020-08-20 11:10:59,046 INFO c.j.jsch - kex: server->client aes128-ctr hmac-sha2-256 none
2020-08-20 11:10:59,047 INFO c.j.jsch - kex: client->server aes128-ctr hmac-sha2-256 none
2020-08-20 11:10:59,053 INFO c.j.jsch - SSH_MSG_KEX_ECDH_INIT sent
2020-08-20 11:10:59,053 INFO c.j.jsch - expecting SSH_MSG_KEX_ECDH_REPLY
2020-08-20 11:10:59,385 INFO c.j.jsch - ssh_rsa_verify: signature true
2020-08-20 11:10:59,389 INFO o.s.i.s.s.DefaultSftpSessionFactory - The authenticity of host '*.*.*.114' can't be established.
RSA key fingerprint is 1f:3e:c9:5f:37:00:a1:00:ef:50:59:af:42:98:99:e9.
Are you sure you want to continue connecting?
2020-08-20 11:10:59,390 WARN c.j.jsch - Permanently added '*.*.*.114' (RSA) to the list of known hosts.
2020-08-20 11:10:59,390 INFO c.j.jsch - SSH_MSG_NEWKEYS sent
2020-08-20 11:10:59,390 INFO c.j.jsch - SSH_MSG_NEWKEYS received
2020-08-20 11:10:59,395 INFO c.j.jsch - SSH_MSG_SERVICE_REQUEST sent
2020-08-20 11:10:59,718 INFO c.j.jsch - SSH_MSG_SERVICE_ACCEPT received
2020-08-20 11:11:00,047 INFO c.j.jsch - Authentications that can continue: gssapi-with-mic,publickey,keyboard-interactive,password
2020-08-20 11:11:00,047 INFO c.j.jsch - Next authentication method: gssapi-with-mic
2020-08-20 11:11:00,399 INFO c.j.jsch - Authentications that can continue: password
2020-08-20 11:11:00,399 INFO c.j.jsch - Next authentication method: password
2020-08-20 11:11:00,746 INFO c.j.jsch - Authentication succeeded (password).
Total Files: 1
2020-08-20 11:11:27,623 INFO c.t.i.s.s.LastModifiedLsEntryFileListFilter - [OB.xml] old size [null]  increased to [0]...
Total Files: 1
2020-08-20 11:11:29,636 INFO c.t.i.s.s.LastModifiedLsEntryFileListFilter - [OB.xml] old size [0]  increased to [38709]...
Total Files: 1
2020-08-20 11:11:34,256 INFO c.j.jsch - Connecting to *.*.*.115 port 22
2020-08-20 11:11:55,282 INFO c.t.i.c.f.a.LoggingAspect - log message Integration Name=CXML invoice DK Service Name=FileTransferService Source=External Source Interface=/home/umair/accruals/14minute/ Target=NA Target Interface=/home/umair/accruals/15minute/ Content ID=OB.xml Message ID=N/A Category=SUCCESS Timestamp=2020-08-20T11:11:55.281 Message=file OB.xml transferred

根据 Artem Bilan 的建议,我创建了一个演示应用程序并上传到 git repo 这里

as per Artem Bilan suggestion, i have created a demo app and uploaded to git repo here

服务器配置是 这里

定义集成流程 这里

自述文件文件

推荐答案

我发现了你的问题:

@Bean
public Advice deleteFileAdvice() {

    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnSuccessExpressionString(AppConstants.SUCCESS_EXPRESSION);
    advice.setTrapException(true);
    return advice;
}

查看其 JavaDocs:

See its JavaDocs:

/**
 * If true, any exception will be caught and null returned.
 * Default false.
 * @param trapException true to trap Exceptions.
 */
public void setTrapException(boolean trapException) {

因此,来自消息处理程序的任何异常,包括 SFTP 连接失败,都将被吞噬.因此,请求消息实际上会移动到您的 publishSubcribeChannel 配置中的下一个订阅者.

So, any exceptions from the message handler, including an SFTP conneciton failure is going to be swallowed. Therefore a request message is really moved to the next subscriber in your publishSubcribeChannel configuration.

这篇关于Spring DSL Integration Flow 在出站适配器出错后发布消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆