带有 SimpleAsyncTaskExecutor 的 Spring Batch 未保存到数据库 [英] Spring Batch with SimpleAsyncTaskExecutor is not saving to DB

查看:51
本文介绍了带有 SimpleAsyncTaskExecutor 的 Spring Batch 未保存到数据库的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的 Spring Boot 应用程序,它带有一个端点,它通过在 JobLauncher bean 中配置的 SimpleAsyncTaskExecutor 异步调用 Spring Batch 作业.

I have a simple Spring Boot application with an enpoint that invokes a Spring Batch Job asynchronously through the SimpleAsyncTaskExecutor configured in the JobLauncher bean.

Spring Batch Job 异步启动并且工作正常,但没有任何内容保存到数据库中.

The Spring Batch Job starts asynchronously and works fine, but nothing is saved to the database.

如果我删除 SimpleAsyncTaskExecutor,数据将被保存.

If I remove the SimpleAsyncTaskExecutor the data is saved.

这是我的 BatchConfigurer.在这里,我使用 SimpleAsyncTaskExecutor 配置 JobLauncher.如果我删除行 simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());//(1) 保存数据.

This is my BatchConfigurer. Heere I configure the JobLauncher with a SimpleAsyncTaskExecutor . If I remove the line simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // (1) the data is saved.

@Component
public class CustomBatchConfiguration implements BatchConfigurer {

private static final Log LOGGER = LogFactory.getLog(CustomBatchConfiguration.class);

@Autowired
private BatchProperties properties;

@Autowired
private DataSource dataSource;

@Autowired
private EntityManagerFactory entityManagerFactory;

private PlatformTransactionManager transactionManager;

private JobRepository jobRepository;

private JobLauncher jobLauncher;

private JobExplorer jobExplorer;

/**
 * Registers {@link JobRepository} bean.
 */
@Override
public JobRepository getJobRepository() {
    return this.jobRepository;
}

/**
 * Registers {@link PlatformTransactionManager} bean.
 */
@Override
public PlatformTransactionManager getTransactionManager() {
    return this.transactionManager;
}

/**
 * Registers {@link JobLauncher} bean.
 */
@Override
public JobLauncher getJobLauncher() {
    return this.jobLauncher;
}

/**
 * Registers {@link JobExplorer} bean. This bean is actually created in
 * {@link BatchConfig}.
 */
@Override
public JobExplorer getJobExplorer() throws Exception {
    return this.jobExplorer;
}

/**
 * Initializes Spring Batch components.
 */
@PostConstruct
public void initialize() {
    try {
        this.transactionManager = createTransactionManager();
        this.jobRepository = createJobRepository();
        this.jobLauncher = createJobLauncher();
        this.jobExplorer = createJobExplorer();
    } catch (Exception ex) {
        throw new IllegalStateException("Unable to initialize Spring Batch", ex);
    }
}

private JobExplorer createJobExplorer() throws Exception {
    JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
    jobExplorerFactoryBean.setDataSource(this.dataSource);
    String tablePrefix = this.properties.getTablePrefix();

    if (StringUtils.hasText(tablePrefix)) {
        jobExplorerFactoryBean.setTablePrefix(tablePrefix);
    }

    jobExplorerFactoryBean.afterPropertiesSet();
    return jobExplorerFactoryBean.getObject();
}

private JobLauncher createJobLauncher() throws Exception {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();

    simpleJobLauncher.setJobRepository(getJobRepository());
    simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // (1)
    simpleJobLauncher.afterPropertiesSet();

    return simpleJobLauncher;
}

private JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();

    jobRepositoryFactoryBean.setDatabaseType("db2");
    jobRepositoryFactoryBean.setDataSource(this.dataSource);

    if (this.entityManagerFactory != null) {
        LOGGER.warn("JPA does not support custom isolation levels, so locks may not be taken when launching Jobs");
        jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_DEFAULT");
    }

    String tablePrefix = this.properties.getTablePrefix();
    if (StringUtils.hasText(tablePrefix)) {
        jobRepositoryFactoryBean.setTablePrefix(tablePrefix);
    }

    jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
    jobRepositoryFactoryBean.afterPropertiesSet();

    return jobRepositoryFactoryBean.getObject();
}

private PlatformTransactionManager createTransactionManager() {
    return new DataSourceTransactionManager(dataSource);
}

}

这是我的 BatchConfiguration.

And this is my BatchConfiguration.

@Autowired(required = true)
private MyItemReader myItemReader;

@Autowired(required = true)
private MyItemProcessor myItemProcessor;

@Autowired(required = true)
private MyItemWriter myItemWriter;


@Bean
public Step myStep(TaskExecutor taskExecutor) { 

    return stepBuilderFactory.get("myStepName")
            .<SomeWrapper, SomeWrapper>chunk(
                    1)
            .reader(myItemReader)
            .processor(myItemProcessor).writer(myItemWriter)

            .build();
}

@Bean(name = "myJob")
public Job myJob(Step myStep) {
    return jobBuilderFactory.get("myJobName").incrementer(new RunIdIncrementer())
            .flow(myStep).end().build();
}

我遗漏了什么?

提前致谢

推荐答案

我做了以下事情来解决我的问题:

I did the following to solve my problem:

  1. CustomBatchConfiguration 中,我更改了以下内容

  1. In CustomBatchConfiguration I changed the following

private PlatformTransactionManager createTransactionManager() {        
    return new DataSourceTransactionManager(dataSource);
}

private PlatformTransactionManager createTransactionManager() {
    if (this.entityManagerFactory != null) {
        return new JpaTransactionManager(this.entityManagerFactory);
    }

    return new DataSourceTransactionManager(this.dataSource);
}

我使用的是 spring JpaTransactionManager.

I used the the spring JpaTransactionManager.

  1. 我将 JobLauncher TaskExecutor 从

  1. I change JobLauncher TaskExecutor from

simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());

ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
threadPoolExecutor.setMaxPoolSize(springBatchJobThreadPollMaxSize);
threadPoolExecutor.afterPropertiesSet();

TaskExecutor taskExecutor = new DelegatingSecurityContextAsyncTaskExecutor(threadPoolExecutor);

创建更强大的线程池并在它们之间共享 spring 安全上下文.

Creating a more robust pool of threads and sharing the spring security context among them.

  1. 我使用了 Tasklet 而不是面向 Spring Batch 块的解决方案.此处详细信息.

这篇关于带有 SimpleAsyncTaskExecutor 的 Spring Batch 未保存到数据库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆