Clojure 代理从队列中消费 [英] Clojure agents consuming from a queue
本文介绍了Clojure 代理从队列中消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
I'm trying to figure out the best way to use agents to consume items from a Message Queue (Amazon SQS). Right now I have a function (process-queue-item) that grabs an items from the queue, and processes it.
I want to process these items concurrently, but I can't wrap my head around how to control the agents. Basically I want to keep all of the agents busy as much as possible without pulling to many items from the Queue and developing a backlog (I'll have this running on a couple of machines, so items need to be left in the queue until they are really needed).
Can anyone give me some pointers on improving my implementation?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))
解决方案
(let [switch (atom true) ; a switch to stop workers
workers (doall
(repeatedly 20 ; 20 workers pulling and processing items from SQS
#(future (while @switch
(retrieve item from Amazon SQS and process)))))]
(Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)
(reset! switch false) ; stop !
(doseq [worker workers] @worker)) ; waiting for all workers to be done
这篇关于Clojure 代理从队列中消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文