spring batch - 当块失败时,分区步骤回滚所有先前的块提交 [英] spring batch - Partition Step to rollback all the previous chunk commit 's, when chunk fails

查看:47
本文介绍了spring batch - 当块失败时,分区步骤回滚所有先前的块提交的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spring Batch 使用 MultiResourcePartitioner 处理多个文件,并且所有 itemreader 和 writers 都在 step 范围内.每个 step 运行单个文件并以 1000 为间隔提交到数据库.当当前处理过程中出现任何错误时,所有的之前的提交需要回滚,该步骤将失败.因此文件内容不会添加到数据库中.

I am using spring batch to process multiple files using MultiResourcePartitioner and all the itemreader and writers are in step scope.Each step runs individual files and commits to database at interval of 1000. when there is any error during current processing, all the previous commits needs to be roll backed and the step will fail . Thus the file contents are not added to the database.

  • 使用事务传播作为嵌套.

  • Using Transaction Propogation as NESTED.

使用 Integer.MAXVALUE 在块中设置提交间隔,这将不起作用文件包含大项目并且因堆空间而失败.

Setting commit interval in chunk with Integer.MAXVALUE , this will not work as the file have large items and fail with heap space.

在步骤级别进行事务的任何其他方式.

any other way to have transaction at the step level.

我有如下所示的示例 xml 文件:

I have the sample xml file shown below:

<bean id="filepartitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
    <property name="resources" value="classpath:${filepath}" />
</bean>

<bean id="fileItemReader" scope="step" autowire-candidate="false" parent="itemReaderParent">
        <property name="resource" value="#{stepExecutionContext[fileName]}" />
</bean>

<step id="step1" xmlns="http://www.springframework.org/schema/batch">
    <tasklet transaction-manager="ratransactionManager"   >
        <chunk writer="jdbcItenWriter" reader="fileItemReader" processor="itemProcessor" commit-interval="800" retry-limit="3">
         <retryable-exception-classes>
        <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
     </retryable-exception-classes>
    </chunk>
    <listeners>
        <listener ref="customStepExecutionListener">
        </listener>
    </listeners>
    </tasklet>
    <fail on="FAILED"/>
</step>

更新:

似乎主表(发生直接插入的地方)被其他表和物化视图引用.如果我删除此表中的数据以使用已处理的列指示器删除陈旧记录,则使用 MV 假脱机的数据将显示旧数据.我认为我的要求需要临时表.

UPDATES:

It seems that the main table (where direct insert happens) is referred by other tables and materialized views . if i delete the data in this table to remove stale records using processed column indicator , the data spooled using MV will show old data. i think staging table is needed for my requirement.

为此需求实现暂存数据表

To implement staging data table for this requirement

  • 创建另一个并行步骤来轮询数据库并写入处理后的列值为 Y 的数据.

  • Create another parallel step to poll database and write the data whose processed column value is Y.

在每个文件成功完成后使用 step listener(afterStep 方法)传输数据.

Transfer data at the end of each successful file completion using step listener (afterStep method).

或任何其他建议.

推荐答案

总的来说,我同意 @MichaelLange 的方法.但也许单独的表太多了......您可以在导入表中添加额外的列 completed,如果设置为false",则记录属于正在处理的文件(或处理失败).处理完文件后,您对该表发出一个简单的更新(应该不会失败,因为您对此列没有任何限制):

In general I agree with @MichaelLange approach. But perhaps separate table is too much... You can have additional column completed in your import table, which if set to "false" then the record belongs to file which is being processing now (or failed processing). After you've processed the file you issue a simple update for this table (should not fail as you don't have any constraints on this column):

update import_table set completed = true where file_name = "file001_chunk1.txt"

在处理文件之前,您应该删除陈旧"记录:

Before processing a file you should remove "stale" records:

delete from import_table where file_name = "file001_chunk1.txt"

此解决方案比嵌套事务更快、更容易实现.Perhaps with this approach you will face table locks but with appropriate selection of isolation level this can be minimised.或者,您可能希望在此表上创建一个视图以过滤掉未完成的记录(在 completed 列上启用索引):

This solution would be faster and easier to implement then nested transactions. Perhaps with this approach you will face table locks but with appropriate selection of isolation level this can be minimised. Optionally you may wish to create a view over this table to filter out the non-completed records (enable index on completed column):

create view import_view as select a, b, c from import_table where completed = true

总的来说,我认为在这种情况下不可能嵌套事务,因为块可以在并行线程中处理,每个线程都拥有自己的事务上下文.事务管理器将无法在新线程中启动嵌套事务,即使您以某种方式设法在顶级"作业线程中创建了主事务".

In general I think nested transactions are not possible in this case, as chunks can be processed in parallel threads, each thread holding it's own transaction context. The transaction manager will not be able to start a nested transaction in new thread, even if you somehow manage to create a "main transaction" in "top" job thread.

另一种方法是延续临时表".导入过程应该做的是创建导入表并根据例如命名它们.日期:

Yet another approach is the continuation of the "temporary table". What the import process should do is to create import tables and name them according to e.g. date:

import_table_2011_10_01
import_table_2011_10_02
import_table_2011_10_05
...
etc

以及连接所有这些表的超级视图":

and a "super-veiw" that joins all these tables:

create view import_table as
select * from import_table_2011_10_01
union
select * from import_table_2011_10_02
union
select * from import_table_2011_10_05

导入成功后,需要重新创建超级视图".

After the import succeeded, the "super-view" should be re-created.

使用这种方法,您将难以处理导入表的外键.

With this approach you will have difficulties with foreign keys for import table.

另一种方法是使用单独的数据库进行导入,然后将导入的数据从导入数据库提供给主数据库(例如传输二进制数据).

Yet another approach is to use a separate DB for import and then feed the imported data from the import DB to main (e.g. transfer the binary data).

这篇关于spring batch - 当块失败时,分区步骤回滚所有先前的块提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆