春季批处理jpaPagingItemReader为什么不读取某些行? [英] Spring batch jpaPagingItemReader why some rows are not read?

查看:318
本文介绍了春季批处理jpaPagingItemReader为什么不读取某些行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spring Batch(3.0.1.RELEASE)/JPA和HSQLBD服务器数据库. 我需要浏览整个表(使用分页)并更新项目(一个接一个).因此,我使用了jpaPagingItemReader.但是当我运行作业时,我可以看到跳过了一些行,并且跳过的行数等于页面大小.例如,如果我的表有12行并且jpaPagingItemReader.pagesize = 3,则作业将读取:第1,2,3行,然后第7,8,9行(因此跳过第4,5,6行)… 您能告诉我我的代码/配置出了什么问题吗,还是HSQLDB分页有问题? 下面是我的代码:

I 'm using Spring Batch(3.0.1.RELEASE) / JPA and an HSQLBD server database. I need to browse an entire table (using paging) and update items (one by one). So I used a jpaPagingItemReader. But when I run the job I can see that some rows are skipped, and the number of skipped rows is equal to the page size. For i.e. if my table has 12 rows and the jpaPagingItemReader.pagesize = 3 the job will read : lines 1,2,3 then lines 7,8,9 (so skip the lines 4,5,6)… Could you tell me what is wrong in my code/configuration, or maybe it's an issue with HSQLDB paging? Below is my code:

:问题是我的ItemProcessor对POJO实体进行了修改.由于JPAPagingItemReader在每次读取之间进行刷新,因此Entities会更新((这就是我想要的).但是,似乎光标分页也增加了(如在日志中所见:行ID 4、5和6已被删除跳过).如何处理此问题?

: The problem is with my ItemProcessor that performs modification to the POJOs Entities. Since JPAPagingItemReader made a flush between each reading, the Entities are updated ((this is what I want) . But it seems that the cursor paging is also incremented (as can be seen in the log: row ID 4, 5 and 6 have been skipped). How can I manage this issue ?

@Configuration
@EnableBatchProcessing(modular=true)
public class AppBatchConfig {
  @Inject
  private InfrastructureConfiguration infrastructureConfiguration;  
  @Inject private JobBuilderFactory jobs;
  @Inject private StepBuilderFactory steps;

  @Bean  public Job job() {
     return jobs.get("Myjob1").start(step1()).build();
  }
  @Bean  public Step step1() {  
      return steps.get("step1")
                .<SNUserPerCampaign, SNUserPerCampaign> chunk(0)
                .reader(reader()).processor(processor()).build();   
  }
  @Bean(destroyMethod = "")
@JobScope 
public ItemStreamReader<SNUserPerCampaign> reader() String trigramme) {
    JpaPagingItemReader reader = new JpaPagingItemReader();
    reader.setEntityManagerFactory(infrastructureConfiguration.getEntityManagerFactory());
    reader.setQueryString("select t from SNUserPerCampaign t where t.isactive=true");
    reader.setPageSize(3));
    return reader;
}
 @Bean @JobScope
 public ItemProcessor<SNUserPerCampaign, SNUserPerCampaign> processor() {   
     return new MyItemProcessor();
 }
}

@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
 @Inject private EntityManagerFactory emf;  
 @Override
public EntityManagerFactory getEntityManagerFactory() {
    return emf;
}
}  

来自我的ItemProcessor:

from my ItemProcessor:

@Override
public SNUserPerCampaign process(SNUserPerCampaign item) throws Exception {
    //do some stuff …
   //then if (condition) update the Entity pojo :   
   item.setModificationDate(new Timestamp(System.currentTimeMillis());
   item.setIsactive = false;

}

从Spring xml配置文件:

from Spring xml config file:

<tx:annotation-driven transaction-manager="transactionManager" />     
<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
    <property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>

<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
    <property name="url" value="jdbc:hsqldb:hsql://localhost:9001/MYAppDB" />
    <property name="username" value="sa" />
    <property name="password" value="" />
</bean>

跟踪/日志摘要:

11:16:05.728 TRACE MyItemProcessor - item processed: snUserInternalId=1]
11:16:06.038 TRACE MyItemProcessor - item processed: snUserInternalId=2]
11:16:06.350 TRACE MyItemProcessor - item processed: snUserInternalId=3]

11:16:06.674 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.677 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.679 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...

11:16:06.681 DEBUG SQL- select ...etc... from  SNUSER_CAMPAIGN snuserperc0_ 

11:16:06.687 TRACE MyItemProcessor - item processed: snUserInternalId=7]
11:16:06.998 TRACE MyItemProcessor - item processed: snUserInternalId=8]
11:16:07.314 TRACE MyItemProcessor - item processed: snUserInternalId=9]

推荐答案

org.springframework.batch.item.database.JpaPagingItemReader创建的是自己的entityManager实例

org.springframework.batch.item.database.JpaPagingItemReader creates is own entityManager instance

(来自org.springframework.batch.item.database.JpaPagingItemReader#doOpen):

(from org.springframework.batch.item.database.JpaPagingItemReader#doOpen) :

entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);

如果您正在事务中(看起来如此),则不会分离阅读器实体 (来自org.springframework.batch.item.database.JpaPagingItemReader#doReadPage):

If you are within a transaction, as it seems to be, reader entities are not detached (from org.springframework.batch.item.database.JpaPagingItemReader#doReadPage):

    if (!transacted) {
        List<T> queryResult = query.getResultList();
        for (T entity : queryResult) {
            entityManager.detach(entity);
            results.add(entity);
        }//end if
    } else {
        results.addAll(query.getResultList());
        tx.commit();
    }

由于这个原因,当您将某个项目更新为处理器或编写器时,该项目仍由读取者的entityManager管理.

For this reason, when you update an item into processor, or writer, this item is still managed by reader's entityManager.

项目读取器读取下一个数据块时,会将上下文刷新到数据库.

When the item reader reads the next chunk of data, it flushes the context to the database.

因此,如果我们看您的情况,则在第一批数据处理之后,我们进入数据库:

So, if we look at your case, after the first chunk of data processes, we have in database:

|id|active
|1 | false
|2 | false
|3 | false

org.springframework.batch.item.database.JpaPagingItemReader使用限制&偏移量以检索分页数据.因此,读者创建的下一个选择看起来像:

org.springframework.batch.item.database.JpaPagingItemReader uses limit & offset to retrieve paginated data. So the next select created by the reader looks like :

select * from table where active = true offset 3 limits 3. 

阅读器将丢失ID为4,5,6的项目,因为它们现在是数据库检索到的第一行.

Reader will miss the items with id 4,5,6, because they are now the first rows retrieved by database.

作为解决方法,您可以使用jdbc实现(org.springframework.batch.item.database.JdbcPagingItemReader),因为它不使用limit&抵消.它基于已排序的列(通常是id列),因此您不会丢失任何数据. 当然,您将必须将数据更新到writer中(使用JPA或纯JDBC实现)

What you can do, as a workaround, is to use jdbc implementation (org.springframework.batch.item.database.JdbcPagingItemReader) as it does not use limit & offset. It is based on a sorted column (typically the id column), so you will not miss any data. Of course, you will have to update your data into the writer (using either JPA ou pure JDBC implementation)

阅读器会更详细:

@Bean
public ItemReader<? extends Entity> reader() {
    JdbcPagingItemReader<Entity> reader = new JdbcPagingItemReader<Entity>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
    sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
    sqlPagingQueryProviderFactoryBean.setSelectClause("select *");
    sqlPagingQueryProviderFactoryBean.setFromClause("from <your table name>");
    sqlPagingQueryProviderFactoryBean.setWhereClause("where active = true");
    sqlPagingQueryProviderFactoryBean.setSortKey("id");
    try {
        reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
    } catch (Exception e) {
        e.printStackTrace();
    }
    reader.setDataSource(dataSource);
    reader.setPageSize(3);
    reader.setRowMapper(new BeanPropertyRowMapper<Entity>(Entity.class));
    return reader;

这篇关于春季批处理jpaPagingItemReader为什么不读取某些行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆