使用CompositeItemWriter时,未调用编写器或分类方法 [英] Using CompositeItemWriter the writer or classify method is not getting called
问题描述
我正在使用Spring boot编写一个Spring批处理,并且我需要根据条件写入两个不同的表,因此我尝试CompositeItemWriter
但是,当我调用批处理时,编写器没有被调用。
这是我的作业配置类。
@Configuration
public class JobConfiguration {
...
...
...
@Bean
public JdbcCursorItemReader<Notification> reader() {
JdbcCursorItemReader<Notification> reader = new JdbcCursorItemReader<Notification>();
reader.setDataSource(dataSource);
...
...
reader.setRowMapper(new BeanPropertyRowMapper<>(Notification.class));
return reader;
}
@Bean
public NotificationItemProcessor notificatonProcessor() {
return new NotificationItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Notification> updateWriter() {
JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
...
writer.setDataSource(dataSource);
return writer;
}
/**
* Composite Exchange Writer
* @return
* @throws InstantiationException
* @throws IllegalAccessException
*/
@SuppressWarnings("unchecked")
@Bean
public CompositeItemWriter<Notification> compositeExchangeWriter() throws InstantiationException, IllegalAccessException {
HashMap<String, Object> map = new HashMap<String, Object>();
map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_NOTIFICATION.getActionName(), exchangeWorkflowWriter());
map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_PACK.getActionName(), exchangeWriter());
map.put(ExchangeRouter.DO_NOTHING.getActionName(), doNothing());
return new CompositeItemWriterBuilder(map, ExchangeWriterRouterClassifier.class).build();
}
@Bean
public JdbcBatchItemWriter<Notification> exchangeWorkflowWriter() {
JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
writer.setSql(" INSERT INTO SOME TABLE..");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public JdbcBatchItemWriter<Notification> exchangeWriter() {
JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
writer.setSql("INSERT INTO SOME OTHER TABLE.");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public ItemWriter<Document> doNothing() {
return new DummyWriter();
}
@Bean
public Job generatePdf(JobCompletionNotificationListener listener) throws InstantiationException, IllegalAccessException {
return jobBuilderFactory.get("generatePdf")
.incrementer(new RunIdIncrementer())
.flow(treatStock())
.end()
.build();
}
@Bean
public Step treatStock() throws InstantiationException, IllegalAccessException {
return stepBuilderFactory.get("treatStock")
.<Notification, Notification>chunk(1)
.reader(reader())
.processor(notificatonProcessor())
.writer(compositeExchangeWriter())
.writer(updateWriter())
.build();
}
}
CompositeItemWriter.java
public class CompositeItemWriterBuilder extends CompositeItemBuilder<CompositeItemWriter> {
public CompositeItemWriterBuilder(HashMap<String, Object> matcherMap, Class<?> routerDelegate) throws InstantiationException, IllegalAccessException {
BackToBackPatternClassifier classif = new BackToBackPatternClassifier();
classif.setRouterDelegate(routerDelegate.newInstance());
classif.setMatcherMap(matcherMap);
ClassifierCompositeItemWriter classifier = new ClassifierCompositeItemWriter();
classifier.setClassifier(classif);
this.delegates.add(classifier);
}
public CompositeItemWriterBuilder(List<Object> delegates) {
this.delegates = delegates;
}
@Override
protected Class<?> getCompositeItem() {
return CompositeItemWriter.class;
}
}
CompositeItemBuiler.java
public abstract class CompositeItemBuilder<T> {
protected List<Object> delegates = new ArrayList<Object>();
@SuppressWarnings("unchecked")
public T build() throws InstantiationException, IllegalAccessException {
Object compositeItem = getCompositeItem().newInstance();
Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);
return (T) compositeItem;
}
abstract protected Class<?> getCompositeItem();
}
ExchangeWriterRouter分类器.java(未调用分类方法)
public class ExchangeWriterRouterClassifier {
@Classifier
public String classify(Notification notification) {
return notification.getExchangesWorkflow().getRouter().getActionName();
}
}
Spring如何调用分类器? 我是不是遗漏了什么?
推荐答案
我正在尝试CompositeItemWriter,但是,当我调用批处理时,未调用编写器。
问题在您的步骤定义中:
@Bean
public Step treatStock() throws InstantiationException, IllegalAccessException {
return stepBuilderFactory.get("treatStock")
.<Notification, Notification>chunk(1)
.reader(reader())
.processor(notificatonProcessor())
.writer(compositeExchangeWriter())
.writer(updateWriter())
.build();
}
您正在调用writer()
方法两次,因此updateWriter()
将覆盖compositeExchangeWriter()
。您需要使用复合编写器作为已经设置委托编写器的参数来调用该方法一次。
在使用复合编写器时需要注意,如果委托没有实现ItemStream
接口,则需要确保将它们注册为流。有关这方面的更多详细信息,请单击此处:https://docs.spring.io/spring-batch/4.0.x/reference/html/readersAndWriters.html#delegatePatternAndRegistering
Spring如何调用分类器?
正确配置ClassifierCompositeItemWriter
后,Spring Batch将对每个项调用分类器以确定要使用哪个编写器,然后调用相应的编写器来编写该项。
在您的配置中,ClassifierCompositeItemWriter
未在此处正确配置:
@SuppressWarnings("unchecked")
public T build() throws InstantiationException, IllegalAccessException {
Object compositeItem = getCompositeItem().newInstance();
Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);
return (T) compositeItem;
}
我不会使用反射来设置委托。问题是您期望方法compositeExchangeWriter
注册ClassifierCompositeItemWriter
,但它的返回类型是CompositeItemWriter
。因此复合编写器不会被视为分类器。
您可以在这里找到ClassifierCompositeItemWriter
的使用示例:https://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/ClassifierCompositeItemWriterTests.java#L44
这篇关于使用CompositeItemWriter时,未调用编写器或分类方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!