使用同步时在Spring分区中意外 [英] Unexpected in Spring partition when using synchronized

查看:95
本文介绍了使用同步时在Spring分区中意外的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spring Batch和Partition进行并行处理.用于db的Hibernate和Spring Data Jpa.对于分区步骤,读取器,处理器和写入器都具有步进镜,因此我可以向其注入分区键和范围(从-到).现在在处理器中,我有一个同步方法,希望该方法一次可以运行一次,但事实并非如此.

I am using Spring Batch and Partition to do parallel processing. Hibernate and Spring Data Jpa for db. For the partition step, the reader, processor and writer have stepscope and so I can inject partition key and range(from-to) to them. Now in processor, I have one synchronized method and expected this method to be ran once at time, but it is not the case.

我将其设置为具有10个分区,所有10个Item读取器均读取正确的分区范围.问题来自项目处理器.打击代码与我使用的逻辑相同.

I set it to have 10 partitions , all 10 Item reader read the right partitioned range. The problem comes with item processor. Blow code has the same logic I use.

public class accountProcessor implementes ItemProcessor{
    @override
    public Custom process(item) {
        createAccount(item);
        return item;
    }

    //account has unique constraints username, gender, and email
    /*
        When 1 thread execute that method, it will create 1 account 
        and save it. If next thread comes in and  try to save the  same  account, 
        it  should find the account created by first thread and do one update. 
        But now it doesn't happen, instead findIfExist return null 
        and it  try to do another insert of duplicate data
    */
    private synchronized void createAccount(item) {
        Account account = accountRepo.findIfExist(item.getUsername(),  item.getGender(),  item.getEmail());
        if(account  == null) {
            //account  doesn't  exist
            account = new Account();
            account.setUsername(item.getUsername());
            account.setGender(item.getGender());
            account.setEmail(item.getEmail());
            account.setMoney(10000);
        } else {
            account.setMoney(account.getMoney()-10);
        }
        accountRepo.save(account);
    }
}

预期输出是,在任何给定时间只有1个线程将运行此方法,因此在db中不会重复插入,并且避免了DataintegrityViolationexception.

The expected output is that only 1 thread will run this method at any given time and so that there will be no duplicate inserttion in db as well as avoid DataintegrityViolationexception.

实际上,结果是第二个线程找不到第一个帐户并尝试创建重复的帐户并保存到db,这将导致DataintegrityViolationexception,唯一约束错误.

Actually result is that second thread can't find the first account and try to create a duplicate account and save to db, which will cause DataintegrityViolationexception, unique constraints error.

由于我已同步该方法,因此线程应按顺序执行它,第二个线程应等待第一个线程完成然后运行,这意味着它应该能够找到第一个帐户.

Since I synchronized the method, thread should execute it in order, second thread should wait for first thread to finish and then run, which mean it should be able to find the first account.

我尝试了许多方法,例如使用volatile集来包含所有唯一帐户,使用savelocal进行saveAndFlush来尽快提交提交,

I tried with many approaches, like a volatile set to contains all unique accounts, do saveAndFlush to make commits asap, using threadlocal whatsoever, no of these works.

需要帮助.

推荐答案

由于您使商品处理器具有步进作用域,因此您实际上并不需要同步,因为每个步骤都有自己的处理器实例.

Since you made the item processor step-scoped, you don't really need synchronization as each step will have its own instance of the processor.

但是看起来您遇到的是设计问题,而不是实施问题.您正在尝试使线程同步以在并行设置中以特定顺序执行.当您决定进行并行处理并将数据划分为多个分区并为每个工作线程(本地或远程)分配一个要处理的分区时,您必须承认这些分区将以未定义的顺序进行处理,并且记录之间应该没有任何关系每个分区或每个工作人员完成的工作之间.

But it looks like you have a design problem rather than an implementation issue. You are trying to sychronize threads to act in a certain order in a parallel setup. When you decide to go parallel and divide the data into partitions and give each worker (either local or remote) a partition to work on, you must admit that these partitions will be processed in an undefined order and that there should be no relation between records of each partition or between the work done by each worker.

当1个线程执行该方法时,它将创建1个帐户 并保存.如果有下一个线程进入并尝试保存相同的帐户, 它应该找到第一个线程创建的帐户并进行一次更新.但是现在却没有发生,取而代之的是findIfExist返回null并尝试再次插入重复数据

When 1 thread execute that method, it will create 1 account and save it. If next thread comes in and try to save the same account, it should find the account created by first thread and do one update. But now it doesn't happen, instead findIfExist return null and it try to do another insert of duplicate data

这是因为线程1的事务可能尚未提交,因此线程2不会找到您认为已被线程1插入的记录.

That's because the transaction of thread1 may not be committed yet, hence thread2 won't find the record you think have been inserted by thread1.

您似乎正在尝试使用分区设置来创建或更新某些帐户.我不确定此设置是否适合当前的问题.

It looks like you are trying to create or update some accounts with a partitioned setup. I'm not sure if this setup is suitable for the problem at hand.

请注意,我不会在项目处理器中调用accountRepo.save(account);,而是在项目编写器中进行调用.

As a side note, I would not call accountRepo.save(account); in an item processor but rather do that in an item writer.

希望这会有所帮助.

这篇关于使用同步时在Spring分区中意外的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆