Spring批处理jpaPagingItemReader为什么有些行没有被读取? [英] Spring batch jpaPagingItemReader why some rows are not read?

查看:32
本文介绍了Spring批处理jpaPagingItemReader为什么有些行没有被读取?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spring Batch(3.0.1.RELEASE)/JPA 和 HSQLBD 服务器数据库.我需要浏览整个表格(使用分页)并更新项目(一项一项).所以我使用了 jpaPagingItemReader.但是当我运行作业时,我可以看到跳过了一些行,并且跳过的行数等于页面大小.例如,如果我的表有 12 行并且 jpaPagingItemReader.pagesize = 3 作业将读取:第 1、2、3 行然后是第 7、8、9 行(所以跳过第 4、5、6 行)……你能告诉我我的代码/配置有什么问题吗,或者它可能是 HSQLDB 分页的问题?下面是我的代码:

I 'm using Spring Batch(3.0.1.RELEASE) / JPA and an HSQLBD server database. I need to browse an entire table (using paging) and update items (one by one). So I used a jpaPagingItemReader. But when I run the job I can see that some rows are skipped, and the number of skipped rows is equal to the page size. For i.e. if my table has 12 rows and the jpaPagingItemReader.pagesize = 3 the job will read : lines 1,2,3 then lines 7,8,9 (so skip the lines 4,5,6)… Could you tell me what is wrong in my code/configuration, or maybe it's an issue with HSQLDB paging? Below is my code:

:问题出在我的 ItemProcessor 上,它对 POJO 实体进行了修改.由于 JPAPagingItemReader 在每次读取之间进行刷新,因此更新实体((这就是我想要的).但似乎游标分页也增加了(从日志中可以看出:行 ID 4、5 和 6 已跳过.我该如何处理这个问题?

: The problem is with my ItemProcessor that performs modification to the POJOs Entities. Since JPAPagingItemReader made a flush between each reading, the Entities are updated ((this is what I want) . But it seems that the cursor paging is also incremented (as can be seen in the log: row ID 4, 5 and 6 have been skipped). How can I manage this issue ?

@Configuration
@EnableBatchProcessing(modular=true)
public class AppBatchConfig {
  @Inject
  private InfrastructureConfiguration infrastructureConfiguration;  
  @Inject private JobBuilderFactory jobs;
  @Inject private StepBuilderFactory steps;

  @Bean  public Job job() {
     return jobs.get("Myjob1").start(step1()).build();
  }
  @Bean  public Step step1() {  
      return steps.get("step1")
                .<SNUserPerCampaign, SNUserPerCampaign> chunk(0)
                .reader(reader()).processor(processor()).build();   
  }
  @Bean(destroyMethod = "")
@JobScope 
public ItemStreamReader<SNUserPerCampaign> reader() String trigramme) {
    JpaPagingItemReader reader = new JpaPagingItemReader();
    reader.setEntityManagerFactory(infrastructureConfiguration.getEntityManagerFactory());
    reader.setQueryString("select t from SNUserPerCampaign t where t.isactive=true");
    reader.setPageSize(3));
    return reader;
}
 @Bean @JobScope
 public ItemProcessor<SNUserPerCampaign, SNUserPerCampaign> processor() {   
     return new MyItemProcessor();
 }
}

@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
 @Inject private EntityManagerFactory emf;  
 @Override
public EntityManagerFactory getEntityManagerFactory() {
    return emf;
}
}  

来自我的 ItemProcessor:

from my ItemProcessor:

@Override
public SNUserPerCampaign process(SNUserPerCampaign item) throws Exception {
    //do some stuff …
   //then if (condition) update the Entity pojo :   
   item.setModificationDate(new Timestamp(System.currentTimeMillis());
   item.setIsactive = false;

}

来自 Spring xml 配置文件:

from Spring xml config file:

<tx:annotation-driven transaction-manager="transactionManager" />     
<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
    <property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>

<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
    <property name="url" value="jdbc:hsqldb:hsql://localhost:9001/MYAppDB" />
    <property name="username" value="sa" />
    <property name="password" value="" />
</bean>

跟踪/日志汇总:

11:16:05.728 TRACE MyItemProcessor - item processed: snUserInternalId=1]
11:16:06.038 TRACE MyItemProcessor - item processed: snUserInternalId=2]
11:16:06.350 TRACE MyItemProcessor - item processed: snUserInternalId=3]

11:16:06.674 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.677 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.679 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...

11:16:06.681 DEBUG SQL- select ...etc... from  SNUSER_CAMPAIGN snuserperc0_ 

11:16:06.687 TRACE MyItemProcessor - item processed: snUserInternalId=7]
11:16:06.998 TRACE MyItemProcessor - item processed: snUserInternalId=8]
11:16:07.314 TRACE MyItemProcessor - item processed: snUserInternalId=9]

推荐答案

org.springframework.batch.item.database.JpaPagingItemReader 创建自己的 entityManager 实例

(来自 org.springframework.batch.item.database.JpaPagingItemReader#doOpen):

entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);

如果你在一个事务中,就像看起来一样,读者实体不会分离(来自 org.springframework.batch.item.database.JpaPagingItemReader#doReadPage):

If you are within a transaction, as it seems to be, reader entities are not detached (from org.springframework.batch.item.database.JpaPagingItemReader#doReadPage):

    if (!transacted) {
        List<T> queryResult = query.getResultList();
        for (T entity : queryResult) {
            entityManager.detach(entity);
            results.add(entity);
        }//end if
    } else {
        results.addAll(query.getResultList());
        tx.commit();
    }

因此,当您将一个项目更新到处理器或写入器时,该项目仍由读取器的 entityManager 管理.

For this reason, when you update an item into processor, or writer, this item is still managed by reader's entityManager.

当项目读取器读取下一个数据块时,它会将上下文刷新到数据库.

When the item reader reads the next chunk of data, it flushes the context to the database.

所以,如果我们看看你的情况,在第一块数据处理之后,我们在数据库中:

So, if we look at your case, after the first chunk of data processes, we have in database:

|id|active
|1 | false
|2 | false
|3 | false

org.springframework.batch.item.database.JpaPagingItemReader 使用 limit &用于检索分页数据的偏移量.所以读者创建的下一个选择看起来像:

org.springframework.batch.item.database.JpaPagingItemReader uses limit & offset to retrieve paginated data. So the next select created by the reader looks like :

select * from table where active = true offset 3 limits 3. 

Reader 将错过 id 为 4、5、6 的项目,因为它们现在是数据库检索到的第一行.

Reader will miss the items with id 4,5,6, because they are now the first rows retrieved by database.

作为一种解决方法,您可以做的是使用 jdbc 实现(org.springframework.batch.item.database.JdbcPagingItemReader),因为它不使用 limit &抵消.它基于已排序的列(通常是 id 列),因此您不会遗漏任何数据.当然,您必须将数据更新到编写器中(使用 JPA 或纯 JDBC 实现)

What you can do, as a workaround, is to use jdbc implementation (org.springframework.batch.item.database.JdbcPagingItemReader) as it does not use limit & offset. It is based on a sorted column (typically the id column), so you will not miss any data. Of course, you will have to update your data into the writer (using either JPA ou pure JDBC implementation)

阅读器会更详细:

@Bean
public ItemReader<? extends Entity> reader() {
    JdbcPagingItemReader<Entity> reader = new JdbcPagingItemReader<Entity>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
    sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
    sqlPagingQueryProviderFactoryBean.setSelectClause("select *");
    sqlPagingQueryProviderFactoryBean.setFromClause("from <your table name>");
    sqlPagingQueryProviderFactoryBean.setWhereClause("where active = true");
    sqlPagingQueryProviderFactoryBean.setSortKey("id");
    try {
        reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
    } catch (Exception e) {
        e.printStackTrace();
    }
    reader.setDataSource(dataSource);
    reader.setPageSize(3);
    reader.setRowMapper(new BeanPropertyRowMapper<Entity>(Entity.class));
    return reader;

这篇关于Spring批处理jpaPagingItemReader为什么有些行没有被读取?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆