从多个来源读取的 Spring 批处理作业 [英] Spring batch Job read from multiple sources
问题描述
如何从多个数据库中读取项目?我已经知道从文件中可以做到这一点.
以下示例适用于从多个文件中读取
How can I read items from multiples databases? I already know that is possible from files.
the following example works for read from multiples files
...
<job id="readMultiFileJob" xmlns="http://www.springframework.org/schema/batch">
<step id="step1">
<tasklet>
<chunk reader="multiResourceReader" writer="flatFileItemWriter"
commit-interval="1" />
</tasklet>
</step>
</job>
...
<bean id="multiResourceReader"
class=" org.springframework.batch.item.file.MultiResourceItemReader">
<property name="resources" value="file:csv/inputs/domain-*.csv" />
<property name="delegate" ref="flatFileItemReader" />
</bean>
...
<小时>
像这样的三个豆子.
three beans like this.
<bean id="database2" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="name" value="database2Reader" />
<property name="dataSource" ref="dataSource2" />
<property name="sql" value="select image from object where image like '%/images/%'" />
<property name="rowMapper">
<bean class="sym.batch.ImagesRowMapper2" />
</property>
</bean>
推荐答案
没有现成的组件可以执行您的要求;唯一的解决方案是编写一个自定义的 ItemReader<>
委托给 JdbcCursorItemReader
(或委托给 HibernateCursorItemReader
或任何通用的 ItemReader
实现).
您需要准备所有必要的内容(数据源、会话、真实数据库阅读器)并将所有委托阅读器绑定到您的自定义阅读器.
There isn't a ready-to-use component that perform what you ask; the only solution is to write a custom ItemReader<>
that delegates to JdbcCursorItemReader
(or to HibernateCursorItemReader
or to any generic ItemReader
implementation).
You need to prepare all necessary stuff (datasource, session, real database readers) and bind all delegated readers to your custom reader.
您需要使用 ItemReader.read()
的回避来模拟循环,并在作业重新启动时保持阅读器和委托状态.
You need to simulate a loop using recusion of ItemReader.read()
and mantain reader and delegates state across job restarts.
class MyItemReader<T> implements ItemReader<T>, ItemStream {
private ItemReader[] delegates;
private int delegateIndex;
private ItemReader<T> currentDelegate;
private ExecutionContext stepExecutionContext;
public void setDelegates(ItemReader[] delegates) {
this.delegates = delegates;
}
@BeforeStep
private void beforeStep(StepExecution stepExecution) {
this.stepExecutionContext = stepExecution.getExecutionContext();
}
public T read() {
T item = null;
if(null != currentDelegate) {
item = currentDelegate.read();
if(null == item) {
((ItemStream)this.currentDelegate).close();
this.currentDelegate = null;
}
}
// Move to next delegate if previous was exhausted!
if(null == item && this.delegateIndex< this.delegates.length) {
this.currentDelegate = this.delegates[this.currentIndex++];
((ItemStream)this.currentDelegate).open(this.stepExecutionContext);
update(this.stepExecutionContext);
// Recurse to read() to simulate loop through delegates
item = read();
}
return item;
}
public void open(ExecutionContext ctx) {
// During open restore last active reader and restore its state
if(ctx.containsKey("index")) {
this.delegateIndex = ctx.getInt("index");
this.currentDelegate = this.delegates[this.delegateIndex];
((ItemStream)this.currentDelegate ).open(ctx);
}
}
public void update(ExecutionContext ctx) {
// Update current delegate index and state
ctx.putInt("index", this.delegateIndex);
if(null != this.currentDelegate) {
((ItemStream)this.currentDelegate).update(ctx);
}
}
public void close(ExecutionContext ctx) {
if(null != this.currentDelegate) {
((ItemStream)this.currentDelegate).close();
}
}
<小时>
<bean id="myItemReader" class=path.to.MyItemReader>
<property name="delegates">
<array>
<ref bean="itemReader1"/>
<ref bean="itemReader2"/>
<ref bean="itemReader3"/>
</array>
</property>
</bean>
<小时>
记得设置属性name;这是让 MyItemReader.read() 正常工作所必需的
Remember to set property name; this is NECESSARY to let MyItemReader.read() works correctly
<bean id="itemReader1" class="JdbcCursorItemReader">
<property name="name" value="itemReader1" />
<!-- Set other properties -->
</bean>
这篇关于从多个来源读取的 Spring 批处理作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!