春季批处理不处理所有记录 [英] spring batch not processing all records
问题描述
我正在使用Spring Batch使用RepositoryItemReader从Postgresql DB中读取记录,然后将其写入主题.我看到大约有100万条记录需要处理,但它并没有处理所有记录.我已将阅读器的pageSize设置为10,000,并且与提交间隔(块大小)相同
I am using spring batch to read records from postgresql DB using RepositoryItemReader and then write it to a topic. I see that there were around 1 million records which had to be processed but it didn't process all the records. I have set pageSize for reader as 10,000 and same as commit interval (chunk size)
@Bean
public TaskletStep broadcastProductsStep(){
return stepBuilderFactory.get("broadcastProducts")
.<Product, Product> chunk(10000)
.reader(productsReader.repositoryItemReader())
.processor(productsProcessor)
.writer(compositeItemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(100000)
.processorNonTransactional()
.listener(new SkipListenerProducts())
.listener(productsChunkListener)
.build();
}
@Bean
public RepositoryItemReader repositoryItemReader() {
RepositoryItemReader<Product> repositoryReader = new RepositoryItemReader<>();
try {
repositoryReader.setRepository(skuRepository);
repositoryReader.setMethodName("findByIsUpdatedAndStatusCodeIn");
repositoryReader.setPageSize(10000);
repositoryReader.setSaveState(false);
List<List<String>> arguments = new ArrayList<>();
arguments.add(Stream.of(SkuStatus.RELEASED.getValue().toString(), SkuStatus.BLOCKED.getValue().toString(),
SkuStatus.DISCONTINUED.getValue().toString())
.collect(Collectors.toList()));
repositoryReader.setArguments(arguments);
Map sorts = new HashMap();
sorts.put("catalog_number", Sort.Direction.ASC);
repositoryReader.setSort(sorts);
repositoryReader.afterPropertiesSet();
} catch (Exception exception){
exception.printStackTrace();
}
return repositoryReader;
}
@Query(value = "SELECT * FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
countQuery = "SELECT COUNT(*) FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
nativeQuery = true)
public Page<Product> findByIsUpdatedAndStatusCodeIn(@Param(value = "statusCode") List<String> statusCode,
Pageable pageable);
推荐答案
问题可能是您在混用分页并根据阅读器查询(IS_UPDATED)的条件进行更新.
The problem is probably that you're mixing pagination and update on the criteria of the reader query (IS_UPDATED).
示例,页面大小= 2,db中有6行
Example with page size = 2 and 6 lines in db
- IS_UPDATED = true
- B IS_UPDATED = true
- C IS_UPDATED = true
- D IS_UPDATED = true
- E IS_UPDATED = true
- F IS_UPDATED = true
第一页读取= 1条返回线A和B
First read page = 1 return lines A and B
执行写程序后(将A& B的IS_UPDATED设置为false),我们在db中:
After writer execution (set IS_UPDATED to false for A & B), we have in db :
- C IS_UPDATED = true
- D IS_UPDATED = true
- E IS_UPDATED = true
- F IS_UPDATED = true
二读将移至第2页,因此将占用E行和&行.F ,而不是C&D
Second read will move to page 2 so it will take line E & F and not C & D
任何一个:
- 您不应更新IS_UPDATED列.
- 或者您创建
RepositoryItemReader
的子类,并在其中重写getPage
- you should not update the IS_UPDATED column.
- Or you create a subclass of
RepositoryItemReader
and where you override getPage
@Override
public int getPage() {
return 0;
}
选项2对于批处理崩溃/错误更具适应性,但是您必须确保在编写器中IS_UPDATED始终设置为false,否则读取器将无限期循环.
Option 2 is more resilient to batch crash / error but you have to make sure IS_UPDATED is always set to false in your writer otherwise the reader will indefinitely loop.
如果您使用的是多线程步骤,那么选项2也将需要更多的调整.
Option 2 will also need more tuning if you're using multithreaded step.
这篇关于春季批处理不处理所有记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!