Spring批处理:组装作业而不是配置它(可扩展作业配置) [英] Spring batch : Assemble a job rather than configuring it (Extensible job configuration)

查看:25
本文介绍了Spring批处理:组装作业而不是配置它(可扩展作业配置)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景

我正在设计一个可以读取分隔文件并将其加载到List 的文件读取层.我决定使用 Spring Batch,因为它提供了许多可扩展性选项,我可以根据文件的大小将其用于不同的文件集.

要求

  1. 我想设计一个可用于读取任何分隔文件的通用作业 API.
  2. 应该有一个单独的 Job 结构用于解析每个带分隔符的文件.例如,如果系统需要读取 5 个文件,则会有 5 个作业(每个文件一个).这 5 个作业彼此不同的唯一方式是它们将使用不同的 FieldSetMapper、列名、目录路径和其他缩放参数,例如 commit-interval油门限制.
  3. 这个 API 的用户应该不需要配置一个 Spring当系统中引入新的文件类型时,可以自行完成批处理作业、步骤、分块、分区等.
  4. 用户需要做的就是提供作业使用的 FieldsetMapper 以及 commit-intervalthrottle-limit 和放置每种类型文件的目录.
  5. 每个文件将有一个预定义的目录.每个目录可以包含多个相同类型和格式的文件.MultiResourcePartioner 将用于查看目录内部.分区数 = 目录中的文件数.

我的要求是构建一个 Spring Batch 基础架构,它为我提供了一个独特的工作,一旦我拥有可以构成工作的零碎部分,我就可以启动.

我的解决方案:

我创建了一个抽象配置类,它将由具体配置类扩展(每个文件将有 1 个具体类要读取).

 @Configuration@EnableBatchProcessing公共抽象类 AbstractFileLoader{私有静态最终字符串 FILE_PATTERN = "*.dat";@自动连线JobBuilderFactory 工作;@自动连线ResourcePatternResolver resourcePatternResolver;公共最终作业 createJob(步骤 s1,JobExecutionListener 侦听器){返回jobs.get(this.getClass().getSimpleName()).incrementer(new RunIdIncrementer()).listener(listener).start(s1).build();}public abstract Job loaderJob(Step s1, JobExecutionListener listener);公共抽象 FieldSetMapper<T>getFieldSetMapper();公共抽象字符串 getFilesPath();公共抽象字符串[] getColumnNames();公共抽象 int getChunkSize();公共抽象 int getThrottleLimit();@豆角,扁豆@StepScope@Value("#{stepExecutionContext['fileName']}")公共 FlatFileItemReader<T>阅读器(字符串文件){FlatFileItemReaderreader = new FlatFileItemReader();String path = file.substring(file.indexOf(":") + 1, file.length());FileSystemResource 资源 = new FileSystemResource(path);reader.setResource(资源);DefaultLineMapperlineMapper = new DefaultLineMapper();lineMapper.setFieldSetMapper(getFieldSetMapper());DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(",");tokenizer.setNames(getColumnNames());lineMapper.setLineTokenizer(tokenizer);reader.setLineMapper(lineMapper);reader.setLinesToSkip(1);返回读者;}@豆角,扁豆公共 ItemProcessor<T, T>处理器(){//TODO 在此处添加转换返回空;}@豆角,扁豆@工作范围公共 ListItemWriter作家(){ListItemWriterwriter = new ListItemWriter();回归作家;}@豆角,扁豆@工作范围公共步骤 readStep(StepBuilderFactory stepBuilderFactory,ItemReader阅读器,ItemWriter<T>作家,ItemProcessor<T,T>处理器,TaskExecutor taskExecutor) {final Step readerStep = stepBuilderFactory.get(this.getClass().getSimpleName() + " ReadStep:slave")<T,T>块(getChunkSize()).读者(读者).processor(processor).writer(writer).taskExecutor(taskExecutor).throttleLimit(getThrottleLimit()).build();最后一步 partitionedStep = stepBuilderFactory.get(this.getClass().getSimpleName() + " ReadStep:master").partitioner(readerStep).partitioner(this.getClass().getSimpleName() + " ReadStep:slave",partitioner()).taskExecutor(taskExecutor).build();返回分区步骤;}/** @Bean public TaskExecutor taskExecutor() { return new* SimpleAsyncTaskExecutor();}*/@豆角,扁豆@工作范围公共分区程序分区程序(){MultiResourcePartitioner partitioner = new MultiResourcePartitioner();资源[] 资源;尝试 {resources = resourcePatternResolver.getResources("文件:"+ getFilesPath() + FILE_PATTERN);} catch (IOException e) {抛出新的运行时异常("解决输入文件模式时的 I/O 问题.", e);}partitioner.setResources(resources);返回分区器;}@豆角,扁豆@工作范围公共 JobExecutionListener 侦听器(ListItemWriter writer){返回新的 JobCompletionNotificationListener<T>(writer);}/** 如果您希望作者具有工作范围(JIRA BATCH-2269),请使用此选项.还* 将 writer 的返回类型更改为 ListItemWriter 以使其工作.*/@豆角,扁豆公共 TaskExecutor taskExecutor() {返回新的 SimpleAsyncTaskExecutor() {@覆盖protected void doExecute(final Runnable task) {//获取配置线程的jobExecution最终 JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();super.doExecute(new Runnable() {公共无效运行(){JobSynchronizationManager.register(jobExecution);尝试 {任务.运行();} 最后 {JobSynchronizationManager.close();}}});}};}}

假设为了讨论,我必须阅读发票数据.因此,我可以扩展上面的类来创建一个 InvoiceLoader :

@Configuration公共类 InvoiceLoader 扩展 AbstractFileLoader{私有类 InvoiceFieldSetMapper 实现了 FieldSetMapper<Invoice>{公共发票 mapFieldSet(FieldSet f) {发票发票 = new Invoice();invoice.setNo(f.readString("INVOICE_NO");返回 e;}}@覆盖public FieldSetMapper<Invoice>getFieldSetMapper() {返回新的 InvoiceFieldSetMapper();}@覆盖公共字符串 getFilesPath() {return "I:/CK/invoices/partitions/";}@覆盖公共字符串[] getColumnNames() {return new String[] { "INVOICE_NO", "DATE"};}@覆盖@Bean(name="invoiceJob")公共 Job loaderJob(步骤 s1,JobExecutionListener 监听器) {返回 createJob(s1, listener);}@覆盖公共 int getChunkSize() {返回 25254;}@覆盖公共 int getThrottleLimit() {返回8;}}

假设我还有一个名为 Inventory 的类,它扩展了 AbstractFileLoader.

在应用程序启动时,我可以如下加载这两个注解配置:

AbstractApplicationContext context1 = new AnnotationConfigApplicationContext(InvoiceLoader.class, InventoryLoader.class);

在我的应用程序的其他地方,两个不同的线程可以启动作业,如下所示:

主题 1:

 JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);Job job1 = context1.getBean("invoiceJob", Job.class);JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

主题 2:

 JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);Job job1 = context1.getBean("inventoryJob", Job.class);JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

这种方法的优点是每次有新文件要读取时,开发者/用户所要做的就是子类AbstractFileLoader并实现所需的抽象方法,而无需获取进入如何组装作业的细节.

问题:

  1. 我是 Spring 批处理的新手,所以我可能忽略了这种方法的一些不太明显的问题,例如 Spring 批处理中的共享内部对象可能会导致两个作业一起运行失败或明显的问题,例如范围豆子.
  2. 是否有更好的方法来实现我的目标?
  3. @Value("#{stepExecutionContext['fileName']}")fileName 属性总是被赋值为 I:/CK/invoices/partitions/ 这是InvoiceLoader中的getPath方法返回的值,即使InventoryLoader`中的getPath方法返回一个不同的值.

解决方案

一个选项是将它们作为作业参数传递.例如:

@Bean工作工作(){jobs.get("myJob").start(step1(null)).build()}@豆角,扁豆@工作范围步骤 step1(@Value('#{jobParameters["commitInterval"]}') commitInterval) {步骤.get('step1').chunk((int) commitInterval).reader(new IterableItemReader(iterable: [1, 2, 3, 4], name: 'foo')).writer(writer(null)).建造()}@豆角,扁豆@工作范围ItemWriter writer(@Value('#{jobParameters["writerClass"]}') writerClass) {applicationContext.classLoader.loadClass(writerClass).newInstance()}

使用 MyWriter:

class MyWriter 实现 ItemWriter;{@覆盖void write(List items) 抛出异常 {println "写 $items"}}

然后执行:

def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([commitInterval: 新的 JobParameter(3),writerClass: new JobParameter('MyWriter'), ]))

输出为:

<前>信息:执行步骤:[step1]写 [1, 2, 3]写 [4]2016 年 2 月 24 日下午 2:30:22 org.springframework.batch.core.launch.support.SimpleJobLauncher$1 运行信息:作业:[SimpleJob: [name=myJob]] 完成,参数如下:[{commitInterval=3, writerClass=MyWriter}] 和以下状态:[COMPLETED]状态为:已完成,作业执行 ID 0#1 step1 已完成

完整示例此处.

Background

I am working on designing a file reading layer that can read delimited files and load it in a List. I have decided to use Spring Batch because it provides a lot of scalability options which I can leverage for different sets of files depending on their size.

The requirement

  1. I want to design a generic Job API that can be used to read any delimited file.
  2. There should be a single Job structure that should be used for parsing every delimited file. For example, if the system needs to read 5 files, there will be 5 jobs (one for each file). The only way the 5 jobs will be different from each other is that they will use a different FieldSetMapper, column name, directory path and additional scaling parameters such as commit-interval and throttle-limit.
  3. The user of this API should not need to configure a Spring batch job, step, chunking, partitioning, etc on his own when a new file type is introduced in the system.
  4. All that the user needs to do is to provide the FieldsetMapperto be used by the job along with the commit-interval, throttle-limit and the directory where each type of file will be placed.
  5. There will be one predefined directory per file. Each directory can contain multiple files of the same type and format. A MultiResourcePartioner will be used to look inside a directory. The number of partitions = number of files in the directory.

My requirement is to build a Spring Batch infrastructure that gives me a unique job I can launch once I have the bits and pieces that will make up the job.

My solution :

I created an abstract configuration class that will be extended by concrete configuration classes (There will be 1 concrete class per file to be read).

    @Configuration
    @EnableBatchProcessing
    public abstract class AbstractFileLoader<T> {

    private static final String FILE_PATTERN = "*.dat";

    @Autowired
    JobBuilderFactory jobs;

    @Autowired
    ResourcePatternResolver resourcePatternResolver;

    public final Job createJob(Step s1, JobExecutionListener listener) {
        return jobs.get(this.getClass().getSimpleName())
                .incrementer(new RunIdIncrementer()).listener(listener)
                .start(s1).build();
    }

    public abstract Job loaderJob(Step s1, JobExecutionListener listener);

    public abstract FieldSetMapper<T> getFieldSetMapper();

    public abstract String getFilesPath();

    public abstract String[] getColumnNames();

    public abstract int getChunkSize();

    public abstract int getThrottleLimit();

    @Bean
    @StepScope
    @Value("#{stepExecutionContext['fileName']}")
    public FlatFileItemReader<T> reader(String file) {
        FlatFileItemReader<T> reader = new FlatFileItemReader<T>();
        String path = file.substring(file.indexOf(":") + 1, file.length());
        FileSystemResource resource = new FileSystemResource(path);
        reader.setResource(resource);
        DefaultLineMapper<T> lineMapper = new DefaultLineMapper<T>();
        lineMapper.setFieldSetMapper(getFieldSetMapper());
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(",");
        tokenizer.setNames(getColumnNames());
        lineMapper.setLineTokenizer(tokenizer);
        reader.setLineMapper(lineMapper);
        reader.setLinesToSkip(1);
        return reader;
    }

    @Bean
    public ItemProcessor<T, T> processor() {
        // TODO add transformations here
        return null;
    }

    @Bean
    @JobScope
    public ListItemWriter<T> writer() {
        ListItemWriter<T> writer = new ListItemWriter<T>();
        return writer;
    }

    @Bean
    @JobScope
    public Step readStep(StepBuilderFactory stepBuilderFactory,
            ItemReader<T> reader, ItemWriter<T> writer,
            ItemProcessor<T, T> processor, TaskExecutor taskExecutor) {

        final Step readerStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:slave")
                .<T, T> chunk(getChunkSize()).reader(reader)
                .processor(processor).writer(writer).taskExecutor(taskExecutor)
                .throttleLimit(getThrottleLimit()).build();

        final Step partitionedStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:master")
                .partitioner(readerStep)
                .partitioner(
                        this.getClass().getSimpleName() + " ReadStep:slave",
                        partitioner()).taskExecutor(taskExecutor).build();

        return partitionedStep;

    }

    /*
     * @Bean public TaskExecutor taskExecutor() { return new
     * SimpleAsyncTaskExecutor(); }
     */

    @Bean
    @JobScope
    public Partitioner partitioner() {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        Resource[] resources;
        try {
            resources = resourcePatternResolver.getResources("file:"
                    + getFilesPath() + FILE_PATTERN);
        } catch (IOException e) {
            throw new RuntimeException(
                    "I/O problems when resolving the input file pattern.", e);
        }
        partitioner.setResources(resources);
        return partitioner;
    }

    @Bean
    @JobScope
    public JobExecutionListener listener(ListItemWriter<T> writer) {
        return new JobCompletionNotificationListener<T>(writer);
    }

    /*
     * Use this if you want the writer to have job scope (JIRA BATCH-2269). Also
     * change the return type of writer to ListItemWriter for this to work.
     */
    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor() {
            @Override
            protected void doExecute(final Runnable task) {
                // gets the jobExecution of the configuration thread
                final JobExecution jobExecution = JobSynchronizationManager
                        .getContext().getJobExecution();
                super.doExecute(new Runnable() {
                    public void run() {
                        JobSynchronizationManager.register(jobExecution);

                        try {
                            task.run();
                        } finally {
                            JobSynchronizationManager.close();
                        }
                    }
                });
            }
        };
    }

}

Let's say I have to read Invoice data for the sake of discussion. I can therefore extend the above class for creating an InvoiceLoader :

@Configuration
public class InvoiceLoader extends AbstractFileLoader<Invoice>{

    private class InvoiceFieldSetMapper implements FieldSetMapper<Invoice> {

        public Invoice mapFieldSet(FieldSet f) {
            Invoice invoice = new Invoice();
            invoice.setNo(f.readString("INVOICE_NO");
            return e;
        }
    }

    @Override
    public FieldSetMapper<Invoice> getFieldSetMapper() {
        return new InvoiceFieldSetMapper();
    }

    @Override
    public String getFilesPath() {
        return "I:/CK/invoices/partitions/";
    }

    @Override
    public String[] getColumnNames() {
        return new String[] { "INVOICE_NO", "DATE"};
    }


    @Override
    @Bean(name="invoiceJob")
    public Job loaderJob(Step s1,
            JobExecutionListener listener) {
        return createJob(s1, listener);
    }

    @Override
    public int getChunkSize() {
        return 25254;
    }

    @Override
    public int getThrottleLimit() {
        return 8;
    }

}

Let's say I have one more class called Inventory that extends AbstractFileLoader.

On application startup, I can load these two annotation configurations as follows :

AbstractApplicationContext context1 = new   AnnotationConfigApplicationContext(InvoiceLoader.class, InventoryLoader.class);

Somewhere else in my application two different threads can launch the jobs as follows :

Thread 1 :

    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("invoiceJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

Thread 2 :

    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("inventoryJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

The advantage of this approach is that everytime there is a new file to be read, all that the developer/user has to do is subclass AbstractFileLoader and implement the required abstract methods without the need to get into the details of how to assemble the job.

The questions :

  1. I am new to Spring batch so I may have overlooked some of the not-so-obvious issues with this approach such as shared internal objects in Spring batch that may cause two jobs running together to fail or obvious issues such as scoping of the beans.
  2. Is there a better way to achieve my objective?
  3. The fileName attribute of the @Value("#{stepExecutionContext['fileName']}") is always being assigned the value as I:/CK/invoices/partitions/ which is the value returned by getPathmethod in InvoiceLoader even though the getPathmethod inInventoryLoader`returns a different value.

解决方案

One option is passing them as job parameters. For instance:

@Bean
Job job() {
    jobs.get("myJob").start(step1(null)).build()
}

@Bean
@JobScope
Step step1(@Value('#{jobParameters["commitInterval"]}') commitInterval) {
    steps.get('step1')
            .chunk((int) commitInterval)
            .reader(new IterableItemReader(iterable: [1, 2, 3, 4], name: 'foo'))
            .writer(writer(null))
            .build()
}

@Bean
@JobScope
ItemWriter writer(@Value('#{jobParameters["writerClass"]}') writerClass) {
    applicationContext.classLoader.loadClass(writerClass).newInstance()
}

With MyWriter:

class MyWriter implements ItemWriter<Integer> {

    @Override
    void write(List<? extends Integer> items) throws Exception {
        println "Write $items"
    }
}

Then executed with:

def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([
        commitInterval: new JobParameter(3),
        writerClass: new JobParameter('MyWriter'), ]))

Output is:

INFO: Executing step: [step1]
Write [1, 2, 3]
Write [4]
Feb 24, 2016 2:30:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{commitInterval=3, writerClass=MyWriter}] and the following status: [COMPLETED]
Status is: COMPLETED, job execution id 0
  #1 step1 COMPLETED

Full example here.

这篇关于Spring批处理:组装作业而不是配置它(可扩展作业配置)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆