Spring Batch-如何根据上一步中创建的参数生成并行步骤 [英] Spring Batch - How to generate parallel steps based on params created in a previous step
问题描述
我正在尝试使用在tasklet中创建的jobparameters在执行tasklet之后创建步骤.
I am trying to use jobparameters created in a tasklet to create steps following the execution of the tasklet.
tasklet尝试查找一些文件(findFiles()),如果找到一些文件,则将文件名保存到字符串列表中.
A tasklet tries to finds some files (findFiles()) and if it finds some files it saves the filenames to a list of strings.
在小任务中,我按以下方式传递数据:
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);
In the tasklet I pass the data as following:
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);
下一步是并行流程,其中将为每个文件执行一个简单的读取器-处理器-写入器步骤(如果您对我的到达方式感兴趣,请参阅我的上一个问题:Spring Batch-循环读取器/处理器/写入器步骤)
The next step is a parallel flow where for each file a simple reader-processor-writer step will be executed (if you are interested in how I got there please see my previous question: Spring Batch - Looping a reader/processor/writer step)
构建作业readFilesJob()时,首先使用伪"文件列表创建流,因为只有在执行了tasklet之后,才知道文件的真实列表.
Upon building the job readFilesJob() a flow is created initially using a "fake" list of files because only after the tasklet has been executed the real list of files is known.
如何配置作业,以便首先执行tasklet,然后使用从Tasklet生成的文件列表执行并行流程?
我认为这取决于在运行时的正确时刻加载正确数据的文件名列表...但是如何?
I think it comes down to getting the list of filenames loaded with the correct data at the correct moment during runtime... but how?
这是我的简化配置:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
private static final String FLOW_NAME = "flow1";
private static final String PLACE_HOLDER = "empty";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
public List<String> files = Arrays.asList(PLACE_HOLDER);
@Bean
public Job readFilesJob() throws Exception {
List<Step> steps = files.stream().map(file -> createStep(file)).collect(Collectors.toList());
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
Flow flow = flowBuilder
.start(findFiles())
.next(createParallelFlow(steps))
.build();
return jobBuilderFactory.get("readFilesJob")
.start(flow)
.end()
.build();
}
private static Flow createParallelFlow(List<Step> steps){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());
List<Flow> flows = steps.stream()
.map(step ->
new FlowBuilder<Flow>("flow_" + step.getName())
.start(step)
.build())
.collect(Collectors.toList());
return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor)
.add(flows.toArray(new Flow[flows.size()]))
.build();
}
private Step createStep(String fileName){
return stepBuilderFactory.get("readFile" + fileName)
.chunk(100)
.reader(reader(fileName))
.writer(writer(filename))
.build();
}
private FileFinder findFiles(){
return new FileFinder();
}
}
研究
Research
The question and answer from How to safely pass params from Tasklet to step when running parallel jobs suggest the usage of a construct like this in the reader/writer:
@Value("#{jobExecutionContext[filePath]}") String filePath
但是,由于在createParallelFlow()方法中创建步骤的方式,我真的希望可以将fileName作为字符串传递给读取器/写入器.因此,即使这个问题的答案可能是我这里问题的解决方案,但这也不是理想的解决方案.但是,如果我错了,请不要纠正我.
However, I really hope it is possible to pass the fileName as a string to the reader/writer due to the way the steps are created in the createParallelFlow() method. Therefore, even tho the answer to that question might be a solution for my problem here, it is not the desired solution. But please do not refrain from correcting me if I am wrong.
我正在使用文件名示例来更好地阐明问题.我的问题实际上不是从目录中读取多个文件.我的问题确实归结为在运行时生成数据并将其传递到下一个动态生成的步骤的想法.
I am using the file names example to clarify the problem better. My problem is not actually the reading of multiple files from a directory. My question really boils down to the idea of generating data during runtime and passing it to the next dynamically generated step(s).
添加了fileFinder的简化任务集.
Added a simplified tasklet of the fileFinder.
@Component
public class FileFinder implements Tasklet, InitializingBean {
List<String> fileNames;
public List<String> getFileNames() {
return fileNames;
}
@PostConstruct
public void afterPropertiesSet() {
// read the filenames and store dem in the list
fileNames.add("sample-data1.csv");
fileNames.add("sample-data2.csv");
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// Execution of methods that will find the file names and put them in the list...
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);
return RepeatStatus.FINISHED;
}
}
推荐答案
我不确定,如果我确实正确理解了您的问题,但据我所知,您需要具有文件名的列表之前,您可以动态地建立自己的工作.
I'm not sure, if I did understand your problem correctly, but as far as I see, you need to have the list with the filenames before you build your job dynamically.
您可以这样做:
@Component
public class MyJobSetup {
List<String> fileNames;
public List<String> getFileNames() {
return fileNames;
}
@PostConstruct
public void afterPropertiesSet() {
// read the filenames and store dem in the list
fileNames = ....;
}
}
之后,您可以将此Bean注入JobConfiguration Bean中
After that, you can inject this Bean inside your JobConfiguration Bean
@Configuration
@EnableBatchProcessing
@Import(MyJobSetup.class)
public class BatchConfiguration {
private static final String FLOW_NAME = "flow1";
private static final String PLACE_HOLDER = "empty";
@Autowired
private MyJobSetup jobSetup; // <--- Inject
// PostConstruct of MyJobSetup was executed, when it is injected
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
public List<String> files = Arrays.asList(PLACE_HOLDER);
@Bean
public Job readFilesJob() throws Exception {
List<Step> steps = jobSetUp.getFileNames() // get the list of files
.stream() // as stream
.map(file -> createStep(file)) // map...
.collect(Collectors.toList()); // and create the list of steps
这篇关于Spring Batch-如何根据上一步中创建的参数生成并行步骤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!