db的Spring批处理分区无法正常工作 [英] Spring batch patitioning of db not working properly

查看:43
本文介绍了db的Spring批处理分区无法正常工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经配置了以下作业,该作业可以从db读取并写入文件,但要根据顺序对数据进行分区.

I have configured a job as follow, which is to read from db and write into files but by partitioning data on basis of sequence.

//作业配置

@Bean
    public Job job(JobBuilderFactory jobBuilderFactory) throws Exception {
        Flow masterFlow1 = (Flow) new FlowBuilder<Object>("masterFlow1").start(masterStep()).build();
        return (jobBuilderFactory.get("Partition-Job")
                .incrementer(new RunIdIncrementer())
                .start(masterFlow1)
                .build()).build();
    }


    @Bean
    public Step masterStep() throws Exception
    {
        return stepBuilderFactory.get(MASTERPPREPAREDATA)
                //.listener(customSEL)
                .partitioner(STEPPREPAREDATA,new  DBPartitioner())
                .step(prepareDataForS1())
                .gridSize(gridSize)
                .taskExecutor(new SimpleAsyncTaskExecutor("Thread"))
                .build();
    }

    @Bean
    public Step prepareDataForS1() throws Exception
    {
        return stepBuilderFactory.get(STEPPREPAREDATA)
                //.listener(customSEL)
                .<InputData,InputData>chunk(chunkSize)
                .reader(JDBCItemReader(0,0))
                .writer(writer(null))
                .build();
    }

@Bean(destroyMethod="")
    @StepScope
    public JdbcCursorItemReader<InputData> JDBCItemReader(@Value("#{stepExecutionContext[startingIndex]}") int startingIndex,
            @Value("#{stepExecutionContext[endingIndex]}") int endingIndex)
    {
        JdbcCursorItemReader<InputData> ir = new JdbcCursorItemReader<>();
        ir.setDataSource(batchDataSource);
        ir.setMaxItemCount(DBPartitioner.partitionSize);
        ir.setSaveState(false);
        ir.setRowMapper(new InputDataRowMapper());
        ir.setSql("SELECT * FROM FIF_INPUT fi WHERE fi.SEQ > ? AND fi.SEQ < ?");
        ir.setPreparedStatementSetter(new PreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps) throws SQLException {
                ps.setInt(1, startingIndex);
                ps.setInt(2, endingIndex);
            }
        });
        return ir;
    }

@Bean
    @StepScope
    public FlatFileItemWriter<InputData> writer(@Value("#{stepExecutionContext[index]}") String index)
    {
        System.out.println("writer initialized!!!!!!!!!!!!!"+index);
        //Create writer instance
        FlatFileItemWriter<InputData> writer = new FlatFileItemWriter<>();

        //Set output file location
        writer.setResource(new FileSystemResource(batchDirectory+relativeInputDirectory+index+inputFileForS1));

        //All job repetitions should "append" to same output file
        writer.setAppendAllowed(false);


        //Name field values sequence based on object properties
        writer.setLineAggregator(customLineAggregator);
        return writer;
    }

为对db进行分区而提供的Partitioner分别写在其他文件中,如下所示

Partitioner provided for partitioning db is written separately in other file so as follows

//PartitionDb.java

//PartitionDb.java

public class DBPartitioner implements Partitioner{



    public static int partitionSize;
    private static Log log = LogFactory.getLog(DBPartitioner.class);
    @SuppressWarnings("unchecked")
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {

        log.debug("START: Partition"+"grid size:"+gridSize);

        @SuppressWarnings("rawtypes")
        Map partitionMap = new HashMap<>();
        int startingIndex = -1;
        int endSize = partitionSize+1;


        for(int i=0; i< gridSize; i++){
            ExecutionContext ctxMap = new ExecutionContext();
            ctxMap.putInt("startingIndex",startingIndex);
            ctxMap.putInt("endingIndex", endSize);
            ctxMap.put("index", i);
            startingIndex = endSize-1;
            endSize += partitionSize; 
            partitionMap.put("Thread:-"+i, ctxMap);
        }
        log.debug("END: Created Partitions of size: "+ partitionMap.size());
        return partitionMap;
    }




}

这是正确执行的,但问题是即使在按顺序进行分区后,我在多个文件中也得到了相同的行,这不正确,因为我为每个分区提供了不同的数据集.谁能告诉我怎么了.我正在使用HikariCP进行Db连接池和Spring Batch 4

This one is executing properly but problem is even after partitioning on the basis of sequence i am getting same rows in multiple files which is not right as i am providing different set of data for each partition. Can anyone tell me whats wrong. I am using HikariCP for Db connection pooling and spring batch 4

推荐答案

这是正确执行的,但是问题甚至出现在按顺序进行分区之后,我在多个文件中得到相同的行,这不正确,因为我为每个分区提供了不同的数据集.

This one is executing properly but problem is even after partitioning on the basis of sequence i am getting same rows in multiple files which is not right as i am providing different set of data for each partition.

我不确定您的分区程序是否正常运行.快速测试表明,它并没有像您所声称的那样提供不同的数据集:

I'm not sure your partitioner is working properly. A quick test shows that it is not providing different sets of data as you are claiming:

DBPartitioner dbPartitioner = new DBPartitioner();
Map<String, ExecutionContext> partition = dbPartitioner.partition(5);
for (String s : partition.keySet()) {
    System.out.println(s + " : " + partition.get(s));
}

此打印:

Thread:-0 : {endingIndex=1, index=0, startingIndex=-1}
Thread:-1 : {endingIndex=1, index=1, startingIndex=0}
Thread:-2 : {endingIndex=1, index=2, startingIndex=0}
Thread:-3 : {endingIndex=1, index=3, startingIndex=0}
Thread:-4 : {endingIndex=1, index=4, startingIndex=0}

如您所见,几乎所有分区都将具有相同的 startingIndex endingIndex .

As you can see, almost all partitions will have the same startingIndex and endingIndex.

我建议您先对分区程序进行单元测试,然后再在分区步骤中使用它.

I recommend you unit test your partitioner before using it in a partitioned step.

这篇关于db的Spring批处理分区无法正常工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆