有资质的生产者消费者 [英] Producer consumer with qualifications

查看:21
本文介绍了有资质的生产者消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 clojure 的新手,正在尝试了解如何正确使用其并发功能,因此感谢您提出任何批评/建议.所以我试图在 clojure 中编写一个小测试程序,其工作方式如下:

I am new to clojure and am trying to understand how to properly use its concurrency features, so any critique/suggestions is appreciated. So I am trying to write a small test program in clojure that works as follows:

  1. 有 5 个生产者和 2 个消费者
  2. 生产者等待一个随机时间,然后将一个数字推送到共享队列中.
  3. 消费者应该在队列非空后立即从队列中取出一个数字,然后短暂休眠以模拟工作
  4. 当队列为空时,消费者应该阻塞
  5. 当队列中有超过 4 个项目时,生产者应该阻塞以防止它变得巨大

这是我对上述每一步的计划:

Here is my plan for each step above:

  1. 生产者和消费者将是并不真正关心他们的状态(只是零值或其他东西)的代理;我只是在某个时间使用代理发送消费者"或生产者"功能.那么共享队列将是 (def queue (ref [])).也许这应该是一个原子?

  • 在生产者"代理函数中,简单地 (Thread/sleep (rand-int 1000)) 然后 (dosync (alter queue conj (rand-int 100))) 推入队列.
  • 我正在考虑让消费者代理使用 add-watcher 来观察队列的变化.虽然不确定这一点......它会在任何变化时唤醒消费者,即使变化来自消费者拉一些东西(可能使它变空).也许在观察者函数中检查这一点就足够了.我看到的另一个问题是,如果所有消费者都很忙,那么当生产者向队列添加新东西时会发生什么?观看的事件是否在某个消费者代理上排队或消失?
  • 见上
  • 我真的不知道该怎么做.我听说 clojure 的 seque 可能有用,但我找不到足够的文档来说明如何使用它,而且我的初始测试似乎不起作用(抱歉,我身上没有代码了)
  • 推荐答案

    这是我的看法.我强调只使用 Clojure 数据结构来看看它是如何工作的.请注意,从 Java 工具箱中获取阻塞队列并在此处使用它是非常常见和惯用的;我认为代码很容易适应.更新:我确实将它改编为java.util.concurrent.LinkedBlockingQueue,见下文.

    Here's my take on it. I made a point of only using Clojure data structures to see how that would work out. Note that it would have been perfectly usual and idiomatic to take a blocking queue from the Java toolbox and use it here; the code would be easy to adapt, I think. Update: I actually did adapt it to java.util.concurrent.LinkedBlockingQueue, see below.

    调用(pro-con)开始试运行;然后查看 output 的内容以查看是否发生了任何事情,并查看 queue-lengths 的内容是否在给定的范围内.

    Call (pro-con) to start a test run; then have a look at the contents of output to see if anything happened and queue-lengths to see if they stayed within the given bound.

    更新:为了解释为什么我觉得有必要在下面使用 ensure(我在 IRC 上被问到这个问题),这是为了防止写倾斜(参见维基百科有关定义的快照隔离的文章).如果我用@queue代替(ensure queue),就有可能有两个或多个生产者检查队列的长度,发现它小于4,然后在队列中放置其他项目,并可能使队列的总长度超过 4,从而打破约束.类似地,两个执行 @queue 的消费者可以接受相同的项目进行处理,然后从队列中弹出两个项目.确保防止发生这些情况.

    Update: To explain why I felt the need to use ensure below (I was asked about this on IRC), this is to prevent write skew (see the Wikipedia article on Snapshot isolation for a definition). If I substituted @queue for (ensure queue), it would become possible for two or more producers to check the length of the queue, find that it is less than 4, then place additional items on the queue and possibly bring the total length of the queue above 4, breaking the constraint. Similarly, two consumers doing @queue could accept the same item for processing, then pop two items off the queue. ensure prevents either of these scenarios from happening.

    (def go-on? (atom true))
    (def queue (ref clojure.lang.PersistentQueue/EMPTY))
    (def output (ref ()))
    (def queue-lengths (ref ()))
    (def *max-queue-length* 4)
    
    (defn overseer
      ([] (overseer 20000))
      ([timeout]
         (Thread/sleep timeout)
         (swap! go-on? not)))
    
    (defn queue-length-watch [_ _ _ new-queue-state]
      (dosync (alter queue-lengths conj (count new-queue-state))))
    
    (add-watch queue :queue-length-watch queue-length-watch)
    
    (defn producer [tag]
      (future
       (while @go-on?
         (if (dosync (let [l (count (ensure queue))]
                       (when (< l *max-queue-length*)
                         (alter queue conj tag)
                         true)))
           (Thread/sleep (rand-int 2000))))))
    
    (defn consumer []
      (future
       (while @go-on?
         (Thread/sleep 100)       ; don't look at the queue too often
         (when-let [item (dosync (let [item (first (ensure queue))]
                                   (alter queue pop)
                                   item))]
           (Thread/sleep (rand-int 500))         ; do stuff
           (dosync (alter output conj item)))))) ; and let us know
    
    (defn pro-con []
      (reset! go-on? true)
      (dorun (map #(%1 %2)
                  (repeat 5 producer)
                  (iterate inc 0)))
      (dorun (repeatedly 2 consumer))
      (overseer))
    

    java.util.concurrent.LinkedBlockingQueue

    使用 LinkedBlockingQueue 编写的上述版本.请注意代码的总体轮廓如何基本相同,但有些细节实际上更清晰一些.我从这个版本中删除了 queue-lengths,因为 LBQ 为我们处理了这个限制.

    java.util.concurrent.LinkedBlockingQueue

    A version of the above written using LinkedBlockingQueue. Note how the general outline of the code is basically the same, with some details actually being slightly cleaner. I removed queue-lengths from this version, as LBQ takes care of that constraint for us.

    (def go-on? (atom true))
    (def *max-queue-length* 4)
    (def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
    (def output (ref ()))
    
    (defn overseer
      ([] (overseer 20000))
      ([timeout]
         (Thread/sleep timeout)
         (swap! go-on? not)))
    
    (defn producer [tag]
      (future
       (while @go-on?
         (.put queue tag)
         (Thread/sleep (rand-int 2000)))))
    
    (defn consumer []
      (future
       (while @go-on?
         ;; I'm using .poll on the next line so as not to block
         ;; indefinitely if we're done; note that this has the
         ;; side effect that nulls = nils on the queue will not
         ;; be handled; there's a number of other ways to go about
         ;; this if this is a problem, see docs on LinkedBlockingQueue
         (when-let [item (.poll queue)]
           (Thread/sleep (rand-int 500)) ; do stuff
           (dosync (alter output conj item)))))) ; and let us know
    
    (defn pro-con []
      (reset! go-on? true)
      (dorun (map #(%1 %2)
                  (repeat 5 producer)
                  (iterate inc 0)))
      (dorun (repeatedly 2 consumer))
      (overseer))
    

    这篇关于有资质的生产者消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆