定期数据库批量插入的Java并发 [英] Java concurrency for periodic database batch insert

查看:401
本文介绍了定期数据库批量插入的Java并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景:每秒调用一个线程数千次,以便对同一个表进行插入,并且当前正在逐个执行这些操作。

Scenario: One thread is being called up to thousands of times per second to do inserts to the same table and is currently doing them one-by-one.

目标:定期进行批量插入以提高性能。

Goal: Do periodic batch inserts instead to improve performance.

尝试使用TimerTask代替添加对象当线程的 saveItem 方法被调用时被保存到列表中,然后每隔2秒左右将它们组合成批量插入。

Trying to use a TimerTask to instead add objects being saved to a list as the thread's saveItem method gets called, then combine them for a batch insert every 2 seconds or so.

首先想到的是有两个列表,称为 toSave toSaveBackup 。当线程的 saveItem 方法被调用以保存它将被添加到toSave列表时,但是一旦TimerTask启动并需要将所有内容保存到数据库,它将设置一个AtomicBoolean标志 saveInProgress 为true。此标志由 saveItem 检查,如果 saveInProgress 为true,它将添加到toSaveBackup而不是toSave。批量保存完成后,toSaveBackup中的所有项目都将被移动到toSave列表,可能是列表中的同步块。

First thought was to have two Lists, call them toSave and toSaveBackup. When the thread's saveItem method is called to save something it will be added to the toSave list, but once the TimerTask kicks off and needs to save everything to the database, it will set an AtomicBoolean flag saveInProgress to true. This flag is checked by saveItem and it will add to toSaveBackup instead of toSave if saveInProgress is true. When the batch save is complete, all items will in toSaveBackup will be moved to the toSave list, probably with a synchronized block on the lists.

这是一种合理的方法吗?还是有更好的最佳做法?我的谷歌搜索技能让我失望,所以欢迎任何帮助。

Is this a reasonable approach? Or is there a better best practice? My googling skills have failed me so any help is welcome.

其他信息:


  • 所有这些插入都在同一个表中

  • 插入是通过接收MQTT消息驱动的,所以我不能在这一点之前批量组合它们

更新:对CKing以下答案的调整达到了预期的方法:TimerTask每100毫秒运行一次并检查<$的大小c $ c> saveQueue 以及保存批次后的时间。如果这些值中的任何一个超过配置的限制(每2秒或每1000条记录保存等),我们就会保存。 LinkedBlockingQueue用于简化同步。

Update: A tweak on CKing's answer below achieved the desired approach: A TimerTask runs every 100 ms and checks the size of the saveQueue and how long it's been since a batch was saved. If either of these values exceed the configured limit (save every 2 seconds or every 1000 records etc) then we save. A LinkedBlockingQueue is used to simplify sychronization.

再次感谢大家的帮助!

推荐答案

看起来您的主要目标是等待预定义的时间,然后触发插入。当插入正在进行时,您不必插入其他插入请求,直到插入完成。插入完成后,您希望再次为下一个插入请求重复相同的过程。

It looks like your primary objective is to wait for a predefined amount of time and then trigger an insert. When an insert is in progress, you wan't other insert requests to wait till the insert is complete. After the insert is complete, you want to repeat the same process again for the next insert requests.

我会在考虑到上述理解的情况下提出以下解决方案。您无需拥有两个单独的列表即可实现目标。另请注意,为了解释,我提出了一个老式的解决方案。我将介绍一些您在解释结束时可以使用的其他API。接下来是:

I would propose the following solution with the above understanding in mind. You don't need to have two separate lists to achieve your goal. Also note that I am proposing an old fashioned solution for the sake of explanation. I cover some other APIs you can use at the end of my explanation. Here goes :


  1. 定义计时器 TimerTask 将每隔N秒运行一次。

  2. 定义一个 ArrayList ,用于排队发送到 saveItem 方法。

  3. saveItem 方法可以围绕此<$ c定义 sycnrhonized 块$ C>的ArrayList 。您可以在此 synchronized 块中将项目添加到 ArrayList 以及 saveItem 被调用。

  4. 在等式的另一边, TimerTask 应该有一个 synchronized 阻止在运行方法中的同一 ArrayList 。它应该在该给定时刻将 ArrayList 中存在的所有记录插入到数据库中。插入完成后, TimerTask 应该清除 ArrayList 并最终退出 synchronized 块。

  1. Define a Timer and a TimerTask that will run every N seconds.
  2. Define an ArrayList that will be used for queuing up insert requests sent to saveItem method.
  3. The saveItem method can define a sycnrhonized block around this ArrayList. You can add items to the ArrayList within this synchronized block as and when saveItem is called.
  4. On the other side of the equation, TimerTask should have a synchronized block on the same ArrayList as well inside its run method. It should insert all the records present in the ArrayList at that given moment into the database. Once the insert is complete, the TimerTask should clear the ArrayList and finally come out of the synchronized block.

您将不再需要显式监视插入是否正在进行中,或者在插入正在进行时创建 ArrayList 的副本。在这种情况下,您的 ArrayList 将成为共享资源。

You will no longer need to explicitly monitor if an insert is in progress or create a copy of your ArrayList when an insert is in progress. Your ArrayList becomes the shared resource in this case.

如果你还希望 size 成为继续插入的决定性因素,你可以这样做:

If you also want size to be a deciding factor for proceeding with inserts, you can do this :


  1. TimerTask waitAttempts 的int $ C>。此字段指示 TimerTask 连续唤醒的次数,如果<$ c的 size ,则无法执行任何操作$ c> list 还不够大。

  2. 每当 TimerTask 醒来时,它都能做点什么喜欢 if(waitAttempts%3 == 0 || list.size> 10){insert data} else {increment waitAttempts并且什么都不做。退出synchronized块和run方法} 。您可以将 3 10 更改为适合您的吞吐量要求的任何数字。

  1. Define an int called waitAttempts in TimerTask. This field indicates the number of consecutive wake ups for which the TimerTask should do nothing if the size of the list is not big enough.
  2. Everytime the TimerTask wakes up, it can do something like if(waitAttempts%3==0 || list.size > 10) { insert data } else { increment waitAttempts and do nothing. Exit the synchronized block and the run method }. You can change 3 and 10 to whatever number suits your throughput requirements.

注意使用内在锁定作为解释方法的手段。人们总是可以采用这种方法并使用现代构造来实现它,例如 BlockingQueue ,这将消除手动同步的需要在 ArrayList 上。我还建议使用 Executors.newSingleThreadScheduledExecutor()而不是 TimerTask ,因为它确保只有一个线程在任何给定时间运行,并且线程不会重叠。此外, waitAttempts 的逻辑是指示性的,需要调整才能正常工作。

Note Intrinsic locking was used as a means of explaining the approach. One can always take this approach and implement it using modern constructs such as a BlockingQueue that would eliminate the need to synchronize manually on the ArrayList. I would also recommend the use of Executors.newSingleThreadScheduledExecutor() instead of a TimerTask as it ensures that there will only be one thread running at any given time and there wont be an overlap of threads. Also, the logic for waitAttempts is indicative and will need to be adjusted to work correctly.

这篇关于定期数据库批量插入的Java并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆