如何在运行并行作业时安全地将params从Tasklet传递到步骤 [英] How to safely pass params from Tasklet to step when running parallel jobs
问题描述
我试图安全地将params从tasklet传递到同一个工作中的一个步骤。
I am trying to pass safely params from tasklet to a step in the same job.
我的工作一个接一个地包含3个tasklet(step1,step2,step3)最后一步step4(处理器,读者,作家)
My job consist 3 tasklets(step1,step2,step3) one after another and in the end a step4(processor,reader,writer)
此作业正在多次并行执行。
this job is being executed many times in parallel.
在tasklet里面的第一步我通过web服务评估param(hashId),而不是把它传遍我的链,直到我的读者(在第4步)
In step1 inside the tasklet I am evaluating param(hashId) via web service) than I am passing it all over my chain till my reader (which on step 4)
在第3步中,我创建了一个名为:filePath的新param,它基于hashid,我将它作为文件资源位置发送到step4(读者)
In step 3 I am creating new param called: filePath which is based on hashid and I send it over to step4(the reader) as a file resource location
I我正在使用stepExecution来传递这个参数(hashId和filePath)。
I am using stepExecution to pass this param(hashId and filePath).
我通过tasklet尝试了3种方法:
I tried 3 ways doing it via the tasklet:
将param(hashId从step1传递到step2)从步骤2到步骤3)我这样做:
to pass the param(hashId from step1 into step2 and from step2 into step 3) I am doing this:
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("hashId", hashId);
在第4步中,我将基于hashId填充filePath并将其传递给我的最后一步(这是读者处理器和作者)
In step4 I am populating filePath based on hashId and pass it this way to my last step(which is reader processor and a writer)
public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..
@Override
public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext
executionContext) throws IOException {
String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");
...
filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("filePath", filePath);
}
logger.info("filePath + "for hashId=" + hashId);
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
请注意我在完成该步骤之前打印hashId和filePath值(步骤3)。按日志它们是一致的并按预期填充
Pay attention that I am printing hashId and filePath values right before I am finished that step(step3). by the logs they are consistent and populated as expected
我还添加了日志在我的读者看到记录我得到的参数。
I also added logs within my reader to see log the params that I get.
@Bean
@StepScope
public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
logger.info("test filePath="+filePath+");
return itemReader;
}
当我执行这个工作~10次我可以看到param filePath值在并行执行时填充其他作业filePath值
When I execute this job ~10 times I can see that the param filePath value is populated with other jobs filePath values when executing in parallel
这是我使用executionContextPromotionListener提升作业键的方式:
This is how I promote the job's keys with executionContextPromotionListener:
作业定义:
@Bean
public Job processFileJob() throws Exception {
return this.jobs.get("processFileJob").
start.(step1).
next(step2)
next(downloadFileTaskletStep()). //step3
next(processSnidFileStep()).build(); //step4
}
第3步定义
public Step downloadFileTaskletStep() {
return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
}
@Bean
public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
executionContextPromotionListener.setKeys(new String[]{"filePath"});
return executionContextPromotionListener;
}
同样的结果线程弄乱了参数
Same results threads messing the params
我可以通过spring批处理数据库表跟踪结果:batch_job_execution_context.short_context:
I can track the results via spring batch database table: batch_job_execution_context.short_context:
在这里你可以看到由hashid构建的filePatch不完全相同到原点hashId
//错误记录///
here you can see the the filePatch which built by the hashid is not identical to the origin hashId //incorrect record///
{map:[{entry:[{string:totalRecords INT:5},{ 串: segmentId, 长:13},{ 串:[ 文件路径,在/ etc / MYDIR /服务/ notification_processor /文件/ 2015_04_22 / <强> f1c7b0f2180b7e266d36f87fcf6fb7aa .csv]},{string:[hashId, 20df39d201fffc7444423cfdf2f43789 ]}]}]}
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87fcf6fb7aa.csv"]},{"string":["hashId","20df39d201fffc7444423cfdf2f43789"]}]}]}
现在,如果我们检查其他记录,他们似乎很好。但总是有一两个搞砸了
Now if we check other records they seems good. but always one or two messed up
//正确的记录
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}
我知道为什么会遇到这些线程问题?
Any idea why I have those Threading issues?
推荐答案
要查看您尝试过的方法:
To review your attempted methods:
- 方法1 - 编辑
JobParameters
工作参数
在作业中是不可变的,因此不应尝试在作业执行期间尝试修改它们。 - 方法2 - 编辑
JobParameters
v2
方法2与方法1非常相似,您只是以不同的方式获取对JobParameters
的引用。 / li>
- 方法3 - 使用
ExecutionContextPromotionListener
。这是正确的方法,但你做错了。ExecutionContextPromotionListener
查看步骤ExecutionContext
并将指定的键复制到作业的的ExecutionContext
。你将密钥直接添加到Job ExecutionContext这是一个坏主意。
- Method 1 - Editing
JobParameters
JobParameters
are immutable in a job so attempting to modify them during job execution should not be attempted. - Method 2 - Editing
JobParameters
v2 Method 2 is really the same as method 1, you're only going at getting the reference to theJobParameters
a different way. - Method 3 - Using the
ExecutionContextPromotionListener
. This is the correct way, but you're doing things incorrectly. TheExecutionContextPromotionListener
looks at the step'sExecutionContext
and copies the keys you specify over to the job'sExecutionContext
. You're adding the keys directly to the Job ExecutionContext which is a bad idea.
所以简而言之,方法3是最接近的正确,但您应该将要共享的属性添加到步骤的 ExecutionContext
,然后配置 ExecutionContextPromotionListener
以进行提升Job的 ExecutionContext
的相应键。
So in short, Method 3 is the closest to correct, but you should be adding the properties you want to share to the step's ExecutionContext
and then configure the ExecutionContextPromotionListener
to promote the appropriate keys to the Job's ExecutionContext
.
代码将更新如下:
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("filePath", filePath);
这篇关于如何在运行并行作业时安全地将params从Tasklet传递到步骤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!