完成后,分区工作不能自行停止?春批 [英] Partitioned Job can't stop by itself after finishing? Spring Batch
问题描述
我写了两个步骤的作业,其中两个步骤之一是分区步骤。
分区步骤使用TaskExecutorPartitionHandler并在线程中运行5个从属步骤。
作业在main()方法中启动。但是在每个从属ItemReader返回null(结束符号)之后它不会停止。即使程序运行了main()方法中的最后一行代码(即System.out.println(Finished)),程序进程也不会停止,挂在内存中并且什么都不做。我必须按下Eclipse面板上的停止按钮才能停止程序。
以下是JobLauncher.run()返回的JobExecution的内容,表示成功状态工作运行..
JobExecution:id = 0,version = 2,startTime = Fri Nov 27 06:05:23 CST 2015 ,endTime = Fri Nov 27 06:05:39 CST 2015,lastUpdated = Fri Nov 27 06:05:39 CST 2015,status = COMPLETED,exitStatus = exitCode = COMPLETED; exitDescription =,job = [JobInstance:id = 0,version = 0,Job = [jobCensoredPages]],jobParameters = [{}]
7217
已完成
为什么运行成功的Spring Batch程序仍然挂起?
请指出我在哪里工作。我怀疑Spring Batch管理的多线程部分不会停止..
简单的作业运行代码
作业作业=(作业)context.getBean(jobPages);
try {
JobParameters p = new JobParametersBuilder()
.toJobParameters();
JobExecution result = launcher.run(job,new JobParameters());
System.out.println(result.toString());
} catch(例外e){
e.printStackTrace();
}
context.getBean(idSet);
AtomicInteger n =(AtomicInteger)context.getBean(pageCount);
System.out.println(n.get());
System.out.println(已完成);
Patitioner和PatitionHandler的配置
@Bean @Autowired
public PartitionHandler beanPartitionHandler(
TaskExecutor beanTaskExecutor,
@Qualifier(beanStepSlave)Step beanStepSlave
)抛出异常
{
TaskExecutorPartitionHandler h = new TaskExecutorPartitionHandler();
h.setGridSize(5);
h.setTaskExecutor(beanTaskExecutor);
h.setStep(beanStepSlave);
h.afterPropertiesSet();
返回h;
}
@Bean public TaskExecutor beanTaskExecutor(){
ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
e.setMaxPoolSize(5);
e.setCorePoolSize(5);
e.afterPropertiesSet();
返回e;
}
唯一的步骤,它的奴隶步骤
@Bean public Step beanStepMaster(
Step beanStepSlave,
Partitioner beanPartitioner,
PartitionHandler beanPartitionHandler
)抛出异常
{
返回stepBuilderFactory()。get(stepMaster)
.partitioner(beanStepSlave)
.partitioner(stepSlave,beanPartitioner)
.partitionHandler(partitionHandler)
.build();
}
@Bean @Autowired
public Step beanStepSlave(
ItemReader< String> beanReaderTest,
ItemProcessor< String,String> beanProcessorTest,
ItemWriter< String> ; beanWriterTest)抛出异常{
返回stepBuilderFactory()。get(stepSlave)
。< String,String> chunk(1)
.reader(beanReaderTest)
。 processor(beanProcessorTest)
.writer(beanWriterTest)
.build();
}
我的pom.xml文件
< dependencies>
< dependency>
< groupId> junit< / groupId>
< artifactId> junit< / artifactId>
< version> RELEASE< / version>
< scope> test< / scope>
< / dependency>
< dependency>
< groupId> org.springframework< / groupId>
< artifactId> spring-core< / artifactId>
< version> 4.2.3.RELEASE< / version>
< / dependency>
< dependency>
< groupId> org.springframework< / groupId>
< artifactId> spring-context< / artifactId>
< version> 4.2.3.RELEASE< / version>
< / dependency>
< dependency>
< groupId> org.springframework< / groupId>
< artifactId> spring-tx< / artifactId>
< version> 4.2.3.RELEASE< / version>
< / dependency>
< dependency>
< groupId> org.springframework.batch< / groupId>
< artifactId> spring-batch-core< / artifactId>
< version> RELEASE< / version>
< / dependency>
< dependency>
< groupId> org.springframework.retry< / groupId>
< artifactId> spring-retry< / artifactId>
< version> 1.1.2.RELEASE< / version>
< / dependency>
<依赖>
< groupId> org.springframework< / groupId>
< artifactId> spring-beans< / artifactId>
< version> RELEASE< / version>
< / dependency>
当我使用ThreadPoolTaskExecutor时,我在分区的Spring批处理应用程序挂起时也遇到了困难。另外,我看到执行者不允许完成所有分区的工作。
我找到了解决这些问题的两种方法。
第一个解决方案是使用SimpleAsyncTaskExecutor而不是ThreadPoolTaskExecutor。如果你不介意重新创建线程的额外开销,这是一个简单的修复。
第二个解决方案是创建一个JobExecutionListener,它在ThreadPoolTaskExecutor上调用shutdown。 / p>
我创建了一个这样的JobExecutionListener:
@Bean
public JobExecutionListener jobExecutionListener(ThreadPoolTaskExecutor executor){
return new JobExecutionListener(){
private ThreadPoolTaskExecutor taskExecutor = executor;
@Override
public void beforeJob(JobExecution jobExecution){
}
@Override
public void afterJob(JobExecution jobExecution){
taskExecutor.shutdown();
}
};
}
并将其添加到我的Job定义中,如下所示:
@Bean
public Job partitionedJob(){
return jobBuilders.get(partitionedJob)
.listener( jobExecutionListener(taskExecutor()))
.start(partitionedStep())
.build();
}
I wrote a Job of two Steps, with one of two steps being a partitioning step. The partition step uses TaskExecutorPartitionHandler and runs 5 slave steps in threads. The job is started in the main() method. But it's not stopping after every slave ItemReader returned null- the finish symbol. And even after the program ran past the last line of code in main() method (which is System.out.println("Finished")) the program process won't stop, hanging in memory and doing nothing. I have to press the stop button on Eclipse's panel to stop the program.
the following is the content of a JobExecution returned by JobLauncher.run(), signaling the successful status of the Job run..
JobExecution: id=0, version=2, startTime=Fri Nov 27 06:05:23 CST 2015, endTime=Fri Nov 27 06:05:39 CST 2015, lastUpdated=Fri Nov 27 06:05:39 CST 2015, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, Job=[jobCensoredPages]], jobParameters=[{}]
7217
Finished
Why does a Spring Batch program with a successful Job run still hang? Please point me where to work it out. I'm suspecting the multithreading part managed by Spring Batch does not stop..
simple job run code
Job job = (Job) context.getBean("jobPages");
try {
JobParameters p=new JobParametersBuilder()
.toJobParameters();
JobExecution result = launcher.run(job, new JobParameters());
System.out.println(result.toString());
} catch (Exception e) {
e.printStackTrace();
}
context.getBean("idSet");
AtomicInteger n=(AtomicInteger) context.getBean("pageCount");
System.out.println(n.get());
System.out.println("Finished");
Configuation for Patitioner and PatitionHandler
@Bean @Autowired
public PartitionHandler beanPartitionHandler(
TaskExecutor beanTaskExecutor,
@Qualifier("beanStepSlave") Step beanStepSlave
) throws Exception
{
TaskExecutorPartitionHandler h=new TaskExecutorPartitionHandler();
h.setGridSize(5);
h.setTaskExecutor(beanTaskExecutor);
h.setStep(beanStepSlave);
h.afterPropertiesSet();
return h;
}
@Bean public TaskExecutor beanTaskExecutor() {
ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
e.setMaxPoolSize(5);
e.setCorePoolSize(5);
e.afterPropertiesSet();
return e;
}
the only step and it's slave step
@Bean public Step beanStepMaster(
Step beanStepSlave,
Partitioner beanPartitioner,
PartitionHandler beanPartitionHandler
) throws Exception
{
return stepBuilderFactory().get("stepMaster")
.partitioner(beanStepSlave)
.partitioner("stepSlave", beanPartitioner)
.partitionHandler(partitionHandler)
.build();
}
@Bean @Autowired
public Step beanStepSlave(
ItemReader<String> beanReaderTest,
ItemProcessor<String, String> beanProcessorTest,
ItemWriter<String> beanWriterTest) throws Exception{
return stepBuilderFactory().get("stepSlave")
.<String, String>chunk(1)
.reader(beanReaderTest)
.processor(beanProcessorTest)
.writer(beanWriterTest)
.build();
}
My pom.xml file
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>4.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>RELEASE</version>
</dependency>
I also had difficulty with my partitioned Spring batch application hanging on completion when I used a ThreadPoolTaskExecutor. In addition, I saw that the executor was not allowing the work of all the partitions to finish.
I found two ways of solving those issues.
The first solution is using a SimpleAsyncTaskExecutor instead of a ThreadPoolTaskExecutor. If you do not mind the extra overhead in re-creating threads, this is a simple fix.
The second solution is creating a JobExecutionListener that calls shutdown on the ThreadPoolTaskExecutor.
I created a JobExecutionListener like this:
@Bean
public JobExecutionListener jobExecutionListener(ThreadPoolTaskExecutor executor) {
return new JobExecutionListener() {
private ThreadPoolTaskExecutor taskExecutor = executor;
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
taskExecutor.shutdown();
}
};
}
and added it to my Job definition like this:
@Bean
public Job partitionedJob(){
return jobBuilders.get("partitionedJob")
.listener(jobExecutionListener(taskExecutor()))
.start(partitionedStep())
.build();
}
这篇关于完成后,分区工作不能自行停止?春批的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!