使用CompositeItemWriter时,未调用编写器或分类方法 [英] Using CompositeItemWriter the writer or classify method is not getting called

查看:11
本文介绍了使用CompositeItemWriter时,未调用编写器或分类方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spring boot编写一个Spring批处理,并且我需要根据条件写入两个不同的表,因此我尝试CompositeItemWriter但是,当我调用批处理时,编写器没有被调用。

这是我的作业配置类。

@Configuration
public class JobConfiguration {

    ...
    ...
    ...

    @Bean
    public JdbcCursorItemReader<Notification> reader() {
        JdbcCursorItemReader<Notification> reader = new JdbcCursorItemReader<Notification>();
        reader.setDataSource(dataSource);

    ...
    ...
        reader.setRowMapper(new BeanPropertyRowMapper<>(Notification.class));
        return reader;
    }

    @Bean
    public NotificationItemProcessor notificatonProcessor() {
        return new NotificationItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Notification> updateWriter() {
        JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
        ...
        writer.setDataSource(dataSource);
        return writer;
    }


    /**
     * Composite Exchange Writer
     * @return
     * @throws InstantiationException
     * @throws IllegalAccessException
     */
    @SuppressWarnings("unchecked")
    @Bean
    public CompositeItemWriter<Notification> compositeExchangeWriter() throws InstantiationException, IllegalAccessException {
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_NOTIFICATION.getActionName(), exchangeWorkflowWriter());
        map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_PACK.getActionName(), exchangeWriter());
        map.put(ExchangeRouter.DO_NOTHING.getActionName(), doNothing());
        return new CompositeItemWriterBuilder(map, ExchangeWriterRouterClassifier.class).build();
    }

    @Bean
    public JdbcBatchItemWriter<Notification> exchangeWorkflowWriter() {
        JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());

        writer.setSql(" INSERT INTO SOME TABLE..");

        writer.setDataSource(dataSource);
        return writer;
    }

    @Bean
    public JdbcBatchItemWriter<Notification> exchangeWriter() {
        JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());

        writer.setSql("INSERT INTO SOME OTHER TABLE.");

        writer.setDataSource(dataSource);
        return writer;
    }

    @Bean
    public ItemWriter<Document> doNothing() {
        return new DummyWriter();
    }

    @Bean
    public Job generatePdf(JobCompletionNotificationListener listener) throws InstantiationException, IllegalAccessException {
        return jobBuilderFactory.get("generatePdf")
                .incrementer(new RunIdIncrementer())
                .flow(treatStock())
                .end()
                .build();
    }

    @Bean
    public Step treatStock() throws InstantiationException, IllegalAccessException {
        return stepBuilderFactory.get("treatStock")
                .<Notification, Notification>chunk(1)
                .reader(reader())
                .processor(notificatonProcessor())
                .writer(compositeExchangeWriter())
                .writer(updateWriter())
                .build();
    }

}

CompositeItemWriter.java

public class CompositeItemWriterBuilder extends CompositeItemBuilder<CompositeItemWriter> {

    public CompositeItemWriterBuilder(HashMap<String, Object> matcherMap, Class<?> routerDelegate) throws InstantiationException, IllegalAccessException {
        BackToBackPatternClassifier classif = new BackToBackPatternClassifier();
        classif.setRouterDelegate(routerDelegate.newInstance());
        classif.setMatcherMap(matcherMap);

        ClassifierCompositeItemWriter classifier = new ClassifierCompositeItemWriter();
        classifier.setClassifier(classif);

        this.delegates.add(classifier);

    }

    public CompositeItemWriterBuilder(List<Object> delegates) {
        this.delegates = delegates;
    }

    @Override
    protected Class<?> getCompositeItem() {
        return CompositeItemWriter.class;
    }
}

CompositeItemBuiler.java

public abstract class CompositeItemBuilder<T> {

    protected List<Object> delegates = new ArrayList<Object>();

    @SuppressWarnings("unchecked")
    public T build() throws InstantiationException, IllegalAccessException {

        Object compositeItem = getCompositeItem().newInstance();
        Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
        ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);

        return (T) compositeItem;
    }

    abstract protected Class<?> getCompositeItem();
}

ExchangeWriterRouter分类器.java(未调用分类方法)

public class ExchangeWriterRouterClassifier {

    @Classifier
    public String classify(Notification notification) {
        return notification.getExchangesWorkflow().getRouter().getActionName();
    }

}

Spring如何调用分类器? 我是不是遗漏了什么?

推荐答案

我正在尝试CompositeItemWriter,但是,当我调用批处理时,未调用编写器。

问题在您的步骤定义中:

@Bean
public Step treatStock() throws InstantiationException, IllegalAccessException {
    return stepBuilderFactory.get("treatStock")
            .<Notification, Notification>chunk(1)
            .reader(reader())
            .processor(notificatonProcessor())
            .writer(compositeExchangeWriter())
            .writer(updateWriter())
            .build();
}
您正在调用writer()方法两次,因此updateWriter()将覆盖compositeExchangeWriter()。您需要使用复合编写器作为已经设置委托编写器的参数来调用该方法一次。

在使用复合编写器时需要注意,如果委托没有实现ItemStream接口,则需要确保将它们注册为流。有关这方面的更多详细信息,请单击此处:https://docs.spring.io/spring-batch/4.0.x/reference/html/readersAndWriters.html#delegatePatternAndRegistering

Spring如何调用分类器?

正确配置ClassifierCompositeItemWriter后,Spring Batch将对每个项调用分类器以确定要使用哪个编写器,然后调用相应的编写器来编写该项。

在您的配置中,ClassifierCompositeItemWriter未在此处正确配置:

    @SuppressWarnings("unchecked")
public T build() throws InstantiationException, IllegalAccessException {

    Object compositeItem = getCompositeItem().newInstance();
    Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
    ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);

    return (T) compositeItem;
}

我不会使用反射来设置委托。问题是您期望方法compositeExchangeWriter注册ClassifierCompositeItemWriter,但它的返回类型是CompositeItemWriter。因此复合编写器不会被视为分类器。

您可以在这里找到ClassifierCompositeItemWriter的使用示例:https://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/ClassifierCompositeItemWriterTests.java#L44

这篇关于使用CompositeItemWriter时,未调用编写器或分类方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆