如何处理SpringBatch中ItemReader之后的逻辑相关行? [英] How to process logically related rows after ItemReader in SpringBatch?

查看:7
本文介绍了如何处理SpringBatch中ItemReader之后的逻辑相关行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

方案

为简单起见,假设我有一个返回25行的ItemReader。

  1. 前10行属于学生A

  2. 接下来的5个属于学生B

  3. 和剩下的10个属于学生C

我想逻辑地将它们聚合在一起,比如说按StudentId,然后将它们展平,以结束每个学生一行。

问题

如果我理解正确,将提交间隔设置为5将执行以下操作:

  1. 向处理器发送5行(它将聚合这些行或执行我告诉它的任何业务逻辑)。
  2. 处理后将写入5行。
  3. 然后它将在接下来的5行中再次执行此操作,依此类推。
如果这是真的,那么在接下来的五个月里,我将不得不检查已经编写的文件,将它们聚合到我当前正在处理的文件中,然后重新编写它们。

我个人不会那样做。

  1. 在Spring Batch中处理此类情况的最佳实践是什么?

备用

有时我觉得编写一个常规的Spring JDBC主程序要容易得多,这样我就可以完全控制我想要做的事情了。但是,我希望利用作业的作业存储库状态监视、重新启动、跳过、作业和步骤侦听器的功能.

我的Spring批量编码

我的module-context.xml

   <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <description>Example job to get you started. It provides a skeleton for a typical batch application.</description>

    <batch:job id="job1">
        <batch:step id="step1"  >           
            <batch:tasklet transaction-manager="transactionManager" start-limit="100" >             
                 <batch:chunk reader="attendanceItemReader"
                              processor="attendanceProcessor" 
                              writer="attendanceItemWriter" 
                              commit-interval="10" 
                 />

            </batch:tasklet>
        </batch:step>
    </batch:job> 

    <bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> 
        <property name="dataSource">
            <ref bean="sourceDataSource"/>
        </property> 
        <property name="sql"                                                    
                  value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator = 'Active' and fas.LOCAL_GRADING_PERIOD = 'G1' and s.student_current_grade_level = 'Gr 9' order by s.student_id"/>
        <property name="preparedStatementSetter" ref="attendanceStatementSetter"/>           
        <property name="rowMapper" ref="attendanceRowMapper"/> 
    </bean> 

    <bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/>

    <bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/>

    <bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" />  

    <bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> 
        <property name="resource" value="file:target/outputs/passthrough.txt"/> 
        <property name="lineAggregator"> 
            <bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> 
        </property> 
    </bean> 

</beans>

我的读者支持类。

一个PreparedStatementSetter

package edu.kdc.visioncards.preparedstatements;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementSetter;

public class AttendanceStatementSetter implements PreparedStatementSetter {

    public void setValues(PreparedStatement ps) throws SQLException {

        ps.setInt(1, 7);

    }

}

和RowMapper

package edu.kdc.visioncards.rowmapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> {

    public static final String STUDENT_NAME = "STUDENT_NAME";
    public static final String STUDENT_ID = "STUDENT_ID";
    public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS";
    public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE";

    public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException {

        AttendanceDTO dto = new AttendanceDTO();
        dto.setStudentId(rs.getString(STUDENT_ID));
        dto.setStudentName(rs.getString(STUDENT_NAME));
        dto.setAttDays(rs.getInt(ATTENDANCE_DAYS));
        dto.setAttValue(rs.getInt(ATTENDANCE_VALUE));

        return dto;
    }
}

我的处理器

package edu.kdc.visioncards;

import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.ItemProcessor;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> {

    private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>();

    public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception {

        if(map.containsKey(new Integer(dto.getStudentId()))){

            AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId()));
            attDto.setAttDays(attDto.getAttDays() + dto.getAttDays());
            attDto.setAttValue(attDto.getAttValue() + dto.getAttValue());

        }else{
            map.put(new Integer(dto.getStudentId()), dto);
        }
        return map;
    }

}

我对上面代码的关注

在处理器中,我创建了一个HashMap,在处理行时,我检查Map中是否已经有该学员,如果没有,我会添加它。如果它已经存在,我将获取它,获取我感兴趣的值,并将它们与我当前正在处理的行相加。

之后,Spring Batch Framework根据我的配置写入文件

我的问题如下:

  1. 我不想让它给作者。我想处理所有剩余的行。如何在内存中保留为需要通过同一处理器的下一组行创建的映射呢?每次通过AttendanceProcessor处理一行时,都会初始化Map。我是否应该将地图初始化放入静电挡路?

推荐答案

在我的应用程序中,我创建了一个CollectingJdbcCursorItemReader,它扩展了标准JdbcCursorItemReader并执行您需要的功能。在内部,它使用MyCollectingRowMapper:将多个相关行映射到一个对象的标准RowMapper的扩展。

这里是ItemReader的代码,CollectingRowMapper接口的代码,以及它的抽象实现,可以在我的another answer中找到。

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.jdbc.core.RowMapper;

/**
 * A JdbcCursorItemReader that uses a {@link CollectingRowMapper}.
 * Like the superclass this reader is not thread-safe.
 * 
 * @author Pino Navato
 **/
public class CollectingJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {

    private CollectingRowMapper<T> rowMapper;
    private boolean firstRead = true;


    /**
     * Accepts a {@link CollectingRowMapper} only.
     **/
    @Override
    public void setRowMapper(RowMapper<T> rowMapper) {
        this.rowMapper = (CollectingRowMapper<T>)rowMapper;
        super.setRowMapper(rowMapper);
     }


    /**
     * Read next row and map it to item.
     **/
    @Override
    protected T doRead() throws Exception {
        if (rs == null) {
            throw new ReaderNotOpenException("Reader must be open before it can be read.");
        }

        try {
            if (firstRead) {
                if (!rs.next()) {  //Subsequent calls to next() will be executed by rowMapper
                    return null;
                }
                firstRead = false;
            } else if (!rowMapper.hasNext()) {
                return null;
            }
            T item = readCursor(rs, getCurrentItemCount());
            return item;
        }
        catch (SQLException se) {
            throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
        }
    }

    @Override
    protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
        T result = super.readCursor(rs, currentRow);
        setCurrentItemCount(rs.getRow());
        return result;
    }

}

您可以像经典JdbcCursorItemReader一样使用它:唯一的要求是您向它提供CollectingRowMapper而不是经典的RowMapper

这篇关于如何处理SpringBatch中ItemReader之后的逻辑相关行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆