Spring 批处理 DB 到 JSON 文件 [英] Spring batch DB to JSON files

查看:36
本文介绍了Spring 批处理 DB 到 JSON 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题似乎与 这个 但它不是

This question might seem to be a duplicate of this but it is not

我的要求是使用 JdbcPagingItemReader 从数据库读取数据并处理单个记录以进行一些额外的处理,并在 writer 中为每个处理的项目创建单独的 json 文件,文件名为 id_of_record_json_fie.txt

My requirement is to read data from db using JdbcPagingItemReader and process individual records for some additional processing and in writer create individual json files for each processed item with file name id_of_record_json_fie.txt

例如,如果阅读器读取 100 条记录,则必须创建 100 个 JSON 文件

For example if reader reads 100 records then 100 JSON files has to be created

最好的方法是什么,我们可以使用 spring 批处理吗?

What is the best way to do this, Can we use spring batch for this ?

更新 1-:

根据@Mahmoud 的回答,可以使用 tasklet,我也尝试在面向块的步骤中实现自定义 itemwriter,这似乎也有效

As per @Mahmoud answer, tasklet can be used , I have also tried implementing custom itemwriter in a chunk oriented step , this also seems to work

      @Override
        public void write(final List<? extends Person> persons) throws Exception {
            
            for (Person  person: persons) {
                objectMapper.writeValue(new File("D:/cp/dataTwo.json"), person);
            }
            
        }

推荐答案

使用面向块的 tasklet 是行不通的,因为会有一个单独的项目编写器,资源是预先设置的,并且会在整个过程中修复步骤.使用复合项编写器可能可行,但您需要知道要预先创建和配置多少个不同的编写器.

Using a chunk-oriented tasklet won't work, because there will be a single item writer on which the resource is set upfront and will be fixed during the entire step. Using a composite item writer might work but you need to know how many distinct writers to create and configure upfront.

我看到的最直接的选择是使用 tasklet,例如:

The most straightforward option I see is to use a tasklet, something like:

import java.util.Collections;
import java.util.HashMap;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public JdbcPagingItemReader<Person> itemReader() {
        return new JdbcPagingItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .beanRowMapper(Person.class)
                .selectClause("select *")
                .fromClause("from person")
                .sortKeys(new HashMap<String, Order>() {{ put("id", Order.DESCENDING);}})
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .tasklet(new MyTasklet(itemReader()))
                        .build())
                .build();
    }
    
    private static class MyTasklet implements Tasklet {

        private boolean readerInitialized;
        private JdbcPagingItemReader<Person> itemReader;

        public MyTasklet(JdbcPagingItemReader<Person> itemReader) {
            this.itemReader = itemReader;
        }

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
            if (!readerInitialized) {
                itemReader.open(executionContext);
                readerInitialized = true;
            }
            Person person = itemReader.read();
            if (person == null) {
                itemReader.close();
                return RepeatStatus.FINISHED;
            }
            // process the item
            process(person);
            // write the item in its own file (dynamically generated at runtime)
            write(person, executionContext);
            // save current state in execution context: in case of restart after failure, the job would resume where it left off.
            itemReader.update(executionContext);
            return RepeatStatus.CONTINUABLE;
        }

        private void process(Person person) {
            // do something with the item
        }
        
        private void write(Person person, ExecutionContext executionContext) throws Exception {
            FlatFileItemWriter<Person> itemWriter = new FlatFileItemWriterBuilder<Person>()
                    .resource(new FileSystemResource("person" + person.getId() + ".csv"))
                    .name("personItemWriter")
                    .delimited()
                    .names("id", "name")
                    .build();
            itemWriter.open(executionContext);
            itemWriter.write(Collections.singletonList(person));
            itemWriter.close();
        }
        
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    @Bean
    public DataSource dataSource() {
        EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
        JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
        jdbcTemplate.execute("create table person (id int primary key, name varchar(20));");
        for (int i = 1; i <= 10; i++) {
            jdbcTemplate.execute(String.format("insert into person values (%s, 'foo%s');", i, i));
        }
        return embeddedDatabase;
    }

    static class Person {
        private int id;
        private String name;

        public Person() {
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String toString() {
            return "Person{id=" + id + ", name='" + name + '\'' + '}';
        }
    }

}

此示例从 db 表中读取 10 个人并生成 10 个 csv 文件(person1.csvperson2.csv 等)

This example reads 10 persons from a db table and generates 10 csv files (person1.csv, person2.csv, etc)

这篇关于Spring 批处理 DB 到 JSON 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆