春批多线程文件读取 [英] spring batch multi thread file reading
本文介绍了春批多线程文件读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在Spring批处理中,我尝试读取CSV文件,并希望将每行分配给单独的线程并对其进行处理。我已经尝试过使用Task Executor来实现它,如果我没有使用作业参数获取文件名,它是有效的。如果我通过作业参数,因为scope="step"
所有线程都在从文件中读取同一行。如果我更改scope="job"
会不会解决?如果会,请给出解决的方法?目前,我收到如下错误:
原因:java.lang.IllegalStateException:没有为作用域名‘job’注册作用域
请帮忙.
查找下面的Job.xml
<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch" restartable="true">
<step id="step" allow-start-if-complete="true">
<partition step="step2" partitioner="partitioner">
<handler grid-size="3" task-executor="taskExecutor" />
</partition>
</step>
</job>
<bean id="partitioner" class="com.range.part.RangePartitioner">
</bean>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<step id="step2" xmlns="http://www.springframework.org/schema/batch">
<tasklet transaction-manager="transactionManager">
<chunk reader="itemReader" writer="cutomitemWriter" processor="itemProcessor" commit-interval="100" />
</tasklet>
</step>
<bean id="itemProcessor" class="com.range.processor.UserProcessor" scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}"/>
</bean>
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="job">
<property name="resource" value="file:#{jobParameters[file]}">
</property>
<!-- <property name="linesToSkip" value="1"/> -->
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value="," />
<!-- <property name="names" value="transactionBranch,batchEntryDate,batchNo,channelID,CountryCode" />-->
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.fieldset.FieldsetMapper">
</bean>
</property>
</bean>
</property>
</bean>
<bean id="cutomitemWriter" class="com.range.processor.customitemWritter">
</bean>
推荐答案
我在想一种可以在上面使用分区程序的方法。在分区程序级别,我们可以读取文件(通过使用任何CSV阅读器或Spring Reader也可以),然后处理每一行。
每行都将添加到分区程序的队列(映射)中,因此它符合您的要求。
我在这里发布了代码供您参考
公共类LinePartiator实现分区程序{
@Value("#{jobParameters['fileName']}")
private String fileName;
Map<String, ExecutionContext> queue = new HashMap<>();
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
BufferedReader reader = new BufferedReader(new FileReader(this.fileName));
List<String> lines = new ArrayList<>();
int count = 0;
while ((line = reader.readLine()) != null) {
ExecutionContext value = new ExecutionContext();
value.put("lineContent", line);
value.put("lineCount", count+1);
queue.put(++count, value);
}
return queue;
}
}
如上代码,您可以将Reader替换为任何CSV Reader或Spring Reader,将简化的映射字段替换为POJO对象。
如果您需要完整的程序,请告诉我,我会为您写信并上传。
谢谢, Nghia
--更新为使用1000项目阅读器构建分区程序的示例
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
try {
Map<String, ExecutionContext> queue = new HashMap<>();
List<List<String>> trunks = new ArrayList<>();
// read and store data to a list of trunk
int chunkSize = 1000;
int count = 1;
try (BufferedReader br = new BufferedReader(new FileReader("your file"))) {
String line;
List items = null;
while ((line = br.readLine()) != null) {
if (count % chunkSize == 0) {
items = new ArrayList();
trunks.add(items);
}
items.add(line);
}
}
// add to queue to start prorcessing
for (int i=0; i<trunks.size(); i++) {
ExecutionContext value = new ExecutionContext();
value.put("items", trunks.get(i));
queue.put("trunk"+i, value);
}
return queue;
}
catch (Exception e) {
// handle exception
}
}
这篇关于春批多线程文件读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文