Spring Batch在Tasklet中执行动态生成的步骤 [英] Spring batch execute dynamically generated steps in a tasklet

查看:424
本文介绍了Spring Batch在Tasklet中执行动态生成的步骤的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个春季批处理工作,可以完成以下工作...

I have a spring batch job that does the following...

步骤1.创建需要处理的对象列表

Step 1. Creates a list of objects that need to be processed

步骤2.根据步骤1中创建的对象列表中的项目数,创建步骤列表.

Step 2. Creates a list of steps depending on how many items are in the list of objects created in step 1.

步骤3.尝试执行步骤2中创建的步骤列表中的步骤.

Step 3. Tries to executes the steps from the list of steps created in step 2.

正在执行的x个步骤在下面的executeDynamicStepsTasklet()中完成.尽管代码运行没有任何错误,但似乎没有做任何事情.我在该方法中所拥有的看起来正确吗?

The executing x steps is done below in executeDynamicStepsTasklet(). While the code runs without any errors it does not seem to be doing anything. Does what I have in that method look correct?

谢谢

/* * */

@Configuration
public class ExportMasterListCsvJobConfig {

public static final String JOB_NAME = "exportMasterListCsv";
@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Value("${exportMasterListCsv.generateMasterListRows.chunkSize}") 
public int chunkSize;

@Value("${exportMasterListCsv.generateMasterListRows.masterListSql}") 
public String masterListSql;

@Autowired
public DataSource onlineStagingDb;

@Value("${out.dir}") 
public String outDir;

@Value("${exportMasterListCsv.generatePromoStartDateEndDateGroupings.promoStartDateEndDateSql}") 
private String promoStartDateEndDateSql;


private List<DivisionIdPromoCompStartDtEndDtGrouping> divisionIdPromoCompStartDtEndDtGrouping;

private List<Step> dynamicSteps = Collections.synchronizedList(new ArrayList<Step>()) ;


@Bean
public Job exportMasterListCsvJob(
        @Qualifier("createJobDatesStep") Step createJobDatesStep,
        @Qualifier("createDynamicStepsStep") Step createDynamicStepsStep,
        @Qualifier("executeDynamicStepsStep") Step executeDynamicStepsStep) {

    return jobBuilderFactory.get(JOB_NAME)
            .flow(createJobDatesStep)
            .next(createDynamicStepsStep)
            .next(executeDynamicStepsStep)
            .end().build();
}   


@Bean
public Step executeDynamicStepsStep(
        @Qualifier("executeDynamicStepsTasklet")  Tasklet executeDynamicStepsTasklet) {

    return  stepBuilderFactory
                .get("executeDynamicStepsStep")
                .tasklet(executeDynamicStepsTasklet)
                .build();               
}

@Bean
public Tasklet executeDynamicStepsTasklet() {

    return new Tasklet() {

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            FlowStep flowStep = new FlowStep(createParallelFlow());
            SimpleJobBuilder jobBuilder = jobBuilderFactory.get("myNewJob").start(flowStep);
            return RepeatStatus.FINISHED;
        }
    };
}

public Flow createParallelFlow() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(1); 

    List<Flow> flows = dynamicSteps.stream()
            .map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build())
            .collect(Collectors.toList());

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
            .split(taskExecutor)
            .add(flows.toArray(new Flow[flows.size()]))
            .build();
}

@Bean

public Step createDynamicStepsStep(
        @Qualifier("createDynamicStepsTasklet")  Tasklet createDynamicStepsTasklet) {

    return  stepBuilderFactory
                .get("createDynamicStepsStep")
                .tasklet(createDynamicStepsTasklet)
                .build();               
}   

@Bean
@JobScope
public Tasklet createDynamicStepsTasklet() {

    return new Tasklet() {

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            for (DivisionIdPromoCompStartDtEndDtGrouping grp: divisionIdPromoCompStartDtEndDtGrouping){

                System.err.println("grp: " + grp);

                String stepName = "stp_" + grp;

                String fileName = grp + FlatFileConstants.EXTENSION_CSV;

                Step dynamicStep = 
                        stepBuilderFactory.get(stepName)
                        .<MasterList,MasterList> chunk(10)
                        .reader(queryStagingDbReader(
                                grp.getDivisionId(), 
                                grp.getRpmPromoCompDetailStartDate(), 
                                grp.getRpmPromoCompDetailEndDate()))
                        .writer(masterListFileWriter(fileName))                                
                        .build(); 

                dynamicSteps.add(dynamicStep);

            } 
            System.err.println("createDynamicStepsTasklet dynamicSteps: " + dynamicSteps);
            return RepeatStatus.FINISHED;
        }
    };
}


public FlatFileItemWriter<MasterList> masterListFileWriter(String fileName) {
    FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(new File(outDir, fileName )));
    writer.setHeaderCallback(masterListFlatFileHeaderCallback());
    writer.setLineAggregator(masterListFormatterLineAggregator());
    return writer;
}

因此,现在我有了一个需要执行的动态步骤的列表,我相信它们在StepScope中.有人可以建议我如何执行它们

So now I have a list of dynamic steps that need to be executed and I believe that they are in StepScope. Can someone advise me on how to execute them

推荐答案

这将不起作用.您的Tasklet只是以FlowStep作为第一步来创建作业.使用jobBuilderfactory只会创建作业.它不会启动它.方法名开始"可能会引起误解,因为这仅定义了第一步.但这并不能启动工作.

This will not work. Your Tasklet just creates a job with a FlowStep as first Step. Using the jobBuilderfactory just creates the job. it does not launch it. The methodname "start" may be misleading, since this only defines the first step. But it does not launch the job.

启动作业后,您将无法更改其结构(步骤和子步骤).因此,不可能根据步骤1中计算出的结果在步骤2中配置流程步骤.(当然,您可以在springbatch结构内部做一些更深的修改,然后直接修改Bean,等等……但是您不这样做.不想这么做).

You cannot change the structure of a job (its steps and substeps) once it is started. Therefore, it is not possible to configure a flowstep in step 2 based on things that are calculated in step 1. (of course you could do some hacking deeper inside the springbatch structure and directly modify the beans and so ... but you don't want to do that).

我建议您使用一种带有适当的postConstruct方法的"SetupBean",该方法将注入配置您的工作的类中.该"SetupBean"负责计算正在处理的对象的列表.

I suggest, that you use a kind of "SetupBean" with an appropriate postConstruct method which is injected into your class that configures your job. This "SetupBean" is responsible to calculate the list of objects being processed.

@Component
public class SetUpBean {

  private List<Object> myObjects;

  @PostConstruct
  public afterPropertiesSet() {
    myObjects = ...;
  }

  public List<Object> getMyObjects() {
   return myObjects;
  }
}

@Configuration
public class JobConfiguration {

   @Autowired
   private JobBuilderFactory jobBuilderFactory;

   @Autowired
   private StepBuilderFactory stepBuilderFactory;

   @Autowired
   private SetUpBean setup;

   ... 
}

这篇关于Spring Batch在Tasklet中执行动态生成的步骤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆