在Haskell中使用工作程序池运行并行URL下载 [英] Running parallel URL downloads with a worker pool in Haskell
问题描述
我想使用Control.Concurrent.Async mapConcurrently
与http-conduit
进行并行下载. 此处的解决方案不足以满足我的情况,因为我想处理 n 任务,但将并发工作人员的数量限制为 m ,其中 m < n .
I'd like to use Control.Concurrent.Async mapConcurrently
to perform parallel downloads with http-conduit
. The solution here is not sufficient for my case because I'd like to process n tasks but throttle the number of concurrent workers to m, where m < n.
传递给mapConcurrently
多个 m 块是不够的,因为那样的话,活动工人的数量往往会少于 m .任务将比其他任务更早完成,从而导致利用率缺口.
It's not enough either to pass to mapConcurrently
multiple chunks of m, because then the number of active workers will tend to be less than m since some of the tasks will complete earlier than others, leaving a utilization gap.
有没有一种简单的方法-几乎与我希望使用mapConcurrently
一样简单-实现同时执行任务队列直到所有任务完成的工作池?
Is there an easy way -- nearly as easy as using mapConcurrently
I hope -- to implement a worker-pool concurrently performing a queue of tasks until all tasks are done?
还是保持Haskell简单并使用xargs -P
进行进程级并行是更容易吗?
Or is it easier just to keep the Haskell simple and do process-level parallelism with xargs -P
?
推荐答案
也许最简单的解决方案是使用 Concurrently
,使用这样的辅助函数:
Perhaps the simplest solution is to throttle the IO
actions using a semaphore before wrapping them in Concurrently
, using a helper function like this one:
withConc :: QSem -> (a -> IO b) -> (a -> Concurrently b)
withConc sem f = \a -> Concurrently
(bracket_ (waitQSem sem) (signalQSem sem) (f a))
我们可以将withConc
与遍历以对任何Traversable
任务容器执行受限制的并发遍历:
We can use withConc
in combination with traverse to perform a throttled concurrent traversal of any Traversable
container of tasks:
traverseThrottled :: Int -> (a -> IO b) -> [a] -> IO [b]
traverseThrottled concLevel action tasks = do
sem <- newQSem concLevel
runConcurrently (traverse (withConc sem action) tasks)
此方法的一个缺点是,使用Concurrently
会创建与任务数量一样多的线程,并且由于信号量,在任何给定的时间中只有一部分子线程会做实际的工作.
One disadvantage of this approach is that the use of Concurrently
will create as many threads as there are tasks, and only a subset of them will be doing actual work at any given time, thanks to the semaphore.
另一方面,Haskell中的线程很便宜,因此在任务数量不是很大的情况下,我认为这是可以接受的解决方案.
On the other hand, threads in Haskell are cheap so I think it is an acceptable solution in cases for which the number of tasks is not very big.
编辑:为traverseThrottled
提供更通用的签名:
Giving traverseThrottled
a more general signature:
import Data.Traversable
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
traverseThrottled :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
traverseThrottled concLevel action taskContainer = do
sem <- newQSem concLevel
let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action
runConcurrently (traverse (Concurrently . throttledAction) taskContainer)
这篇关于在Haskell中使用工作程序池运行并行URL下载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!