Spring批量查询状态更改 [英] Spring Batch querying with state changes

查看:167
本文介绍了Spring批量查询状态更改的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Spring数据JPA和Spring Batch使用Spring Boot 1.5.7。我使用

解决方案

JpaPagingItemReader ,对于方法 doReadPage(),您会注意到这一行,



Query query = createQuery()。setFirstResult(getPage()* getPageSize())。setMaxResults(getPageSize());



w这里 createQuery()是as,

  private查询createQuery(){ 
if(queryProvider == null){
return entityManager.createQuery(queryString);
}
else {
return queryProvider.createQuery();






$ b所以你看到查询是重新创建/执行的每个页面但页码不会按照新数据集重新计算,页码重新计算也没有意义。


$ b

getPageSize()总是返回配置中设置的值, getPage()返回最后一次计算的页码(以前处理过的页面+1),所以如果数据正在收缩,如果页码计算也是重新执行的,那么你的程序将正常工作,也就是说,你始终以页面= 0开始,并且不会发生 JpaPagingItemReader ,这样您将在M注释中指定由M Deinum指定的数据。另外,根据我的理解,新数据的添加可以正常工作(即使锁定数据 / strong>通常在工作期间假定)。



我认为,在当前作业运行期间将行标记为 PROCESSED 没有任何用处,因为框架已经考虑到了这一点(作为记录没有得到两次处理)。



您可能需要将记录标记为 PROCESSED 作为下一份作业运行,并且可以通过更新一个单独的标志不是 WHERE 子句的一部分(在作业运行期间),然后在作业结束时更新一个标志,该标志是 WHERE 子句(在 WHERE 子句中用于指示处理的记录)。

I am using Spring Boot 1.5.7 with Spring Data JPA and Spring Batch. I use JpaPagingItemReader<T> to read entities and JpaItemWriter<T> to write them. What I am aiming to do, is read data from a certain database table, convert them to a different format and write them back to different tables (I read raw json strings, deserialize them and insert them to their specific tables).

I don't plan to delete the data I read after processing them, instead I just want to mark them as processed. The question is, will JpaPagingItemReader handle reads well, If I make the query to something like this:

    @Bean
    public ItemReader<RdJsonStore> reader(){
        JpaPagingItemReader<RdJsonStore> reader = new JpaPagingItemReader<>();
        reader.setEntityManagerFactory(entityManagerFactory);
        reader.setQueryString("select e from RdJsonStore e "+
                              "where e.jsonStoreProcessedPointer is null");
        reader.setPageSize(rawDataProperties.getBatchProcessingSize());
        return reader;
    }

So it would read only if there is no pointer to it. I would insert a pointer after processing an entry (in batches, like I process 1000 entry and post all their ids to the pointer table).

Can an ItemWriter (and the JPA one) handle the data read if I make changed to the returned data on the run like this (the entries it tries to query gets reduced with every batch)?

If the pointer solution is not applicable, how should I design the DB-to-DB batch job?

My source table looks like this:

解决方案

If you look at the code of JpaPagingItemReader , for method doReadPage() , you will notice this line,

Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());

where createQuery() is as,

private Query createQuery() {
        if (queryProvider == null) {
            return entityManager.createQuery(queryString);
        }
        else {
            return queryProvider.createQuery();
        }
    }

So you see that query is created / executed afresh for each page but page number is not recalculated as per new data set and page number recalculation doesn't make sense either.

getPageSize() always returns value set in configuration and getPage() returns last calculated page number ( previously processed page + 1 ) so if data is shrinking, your program will work correctly if page number calculation is also done afresh i.e. you always start with page = 0 and that doesn't happen with JpaPagingItemReader so you will loose data as specified by M Deinum in comments.

Also, as per my understanding addition of new data will work OK ( provided new records are added at the end as per sorting keys even though locking of data is usually assumed for during the job run ).

I think, marking a row as PROCESSED during the current job run serves no purpose since that is already taken care by framework ( as a record is not getting processed twice ) .

What you might need is marking a record as PROCESSED for Next Job Run and that can be handled by updating a separate flag which is not part of WHERE clause ( during job run ) and then at the end of the job - update a flag which is part of WHERE clause ( which you use in your WHERE clause to indicate about processed records ).

这篇关于Spring批量查询状态更改的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆