Spring Batch 集成 Java DSL 和 RunIdIncrementer 不增加 [英] Spring Batch integration Java DSL and RunIdIncrementer not incrementing

查看:20
本文介绍了Spring Batch 集成 Java DSL 和 RunIdIncrementer 不增加的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 spring boot/integration/batch 可以在 SFTP 上运行和轮询文件.

I have a spring boot/integration/batch which will run and poll files on SFTP.

我希望最终能够使用 RunIdIncrementer<使用相同的参数(基本上相同的文件)重新启动作业(可能是因为应用程序已重新启动,或者由于某些原因我们再次收到相同的文件)/code> 在作业的配置中定义.

I'd like to be able to eventually relaunch a job (could be because application has been restarted or because for some reasons we received the same file again) with the same parameters (same file basically) using RunIdIncrementer defined on Job's Configuration.

不幸的是 run.id=1 没有增加,我得到一个 JobInstanceAlreadyCompleteException

Unfortunately run.id=1 doesn't increment and I get a JobInstanceAlreadyCompleteException

作业配置

@Autowired
private JobBuilderFactory jobBuilders;

@Bean
public Job importOffersJob() {

    Job job = jobBuilders.get("importOffersJob")
            .start(importOffersStep)
            .listener(traceJobExecutionListener())  
            .incrementer(new RunIdIncrementer())
            .build();

    return job;
}

整合

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\\.xml.mini$")
                .deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles())
                .preserveTimestamp(Boolean.TRUE)
                .autoCreateLocalDirectory(Boolean.TRUE)
                .remoteDirectory(intCfg.getSftpRemoteDirectory())
                .localDirectory(new File(intCfg.getSftpLocalDirectory())
            ), 
                e -> e.id("sftpInboundAdapter")
                .poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1)))
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw())
            .handle(logger())
            .get();
}

<小时>

public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> {

    private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class);

    @Autowired
    @Qualifier("importOffersJob")
    private Job job;

    @Override
    public JobLaunchRequest transform(Message<File> source) {
        log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload()
                .getAbsolutePath());

        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath());
        //jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());

        JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters());

        return new JobLaunchRequest(job, jobParams);
        }
     }

如果我取消注释 jobParametersBuilder.addString("UUID", UUID.randomUUID().toString()); 它正在工作,但我认为关键是能够重用定义在我的工作配置不是吗?

If I uncomment jobParametersBuilder.addString("UUID", UUID.randomUUID().toString()); It's is working but I think the point is to be able to reuse the incrementer defined in my job configuration isn't it ?

(当我在没有集成的情况下运行与简单弹簧启动相同的批次时,增量器正在工作)

(When I was running the same batch as a simple spring boot without integration the incrementer was working)

非常感谢

推荐答案

RunIdIncrementer.getNext():

public JobParameters getNext(JobParameters parameters) {

    JobParameters params = (parameters == null) ? new JobParameters() : parameters;

    long id = params.getLong(key, 0L) + 1;
    return new JobParametersBuilder(params).addLong(key, id).toJobParameters();
}

您每次都在创建一个新的作业参数,因此它将始终返回 1,因为 run.id 不存在.

You are creating a new job parameters each time so it will always return 1 because run.id doesn't exist.

如果您将 jobParametersBuilder 移动到转换器中的一个字段,与 job 一起,它将起作用,但仅适用于您的应用程序的此实例化.下次启动应用程序时,它将从 1 重新开始.

If you move the jobParametersBuilder to a field in the transformer, alongside job, it will work, but only for this instantiation of your app. It will start again at 1 the next time you start your app.

为了在重启后继续存在,您需要在某处保存 run.id 值,或者您需要从存储库中获取最后一个作业参数.

To survive restarts, you need to save off the run.id value somewhere, or you need to get the last job parameters from the repository.

这篇关于Spring Batch 集成 Java DSL 和 RunIdIncrementer 不增加的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆