春季批处理:组装作业而不是对其进行配置(可扩展的作业配置) [英] Spring batch : Assemble a job rather than configuring it (Extensible job configuration)

查看:60
本文介绍了春季批处理:组装作业而不是对其进行配置(可扩展的作业配置)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景

我正在设计一个文件读取层,该层可以读取分隔的文件并将其加载到 List 中.我决定使用Spring Batch,因为它提供了很多可伸缩性选项,我可以根据它们的大小来使用不同的文件集.

I am working on designing a file reading layer that can read delimited files and load it in a List. I have decided to use Spring Batch because it provides a lot of scalability options which I can leverage for different sets of files depending on their size.

要求

  1. 我想设计一个通用的Job API,可用于读取任何定界文件.
  2. 应该有一个用于解析每个分隔文件的Job结构.例如,如果系统需要读取5个文件,则将有5个作业(每个文件一个).这5个作业互不相同的唯一方法是,它们将使用不同的 FieldSetMapper ,列名,目录路径以及其他扩展参数,例如 commit-interval throttle-limit .
  3. 该API的用户无需配置Spring在系统中引入新文件类型时,可以自行完成批处理作业,步骤,分块,分区等操作.
  4. 用户所需要做的就是提供 FieldsetMapper 以便作业与 commit-interval throttle-limit code>以及将放置每种文件类型的目录.
  5. 每个文件将有一个预定义目录.每个目录可以包含相同类型和格式的多个文件. MultiResourcePartioner 将用于查看目录内部.分区数=目录中的文件数.
  1. I want to design a generic Job API that can be used to read any delimited file.
  2. There should be a single Job structure that should be used for parsing every delimited file. For example, if the system needs to read 5 files, there will be 5 jobs (one for each file). The only way the 5 jobs will be different from each other is that they will use a different FieldSetMapper, column name, directory path and additional scaling parameters such as commit-interval and throttle-limit.
  3. The user of this API should not need to configure a Spring batch job, step, chunking, partitioning, etc on his own when a new file type is introduced in the system.
  4. All that the user needs to do is to provide the FieldsetMapperto be used by the job along with the commit-interval, throttle-limit and the directory where each type of file will be placed.
  5. There will be one predefined directory per file. Each directory can contain multiple files of the same type and format. A MultiResourcePartioner will be used to look inside a directory. The number of partitions = number of files in the directory.

我的要求是建立一个Spring Batch基础结构,该基础结构给我一个独特的工作,一旦我有零碎的工作就可以启动它.

My requirement is to build a Spring Batch infrastructure that gives me a unique job I can launch once I have the bits and pieces that will make up the job.

我的解决方案:

我创建了一个抽象配置类,将通过具体的配置类进行扩展(每个文件将读取1个具体的类).

I created an abstract configuration class that will be extended by concrete configuration classes (There will be 1 concrete class per file to be read).

    @Configuration
    @EnableBatchProcessing
    public abstract class AbstractFileLoader<T> {

    private static final String FILE_PATTERN = "*.dat";

    @Autowired
    JobBuilderFactory jobs;

    @Autowired
    ResourcePatternResolver resourcePatternResolver;

    public final Job createJob(Step s1, JobExecutionListener listener) {
        return jobs.get(this.getClass().getSimpleName())
                .incrementer(new RunIdIncrementer()).listener(listener)
                .start(s1).build();
    }

    public abstract Job loaderJob(Step s1, JobExecutionListener listener);

    public abstract FieldSetMapper<T> getFieldSetMapper();

    public abstract String getFilesPath();

    public abstract String[] getColumnNames();

    public abstract int getChunkSize();

    public abstract int getThrottleLimit();

    @Bean
    @StepScope
    @Value("#{stepExecutionContext['fileName']}")
    public FlatFileItemReader<T> reader(String file) {
        FlatFileItemReader<T> reader = new FlatFileItemReader<T>();
        String path = file.substring(file.indexOf(":") + 1, file.length());
        FileSystemResource resource = new FileSystemResource(path);
        reader.setResource(resource);
        DefaultLineMapper<T> lineMapper = new DefaultLineMapper<T>();
        lineMapper.setFieldSetMapper(getFieldSetMapper());
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(",");
        tokenizer.setNames(getColumnNames());
        lineMapper.setLineTokenizer(tokenizer);
        reader.setLineMapper(lineMapper);
        reader.setLinesToSkip(1);
        return reader;
    }

    @Bean
    public ItemProcessor<T, T> processor() {
        // TODO add transformations here
        return null;
    }

    @Bean
    @JobScope
    public ListItemWriter<T> writer() {
        ListItemWriter<T> writer = new ListItemWriter<T>();
        return writer;
    }

    @Bean
    @JobScope
    public Step readStep(StepBuilderFactory stepBuilderFactory,
            ItemReader<T> reader, ItemWriter<T> writer,
            ItemProcessor<T, T> processor, TaskExecutor taskExecutor) {

        final Step readerStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:slave")
                .<T, T> chunk(getChunkSize()).reader(reader)
                .processor(processor).writer(writer).taskExecutor(taskExecutor)
                .throttleLimit(getThrottleLimit()).build();

        final Step partitionedStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:master")
                .partitioner(readerStep)
                .partitioner(
                        this.getClass().getSimpleName() + " ReadStep:slave",
                        partitioner()).taskExecutor(taskExecutor).build();

        return partitionedStep;

    }

    /*
     * @Bean public TaskExecutor taskExecutor() { return new
     * SimpleAsyncTaskExecutor(); }
     */

    @Bean
    @JobScope
    public Partitioner partitioner() {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        Resource[] resources;
        try {
            resources = resourcePatternResolver.getResources("file:"
                    + getFilesPath() + FILE_PATTERN);
        } catch (IOException e) {
            throw new RuntimeException(
                    "I/O problems when resolving the input file pattern.", e);
        }
        partitioner.setResources(resources);
        return partitioner;
    }

    @Bean
    @JobScope
    public JobExecutionListener listener(ListItemWriter<T> writer) {
        return new JobCompletionNotificationListener<T>(writer);
    }

    /*
     * Use this if you want the writer to have job scope (JIRA BATCH-2269). Also
     * change the return type of writer to ListItemWriter for this to work.
     */
    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor() {
            @Override
            protected void doExecute(final Runnable task) {
                // gets the jobExecution of the configuration thread
                final JobExecution jobExecution = JobSynchronizationManager
                        .getContext().getJobExecution();
                super.doExecute(new Runnable() {
                    public void run() {
                        JobSynchronizationManager.register(jobExecution);

                        try {
                            task.run();
                        } finally {
                            JobSynchronizationManager.close();
                        }
                    }
                });
            }
        };
    }

}

比方说,为了讨论起见,我必须阅读发票数据.因此,我可以扩展上述类来创建 InvoiceLoader :

Let's say I have to read Invoice data for the sake of discussion. I can therefore extend the above class for creating an InvoiceLoader :

@Configuration
public class InvoiceLoader extends AbstractFileLoader<Invoice>{

    private class InvoiceFieldSetMapper implements FieldSetMapper<Invoice> {

        public Invoice mapFieldSet(FieldSet f) {
            Invoice invoice = new Invoice();
            invoice.setNo(f.readString("INVOICE_NO");
            return e;
        }
    }

    @Override
    public FieldSetMapper<Invoice> getFieldSetMapper() {
        return new InvoiceFieldSetMapper();
    }

    @Override
    public String getFilesPath() {
        return "I:/CK/invoices/partitions/";
    }

    @Override
    public String[] getColumnNames() {
        return new String[] { "INVOICE_NO", "DATE"};
    }


    @Override
    @Bean(name="invoiceJob")
    public Job loaderJob(Step s1,
            JobExecutionListener listener) {
        return createJob(s1, listener);
    }

    @Override
    public int getChunkSize() {
        return 25254;
    }

    @Override
    public int getThrottleLimit() {
        return 8;
    }

}

假设我还有一个名为 Inventory 的类,该类扩展了 AbstractFileLoader.

Let's say I have one more class called Inventory that extends AbstractFileLoader.

在应用程序启动时,我可以按以下方式加载这两个注释配置:

On application startup, I can load these two annotation configurations as follows :

AbstractApplicationContext context1 = new   AnnotationConfigApplicationContext(InvoiceLoader.class, InventoryLoader.class);

在我的应用程序中的其他地方,两个不同的线程可以按如下方式启动作业:

Somewhere else in my application two different threads can launch the jobs as follows :

线程1:

    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("invoiceJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

线程2:

    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("inventoryJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

这种方法的优点是,每当有一个新文件要读取时,开发人员/用户要做的就是将 AbstractFileLoader 子类化,并实现所需的抽象方法,而无需获取详细了解如何组装作业.

The advantage of this approach is that everytime there is a new file to be read, all that the developer/user has to do is subclass AbstractFileLoader and implement the required abstract methods without the need to get into the details of how to assemble the job.

问题:

  1. 我是Spring批处理的新手,所以我可能忽略了这种方法的一些不太明显的问题,例如Spring批处理中的共享内部对象,这可能导致两个作业一起运行失败或明显的问题,例如豆.
  2. 有没有更好的方法可以实现我的目标?
  3. @Value(#{stepExecutionContext ['fileName']}") fileName 属性始终被分配为 I:/CK/invoices/partitions/这是 InvoiceLoader 中的 getPath 方法返回的值,即使InventoryLoader中的getPath 方法返回了一个不同的值.
  1. I am new to Spring batch so I may have overlooked some of the not-so-obvious issues with this approach such as shared internal objects in Spring batch that may cause two jobs running together to fail or obvious issues such as scoping of the beans.
  2. Is there a better way to achieve my objective?
  3. The fileName attribute of the @Value("#{stepExecutionContext['fileName']}") is always being assigned the value as I:/CK/invoices/partitions/ which is the value returned by getPathmethod in InvoiceLoader even though the getPathmethod inInventoryLoader`returns a different value.

推荐答案

一个选项是将它们作为作业参数传递.例如:

One option is passing them as job parameters. For instance:

@Bean
Job job() {
    jobs.get("myJob").start(step1(null)).build()
}

@Bean
@JobScope
Step step1(@Value('#{jobParameters["commitInterval"]}') commitInterval) {
    steps.get('step1')
            .chunk((int) commitInterval)
            .reader(new IterableItemReader(iterable: [1, 2, 3, 4], name: 'foo'))
            .writer(writer(null))
            .build()
}

@Bean
@JobScope
ItemWriter writer(@Value('#{jobParameters["writerClass"]}') writerClass) {
    applicationContext.classLoader.loadClass(writerClass).newInstance()
}

使用 MyWriter :

class MyWriter implements ItemWriter<Integer> {

    @Override
    void write(List<? extends Integer> items) throws Exception {
        println "Write $items"
    }
}

然后执行:

def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([
        commitInterval: new JobParameter(3),
        writerClass: new JobParameter('MyWriter'), ]))

输出为:


INFO: Executing step: [step1]
Write [1, 2, 3]
Write [4]
Feb 24, 2016 2:30:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{commitInterval=3, writerClass=MyWriter}] and the following status: [COMPLETED]
Status is: COMPLETED, job execution id 0
  #1 step1 COMPLETED

完整示例此处.

这篇关于春季批处理:组装作业而不是对其进行配置(可扩展的作业配置)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆