Spring Batch:转换为多线程时出现问题(混合数据) [英] Spring Batch: problems (mix data) when converting to multithread

查看:228
本文介绍了Spring Batch:转换为多线程时出现问题(混合数据)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

也许这是一个经常性的问题,但是我需要根据自己的情况进行一些自定义.

Maybe this is a recurrent question, but I need some customization with my context.

我正在使用Spring Batch 3.0.1.RELEASE

I'm using Spring Batch 3.0.1.RELEASE

我的工作很简单,有一些步骤.第一步是这样的一个块:

I have a simple job with some steps. One step is a chunk like this:

    <tasklet transaction-manager="myTransactionManager">
<batch:chunk reader="myReader" processor="myProcessor" writer="myWriter" commit-interval="${commit.interval}">
</batch:chunk>

<bean id="myProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
    <list>
        <bean class="...MyFirstProcessor">
        </bean>
        <bean class="...MySecondProcessor">
        </bean>
    </list>
</property>

  • 阅读器:JdbcCursorItemReader
  • 处理器:包含我的代表的CompositeProcessor
  • 作家:CompositeWriter和我的代表
  • Reader: JdbcCursorItemReader
  • Processor: CompositeProcessor with my delegates
  • Writer: CompositeWriter with my delegates

使用此配置,我的工作可以完美地工作.

With this configuration, my job works perfectly.

现在,我想将其转换为多线程作业. 在关于基本多线程作业的文档之后,我在小任务中包含一个SympleAsyncTaskExecutor,但失败了.

Now, I want to convert this to a multi-threaded job. Following the documentation to basic multi-thread jobs, I included a SympleAsyncTaskExecutor in the tasklet, but it failed.

我已阅读JdbcCursorItemReader在多线程执行中无法正常工作(对吗?).我已将阅读器更改为JdbcPagingItemReader,这是一场噩梦:作业不会失败,编写过程还可以,但是数据在线程之间混合,并且客户数据不正确且连贯(客户已经得到其他人的服务,地址等).

I have readed JdbcCursorItemReader does not work properly with multi-thread execution (is it right?). I have changed the reader to a JdbcPagingItemReader, and it has been a nightmare: job does not fail, writing process are ok, but data has mixed among the threads, and customer data were not right and coherent (customers have got services, addreses, etc. from others).

那么,为什么会发生呢?如何更改为多线程作业?

So, why does it happen? How could I change to a multi-thread job?

  • 复合处理器和编写器是否适合多线程?
  • 如何制作自定义的线程安全复合处理器?
  • 也许是JDBC阅读器:多线程是否有任何线程安全的JDBC阅读器?

我对此非常锁定并感到困惑,因此我们将不胜感激. 非常感谢.

I'm very locked and confused with this, so any help would be very appreciated. Thanks a lot.

嗯,对我的问题的正确而适当的解决方案是从一开始就设计用于多线程和线程安全执行的作业.通常习惯于先执行单线程步骤,以了解和了解Spring Batch概念.但是如果您认为自己将把这一阶段抛在脑后,那么必须提出诸如不可变对象,线程安全列表,映射等之类的注意事项.

Well, the right and suitable fix to my issue is to design the job for multithread and thread-safe execution from the beggining. It's habitual to practice first with one-thread step execution, to understand and know Spring Batch concepts; but if you consider you are leaving this phase behind, considerations like immutable objects, thread-safe list, maps, etc... must raise.

问题的当前状态的当前修复方法是我稍后描述的下一个解决方法.在测试了Martin的建议并考虑了Michael的指导原则之后,我终于尽可能地解决了我的问题.下一步不是一个好习惯,但是我不能从一开始就重新构建我的工作:

And the current fix in the current state of my issue has been the next I describe later. After test Martin's suggestions and taking into account Michael's guidelines, I have finally fix my issue as good as I could. The next steps aren't good practice, but I couldn't rebuild my job from the beggining:

  • 将setReader设置为false,将itemReader更改为JdbcPagingItemReader.
  • 通过CopyOnWriteArrayList更改列表.
  • 通过ConcurrentHashMap更改HashMap.
  • 在每个委托的处理器中,通过传递上下文(实现ApplicationContextAware)并获取bean的唯一实例(将每个注入的bean配置为scope =,来获取每个bean属性的新实例(幸运的是,只有一个注入的bean). 原型").

因此,如果委托的bean是:

So, if the delegated bean was:

<bean class="...MyProcessor">
<property name="otherBean"  ref="otherBeanID" />

更改为:

<bean class="...MyProcessor">
<property name="otherBean"  value="otherBeanID" />

然后,在MyProcessor内部,从上下文中获取 otherBeanID 的单个实例; otherBeanID 必须配置有scope ="protoype".

And, inside MyProcessor, get a single instance for otherBeanID from the context; otherBeanID must be configurated with scope="protoype".

正如我之前讲的那样,它们不是很好的样式,但这是我的最佳选择,并且我可以断言每个线程都有自己的,不同的item实例和其他bean实例.

As I tell before, they're no good style, but it was my best option, and I can assert each thread has its own and different item instance and other bean instance.

证明某些类没有为正确的多线程执行而精心设计.

It proves that some classes has not been well designed for a right multithread execution.

马丁,迈克尔,感谢您的支持.

Martin, Michael, thanks for your support.

我希望它对任何人都有帮助.

I hope it helps to anyone.

推荐答案

您在问题中提出了很多问题(将来,请将此类问题分解为多个更具体的问题).但是,逐项:

You have asked a lot in your question (in the future, please break this type of question up into multiple, more specific questions). However, item by item:

JdbcCursorItemReader是线程安全的吗?
作为文档状态,不是.这样做的原因是JdbcCursorItemReader包装了一个不是线程安全的ResultSet.

Is JdbcCursorItemReader thread-safe?
As the documentation states, it is not. The reason for this is that the JdbcCursorItemReader wraps a single ResultSet which is not thread safe.

复合处理器和编写器是否适合多线程?
只要委托ItemProcessor实现也是线程安全的,Spring Batch提供的CompositeItemProcessor被认为是线程安全的.您没有提供与您的实现或其配置有关的代码,因此我无法验证其线程安全性.但是,鉴于您所描述的症状,我的直觉是您的代码中存在某种形式的线程安全问题.

Are the composite processor and writer right for multithread?
The CompositeItemProcessor provided by Spring Batch is considered thread safe as long as the delegate ItemProcessor implementations are thread safe as well. You provide no code in relation to your implementations or their configurations so I can't verify their thread safety. However, given the symptoms you are describing, my hunch is that there is some form of thread safety issues going on within your code.

您也不会确定您正在使用什么ItemWriter实现或它们的配置,因此那里也可能存在与线程相关的问题.

You also don't identify what ItemWriter implementations or their configurations you are using so there may be thread related issues there as well.

如果您使用有关您的实现和配置的更多信息来更新您的问题,我们可以提供更多的见解.

If you update your question with more information about your implementations and configurations, we can provide more insight.

如何制作自定义的线程安全复合处理器?
实施任何ItemProcessor时,要考虑两件事:

How could I make a custom thread-safe composite processor?
There are two things to consider when implementing any ItemProcessor:

  1. 使其具有线程安全性::遵循基本的线程安全性规则(请阅读
  1. Make it thread safe: Following basic thread safety rules (read the book Java Concurrency In Practice for the bible on the topic) will allow you to scale your components by just adding a task executor.
  2. Make it idempotent: During skip/retry processing, items may be re-processed. By making your ItemProcessor implementation idempotent, this will prevent side effects from this multiple trips through a processor.

也许它是JDBC阅读器:多线程是否有任何线程安全的JDBC阅读器?
如您所注意到的,JdbcPaginingItemReader是线程安全的,并且在

Maybe could it be the JDBC reader: Is there any thread-safe JDBC reader for multi-thread?
As you have noted, the JdbcPaginingItemReader is thread safe and noted as such in the documentation. When using multiple threads, each chunk is executed in it's own thread. If you've configured the page size to match the commit-interval, that means each page is processed in the same thread.

用于单步扩展的其他选项
当您沿着实现单个多线程步骤的路径前进时,可能会有更好的选择. Spring Batch提供了5种核心扩展选项:

Other options for scaling a single step
While you went down the path of implementing a single, multi-threaded step, there may be better options. Spring Batch provides 5 core scaling options:

  1. 多线程步骤-您正在尝试的步骤.
  2. 并行步骤-使用Spring Batch的拆分功能,您可以并行执行多个步骤.假设您在同一步骤中使用复合ItemProcessor和复合ItemWriter,则可能需要探索(将当前的复合方案分为多个并行步骤).
  3. 异步ItemProcessor/ItemWriters-此选项允许您在其他线程中执行处理器逻辑.处理器将线程断开,并将Future返回给AsyncItemWriter,该处理器将阻塞直到Future返回要写入的内容.
  4. 分区-这是将数据划分为称为分区的块,这些块由子步骤并行处理.每个分区都由一个实际的独立步骤处理,因此使用具有范围作用域的组件可以防止线程安全问题(每个步骤都有其自己的实例).可以通过线程在本地执行分区处理,也可以跨多个JVM远程执行分区处理.
  5. 远程分块-此选项将处理器逻辑输出到其他JVM进程.实际上,只有在ItemProcessor逻辑是流程中的瓶颈时,才应使用它.
  1. Multithreaded step - As you are trying right now.
  2. Parallel Steps - Using Spring Batch's split functionality you can execute multiple steps in parallel. Given that you're working with composite ItemProcessor and composite ItemWriters in the same step, this may be something to explore (breaking your current composite scenarios into multiple, parallel steps).
  3. Async ItemProcessor/ItemWriters - This option allows you to execute the processor logic in a different thread. The processor spins the thread off and returns a Future to the AsyncItemWriter which will block until the Future returns to be written.
  4. Partitioning - This is the division of the data into blocks called partitions that are processed in parallel by child steps. Each partition is processed by an actual, independent step so using step scoped components can prevent thread safety issues (each step gets it's own instance). Partition processing can be preformed either locally via threads or remotely across multiple JVMs.
  5. Remote Chunking - This option farms the processor logic out to other JVM processes. It really should only be used if the ItemProcessor logic is the bottle neck in the flow.

您可以在此处查看有关Spring Batch文档的所有这些选项: http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html

You can read about all of these options in the documentation for Spring Batch here: http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html

线程安全是一个复杂的问题.只需在用于单线程环境的代码中添加多个线程,通常就可以发现代码中的问题.

Thread safety is a complex problem. Just adding multiple threads to code that used to work in a single threaded environment will typically uncover issues in your code.

这篇关于Spring Batch:转换为多线程时出现问题(混合数据)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆