定期数据库批量插入的Java并发 [英] Java concurrency for periodic database batch insert
问题描述
场景:每秒调用一个线程数千次,以便对同一个表进行插入,并且当前正在逐个执行这些操作。
Scenario: One thread is being called up to thousands of times per second to do inserts to the same table and is currently doing them one-by-one.
目标:定期进行批量插入以提高性能。
Goal: Do periodic batch inserts instead to improve performance.
尝试使用TimerTask代替添加对象当线程的 saveItem
方法被调用时被保存到列表中,然后每隔2秒左右将它们组合成批量插入。
Trying to use a TimerTask to instead add objects being saved to a list as the thread's saveItem
method gets called, then combine them for a batch insert every 2 seconds or so.
首先想到的是有两个列表,称为 toSave
和 toSaveBackup
。当线程的 saveItem
方法被调用以保存它将被添加到toSave列表时,但是一旦TimerTask启动并需要将所有内容保存到数据库,它将设置一个AtomicBoolean标志 saveInProgress
为true。此标志由 saveItem
检查,如果 saveInProgress
为true,它将添加到toSaveBackup而不是toSave。批量保存完成后,toSaveBackup中的所有项目都将被移动到toSave列表,可能是列表中的同步块。
First thought was to have two Lists, call them toSave
and toSaveBackup
. When the thread's saveItem
method is called to save something it will be added to the toSave list, but once the TimerTask kicks off and needs to save everything to the database, it will set an AtomicBoolean flag saveInProgress
to true. This flag is checked by saveItem
and it will add to toSaveBackup instead of toSave if saveInProgress
is true. When the batch save is complete, all items will in toSaveBackup will be moved to the toSave list, probably with a synchronized block on the lists.
这是一种合理的方法吗?还是有更好的最佳做法?我的谷歌搜索技能让我失望,所以欢迎任何帮助。
Is this a reasonable approach? Or is there a better best practice? My googling skills have failed me so any help is welcome.
其他信息:
- 所有这些插入都在同一个表中
- 插入是通过接收MQTT消息驱动的,所以我不能在这一点之前批量组合它们
更新:对CKing以下答案的调整达到了预期的方法:TimerTask每100毫秒运行一次并检查<$的大小c $ c> saveQueue 以及保存批次后的时间。如果这些值中的任何一个超过配置的限制(每2秒或每1000条记录保存等),我们就会保存。 LinkedBlockingQueue用于简化同步。
Update: A tweak on CKing's answer below achieved the desired approach: A TimerTask runs every 100 ms and checks the size of the saveQueue
and how long it's been since a batch was saved. If either of these values exceed the configured limit (save every 2 seconds or every 1000 records etc) then we save. A LinkedBlockingQueue is used to simplify sychronization.
再次感谢大家的帮助!
推荐答案
看起来您的主要目标是等待预定义的时间,然后触发插入。当插入正在进行时,您不必插入其他插入请求,直到插入完成。插入完成后,您希望再次为下一个插入请求重复相同的过程。
It looks like your primary objective is to wait for a predefined amount of time and then trigger an insert. When an insert is in progress, you wan't other insert requests to wait till the insert is complete. After the insert is complete, you want to repeat the same process again for the next insert requests.
我会在考虑到上述理解的情况下提出以下解决方案。您无需拥有两个单独的列表即可实现目标。另请注意,为了解释,我提出了一个老式的解决方案。我将介绍一些您在解释结束时可以使用的其他API。接下来是:
I would propose the following solution with the above understanding in mind. You don't need to have two separate lists to achieve your goal. Also note that I am proposing an old fashioned solution for the sake of explanation. I cover some other APIs you can use at the end of my explanation. Here goes :
- 定义
计时器
和TimerTask
将每隔N秒运行一次。 - 定义一个
ArrayList
,用于排队发送到saveItem $ c $的插入请求c>方法。
-
saveItem
方法可以围绕此<$ c定义sycnrhonized
块$ C>的ArrayList 。您可以在此synchronized
块中将项目添加到ArrayList
以及saveItem
被调用。 - 在等式的另一边,
TimerTask
应该有一个synchronized
阻止在运行
方法中的同一ArrayList
。它应该在该给定时刻将ArrayList
中存在的所有记录插入到数据库中。插入完成后,TimerTask
应该清除
ArrayList
并最终退出synchronized
块。
- Define a
Timer
and aTimerTask
that will run every N seconds. - Define an
ArrayList
that will be used for queuing up insert requests sent tosaveItem
method. - The
saveItem
method can define asycnrhonized
block around thisArrayList
. You can add items to theArrayList
within thissynchronized
block as and whensaveItem
is called. - On the other side of the equation,
TimerTask
should have asynchronized
block on the sameArrayList
as well inside itsrun
method. It should insert all the records present in theArrayList
at that given moment into the database. Once the insert is complete, theTimerTask
shouldclear
theArrayList
and finally come out of thesynchronized
block.
您将不再需要显式监视插入是否正在进行中,或者在插入正在进行时创建 ArrayList
的副本。在这种情况下,您的 ArrayList
将成为共享资源。
You will no longer need to explicitly monitor if an insert is in progress or create a copy of your ArrayList
when an insert is in progress. Your ArrayList
becomes the shared resource in this case.
如果你还希望 size
成为继续插入的决定性因素,你可以这样做:
If you also want size
to be a deciding factor for proceeding with inserts, you can do this :
- 在
TimerTask $ c中定义一个名为
还不够大。waitAttempts
的int $ C>。此字段指示TimerTask
连续唤醒的次数,如果<$ c的size
,则无法执行任何操作$ c> list - 每当
TimerTask
醒来时,它都能做点什么喜欢if(waitAttempts%3 == 0 || list.size> 10){insert data} else {increment waitAttempts并且什么都不做。退出synchronized块和run方法}
。您可以将3
和10
更改为适合您的吞吐量要求的任何数字。
- Define an int called
waitAttempts
inTimerTask
. This field indicates the number of consecutive wake ups for which theTimerTask
should do nothing if thesize
of thelist
is not big enough. - Everytime the
TimerTask
wakes up, it can do something likeif(waitAttempts%3==0 || list.size > 10) { insert data } else { increment waitAttempts and do nothing. Exit the synchronized block and the run method }
. You can change3
and10
to whatever number suits your throughput requirements.
注意使用内在锁定作为解释方法的手段。人们总是可以采用这种方法并使用现代构造来实现它,例如 BlockingQueue
,这将消除手动同步
的需要在 ArrayList
上。我还建议使用 Executors.newSingleThreadScheduledExecutor()
而不是 TimerTask
,因为它确保只有一个线程在任何给定时间运行,并且线程不会重叠。此外, waitAttempts
的逻辑是指示性的,需要调整才能正常工作。
Note Intrinsic locking was used as a means of explaining the approach. One can always take this approach and implement it using modern constructs such as a BlockingQueue
that would eliminate the need to synchronize
manually on the ArrayList
. I would also recommend the use of Executors.newSingleThreadScheduledExecutor()
instead of a TimerTask
as it ensures that there will only be one thread running at any given time and there wont be an overlap of threads. Also, the logic for waitAttempts
is indicative and will need to be adjusted to work correctly.
这篇关于定期数据库批量插入的Java并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!