如何在Spring Batch中使用分类器CompositeItemProcessor,并将数据写入同一个表中进行插入和升级? [英] How to use ClassifierCompositeItemProcessor in Spring Batch and write data into same table for Insert and Upsert?

查看:6
本文介绍了如何在Spring Batch中使用分类器CompositeItemProcessor,并将数据写入同一个表中进行插入和升级?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我查看了链接-https://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/ClassifierCompositeItemProcessorTests.java,但没有太多问题。

我正在尝试将ETL Informatica映射逻辑替换到批处理中。我希望将Status=IStatus=U分离到单独的(单独的)处理器中,然后进一步执行查找和消息处理数据,然后将那些记录直接写入表中(状态=I和状态=U),执行另一个复杂的逻辑(如查找、消息和匹配合并逻辑),然后将这些记录再次插入到同一个表中。

我已尝试进行POC,希望将处理器中的记录分开

CustomerSorfier.java

public class CustomerClassifier implements Classifier<Customer, ItemProcessor<Customer, Customer>> {

    private ItemProcessor<Customer, Customer> insertCustomerProcessor;
    private ItemProcessor<Customer, Customer> updateCustomerProcessor;
    
    public CustomerClassifier(ItemProcessor<Customer, Customer> evenCustomerProcessor, ItemProcessor<Customer, Customer> oddCustomerProcessor) {
        this.insertCustomerProcessor= insertCustomerProcessor;
        this.updateCustomerProcessor= updateCustomerProcessor;
    }
    
    @Override
    public ItemProcessor<Customer, Customer> classify(Customer customer) {
        return customer.getStatus().equals("I") ? insertCustomerProcessor : updateCustomerProcessor;
    }
}

OddCustomerProcessor.java

public class OddCustomerProcessor implements ItemProcessor<Customer, Customer> {

    @Override
    public Customer process(Customer item) throws Exception {
        Customer customer = new Customer();
        // Perform some msaaging and lookups here
        customer.setId(item.getId());
        customer.setFirstName(item.getFirstName());
        customer.setLastName(item.getLastName());
        customer.setBirthdate(item.getBirthdate());
        customer.setStatus(item.getStatus());
        return customer;
    }
}

EvenCustomerProcessor.java

public class EvenCustomerProcessor implements ItemProcessor<Customer, Customer> {

    @Override
    public Customer process(Customer item) throws Exception {
        Customer customer = new Customer();
        // Perform some msaaging and lookups here
        customer.setId(item.getId());
        customer.setFirstName(item.getFirstName());
        customer.setLastName(item.getLastName());
        customer.setBirthdate(item.getBirthdate());
        customer.setStatus(item.getStatus());
        return customer;
    }
}

CustomLineAggregator.java

public class CustomLineAggregator implements LineAggregator<Customer> {
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public String aggregate(Customer item) {
        try {
            return objectMapper.writeValueAsString(item);
        } catch (Exception e) {
            throw new RuntimeException("Unable to serialize Customer", e);
        }
    }
}

Customer.java

@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
    private String status;
}

错误-

方法设置分类器(分类器<;?超级客户,ItemProcessor<;?,?扩展Customer&>)类型中的分类器CompositeItemProcessor<;Customer,Customer&>不适用于 参数(客户分类器)

配置

@Configuration
public class JobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Autowired
    private DataSource dataSource;
    
    @Bean
    public JdbcPagingItemReader<Customer> customerPagingItemReader(){
        // reading database records using JDBC in a paging fashion
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setFetchSize(1000);
        reader.setRowMapper(new CustomerRowMapper());
        
        // Sort Keys
        Map<String, Order> sortKeys = new HashMap<>();
        sortKeys.put("id", Order.ASCENDING);
        
        // MySQL implementation of a PagingQueryProvider using database specific features.
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from customer");
        queryProvider.setSortKeys(sortKeys);
        
        reader.setQueryProvider(queryProvider);
        
        return reader;
    }
    
    @Bean
    public EvenCustomerProcessor evenCustomerProcessor() {
        return new EvenCustomerProcessor();
    }
    
    @Bean
    public OddCustomerProcessor oddCustomerProcessor() {
        return new OddCustomerProcessor();
    }
    
    @Bean 
    public JdbcBatchItemWriter<Customer> customerItemWriter(){
    JdbcBatchItemWriter<Customer> batchItemWriter = new JdbcBatchItemWriter<>();
    batchItemWriter.setDataSource(dataSource);
    batchItemWriter.setSql(""); // Query Goes here
    return batchItemWriter;
}
    
    @Bean
    public ClassifierCompositeItemProcessor<Customer, Customer> classifierCustomerCompositeItemProcessor() throws Exception{
        ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor = new ClassifierCompositeItemProcessor<>();
        itemProcessor.setClassifier(new CustomerClassifier(evenCustomerProcessor(), oddCustomerProcessor()));
    }
    
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer> chunk(10)
                .reader(customerPagingItemReader())
                .processor(classifierCustomerCompositeItemProcessor())
                .writer(customerItemWriter())
                .build();
    }
    
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .start(step1())
                .build();
    }
}

推荐答案

您可以删除CustomerClassifier并按如下方式定义复合项处理器:

@Bean
public ClassifierCompositeItemProcessor<Customer, Customer> classifierCustomerCompositeItemProcessor(
        EvenCustomerProcessor evenCustomerProcessor,
        OddCustomerProcessor oddCustomerProcessor
) {
    ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor = new ClassifierCompositeItemProcessor<>();
    itemProcessor.setClassifier(new Classifier<Customer, ItemProcessor<?, ? extends Customer>>() {
        @Override
        public ItemProcessor<?, ? extends Customer> classify(Customer customer) {
            return customer.getStatus().equals("I") ? evenCustomerProcessor : oddCustomerProcessor;
        }
    });
    return itemProcessor;
}

然后按如下方式更新您的步骤定义:

@Bean
public Step step1() throws Exception {
    return stepBuilderFactory.get("step1")
            .<Customer, Customer> chunk(10)
            .reader(customerPagingItemReader())
            .processor(classifierCustomerCompositeItemProcessor(evenCustomerProcessor(), oddCustomerProcessor()))
            .writer(customerItemWriter())
            .build();
}

这篇关于如何在Spring Batch中使用分类器CompositeItemProcessor,并将数据写入同一个表中进行插入和升级?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆