Clojure生产者消费者 [英] Clojure Producer Consumer

查看:149
本文介绍了Clojure生产者消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习clojure,并通过一个生产者消费者示例尝试它的并发和有效性。

I am learning clojure and try out its concurrency and effectiveness via a producer consumer example.

这样做,感觉很不错,不得不使用ref和deref,也观看和不喜欢。

Did that and it felt pretty akward with having to use ref and deref and also watch and unwatch.

我试图检查其他代码段;但是除了使用Java Condition await()和signal()方法以及Java Lock之外,还有更好的方法来重新考虑这一点。我不想在Java中使用任何东西。

I tried to check other code snippet; but is there a better way of re factoring this other than using Java Condition await() and signal() methods along with Java Lock. I did not want to use anything in Java.

这里是代码;我想我会在这里使用我的许多错误...

here is the code; i guess I would have made many mistakes here with my usage...

;a simple producer class
(ns my.clojure.producer
    (:use  my.clojure.consumer)
  (:gen-class)
  )

 (def tasklist( ref (list) )) ;this is declared as a global variable; to make this 
    ;mutable we need to use the fn ref


(defn gettasklist[]
    (deref tasklist) ;we need to use deref fn to return the task list
  )

(def testagent (agent 0)); create an agent


(defn emptytasklist[akey aref old-val new-val]

   (doseq [item (gettasklist)]
        (println(str "item is") item)
        (send testagent consume item)
        (send testagent increment item)
     )
      (. java.lang.Thread sleep 1000)
   (dosync ; adding a transaction for this is needed to reset

      (remove-watch tasklist "key123"); removing the watch on the tasklist so that it does not
                                       ; go to a recursive call
      (ref-set tasklist (list ) ) ; we need to make it as a ref to reassign
      (println (str "The number of tasks now remaining is=")  (count (gettasklist)))

     )
  (add-watch tasklist "key123" emptytasklist)
 )
(add-watch tasklist "key123" emptytasklist)

  (defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset
      ;(println (str "The number of tasks before") (count (gettasklist)))
      (println (str "Adding a task") task)
      (ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
      ;(println (str "The number of tasks after") (count (gettasklist)))
     )
  )

以下是消费者代码

(ns my.clojure.consumer
  )
(defn consume[c item]

  (println  "In the consume method:Item is " c item  )
  item 
)
(defn increment [c n] 
  (println "parmeters are" c n)
  (+ c n)
  )

这里是测试代码(我已经使用maven运行clojure代码和使用NetBeans编辑,因为这更熟悉我来自Java - 文件夹结构和pom - https://github.com/alexcpn/clojure-evolve

And here is the test code ( I have used maven to run clojure code and used NetBeans to edit as this is more familiar to me coming from Java - folder structure and pom at - https://github.com/alexcpn/clojure-evolve

(ns my.clojure.Testproducer
        (:use my.clojure.producer)
        (:use clojure.test)
      (:gen-class)
  )

(deftest test-addandcheck

  (addtask 1)
  (addtask 2)
  (is(= 0 (count (gettasklist))))
   (println (str "The number of tasks are") (count (gettasklist)))

 )

如果任何人都可以轻轻地重构这些,以便我可以阅读和理解代码,大;否则我想我将学习更多

If anybody can refactor this lightly so that I can read and understand the code then it will be great; Else I guess I will have to learn more

编辑-1

使用全局任务列表并通过反引用(deref)使其可用于其他函数,并通过ref再次使其可变,不是clojure中的方式;

I guess using a global task list and making it available to other functions by de-referencing it (deref) and again making it mutable by ref is not the way in clojure;

因此,更改addTask方法以直接将传入任务发送给代理

So changing the addTask method to directly send the incoming tasks to an agent

(defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset

      (println (str "Adding a task") task)

        ;(ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
       (def testagent (agent 0)); create an agent
       (send testagent consume task)
       (send testagent increment task)

     )

但是当我测试它

(deftest test-addandcheck
  (loop [task 0]
    (when ( < task 100)
        (addtask task)
      (recur (inc task))))

  (is(= 0 (count (gettasklist))))
   (println (str "The number of tasks are") (count (gettasklist)))

 )

得到Java拒绝执行异常 - 这是很好,如果你做Java线程,因为你完全控制。但是从clojure这看起来很奇怪,特别是因为你没有自己选择ThreadPool stratergy

after sometime the I am getting Java rejected execution exception -- This is fine if you do Java threads, because you take full control. But from clojure this looks odd, especially since you are not selecting the ThreadPool stratergy yourself

Adding a task 85
Exception in thread "pool-1-thread-4" java.util.concurrent.RejectedExecutionExce
ption
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution
(ThreadPoolExecutor.java:1759)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.jav
a:767)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.ja
va:658)
        at clojure.lang.Agent$Action.execute(Agent.java:56)
        at clojure.lang.Agent$Action.doRun(Agent.java:95)
        at clojure.lang.Agent$Action.run(Agent.java:106)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:885)Adding a task 86
Adding a task 87


推荐答案

消费者将使用 lamina渠道进行最简单(最有效)的消费。

I think modeling producer & consumer in clojure would be done the easiest (and most efficiently) using lamina channels.

这篇关于Clojure生产者消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆