有资质的生产者消费者 [英] Producer consumer with qualifications
问题描述
我是 clojure 的新手,正在尝试了解如何正确使用其并发功能,因此感谢您提出任何批评/建议.所以我试图在 clojure 中编写一个小测试程序,其工作方式如下:
I am new to clojure and am trying to understand how to properly use its concurrency features, so any critique/suggestions is appreciated. So I am trying to write a small test program in clojure that works as follows:
- 有 5 个生产者和 2 个消费者
- 生产者等待一个随机时间,然后将一个数字推送到共享队列中.
- 消费者应该在队列非空后立即从队列中取出一个数字,然后短暂休眠以模拟工作
- 当队列为空时,消费者应该阻塞
- 当队列中有超过 4 个项目时,生产者应该阻塞以防止它变得巨大
这是我对上述每一步的计划:
Here is my plan for each step above:
- 生产者和消费者将是并不真正关心他们的状态(只是零值或其他东西)的代理;我只是在某个时间使用代理发送消费者"或生产者"功能.那么共享队列将是 (def queue (ref [])).也许这应该是一个原子?
推荐答案
这是我的看法.我强调只使用 Clojure 数据结构来看看它是如何工作的.请注意,从 Java 工具箱中获取阻塞队列并在此处使用它是非常常见和惯用的;我认为代码很容易适应.更新:我确实将它改编为java.util.concurrent.LinkedBlockingQueue
,见下文.
Here's my take on it. I made a point of only using Clojure data structures to see how that would work out. Note that it would have been perfectly usual and idiomatic to take a blocking queue from the Java toolbox and use it here; the code would be easy to adapt, I think. Update: I actually did adapt it to java.util.concurrent.LinkedBlockingQueue
, see below.
调用(pro-con)
开始试运行;然后查看 output
的内容以查看是否发生了任何事情,并查看 queue-lengths
的内容是否在给定的范围内.
Call (pro-con)
to start a test run; then have a look at the contents of output
to see if anything happened and queue-lengths
to see if they stayed within the given bound.
更新:为了解释为什么我觉得有必要在下面使用 ensure
(我在 IRC 上被问到这个问题),这是为了防止写倾斜(参见维基百科有关定义的快照隔离的文章).如果我用@queue
代替(ensure queue)
,就有可能有两个或多个生产者检查队列的长度,发现它小于4,然后在队列中放置其他项目,并可能使队列的总长度超过 4,从而打破约束.类似地,两个执行 @queue
的消费者可以接受相同的项目进行处理,然后从队列中弹出两个项目.确保
防止发生这些情况.
Update: To explain why I felt the need to use ensure
below (I was asked about this on IRC), this is to prevent write skew (see the Wikipedia article on Snapshot isolation for a definition). If I substituted @queue
for (ensure queue)
, it would become possible for two or more producers to check the length of the queue, find that it is less than 4, then place additional items on the queue and possibly bring the total length of the queue above 4, breaking the constraint. Similarly, two consumers doing @queue
could accept the same item for processing, then pop two items off the queue. ensure
prevents either of these scenarios from happening.
(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn queue-length-watch [_ _ _ new-queue-state]
(dosync (alter queue-lengths conj (count new-queue-state))))
(add-watch queue :queue-length-watch queue-length-watch)
(defn producer [tag]
(future
(while @go-on?
(if (dosync (let [l (count (ensure queue))]
(when (< l *max-queue-length*)
(alter queue conj tag)
true)))
(Thread/sleep (rand-int 2000))))))
(defn consumer []
(future
(while @go-on?
(Thread/sleep 100) ; don't look at the queue too often
(when-let [item (dosync (let [item (first (ensure queue))]
(alter queue pop)
item))]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))
java.util.concurrent.LinkedBlockingQueue
使用 LinkedBlockingQueue
编写的上述版本.请注意代码的总体轮廓如何基本相同,但有些细节实际上更清晰一些.我从这个版本中删除了 queue-lengths
,因为 LBQ
为我们处理了这个限制.
java.util.concurrent.LinkedBlockingQueue
A version of the above written using LinkedBlockingQueue
. Note how the general outline of the code is basically the same, with some details actually being slightly cleaner. I removed queue-lengths
from this version, as LBQ
takes care of that constraint for us.
(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn producer [tag]
(future
(while @go-on?
(.put queue tag)
(Thread/sleep (rand-int 2000)))))
(defn consumer []
(future
(while @go-on?
;; I'm using .poll on the next line so as not to block
;; indefinitely if we're done; note that this has the
;; side effect that nulls = nils on the queue will not
;; be handled; there's a number of other ways to go about
;; this if this is a problem, see docs on LinkedBlockingQueue
(when-let [item (.poll queue)]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))
这篇关于有资质的生产者消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!