Spring Batch 从同一执行和步骤重新启动未完成的作业 [英] Spring Batch restart uncompleted jobs from the same execution and step

查看:18
本文介绍了Spring Batch 从同一执行和步骤重新启动未完成的作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用以下逻辑重新启动未完成的Spring Batch(例如应用程序异常终止后)作业:

I use the following logic to restart the Spring Batch uncompleted(for example after application abnormal termination) jobs:

public void restartUncompletedJobs() {

    LOGGER.info("Restarting uncompleted jobs");

    try {
        jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

        List<String> jobs = jobExplorer.getJobNames();
        for (String job : jobs) {
            Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);

            for (JobExecution runningJob : runningJobs) {
                runningJob.setStatus(BatchStatus.FAILED);
                runningJob.setEndTime(new Date());
                jobRepository.update(runningJob);
                jobOperator.restart(runningJob.getId());
                LOGGER.info("Job restarted: " + runningJob);
            }
        }
    } catch (Exception e) {
        LOGGER.error(e.getMessage(), e);
    }
}

这工作正常,但有一个副作用——它不会重新启动失败的作业执行,而是创建一个新的执行实例.如何更改此逻辑以从失败的步骤重新开始失败的执行并且不创建新的执行?

This works fine but with one side effect - it doesn't restart the failed job execution but creates a new execution instance. How to change this logic in order to restart the failed execution from the failed step and do not create a new execution ?

更新

当我尝试以下代码时:

public void restartUncompletedJobs() {
try {
    jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

    List<String> jobs = jobExplorer.getJobNames();
    for (String job : jobs) {

    Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job);

    for (JobExecution jobExecution : jobExecutions) {
        jobOperator.restart(jobExecution.getId());
    }
    }
} catch (Exception e) {
    LOGGER.error(e.getMessage(), e);
}
}

它失败并出现以下异常:

it fails with the following exception:

2018-07-30 06:50:47.090 ERROR 1588 --- [           main] c.v.p.d.service.batch.BatchServiceImpl   : Illegal state (only happens on a race condition): job execution already running with name=documetPipelineJob and parameters={ID=826407fa-d3bc-481a-8acb-b9643b849035, inputDir=/home/public/images, STORAGE_TYPE=LOCAL}

org.springframework.batch.core.UnexpectedJobExecutionException: Illegal state (only happens on a race condition): job execution already running with name=documetPipelineJob and parameters={ID=826407fa-d3bc-481a-8acb-b9643b849035, inputDir=/home/public/images, STORAGE_TYPE=LOCAL}
    at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobOperator$$FastClassBySpringCGLIB$$44ee6049.invoke(<generated>) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) [spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:684) [spring-aop-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobOperator$$EnhancerBySpringCGLIB$$7659d4c.restart(<generated>) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE]
    at com.example.pipeline.domain.service.batch.BatchServiceImpl.restartUncompletedJobs(BatchServiceImpl.java:143) ~[domain-0.0.1.jar!/:0.0.1]

以下代码在作业存储数据库中创建新的执行:

public void restartUncompletedJobs() {
try {
    jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

    List<String> jobs = jobExplorer.getJobNames();
    for (String job : jobs) {

    Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job);

    for (JobExecution jobExecution : jobExecutions) {

        jobExecution.setStatus(BatchStatus.STOPPED);
        jobExecution.setEndTime(new Date());
        jobRepository.update(jobExecution);

        Long jobExecutionId = jobExecution.getId();
        jobOperator.restart(jobExecutionId);
    }
    }
} catch (Exception e) {
    LOGGER.error(e.getMessage(), e);
}

}

问题是 - 如何在应用程序重启后继续运行旧的未完成的执行而不创建新的执行?

The question is - how to continue to run the old uncompleted executions without creating new ones after application restart?

推荐答案

TL;DR:Spring Batch 将始终创建新的 Job Execution,并且不会重用之前失败的 Job 执行以继续其执行.

TL;DR: Spring Batch will always create new Job Execution and will not reuse a previous failed job execution to continue its execution.

更长的答案:首先你需要了解 Spring Batch 中三个相似但不同的概念:Job、Job Instance、Job Execution

Longer answer: First you need to understand three similar but different concept in Spring Batch: Job, Job Instance, Job Execution

我总是用这个例子:

  • 作业:日终批处理
  • 作业实例:2018-01-01 的日终批处理
  • 作业执行:2018 年 1 月 1 日的日终批处理,执行 #1

概括地说,这就是 Spring Batch 恢复的工作原理:

In high-level, that's how Spring Batch's recovery works:

假设您在第 3 步中第一次执行失败.您可以提交具有相同参数的相同作业(日终批次)(2018-01-01).Spring Batch 将尝试查找提交的 作业实例(End-of-Day Batch for 2018-01-01),发现之前在第 3 步失败了.Spring Batch 然后会创建一个 NEW 执行,[End-Of-Day Batch for 2018-01-01, execution#2],然后从第 3 步开始执行.

Assuming your first execution failed in the step 3. You can submit the same Job (End-of-Day Batch) with same Parameters (2018-01-01). Spring Batch will try to look up last Job Execution (End-Of-Day Batch for 2018-01-01, execution #1) of the submitted Job Instance (End-of-Day Batch for 2018-01-01), and found that it has previously failed in step 3. Spring Batch will then create a NEW execution, [End-Of-Day Batch for 2018-01-01, execution #2], and start the execution from step 3.

因此,按照设计,Spring 试图恢复的是先前失败的作业实例(而不是作业执行).当您重新运行之前失败的执行时,Spring Batch 不会重用执行.

So by design, what Spring trying to recover is a previously failed Job Instance (instead of Job Execution). Spring batch will not reuse execution when you are re-running a previous-failed execution.

这篇关于Spring Batch 从同一执行和步骤重新启动未完成的作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆