如何在运行并行作业时安全地将params从Tasklet传递到步骤 [英] How to safely pass params from Tasklet to step when running parallel jobs

查看:237
本文介绍了如何在运行并行作业时安全地将params从Tasklet传递到步骤的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图安全地将params从tasklet传递到同一个工作中的一个步骤。

I am trying to pass safely params from tasklet to a step in the same job.

我的工作一个接一个地包含3个tasklet(step1,step2,step3)最后一步step4(处理器,读者,作家)

My job consist 3 tasklets(step1,step2,step3) one after another and in the end a step4(processor,reader,writer)

此作业正在多次并行执行。

this job is being executed many times in parallel.

在tasklet里面的第一步我通过web服务评估param(hashId),而不是把它传遍我的链,直到我的读者(在第4步)

In step1 inside the tasklet I am evaluating param(hashId) via web service) than I am passing it all over my chain till my reader (which on step 4)

在第3步中,我创建了一个名为:filePath的新param,它基于hashid,我将它作为文件资源位置发送到step4(读者)

In step 3 I am creating new param called: filePath which is based on hashid and I send it over to step4(the reader) as a file resource location

I我正在使用stepExecution来传递这个参数(hashId和filePath)。

I am using stepExecution to pass this param(hashId and filePath).

我通过tasklet尝试了3种方法:

I tried 3 ways doing it via the tasklet:

将param(hashId从step1传递到step2)从步骤2到步骤3)我这样做:

to pass the param(hashId from step1 into step2 and from step2 into step 3) I am doing this:

chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("hashId", hashId);

在第4步中,我将基于hashId填充filePath并将其传递给我的最后一步(这是读者处理器和作者)

In step4 I am populating filePath based on hashId and pass it this way to my last step(which is reader processor and a writer)

public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..

    @Override
     public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext    
     executionContext) throws IOException {

    String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");

          ...

filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys

        chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("filePath", filePath);
    } 

logger.info("filePath + "for hashId=" + hashId);

}
@Override
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

请注意我在完成该步骤之前打印hashId和filePath值(步骤3)。按日志它们是一致的并按预期填充

Pay attention that I am printing hashId and filePath values right before I am finished that step(step3). by the logs they are consistent and populated as expected

我还添加了日志在我的读者看到记录我得到的参数。

I also added logs within my reader to see log the params that I get.

@Bean
    @StepScope
    public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
              logger.info("test filePath="+filePath+");

        return itemReader;
    }

当我执行这个工作~10次我可以看到param filePath值在并行执行时填充其他作业filePath值

When I execute this job ~10 times I can see that the param filePath value is populated with other jobs filePath values when executing in parallel

这是我使用executionContextPromotionListener提升作业键的方式:

This is how I promote the job's keys with executionContextPromotionListener:

作业定义:

 @Bean
    public Job processFileJob() throws Exception {
        return this.jobs.get("processFileJob").
                start.(step1).
                next(step2)
                next(downloadFileTaskletStep()). //step3
                next(processSnidFileStep()).build();  //step4

    }

第3步定义

  public Step downloadFileTaskletStep() {
        return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
    }


  @Bean
    public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
        ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
        executionContextPromotionListener.setKeys(new String[]{"filePath"});
        return executionContextPromotionListener;
    }

同样的结果线程弄乱了参数

Same results threads messing the params

我可以通过spring批处理数据库表跟踪结果:batch_job_execution_context.short_context:

I can track the results via spring batch database table: batch_job_execution_context.short_context:

在这里你可以看到由hashid构建的filePatch不完全相同到原点hashId
//错误记录///

here you can see the the filePatch which built by the hashid is not identical to the origin hashId //incorrect record///

{map:[{entry:[{string:totalRecords INT:5},{ 串: segmentId, 长:13},{ 串:[ 文件路径,在/ etc / MYDIR /服务/ notification_processor /文件/ 2015_04_22 / <强> f1c7b0f2180b7e266d36f87fcf6fb7aa .csv]},{string:[hashId, 20df39d201fffc7444423cfdf2f43789 ]}]}]}

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87fcf6fb7aa.csv"]},{"string":["hashId","20df39d201fffc7444423cfdf2f43789"]}]}]}

现在,如果我们检查其他记录,他们似乎很好。但总是有一两个搞砸了

Now if we check other records they seems good. but always one or two messed up

//正确的记录

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}   

我知道为什么会遇到这些线程问题?

Any idea why I have those Threading issues?

推荐答案

要查看您尝试过的方法:

To review your attempted methods:


  • 方法1 - 编辑 JobParameters
    工作参数在作业中是不可变的,因此不应尝试在作业执行期间尝试修改它们。

  • 方法2 - 编辑 JobParameters v2
    方法2与方法1非常相似,您只是以不同的方式获取对 JobParameters 的引用。 / li>
  • 方法3 - 使用 ExecutionContextPromotionListener 。这是正确的方法,但你做错了。 ExecutionContextPromotionListener 查看步骤 ExecutionContext 并将指定的键复制到作业的的ExecutionContext 。你将密钥直接添加到Job ExecutionContext这是一个坏主意。

  • Method 1 - Editing JobParameters JobParameters are immutable in a job so attempting to modify them during job execution should not be attempted.
  • Method 2 - Editing JobParameters v2 Method 2 is really the same as method 1, you're only going at getting the reference to the JobParameters a different way.
  • Method 3 - Using the ExecutionContextPromotionListener. This is the correct way, but you're doing things incorrectly. The ExecutionContextPromotionListener looks at the step's ExecutionContext and copies the keys you specify over to the job's ExecutionContext. You're adding the keys directly to the Job ExecutionContext which is a bad idea.

所以简而言之,方法3是最接近的正确,但您应该将要共享的属性添加到步骤的 ExecutionContext ,然后配置 ExecutionContextPromotionListener 以进行提升Job的 ExecutionContext 的相应键。

So in short, Method 3 is the closest to correct, but you should be adding the properties you want to share to the step's ExecutionContext and then configure the ExecutionContextPromotionListener to promote the appropriate keys to the Job's ExecutionContext.

代码将更新如下:

chunkContext.getStepContext()
            .getStepExecution()
            .getExecutionContext()
            .put("filePath", filePath);

这篇关于如何在运行并行作业时安全地将params从Tasklet传递到步骤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆