使用 Spring 批处理文件项阅读器进行多线程处理 [英] Multi-threading with Spring batch File Item Reader

查看:71
本文介绍了使用 Spring 批处理文件项阅读器进行多线程处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Spring Batch 中,我试图读取一个 CSV 文件,并希望将每一行分配给一个单独的线程并进行处理.我试图通过使用 TaskExecutor 来实现它,但是所有线程都在一次选择同一行.我也尝试使用 Partioner 来实现这个概念,也发生了同样的事情.请参阅下面我的配置 Xml.

In a Spring Batch I am trying to read a CSV file and want to assign each row to a separate thread and process it. I have tried to achieve it by using TaskExecutor, but what is happening all the thread is picking the same row at a time. I also tried to implement the concept using Partioner, there also same thing happening. Please see below my Configuration Xml.

步骤说明

    <step id="Step2">
        <tasklet task-executor="taskExecutor">
            <chunk reader="reader" processor="processor" writer="writer" commit-interval="1" skip-limit="1">
            </chunk>
        </tasklet> 
    </step>

              <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:cvs/user.csv" />

<property name="lineMapper">
    <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
      <!-- split it -->
      <property name="lineTokenizer">
            <bean
          class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
            <property name="names" value="userid,customerId,ssoId,flag1,flag2" />
        </bean>
      </property>
      <property name="fieldSetMapper">   

          <!-- map to an object -->
          <bean
            class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
            <property name="prototypeBeanName" value="user" />
          </bean>           
      </property>

      </bean>
  </property>

       </bean>

      <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
 <property name="concurrencyLimit" value="4"/>   

我尝试过不同类型的任务执行器,但它们的行为方式都相同.如何将每一行分配给单独的线程?

I have tried with different types of task executor, but all of them are behaving in same way. How can I assign each row to a separate thread?

推荐答案

FlatFileItemReader 不是线程安全的.在您的示例中,您可以尝试将 CSV 文件拆分为较小的 CSV 文件,然后使用 MultiResourcePartitioner 来处理每一个.这可以分两步完成,一个用于拆分原始文件(如 10 个较小的文件),另一个用于处理拆分的文件.这样您就不会遇到任何问题,因为每个文件都将由一个线程处理.

FlatFileItemReader is not thread-safe. In your example you can try to split the CSV file to smaller CSV files and then use a MultiResourcePartitioner to process each one of them. This can be done in 2 steps, one for splitting the original file(like 10 smaller files) and the other for processing splitted files.This way you won't have any issues since each file will be processed by one thread.

示例:

<batch:job id="csvsplitandprocess">
     <batch:step id="step1" next="step2master">
    <batch:tasklet>
        <batch:chunk reader="largecsvreader" writer="csvwriter" commit-interval="500">
        </batch:chunk>
    </batch:tasklet>
    </batch:step>
    <batch:step id="step2master">
    <partition step="step2" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</batch:step>
</batch:job>

<batch:step id="step2">
    <batch:tasklet>
        <batch:chunk reader="smallcsvreader" writer="writer" commit-interval="100">
        </batch:chunk>
    </batch:tasklet>
</batch:step>


<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10" />
            <property name="maxPoolSize" value="10" />
    </bean>

<bean id="partitioner" 
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
<property name="resources" value="file:cvs/extracted/*.csv" />
</bean>

替代分区的替代方案可能是自定义线程安全读取器,它会为每一行创建一个线程,但分区可能是您的最佳选择

The alternative instead of partitioning might be a Custom Thread-safe Reader who will create a thread for each line, but probably partitioning is your best choice

这篇关于使用 Spring 批处理文件项阅读器进行多线程处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆