Spring Batch分区联接覆盖RowMapper值,从而获得单个数组而不是多个数组 [英] Spring Batch Partitions Join overriding the RowMapper Values hence getting single array instead of multiple

查看:15
本文介绍了Spring Batch分区联接覆盖RowMapper值,从而获得单个数组而不是多个数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Spring BatchPostgres读取数据并将其写入MongoDB。在我的示例中,Employee有3种不同类型的电子邮件地址1)家庭地址2)办公室地址3)员工电子邮件表中的社交地址。

因为我们在DB中几乎有10 lacs名员工,因此使用自定义分区Postgres中提取数据,并使用Employee_Email(后来的Employee_Phone也是如此)进行连接,以便在Processor中将有Mongo Pojo的映射并保存到MongoDB中。

现在的问题是,我需要将员工电子邮件记录作为数组嵌入到联系人中,但使用当前逻辑将其另存为单独的集合

我们如何解决此问题?

select * from root.employees c
full outer join root.employee_email ce
on c.employee_id = ce.employee_id
order by c.employee_id limit 1000 offset 0;
现在,当数据保存到数据库中时,只有电子邮件会被保存,其他两个似乎会被覆盖。

我需要如何处理,看起来EmployeeRowMapper正在覆盖所有其他电子邮件地址。我将如何解决此问题?

Employee.Job

@Configuration
public class EmployeeJob {
    private static Logger logger = LoggerFactory.getLogger(EmployeeJob.class);

    private static final Integer CHUNK_SIZE = 1000;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public EmployeesPartitions EmployeesPartition() {
        return new EmployeesPartitions();
    }

    @Bean
    public EmployeesJobListener EmployeesJobListener() {
        return new EmployeesJobListener();
    }

    @Bean("readEmployeeJob")
    public Job readEmployeeJob() throws Exception {
        return jobBuilderFactory.get("readEmployeeJob")
                .incrementer(new RunIdIncrementer())
                .start(EmployeeStepOne())
                .listener(EmployeesJobListener())
                .build();
    }

    @Bean
    public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");
        taskExecutor.setConcurrencyLimit(Runtime.getRuntime().availableProcessors());
        taskExecutor.setThreadGroupName("Employees-Thread");
        taskExecutor.setDaemon(false);
        taskExecutor.setThreadPriority(5);
        return taskExecutor;
    }

    @Bean
    public Step EmployeeStepOne() throws Exception {            
        return stepBuilderFactory.get("EmployeeStepOne")
                .partitioner(slaveStep().getName(), EmployeesPartition())
                .step(slaveStep())
                .gridSize(10)
                .taskExecutor(simpleAsyncTaskExecutor())
                .build();
    }

    // slave step
    @Bean
    public Step slaveStep() throws Exception {
        return stepBuilderFactory.get("slaveStep")
                .<EmployeesDTO, EmployeesDTO>chunk(CHUNK_SIZE)
                .reader(EmployeeReader(null, null))
                .writer(EmployeeWriter())
                .build();
    }


    // Readers
    @Bean(destroyMethod = "")
    @StepScope
    public JdbcCursorItemReader<EmployeesDTO> EmployeeReader(
            @Value("#{stepExecutionContext['limit']}") Long limit,
            @Value("#{stepExecutionContext['offset']}") Long offset) throws Exception {

        String sql = "select * from root.Employees c "
                + "full outer join root.Employee_email ce "
                + "on c.Employee_id = ce.Employee_id "
                + "order by c.Employee_id limit " + limit +" offset "+ offset;
        logger.info("Employees SQL = {} ", sql);
        JdbcCursorItemReader<EmployeesDTO> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setSql(sql);
        reader.setRowMapper(new EmployeeRowMapper());
        reader.afterPropertiesSet();
        return reader;
    }

    // Processors
    @Bean
    public ItemProcessor<EmployeesDTO, EmployeesDTO> EmployeeProcessor() {
        return new EmployeesProcessor();
    }

    // Writers
    @Bean
    public ItemWriter<EmployeesDTO> EmployeeWriter() {
        return new EmployeeWriter();
    }
}

RowMapper.java

public class EmployeeRowMapper implements RowMapper<Employee> {

    @Override
    public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
        // EmployeeEmail email = new EmployeeEmail();
        ....
        ....
        ....
        ....
        ....
        List<EmployeeEmail> employeeEmails = new ArrayList<>();
        employeeEmails.add(email);

        Employee dto = Employee.builder()
                .businessTitle(rs.getString(""))
                ...........
                ...........
                ...........
                ...........
                ...........
                ...........
                ...........
                .employeeEmails(employeeEmails)
                .build();

        return dto;
    }
}

推荐答案

ARowMapper用于将单个数据库行映射到POJO。因此,除非每行包含不同列中的所有电子邮件(例如id,name,email1,email2,email3),否则您尝试执行的操作将不起作用。

如果每个项目都有3行电子邮件,则需要使查询仅返回id,name,并使用另一个查询来抓取电子邮件。此附加查询可以在映射器本身或项目处理器中完成,如driving query pattern中所述。

这篇关于Spring Batch分区联接覆盖RowMapper值,从而获得单个数组而不是多个数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆