如何使用.tasklet()/.chunk()成功完成工作? [英] How should I use .tasklet() / .chunk() to finish job succesfully?

查看:102
本文介绍了如何使用.tasklet()/.chunk()成功完成工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Spring Batch将表从源数据库克隆到目标数据库.作业是使用带有传递参数的jobLauncher从服务层手动启动的.

I use Spring Batch for cloning table from source to target database. The job is started manually from service layer using jobLauncher with passing parameters.

一切都很好,但是在步骤描述中将当前配置(如下)与.chunk(10)一起使用时,我只有10行被克隆,并且Caused by: java.sql.SQLException: Result set already closed例外.

Everything is fine, but using current configuration (below) with .chunk(10) in step description I have only 10 rows cloned and Caused by: java.sql.SQLException: Result set already closed exception.

如何正确地描述步骤以完成读取->将整个表从源数据库写入目标数据库?

How to describe step properly just to finish read->write the whole table from source to target DB?

@Configuration
@EnableBatchProcessing
public class DatasetProcessingContext {

    private static final String OVERRIDEN_BY_JOB_PARAMETER = null;
    private static final String DATASET_PROCESSING_STEP = "datasetProcessingStep";
    private static final String DATASET_PROCESSING_JOB = "datasetProcessingJob";

    public static final String SUBSYSTEM = "subsystem";
    public static final String SQL = "sql";
    public static final String SOURCE_DATASOURCE = "sourceDatasource";
    public static final String INSERT_QUERY = "insertQuery";
    public static final String TARGET_DATASOURCE = "targetDatasource";

    @Autowired
    @Qualifier(DEV_DATA_SOURCE)
    private DataSource devDataSource;

    //set of datasources

    @Autowired
    private PlatformTransactionManager transactionManager;

    @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
    @Autowired
    private Map<String, TableMessageDataRowMapper> tableMessageDataRowMappers;

    @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
    @Autowired
    private Map<String, TableMessageDataPreparedStatementSetter> messageDataPreparedStatementSetters;

    @Autowired
    private JobBuilderFactory jobsFactory;

    @Autowired
    private StepBuilderFactory stepsFactory;

    @Bean
    public JobRepository jobRepository() throws Exception {
        return new MapJobRepositoryFactoryBean(transactionManager).getObject();
    }

    @Bean
    public JobRegistry jobRegistry() {
        return new MapJobRegistry();
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry());
        return postProcessor;
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository());
        return jobLauncher;
    }

    @Bean
    public static StepScope stepScope() {
        return new StepScope();
    }

    @Bean
    @SuppressWarnings("unchecked")
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public ItemStreamReader jdbcReader(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
                                       @Value("#{jobParameters['" + SQL + "']}") String sql,
                                       @Value("#{jobParameters['" + SOURCE_DATASOURCE + "']}") String sourceDatasource) {

        JdbcCursorItemReader jdbcCursorItemReader = new JdbcCursorItemReader();
        jdbcCursorItemReader.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(sourceDatasource)));
        jdbcCursorItemReader.setSql(sql);
        jdbcCursorItemReader.setRowMapper((RowMapper) tableMessageDataRowMappers
                .get(subsystem + TABLE_MESSAGE_DATA_ROW_MAPPER));

        return jdbcCursorItemReader;
    }

    @Bean
    @SuppressWarnings("unchecked")
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public ItemWriter jdbcWriter(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
                                 @Value("#{jobParameters['" + INSERT_QUERY + "']}") String insertQuery,
                                 @Value("#{jobParameters['" + TARGET_DATASOURCE + "']}") String targetDatasource) {

        JdbcBatchItemWriter jdbcWriter = new JdbcBatchItemWriter();
        jdbcWriter.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(targetDatasource)));
        jdbcWriter.setSql(insertQuery);
        jdbcWriter.setItemPreparedStatementSetter(messageDataPreparedStatementSetters
                .get(subsystem + TABLE_MESSAGE_DATA_PREPARED_STATEMENT_SETTER));

        return jdbcWriter;
    }

    @Bean
    @SuppressWarnings("unchecked")
    public Step datasetProcessingStep() {

        return stepsFactory.get(DATASET_PROCESSING_STEP)
                // should I create Tasklet or chunk with some CompletionPolicy?
                .chunk(10)
                .reader(jdbcReader(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
                .writer(jdbcWriter(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public Job datasetProcessingJob() {

        return jobsFactory.get(DATASET_PROCESSING_JOB).start(datasetProcessingStep()).build();
    }

推荐答案

在步骤说明中使用.chunk(new DefaultResultCompletionPolicy())适合我的情况.如果结果为空,则此策略从isComplete(RepeatContext context, RepeatStatus result)返回true-结果集结束.

Using .chunk(new DefaultResultCompletionPolicy()) in step description is suitable for my case. This policy returns true from isComplete(RepeatContext context, RepeatStatus result) in case of null-result - than ResultSet is over.

这篇关于如何使用.tasklet()/.chunk()成功完成工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆