Spring批处理多线程 [英] Spring Batch Multiple Threads

查看:197
本文介绍了Spring批处理多线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个Spring Batch,并在需要时对其进行扩展. 我的ApplicationContext看起来像这样

I am writing a Spring Batch with idea of scaling it when required. My ApplicationContext looks like this

@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
@ComponentScan(basePackages = "in.springbatch")
@PropertySource(value = {"classpath:springbatch.properties"})

public class ApplicationConfig {

@Autowired
Environment environment;

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job job() throws Exception {

    return jobs.get("spring_batch")
            .flow(step()).end()
            .build();
}

@Bean(name = "dataSource", destroyMethod = "close")
public DataSource dataSource() {

    BasicDataSource basicDataSource = new BasicDataSource();



    return basicDataSource;
}

@Bean
public JobRepository jobRepository() throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setTransactionManager(transactionManager());
    jobRepositoryFactoryBean.setDataSource(dataSource());
    return jobRepositoryFactoryBean.getObject();
}

 @Bean(name = "batchstep")
 public Step step() throws Exception {

    return    stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
    transactionManager(transactionManager()).
          chunk(2).reader(batchReader()).processor(processor()).writer(writer()).build();

  }



@Bean
ItemReader batchReader() throws Exception {
    System.out.println(Thread.currentThread().getName()+"reader");
    HibernateCursorItemReader<Source> hibernateCursorItemReader = new HibernateCursorItemReader<>();
    hibernateCursorItemReader.setQueryString("from Source");
    hibernateCursorItemReader.setFetchSize(2);
    hibernateCursorItemReader.setSessionFactory(sessionFactory().getObject());


    hibernateCursorItemReader.close();
    return hibernateCursorItemReader;
}

@Bean
 public ItemProcessor processor() {
     return new BatchProcessor();
 }

@Bean
public ItemWriter writer() {
    return new BatchWriter();
}

public TaskExecutor taskExecutor(){

    SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
    asyncTaskExecutor.setConcurrencyLimit(5);
    return asyncTaskExecutor;


}
@Bean
public LocalSessionFactoryBean sessionFactory() {
    LocalSessionFactoryBean sessionFactory = new LocalSessionFactoryBean();
    sessionFactory.setDataSource(dataSource());
    sessionFactory.setPackagesToScan(new String[]{"in.springbatch.entity"});
    sessionFactory.setHibernateProperties(hibernateProperties());

    return sessionFactory;
}

@Bean
public PersistenceExceptionTranslationPostProcessor exceptionTranslation() {
    return new PersistenceExceptionTranslationPostProcessor();
}

@Bean
@Autowired
public HibernateTransactionManager transactionManager() {
    HibernateTransactionManager txManager = new HibernateTransactionManager();
    txManager.setSessionFactory(sessionFactory().getObject());

    return txManager;
}

Properties hibernateProperties() {
    return new Properties() {
        {
            setProperty("hibernate.hbm2ddl.auto",       environment.getProperty("hibernate.hbm2ddl.auto"));
            setProperty("hibernate.dialect", environment.getProperty("hibernate.dialect"));
            setProperty("hibernate.globally_quoted_identifiers", "false");

        }
    };
}

}

  1. 通过上述配置,我能够从数据库读取数据,处理数据并写入数据库.
  2. 我正在使用块大小为2并使用以下命令从游标读取2条记录 HibernateCusrsorItem阅读器和我从数据库读取的查询基于 日期以选择当前日期记录.
  3. 到目前为止,我能够实现所需的行为并重新启动 只能处理未处理的记录的能力 由于之前的运行失败.
  1. With above configuration I am able to read from DB , process the data and write to DB.
  2. I am using chunk size as 2 and reading 2 records from cursor using HibernateCusrsorItem reader and my query to read from DB is based on date to pick current date records.
  3. So far I am able to achieve desired behavior as well as restart ability with job only picking records which were not processed due to failure in previous run.

现在我的要求是使批处理使用多个线程来处理数据并写入数据库.

Now my requirement is to make batch use multiple threads to process data and write to DB.

我的处理器和编写器看起来像这样

My Processor and writer looks like this

@Component
public class BatchProcessor implements ItemProcessor<Source,DestinationDto>{

@Override
public DestinationDto process(Source source) throws Exception {

        System.out.println(Thread.currentThread().getName()+":"+source);
        DestinationDto destination=new DestinationDto();
        destination.setName(source.getName());
        destination.setValue(source.getValue());
        destination.setSourceId(source.getSourceId().toString());

    return destination;
}
@Component
public class BatchWriter implements ItemWriter<DestinationDto>{

@Autowired
IBatchDao batchDao;

@Override
public void write(List<? extends DestinationDto> list) throws Exception {
   System.out.println(Thread.currentThread().getName()+":"+list);
    batchDao.saveToDestination((List<DestinationDto>)list);
}

我更新了步骤并添加了ThreadPoolTask​​Executor,如下所示

I updated my step and added a ThreadPoolTaskExecutor as follows

@Bean(name = "batchstep")
public Step step() throws Exception {

    return  stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
     transactionManager(transactionManager()).chunk(1).reader(batchReader()).
     processor(processor()).writer(writer()).taskExecutor(taskExecutor()).build();

  }

此后,我的处理器被多个线程调用,但是具有相同的源数据. 我还有什么需要做的吗?

After this my processor is getting called by multiple threads but with same source data. Is there anything extra i need to do?

推荐答案

这是一个大问题

  1. 获得最佳答案的最佳选择是浏览Spring Batch文档中的缩放和并行处理"一章( spring批处理示例中可能会有一些多线程示例(

    There might be some multi-threading samples in the spring batch examples (Here)

    线程处理Spring批处理作业的一种简单方法是创建一个Future Processor-将所有处理逻辑都放在Future Object中,而spring-processor类仅将Object添加到Future中.编写器类,然后等待将来完成,然后再执行写过程.抱歉,我也没有一个样本可以为您指出-但是,如果您有特定的问题,我可以尝试回答!

    An easy way to thread the Spring batch job is to Create A Future Processor - you put all your Processing Logic in a Future Object and you spring-processor class only adds Objects to the future. You writer class then wait on the future to finish before performing the write process. Sorry I don't have a sample to point you too for this - but if you have specific questions I can try and answer!

    这篇关于Spring批处理多线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆