将消息转换为作业,使其与批处理进行 Spring 集成 [英] Convert Message to Job to make it Spring Integration with Batch Processing

查看:31
本文介绍了将消息转换为作业,使其与批处理进行 Spring 集成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试以批处理方式使用 Spring Integration 处理一系列文件.我有这个非常旧的 xml,它试图将消息转换为作业

I am trying to process a series of files using Spring Integration in a batch fashion. I have this very old xml which tries to convert the messages into jobs

    <int:transformer ref="messageToJobTransformer"/>
    <batch-int:job-launching-gateway job-launcher="jobLauncher"/>

messageToJobTransformer 是一个可以将 Message 转换为 Job 的类.问题是我不知道这个文件现在在哪里,我也不想要一个 xml 配置.我希望它是纯 Java DSL.这是我的简单配置.

The messageToJobTransformer is a class which can convert a Message into a Job. The problem is I don't know where this file is now neither I want a xml config. I want it to be pure Java DSL. Here is my simple config.

return IntegrationFlows.from(Files.inboundAdapter(directory)
                    .preventDuplicates()
                    .patternFilter("*.txt")))
                    .handle(jobLaunchingGw())                       
                    .get();

这是我的网关 bean.

And here is my bean for the gateway.

@Autowired
private JobLauncher jobLauncher;

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher);   
}

更新批处理配置类.

@Configuration
    @EnableBatchProcessing
public class BatchConfig

{

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;




@Bean
        public ItemReader<String> reader(@Value({jobParameters['input.file.name']}") String filename) throws MalformedURLException
    {
        FlatFileItemReader<String> reader = new FlatFileItemReader<String>();       
        return reader;
    }
@Bean
public Job job() throws MalformedURLException
{
    return jobs.get("job").start(step()).build();
}

@Bean
public Step step() throws MalformedURLException
{
    return steps.get("step").<String, String> chunk(5).reader(reader())
    .writer(writer()).build();
}

@Bean
public ItemWriter<String> writer(@Value("#{jobParameters['input.file.name']}")
{
    FlatFileItemWriter writer = new FlatFileItemWriter();
    return writer;
}



}

推荐答案

您的问题不清楚.JobLaunchingGateway 期望 JobLaunchRequest 作为 payload.

Your question isn't clear. The JobLaunchingGateway expects JobLaunchRequest as a payload.

由于您的集成流程从 Files.inboundAdapter(directory) 开始,我可以假设您有一些作业定义.所以,这里你需要的是一些可以解析文件并返回 JobLaunchRequest 的类.

Since your Integration Flow begins from the Files.inboundAdapter(directory), I can assume that you that you have there some Job definitions. So, what you need here is some class which can parse the file and return JobLaunchRequest.

Spring Batch 参考手册中的类似内容:

Something like this from the Spring Batch Reference Manual:

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

在将该类定义为 @Bean 之后,您可以在 .handle(jobLaunchingGw()).

After the definition that class as a @Bean you can use it from the .transform() EIP-method just before your .handle(jobLaunchingGw()).

更新

@Bean
public FileMessageToJobRequest fileMessageToJobRequest(Job job) {
     FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
     fileMessageToJobRequest.setJob(job);
     fileMessageToJobRequest.setfileParameterName("file");
     return fileMessageToJobRequest;
}
...

@Bean
public IntegrationFlow flowToBatch(FileMessageToJobRequest fileMessageToJobRequest) {

      return IntegrationFlows
            .from(Files.inboundAdapter(directory)
                    .preventDuplicates()
                    .patternFilter("*.txt")))
            .transform(fileMessageToJobRequest)
            .handle(jobLaunchingGw())                       
                    .get();

}

这篇关于将消息转换为作业,使其与批处理进行 Spring 集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆