SimpleAsyncTaskExecutor在处理完成后尝试读取记录 [英] SimpleAsyncTaskExecutor trying to read record after completion of processing

查看:252
本文介绍了SimpleAsyncTaskExecutor在处理完成后尝试读取记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的spring批处理项目中,我需要从一个表的行列表中读取,创建一个4块并进行处理,然后写入另一个表.我已经实现了SimpleAsyncTaskExecutor以允许并行处理块,但是我发现在处理记录集中的所有记录之后,Spring Batch试图继续读取下一个结果并失败.超过跳过级别后,显然会中止作业.

In my spring batch project, I need to read from a list of rows from a table, create a chunk of 4 and process and then write to another table. I have implemented SimpleAsyncTaskExecutor to allow for parallel processing of chunks, but I find that after all records in the recordset are processed, Spring Batch is trying to continue reading next lot of result and failing. After it exceeds the skip level, it obviously aborts the job.

我的查询是-为什么在处理完该集中的所有记录后,批次将继续寻找下一条记录?

My query is - why will be the batch continue to look for next record after completion of processing all records in the set?

我最后得到的错误是:

org.springframework.batch.core.step.item.FaultTolerantChunkProvider- 跳过输入失败 org.springframework.jdbc.UncategorizedSQLException:尝试处理 下一行失败;未分类的SQLException for SQL

org.springframework.batch.core.step.item.FaultTolerantChunkProvider - Skipping failed input org.springframework.jdbc.UncategorizedSQLException: Attempt to process next row failed; uncategorized SQLException for SQL

下面是我的批处理XML

Below is my batch xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
    xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">

    <beans:import resource="../launch-context.xml" />

    <beans:bean id="wsStudentItemReader"
        class="org.springframework.batch.item.database.JdbcCursorItemReader"    
        scope="step">
        <beans:property name="dataSource" ref="rptDS" />
        <beans:property name="sql"
            value="SELECT * FROM STUDENTS WHERE BATCH_ID=?" />
        <beans:property name="preparedStatementSetter">
            <beans:bean class="com.test.BatchDtSetter"
                autowire="byName">
                <beans:property name="batchId" value="#{jobParameters[batchId]}" /> 
            </beans:bean>
        </beans:property>
        <beans:property name="rowMapper" ref="wsRowMapper" />
    </beans:bean>


 <beans:bean id="outputWriter"
        class="org.springframework.batch.item.support.ClassifierCompositeItemWriter">
        <beans:property name="classifier" ref="writerClassifier" >      
        </beans:property>       
 </beans:bean>  

<beans:bean id="writerClassifier"
    class="com.test.WriterClassifier">
    <beans:property name="codeFailWriter" ref="failJdbcBatchItemWriter" />
    <beans:property name="codePassWriter" ref="passJdbcBatchItemWriter"></beans:property>
</beans:bean>

    <beans:bean id="failJdbcBatchItemWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <beans:property name="dataSource" ref="rptDS" />
        <beans:property name="sql"
            value="DELETE FROM STUDENTS WHERE BATCH_ID=?" />
        <beans:property name="itemPreparedStatementSetter" ref="FailStatusSetter" />
    </beans:bean>

<beans:bean id="FailStatusSetter" class="com.test.FailStatusSetter" />

    <beans:bean id="passJdbcBatchItemWriter"
        class="com.test.PassBatchItemWriter">
    </beans:bean>

    <beans:bean id="WSListnr"
        class="com.test.WSBatchListnr">
        <beans:property name="dataSource" ref="rptDS" />
    </beans:bean>


    <beans:bean id="wsRowMapper" class="com.test.WSReqMapper" />
    <beans:bean id="wsReqPrcsr" 
         class="com.test.WSReqProc">
         <beans:property name="dataSource" ref="rptDS" />
    </beans:bean>
    <beans:bean id="wsReqPrepStmtSetter" class="com.test.wsStudentSetter" />

    <step id="initiateStep">
        <tasklet ref="initiateStepTask" />
    </step>
    <step id="wsStudentGenStep">
        <tasklet task-executor="taskExecutor">
            <chunk reader="wsStudentItemReader" processor="wsReqPrcsr"
                writer="outputWriter" commit-interval="4" skip-limit="20">
                <skippable-exception-classes>
                    <include class="java.lang.Exception" />
                </skippable-exception-classes>
            </chunk>
            <listeners> 
                <listener ref="WSListnr" />
            </listeners>            
        </tasklet>
    </step>
    <job id="wsStudent">
        <step id="wsStudentFileGenIntialStep" parent="initiateStep"
            next="wsStudentFileGenStep" />
        <step id="wsStudentFileGenStep" parent="wsStudentGenStep" />
    </job>


    <beans:bean id="initiateStepTask" class="com.test.Initializer"
        scope="step">
    </beans:bean>

    <beans:bean id="taskExecutor"
       class="org.springframework.core.task.SimpleAsyncTaskExecutor">
       <beans:property name="concurrencyLimit" value="2"/>
   </beans:bean>

</beans:beans>

推荐答案

使用JdbcPagingItemReader解决了该问题.该阅读器将同步读取,并且还要求按特定列对数据进行排序.

The issue is resolved by using JdbcPagingItemReader. This reader will synchronize the reads and also requires data to be ordered by a specific column.

这篇关于SimpleAsyncTaskExecutor在处理完成后尝试读取记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆