Spring批处理 - 循环读取器/处理器/写入器步骤 [英] Spring Batch - Looping a reader/processor/writer step

查看:404
本文介绍了Spring批处理 - 循环读取器/处理器/写入器步骤的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

答案



根据接受的答案代码,对该代码的以下调整适用于我:

 //助手方法创建一个分离流出步骤列表
private static Flow createParallelFlow(List< Step> steps){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor );
taskExecutor.setConcurrencyLimit(steps.size());

Flow [] flows = new Flow [steps.size()]; (int i = 0; i< steps.size(); i ++){
flows [i] = new FlowBuilder< SimpleFlow>(steps.get(i).getName())。开始(steps.get(i))的构建();


返回新的FlowBuilder< SimpleFlow>(parallelStepsFlow)
.split(taskExecutor)
.add(flows)
.build() ;
}



编辑



我已经将问题更新为正确循环的版本,但随着应用程序的扩展,能够并行处理是非常重要的,而且我仍然不知道如何在运行时动态地使用javaconfig执行该操作...



优化问题:如何在运行时动态地创建读写器 - 处理器编写器,以说5种不同的情况(5个查询意味着5个循环)

My LoopDecider看起来像这样:

 公共类LoopDecider实现JobExecutionDecider {

private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
private static final String COMPLETED =COMPLETED;
private static final String CONTINUE =CONTINUE;
private static final String ALL =queries;
private static final String COUNT =count;

private int currentQuery;
私有int限制;

@SuppressWarnings(unchecked)
@Override
公共FlowExecutionStatus决定(JobExecution jobExecution,StepExecution stepExecution){
List< String> allQueries =(List< String>)jobExecution.getExecutionContext()。get(ALL);
this.limit = allQueries.size();
jobExecution.getExecutionContext()。put(COUNT,currentQuery);
if(++ currentQuery> = limit){
返回新的FlowExecutionStatus(COMPLETED);
} else {
LOG.info(Looping for query:+ allQueries.get(currentQuery - 1));
返回新的FlowExecutionStatus(CONTINUE);



$ b

基于查询列表(HQL查询)我想为每个查询使用一个读者 - 处理器 - 编写器。我目前的配置如下所示:

工作

  @Bean 
public Job subsetJob()抛出异常{
LoopDecider loopDecider = new LoopDecider();
FlowBuilder< Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
Flow flow = flowBuilder
.start(createHQL())
.next(extractData())
.next(loopDecider)
.on(CONTINUE)
.to(extractData())
.from(loopDecider)
.on(COMPLETED)
.end()
.build();
$ b $ return jobBuilderFactory.get(subsetJob)
.start(flow)
.end()
.build();
}

步骤 $ b

  public Step extractData(){
return stepBuilderFactory.get(extractData)
.chunk(100_000)
.reader (reader())
.processor(processor())
.writer(writer())
.build();
}

读者 $ b

  public HibernateCursorItemReader reader(){
CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
reader.setSessionFactory(HibernateUtil.getSessionFactory());
reader.setUseStatelessSession(false);
回报阅读器;
}

处理器

  public DynamicRecordProcessor processor(){
return new DynamicRecordProcessor();
}

作家 $ b

  public FlatFileItemWriter writer(){
CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();
writer.setLineAggregator(new DelimitedLineAggregator(){{
setDelimiter(TARGET_DELIMITER);
setFieldExtractor(new PassThroughFieldExtractor());
}}
);
回报作家;
}

目前这个过程对单个查询来说工作正常。然而,我实际上有一个查询列表。



我最初的想法是循环执行步骤并将查询列表传递给每个查询列表并读取 - process - write 。然而,当我将查询列表作为参数添加到extractData步骤中时,并且对于每个查询,我创建一个步骤,一个列表的步骤被返回,而不是预期的单个步骤。作业开始抱怨它只需要一步而不是一个步骤列表。

另一个想法是创建一个与MultiItemResourceReader具有相同想法的自定义MultiHibernateCursorItemReader,但是我是真正寻找更多开箱即用的解决方案。

  @Bean 
public List< Step> extractData(@Value(#{jobExecutionContext [HQL]})List< String> queries){
List< Step> steps = new ArrayList< Step>();
for(String query:queries){
steps.add(stepBuilderFactory.get(extractData)
.chunk(100_000)
.reader(reader(query))
.processor(processor())
.writer(writer(query))
.build());
}
返回步骤;
}

问题

如何我是否循环这个步骤并将它集成到工作中?解析方案

不要将您的Steps,Readers,Processers和Writer实例化为春豆。没有必要这样做。只有您的工作实例必须是Spring Bean。



因此,只需从步骤,读取器,编写器和处理器创建器方法中删除@Bean和@StepScope配置并实例化它们需要的地方。

只有一个catch,你必须手动调用afterPropertiesSet()。例如:

  // @Bean  - >删除
// @StepScope - >删除
public FlatFileItemWriter writer(@Value(#{jobExecutionContext [fileName]})String fileName){
FlatFileItemWriter writer = new FlatFileItemWriter();
writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));
writer.setLineAggregator(new DelimitedLineAggregator(){{
setDelimiter(TARGET_DELIMITER);
setFieldExtractor(new PassThroughFieldExtractor());
}}
);

// ------- ADD !!
writer.afterPropertiesSet();

返回作者;
}

这样,您的步骤,读取器,写入器实例将成为步骤范围自动,因为你明确地为每一步实例化他们。



如果我的答案不够清楚,请告诉我。然后我会添加一个更详细的例子。



编辑



一个简单的例子:

  @Configuration 
public class MyJobConfiguration {
$ b $ @Autowired
Private JobBuilderFactory jobBuilderFactory;

@Autowired
私人StepBuilderFactory stepBuilderFactory;


列表< String> filenames = Arrays.asList(file1.txt,file2.txt);

@Bean
public Job myJob(){

List< Step> steps = filenames.stream()。map(name - > createStep(filename));

return jobBuilderFactory.get(subsetJob)
.start(createParallelFlow(steps));
.end()
.build();
}


//辅助方法创建步骤
private步骤createStep(字符串文件名){
{
return stepBuilderFactory.get (convertStepFor+ filename); // !!! Stepname必须是唯一的
.chunk(100_000)
.reader(createFileReader(new FileSystemResource(new File(filename)),new YourInputLineMapper()));
.processor(new YourConversionProcessor());
.writer(createFileWriter(new FileSystemResource(new File(converted _+ filename)),new YourOutputLineAggregator()));
.build();
}


//辅助方法创建一个分离流出步骤列表
private static Flow createParallelFlow(List< Step> steps){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());

列表< Flow> (步骤 - > //
新的FlowBuilder< Flow>(flow_+ step.getName())= steps.stream()//我们必须将步骤转换为流程
.map )//
.start(step)//
.build())//
.collect(Collectors.toList());

返回新的FlowBuilder< SimpleFlow>(parallelStepsFlow).split(taskExecutor)//
.add(flows.toArray(new Flow [flows.size()]))//
.build();
}


//辅助方法创建文件读取器和文件写入器
public static< T> ItemReader< T> createFileReader(资源源,LineMapper< T> lineMapper)抛出E​​xception {
FlatFileItemReader< T> reader = new FlatFileItemReader<>();

reader.setEncoding(UTF-8);
reader.setResource(source);
reader.setLineMapper(lineMapper);
reader.afterPropertiesSet();

回报阅读器;
}

public static< T> ItemWriter< T> createFileWriter(资源目标,LineAggregator< T>聚合器)抛出异常{
FlatFileItemWriter< T> writer = new FlatFileItemWriter<>();

writer.setEncoding(UTF-8);
writer.setResource(target);
writer.setLineAggregator(聚合器);

writer.afterPropertiesSet();
回报作家;
}
}


ANSWER

Based on the accepted answer code the following adjustment to that code worked for me:

// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(steps.size());         

    Flow[] flows = new Flow[steps.size()];
    for (int i = 0; i < steps.size(); i++) {
        flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
    }           

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
        .split(taskExecutor)                
        .add(flows)
        .build();
}

EDIT

I have updated the question to a version that correctly loops, but as the application will scale, being able to proces parallel is important, and I still don't know how to do that with a javaconfig dynamically at runtime...

Refined question: How do I create a reader-processor-writer dynamically at runtime for say 5 different situations (5 queries means a loop of 5 as it is configured now)?

My LoopDecider looks like this:

public class LoopDecider implements JobExecutionDecider {

    private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
    private static final String COMPLETED = "COMPLETED";
    private static final String CONTINUE = "CONTINUE";
    private static final String ALL = "queries";
    private static final String COUNT = "count";

    private int currentQuery;
    private int limit;

    @SuppressWarnings("unchecked")
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
        this.limit = allQueries.size();
        jobExecution.getExecutionContext().put(COUNT, currentQuery);
        if (++currentQuery >= limit) {
            return new FlowExecutionStatus(COMPLETED);
        } else {
            LOG.info("Looping for query: " + allQueries.get(currentQuery - 1));
            return new FlowExecutionStatus(CONTINUE);
        }       
    }

}

Based on a list of queries (HQL queries) I want a reader - processor - writer for each query. My current configuration looks like this:

Job

@Bean
public Job subsetJob() throws Exception {
    LoopDecider loopDecider = new LoopDecider();        
    FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
    Flow flow = flowBuilder
            .start(createHQL())
            .next(extractData())
            .next(loopDecider)
            .on("CONTINUE")
            .to(extractData())
            .from(loopDecider)
            .on("COMPLETED")                
            .end()
            .build();       

    return jobBuilderFactory.get("subsetJob")               
            .start(flow)                
            .end()
            .build();
}

Step

public Step extractData(){
    return stepBuilderFactory.get("extractData")
            .chunk(100_000)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
}

Reader

public HibernateCursorItemReader reader(){      
    CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
    reader.setSessionFactory(HibernateUtil.getSessionFactory());        
    reader.setUseStatelessSession(false);
    return reader;
}

Processor

public DynamicRecordProcessor processor(){
    return new DynamicRecordProcessor();
}

Writer

public FlatFileItemWriter writer(){
    CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();               
    writer.setLineAggregator(new DelimitedLineAggregator(){{
        setDelimiter(TARGET_DELIMITER);
        setFieldExtractor(new PassThroughFieldExtractor());
        }}
    );
    return writer;
}

Currently the process works fine for a single query. However, I actually have a list of queries.

My initial idea is to loop the step and pass the step the list of queries and for each query read - process - write. This would also be ideal for parallel chunking.

However, when I add the list of queries as parameter to the extractData step and for each query I create a step, a list of steps is returned, instead of the expected single step. The job starts complaining it expects a single step instead of a list of steps.

Another idea was to create a custom MultiHibernateCursorItemReader with the same idea as the MultiItemResourceReader, however I am really looking for a more out-of-the-box solution.

@Bean
public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
    List<Step> steps = new ArrayList<Step>();
    for (String query : queries) {
        steps.add(stepBuilderFactory.get("extractData")
            .chunk(100_000)
            .reader(reader(query))
            .processor(processor())
            .writer(writer(query))
            .build());
    }
    return steps;
}

Question
How do I loop the step and integrate that in the job?

解决方案

Don't instantiate your Steps, Readers, Processers and Writers as Spring-Beans. There is no need to do it. Only your job instance has to be a Spring Bean.

So just remove the @Bean and @StepScope configuration from your step, reader, writer and processor creater methods and instantiate them where needed.

There is only one catch, you have to call afterPropertiesSet() manually. E.g.:

// @Bean -> delete
// @StepScope -> delete
public FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){
    FlatFileItemWriter writer = new FlatFileItemWriter();
    writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));       
    writer.setLineAggregator(new DelimitedLineAggregator(){{
        setDelimiter(TARGET_DELIMITER);
        setFieldExtractor(new PassThroughFieldExtractor());
        }}
    );

    // ------- ADD!!
    writer.afterPropertiesSet();

    return writer;
}

This way, your step, reader, writer instances will be "step scoped" automatically, since you instantiate them for every step explicitely.

Let me know, if my answer is not clear enough. I will then add a more detailed example.

EDIT

A simple example:

@Configuration
public class MyJobConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    List<String> filenames = Arrays.asList("file1.txt", "file2.txt");

    @Bean
    public Job myJob() {

       List<Step> steps = filenames.stream().map(name -> createStep(filename));

       return jobBuilderFactory.get("subsetJob")               
            .start(createParallelFlow(steps));                
            .end()
            .build();
    }


    // helper method to create a step
    private Step createStep(String filename) {
    {
        return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique
            .chunk(100_000)
            .reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper()));
            .processor(new YourConversionProcessor());
            .writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator()));
            .build();
    }


    // helper method to create a split flow out of a List of steps
    private static Flow createParallelFlow(List<Step> steps) {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(steps.size());

        List<Flow> flows = steps.stream() // we have to convert the steps to a flows
            .map(step -> //
                    new FlowBuilder<Flow>("flow_" + step.getName()) //
                    .start(step) //
                    .build()) //
            .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) //
            .add(flows.toArray(new Flow[flows.size()])) //
            .build();
    }


    // helper methods to create filereader and filewriters
    public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception {
        FlatFileItemReader<T> reader = new FlatFileItemReader<>();

        reader.setEncoding("UTF-8");
        reader.setResource(source);
        reader.setLineMapper(lineMapper);
        reader.afterPropertiesSet();

        return reader;
    }

    public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception {
        FlatFileItemWriter<T> writer = new FlatFileItemWriter<>();

        writer.setEncoding("UTF-8");
        writer.setResource(target);
        writer.setLineAggregator(aggregator);

        writer.afterPropertiesSet();
        return writer;
    }
}

这篇关于Spring批处理 - 循环读取器/处理器/写入器步骤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆