带有Liberty Profile的JSR 352-如何在ItemReader执行数据库查询时实现检查点 [英] JSR 352 with Liberty Profile - how to implement checkpointing when ItemReader does a DB query
问题描述
我的源表中有10条记录,并且项目计数为3.
I have 10 records in my source table and I am having item count as 3.
我有2个分区来处理这10条记录(即,前5条记录将在第一个分区中处理,其余记录将在第2个分区中处理,而在第二个分区中处理记录时,我抛出异常,因此作业将在第2个块中失败)第二分区.当我重新启动作业时,失败的分区将再次处理所有记录(即第一块和第二块). 重新启动作业应该只处理最后失败的块记录,而不处理该分区中的所有记录.您能指导我如何实现此目标吗?
I have 2 partitions to process these 10 records(i.e first 5 records will be processed in first partition and remaining records processed in 2nd partition while processing records in 2nd partition I am throwing an exception so job will be failed at 2nd chunk of 2nd partition.when I am restarting the job ,failed partition is processing all the records again(that is first chunk and 2nd chunk). Restarting the job should only process from last failed chunk records but not all the records in that partition.Can you please guide me how to achieve this?
我的JSL如下:
<?xml version="1.0" encoding="UTF-8"?>
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
id="readingfrom-db" restartable="true" version="1.0" >
<properties >
<property name="numRec" value="#{jobParameters['numRec']}?:5;"/>
<property name="chunkSize" value="#{jobParameters['chunkSize']}?:3;"/>
<property name="whereclauseFrom" value="#{jobParameters['whereclauseFrom']}?:5;"/>
<property name="whereclauseTo" value="#{jobParameters['whereclauseTo']}?:6;"/>
<property name="dsJNDI" value="#{jobParameters['dsJNDI']}?:jdbc/db2;"/>
<property name="dsJNDI1" value="#{jobParameters['dsJNDI1']}?:jdbc/db2;"/>
<property name="tableName" value="#{jobParameters['tableName']}?:CISDW.AIF1_CH;"/>
<property name="ProcesstableName" value="#{jobParameters['ProcesstableName']}?:CISDW.PROC_AIF1_CH;"/>
</properties>
<step id="runcache" next="readFromDB">
<batchlet ref="com.cdc.runcache.CacheRunnerBatchlet" />
</step>
<step id="readFromDB">
<listeners>
<listener ref="com.cdc.dbreader.LogExceptionListener"/>
</listeners>
<chunk item-count="3" checkpoint-policy="item">
<reader ref="com.cdc.dbreader.DBItemReader">
<properties >
<property name="dsJNDI" value="#{jobProperties['dsJNDI']}"/>
<property name="tableName" value="#{jobProperties['tableName']}"/>
<property name="whereclauseFrom" value="#{partitionPlan['modrec']}"/>
</properties>
</reader>
<processor ref="com.cdc.dbreader.DBItemProcessor" />
<writer ref="com.cdc.dbreader.DBItemWriter">
<properties >
<property name="dsJNDI" value="#{jobProperties['dsJNDI1']}"/>
<property name="tableName" value="#{jobProperties['ProcesstableName']}"/>
</properties>
</writer>
</chunk>
<partition>
<plan partitions="2" threads="2">
<properties partition="0">
<property name="modrec" value="#{jobProperties['whereclauseFrom']}"/>
</properties>
<properties partition="1">
<property name="modrec" value="#{jobProperties['whereclauseTo']}"/>
</properties>
</plan>
</partition>
</step>
</job>
我的物品阅读器如下:
public class DBItemReader implements ItemReader {
@Inject
@BatchProperty
private String dsJNDI;
@Inject
@BatchProperty
private String whereclauseFrom;
@Inject
@BatchProperty
private String tableName;
private Connection conn =null;
private int totalRecords=0;
private DataSource ds = null;
List<RecObj> listRecObj=new ArrayList<RecObj>();
@Override
public Object readItem() throws SQLException {
if (listRecObj.size() == 0) {
return null;
} else {
RecObj rec =null;
Iterator<RecObj> iter =listRecObj.iterator();
while (iter.hasNext()) {
rec = iter.next();
if (Integer.parseInt(rec.getRec()) == 7) {
throw new IllegalStateException("Thrown Error");
}
iter.remove();
return rec;
}
return rec;
}
@Override
public void open(Serializable arg0) throws NamingException, SQLException {
ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));
// System.out.println("whereclauseFrom: " + whereclauseFrom);
conn = ds.getConnection();
String sql ="";
if(Integer.parseInt(whereclauseFrom) == 5){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) <= "+ whereclauseFrom;
}else if(Integer.parseInt(whereclauseFrom) == 6){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) >= "+ whereclauseFrom;
}
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet rs=ps.executeQuery();
while(rs.next()){
totalRecords++;
String rec=rs.getString("REC");
if(rec != null)
listRecObj.add(new RecObj(rec));
}
rs.close();
}
@Override
public void close() throws SQLException {
conn.close();
}
@Override
public Serializable checkpointInfo() {
return null;
}
}
}
我的作家类如下:
public class DBItemWriter extends AbstractItemWriter implements ItemWriter {
@Inject
@BatchProperty
private String dsJNDI;
@Inject
@BatchProperty
private String tableName;
private DataSource ds = null;
@Override
public void open(Serializable arg0) throws NamingException {
ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));
}
@Override
public void writeItems(List<java.lang.Object> items) throws BatchUpdateException,SQLException{
Connection conn = ds.getConnection();
String sql = "INSERT INTO "+tableName+ "(MOD_REC) VALUES(?) ";
PreparedStatement ps = conn.prepareStatement(sql);
for (Object obj : items) {
RecObj v = (RecObj)obj;
System.out.println("=======Writer values===="+v.getRec());
ps.setString(1, v.getRec());
ps.addBatch();
}
ps.executeBatch();
ps.clearBatch();
ps.close();
conn.close();
}
}
下面是我的处理器:
public class DBItemProcessor implements ItemProcessor {
Integer count=0;
@Override
public Object processItem(Object arg0) {
count++;
RecObj v=(RecObj)arg0;
String vname=v.getRec();
System.out.println("=========Processer Values==="+vname);
return new RecObj(vname+count);
}
}
下面是我的Bean类
public class RecObj {
private String rec;
public RecObj(String rec) {
this.rec=rec;
}
推荐答案
您需要在阅读器的 checkpointInfo()
中返回检查点值,该值将传递到阅读器的 open()
方法.读取器和批处理容器通过这种方式协同工作,以在重新启动时提供检查点.
You need to return a checkpoint value in your reader's checkpointInfo()
which will be passed into your reader's open()
method on restart. This is how a reader and the batch container coordinate to provide checkpointing on restart.
因此,您可能会遇到类似的情况(查找 CHECKPOINT 注释):
So you could have something like (look for the CHECKPOINT comments):
public class DBItemReader implements ItemReader {
// ...
// CHECKPOINT field defined
private String checkpoint = null;
@Override
public void open(Serializable checkpoint) throws NamingException, SQLException {
// CHECKPOINT-based positioning through query value.
// Initial position = whereclauseFrom, on restart set to checkpoint
String queryVal = (String)(checkpoint == null ? whereclauseFrom : checkpoint);
if(Integer.parseInt(whereclauseFrom) == 5){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) <= "+ queryVal;
}else if(Integer.parseInt(whereclauseFrom) == 6){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) >= "+ queryVal;
}
// ..
}
@Override
public Object readItem() throws SQLException {
if (listRecObj.size() == 0) {
return null;
} else {
RecObj rec =null;
Iterator<RecObj> iter =listRecObj.iterator();
while (iter.hasNext()) {
rec = iter.next();
// CHECKPOINT updated
checkpoint = rec.getRec();
if (Integer.parseInt(rec.getRec()) == 7) {
throw new IllegalStateException("Thrown Error");
}
}
}
// ...
}
@Override
public Serializable checkpointInfo() {
// CHECKPOINT returned at end of chunk
return checkpoint;
}
}
这篇关于带有Liberty Profile的JSR 352-如何在ItemReader执行数据库查询时实现检查点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!