使用 Spring Batch 集成为 AWS S3 中的每个新文件启动 JobLaunchRequest [英] Launch JobLaunchRequest for each new file in AWS S3 with Spring Batch Integration

查看:55
本文介绍了使用 Spring Batch 集成为 AWS S3 中的每个新文件启动 JobLaunchRequest的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在关注文档:Spring Batch Integration 结合 集成 AWS 用于汇集 AWS S3.

I'm following the docs: Spring Batch Integration combining with the Integration AWS for pooling the AWS S3.

但是在某些情况下,每个文件的批处理执行不起作用.

But the batch execution per each file is not working in some situations.

AWS S3 池工作正常,所以当我放置一个新文件或当我启动应用程序并且存储桶中有文件时,应用程序与本地目录同步:

The AWS S3 Pooling is working correctly, so when I put a new file or when I started the application and there's files in the bucket the application sync with the local directory:

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(pS3SessionFactory);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setDeleteRemoteFiles(false);
        synchronizer.setRemoteDirectory("remote-bucket");
        //synchronizer.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "simpleMetadataStore"));
        return synchronizer;
    }

    @Bean
    @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
            S3InboundFileSynchronizer pS3InboundFileSynchronizer) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new FileSystemResource("files").getFile());
        //messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "fsSimpleMetadataStore"));
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

我按照教程创建了FileMessageToJobRequest 我不会把代码放在这里,因为它和文档一样

所以我创建了 bean IntegrationFlow 和 FileMessageToJobRequest:

So I created the beans IntegrationFlow and FileMessageToJobRequest:

    @Bean
    public IntegrationFlow integrationFlow(
            S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource) {
        return IntegrationFlows.from(pS3InboundFileSynchronizingMessageSource, 
                         c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("input.file.name");
        fileMessageToJobRequest.setJob(delimitedFileJob);
        return fileMessageToJobRequest;
    }

所以在 JobLaunchingGateway 中,我认为是问题所在:

如果我是这样创建的:

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

        return jobLaunchingGateway;
    }

案例 1(应用程序启动时桶为空):

  • 我在 AWS S3 中上传了一个新文件;
  • 池化工作并且文件出现在本地目录中;
  • 但是转换/作业没有被触发;

案例 2(当应用程序启动时,Bucket 已经有一个文件):

  • 作业已启动:
2021-01-12 13:32:34.451  INFO 1955 --- [ask-scheduler-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=arquivoDelimitadoJob]] launched with the following parameters: [{input.file.name=files/FILE1.csv}]
2021-01-12 13:32:34.524  INFO 1955 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [delimitedFileJob]

  • 如果我在 S3 中添加第二个文件,则不会像情况 1 那样启动作业.
  • 案例 3(Bucket 有多个文件):

    • 文件在本地目录中正确同步
    • 但作业只对最后一个文件执行一次.

    所以遵循 docs 我将网关更改为:

    So following the docs I change my Gateway to:

        @Bean
        @ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedRate="1000"))
        public JobLaunchingGateway jobLaunchingGateway() {
            SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
            simpleJobLauncher.setJobRepository(jobRepository);
            simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    
            //JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
            JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
            //jobLaunchingGateway.setOutputChannel(replyChannel());
            jobLaunchingGateway.setOutputChannel(s3FilesChannel());
            return jobLaunchingGateway;
        }
    

    使用这个新的网关实现,如果我在 S3 中放置一个新文件,应用程序会做出反应但没有转换并给出错误:

    With this new gateway implementation, if I put a new file in S3 the application reacts but didn't transform giving the error:

    Caused by: java.lang.IllegalArgumentException: The payload must be of type JobLaunchRequest. Object of class [java.io.File] must be an instance of class org.springframework.batch.integration.launch.JobLaunchRequest
    

    如果存储桶中有两个文件(当应用程序启动时)FILE1.csv 和 FILE2.csv,则该作业会针对 FILE1.csv 正确运行,但会为 FILE2.csv 提供上述错误.

    And if there's two files in the bucket (when the apps starts) FILE1.csv and FILE2.csv, the job runs for the FILE1.csv correctly, but give the error above for the FILE2.csv.

    实现这样的事情的正确方法是什么?

    What's the correct way to implement something like this?

    明确地说,我想在这个存储桶中接收数千个 csv 文件,使用 Spring Batch 读取和处理,但我还需要尽快从 S3 获取每个新文件.

    Just to be clear I want to receive thousand of csv files in this bucket, read and process with Spring Batch, but I also need to get every new file asap from S3.

    提前致谢.

    推荐答案

    JobLaunchingGateway 实际上只希望我们将 JobLaunchRequest 作为有效负载.

    The JobLaunchingGateway indeed expects from us only JobLaunchRequest as a payload.

    既然你在 S3InboundFileSynchronizingMessageSource bean 定义中有那个 @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30")),它真的是错误的是 @ServiceActivator(inputChannel = IN_CHANNEL_NAME 对于那个 JobLaunchingGateway 而没有 FileMessageToJobRequest 转换器.

    Since you have that @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30")) on the S3InboundFileSynchronizingMessageSource bean definition, it is really wrong to have then @ServiceActivator(inputChannel = IN_CHANNEL_NAME for that JobLaunchingGateway without FileMessageToJobRequest transformer in between.

    你的 integrationFlow 对我来说看起来不错,但是你真的需要从 S3InboundFileSynchronizingMessageSource bean 中删除那个 @InboundChannelAdapter 并完全依赖于c.poller() 配置.

    Your integrationFlow looks OK for me, but then you really need to remove that @InboundChannelAdapter from the S3InboundFileSynchronizingMessageSource bean and fully rely on the c.poller() configuration.

    另一种方法是保留 @InboundChannelAdapter,然后从 IN_CHANNEL_NAME 开始 IntegrationFlow 而不是 MessageSource>.

    Another way is to leave that @InboundChannelAdapter, but then start the IntegrationFlow from the IN_CHANNEL_NAME not a MessageSource.

    由于您有多个轮询器针对同一个 S3 源,而且两者都基于相同的本地目录,因此看到这么多意外情况并不奇怪.

    Since you have several poller against the same S3 source, plus both of then are based on the same local directory, it is not a surprise to see so many unexpected situations.

    这篇关于使用 Spring Batch 集成为 AWS S3 中的每个新文件启动 JobLaunchRequest的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆