使用同步时在 Spring 分区中意外 [英] Unexpected in Spring partition when using synchronized

查看:22
本文介绍了使用同步时在 Spring 分区中意外的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spring Batch 和 Partition 进行并行处理.用于数据库的 Hibernate 和 Spring Data Jpa.对于分区步骤,读取器、处理器和写入器具有 stepscope,因此我可以向它们注入分区键和范围(从到).现在在处理器中,我有一个同步方法,并希望该方法一次运行一次,但事实并非如此.

I am using Spring Batch and Partition to do parallel processing. Hibernate and Spring Data Jpa for db. For the partition step, the reader, processor and writer have stepscope and so I can inject partition key and range(from-to) to them. Now in processor, I have one synchronized method and expected this method to be ran once at time, but it is not the case.

我将其设置为 10 个分区,所有 10 个 Item 阅读器都读取了正确的分区范围.问题来自项目处理器.Blow 代码与我使用的逻辑相同.

I set it to have 10 partitions , all 10 Item reader read the right partitioned range. The problem comes with item processor. Blow code has the same logic I use.

public class accountProcessor implementes ItemProcessor{
    @override
    public Custom process(item) {
        createAccount(item);
        return item;
    }

    //account has unique constraints username, gender, and email
    /*
        When 1 thread execute that method, it will create 1 account 
        and save it. If next thread comes in and  try to save the  same  account, 
        it  should find the account created by first thread and do one update. 
        But now it doesn't happen, instead findIfExist return null 
        and it  try to do another insert of duplicate data
    */
    private synchronized void createAccount(item) {
        Account account = accountRepo.findIfExist(item.getUsername(),  item.getGender(),  item.getEmail());
        if(account  == null) {
            //account  doesn't  exist
            account = new Account();
            account.setUsername(item.getUsername());
            account.setGender(item.getGender());
            account.setEmail(item.getEmail());
            account.setMoney(10000);
        } else {
            account.setMoney(account.getMoney()-10);
        }
        accountRepo.save(account);
    }
}

预期的输出是在任何给定时间只有 1 个线程会运行此方法,因此不会在 db 中重复插入并避免 DataintegrityViolationexception.

The expected output is that only 1 thread will run this method at any given time and so that there will be no duplicate inserttion in db as well as avoid DataintegrityViolationexception.

实际结果是第二个线程找不到第一个账号,尝试创建重复账号保存到db,导致DataintegrityViolationexception,unique约束错误.

Actually result is that second thread can't find the first account and try to create a duplicate account and save to db, which will cause DataintegrityViolationexception, unique constraints error.

由于我同步了方法,线程应该按顺序执行它,第二个线程应该等待第一个线程完成然后运行,这意味着它应该能够找到第一个帐户.

Since I synchronized the method, thread should execute it in order, second thread should wait for first thread to finish and then run, which mean it should be able to find the first account.

我尝试了很多方法,比如一个包含所有唯一帐户的 volatile 集,做 saveAndFlush 以尽快提交,使用 threadlocal 任何方式,这些都不起作用.

I tried with many approaches, like a volatile set to contains all unique accounts, do saveAndFlush to make commits asap, using threadlocal whatsoever, no of these works.

需要帮助.

推荐答案

由于您将项目处理器设置为步进范围,因此您实际上并不需要同步,因为每个步骤都有自己的处理器实例.

Since you made the item processor step-scoped, you don't really need synchronization as each step will have its own instance of the processor.

但看起来您遇到的是设计问题而不是实现问题.您正在尝试同步线程以在并行设置中按特定顺序运行.当您决定并行并将数据划分为多个分区并为每个工作人员(本地或远程)分配一个分区时,您必须承认这些分区将以未定义的顺序进行处理,并且记录之间不应该有任何关系每个分区或每个工人完成的工作之间.

But it looks like you have a design problem rather than an implementation issue. You are trying to sychronize threads to act in a certain order in a parallel setup. When you decide to go parallel and divide the data into partitions and give each worker (either local or remote) a partition to work on, you must admit that these partitions will be processed in an undefined order and that there should be no relation between records of each partition or between the work done by each worker.

当 1 个线程执行该方法时,将创建 1 个帐户并保存它.如果下一个线程进来并尝试保存同一个帐户,它应该找到第一个线程创建的帐户并进行一次更新.但现在它没有发生,而是 findIfExist 返回 null 并尝试再次插入重复数据

When 1 thread execute that method, it will create 1 account and save it. If next thread comes in and try to save the same account, it should find the account created by first thread and do one update. But now it doesn't happen, instead findIfExist return null and it try to do another insert of duplicate data

那是因为thread1的事务可能还没有提交,所以thread2找不到你认为已经被thread1插入的记录.

That's because the transaction of thread1 may not be committed yet, hence thread2 won't find the record you think have been inserted by thread1.

您似乎正在尝试使用分区设置创建或更新某些帐户.我不确定这个设置是否适合手头的问题.

It looks like you are trying to create or update some accounts with a partitioned setup. I'm not sure if this setup is suitable for the problem at hand.

作为旁注,我不会在项目处理器中调用 accountRepo.save(account); 而是在项目编写器中调用.

As a side note, I would not call accountRepo.save(account); in an item processor but rather do that in an item writer.

希望这会有所帮助.

这篇关于使用同步时在 Spring 分区中意外的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆