Clojure生产者消费者 [英] Clojure Producer Consumer
问题描述
我正在学习clojure,并通过一个生产者消费者示例尝试它的并发和有效性。
I am learning clojure and try out its concurrency and effectiveness via a producer consumer example.
这样做,感觉很不错,不得不使用ref和deref,也观看和不喜欢。
Did that and it felt pretty akward with having to use ref and deref and also watch and unwatch.
我试图检查其他代码段;但是除了使用Java Condition await()和signal()方法以及Java Lock之外,还有更好的方法来重新考虑这一点。我不想在Java中使用任何东西。
I tried to check other code snippet; but is there a better way of re factoring this other than using Java Condition await() and signal() methods along with Java Lock. I did not want to use anything in Java.
这里是代码;我想我会在这里使用我的许多错误...
here is the code; i guess I would have made many mistakes here with my usage...
;a simple producer class
(ns my.clojure.producer
(:use my.clojure.consumer)
(:gen-class)
)
(def tasklist( ref (list) )) ;this is declared as a global variable; to make this
;mutable we need to use the fn ref
(defn gettasklist[]
(deref tasklist) ;we need to use deref fn to return the task list
)
(def testagent (agent 0)); create an agent
(defn emptytasklist[akey aref old-val new-val]
(doseq [item (gettasklist)]
(println(str "item is") item)
(send testagent consume item)
(send testagent increment item)
)
(. java.lang.Thread sleep 1000)
(dosync ; adding a transaction for this is needed to reset
(remove-watch tasklist "key123"); removing the watch on the tasklist so that it does not
; go to a recursive call
(ref-set tasklist (list ) ) ; we need to make it as a ref to reassign
(println (str "The number of tasks now remaining is=") (count (gettasklist)))
)
(add-watch tasklist "key123" emptytasklist)
)
(add-watch tasklist "key123" emptytasklist)
(defn addtask [task]
(dosync ; adding a transaction for this is needed to refset
;(println (str "The number of tasks before") (count (gettasklist)))
(println (str "Adding a task") task)
(ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
;(println (str "The number of tasks after") (count (gettasklist)))
)
)
以下是消费者代码
(ns my.clojure.consumer
)
(defn consume[c item]
(println "In the consume method:Item is " c item )
item
)
(defn increment [c n]
(println "parmeters are" c n)
(+ c n)
)
这里是测试代码(我已经使用maven运行clojure代码和使用NetBeans编辑,因为这更熟悉我来自Java - 文件夹结构和pom - https://github.com/alexcpn/clojure-evolve
And here is the test code ( I have used maven to run clojure code and used NetBeans to edit as this is more familiar to me coming from Java - folder structure and pom at - https://github.com/alexcpn/clojure-evolve
(ns my.clojure.Testproducer
(:use my.clojure.producer)
(:use clojure.test)
(:gen-class)
)
(deftest test-addandcheck
(addtask 1)
(addtask 2)
(is(= 0 (count (gettasklist))))
(println (str "The number of tasks are") (count (gettasklist)))
)
如果任何人都可以轻轻地重构这些,以便我可以阅读和理解代码,大;否则我想我将学习更多
If anybody can refactor this lightly so that I can read and understand the code then it will be great; Else I guess I will have to learn more
编辑-1
使用全局任务列表并通过反引用(deref)使其可用于其他函数,并通过ref再次使其可变,不是clojure中的方式;
I guess using a global task list and making it available to other functions by de-referencing it (deref) and again making it mutable by ref is not the way in clojure;
因此,更改addTask方法以直接将传入任务发送给代理
So changing the addTask method to directly send the incoming tasks to an agent
(defn addtask [task]
(dosync ; adding a transaction for this is needed to refset
(println (str "Adding a task") task)
;(ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
(def testagent (agent 0)); create an agent
(send testagent consume task)
(send testagent increment task)
)
但是当我测试它
(deftest test-addandcheck
(loop [task 0]
(when ( < task 100)
(addtask task)
(recur (inc task))))
(is(= 0 (count (gettasklist))))
(println (str "The number of tasks are") (count (gettasklist)))
)
得到Java拒绝执行异常 - 这是很好,如果你做Java线程,因为你完全控制。但是从clojure这看起来很奇怪,特别是因为你没有自己选择ThreadPool stratergy
after sometime the I am getting Java rejected execution exception -- This is fine if you do Java threads, because you take full control. But from clojure this looks odd, especially since you are not selecting the ThreadPool stratergy yourself
Adding a task 85
Exception in thread "pool-1-thread-4" java.util.concurrent.RejectedExecutionExce
ption
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution
(ThreadPoolExecutor.java:1759)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.jav
a:767)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.ja
va:658)
at clojure.lang.Agent$Action.execute(Agent.java:56)
at clojure.lang.Agent$Action.doRun(Agent.java:95)
at clojure.lang.Agent$Action.run(Agent.java:106)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:885)Adding a task 86
Adding a task 87
推荐答案
消费者将使用 lamina渠道进行最简单(最有效)的消费。
I think modeling producer & consumer in clojure would be done the easiest (and most efficiently) using lamina channels.
这篇关于Clojure生产者消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!