Spring Batch Partitioning在itemReader中注入stepExecutionContext参数 [英] Spring Batch Partitioning inject stepExecutionContext parameter in itemReader

查看:2751
本文介绍了Spring Batch Partitioning在itemReader中注入stepExecutionContext参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用分区程序学习Spring Batch。

I am trying to learn Spring Batch with Partitioner.

问题是我需要从Partitioner实现动态设置文件名。我试图在 itemReader 中获取它。但它提供了文件名 null

The issue is that I need to set the filenames dynamically from the Partitioner implementation. And I am trying to get it in the itemReader. But it gives filename null.

My Spring Batch配置:

My Spring Batch configuration:

@Bean
@StepScope
public ItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[filename]}") String filename) 
    throws UnexpectedInputException, ParseException {
    FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens = { "username", "userid", "transactiondate", "amount" };
    tokenizer.setNames(tokens);
    reader.setResource(new ClassPathResource(
        "input/"+filename));
    DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLinesToSkip(1);
    reader.setLineMapper(lineMapper);
    return reader;
}
@Bean(name = "partitioningJob")  
public Job partitioningJob() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return jobs.get("partitioningJob").listener(jobListener()).start(partitionStep()).build();  
}  

@Bean 
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(2).taskExecutor(taskExecutor).build();  
}  

@Bean 
public Step step2() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("step2").<Transaction, Transaction> chunk(1).reader(itemReader(null)).processor(itemProcessor()).writer(itemWriter(marshaller(),null)).build();  
}  

@Bean 
public TransactionPartitioner partitioner() {  
    TransactionPartitioner partitioner = new TransactionPartitioner();  
    return partitioner;  
}                           

@Bean 
public JobListener jobListener() {  
   return new JobListener();  
} 

 @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(2);
        taskExecutor.setQueueCapacity(2);
        taskExecutor.setCorePoolSize(2);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }  

我的 TransactionPartitioner class is:

And my TransactionPartitioner class is:

public class TransactionPartitioner implements Partitioner {  

public Map<String, ExecutionContext> partition(int range) {  
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();  
    for (int i = 1; i <= range; i++) {  
        ExecutionContext exContext = new ExecutionContext();  
        exContext.put("filename", "input"+i+".csv");
        exContext.put("name", "Thread" + i);  
        result.put("partition" + i, exContext);  
    }       
    return result;  
}  
}

这不是正确的方法吗?请建议。

Is this not the right way to do it? Please suggest.

这是堆栈跟踪:

  18:23:39.060 [main] DEBUG org.springframework.batch.core.job.AbstractJob - Upgrading JobExecution status: StepExecution: id=1, version=2, name=partitionStep, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392)
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
    at com.sun.proxy.$Proxy19.run(Unknown Source)
    at org.baeldung.spring_batch_intro.App.main(App.java:24)
; org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:147)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): class path resource [input/null]
    at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:144)
    ... 9 more

根据@ Sabir的建议,我检查了我的数据。步骤上下文表如下所示:

As per @Sabir's suggestion, I checked my data. The step context table looks like this:

  | STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT |
|                 1 | {"map":[{"entry":[{"string":"SimpleStepExecutionSplitter.GRID_SIZE","long":2},{"string":["batch.stepType","org.springframework.batch.core.partition.support.PartitionStep"]}]}]} | NULL    
|                 2 | {"map":[{"entry":[{"string":["filename","input2.csv"]},{"string":["name","Thread2"]}]}]}                                                                                            | NULL               |
|                 3 | {"map":[{"entry":[{"string":["filename","input1.csv"]},{"string":["name","Thread1"]}]}]}  

以下是完整代码: https://drive.google.com/file/d/0Bziay9b2ceLbUXdTRnZoSjRfR2s/view?usp=sharing

推荐答案

完成代码并试图运行它。

Went through your code and tried to run it.

目前它没有在范围级别绑定文件名。

Currently it is not binding the file name at scope level.

你有两个配置文件:


  1. SpringConfig - 包含与Spring相关的配置bean

  2. SpringBatchConfig - 包含Spring批次相关bean

第一个包含注释 @EnableBatchProcessing @Configuration

itemReader 是在另一个不包含任何注释的配置文件中定义的。

But the itemReader is defined in another config file which do not contain any of the annotations.

你也应该在另一个文件上有 @Configuration

You should have @Configuration on the other file too.

OR

您可以将这两个注释添加到 SpringBatchConfig 配置文件中,并可以在<$中跳过它们c $ c> Spring

You can add both the annotations to SpringBatchConfig config file and can skip them in Spring

如果没有这个,这些配置将无法正确读取且 itemReader 不被视为Step Scoped(即注释 @StepScope 不起作用)并且不会在步骤级别绑定值,因此您获得 NULL values。

Without this, these configurations are not read properly and the itemReader is not considered as Step Scoped (i.e. the annotation @StepScope does not work) and does not bind the values at step level, and hence you are getting the NULL values.

这篇关于Spring Batch Partitioning在itemReader中注入stepExecutionContext参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆