在Haskell中使用工作程序池运行并行URL下载 [英] Running parallel URL downloads with a worker pool in Haskell

查看:85
本文介绍了在Haskell中使用工作程序池运行并行URL下载的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Control.Concurrent.Async mapConcurrentlyhttp-conduit进行并行下载. 此处的解决方案不足以满足我的情况,因为我想处理 n 任务,但将并发工作人员的数量限制为 m ,其中 m < n .

I'd like to use Control.Concurrent.Async mapConcurrently to perform parallel downloads with http-conduit. The solution here is not sufficient for my case because I'd like to process n tasks but throttle the number of concurrent workers to m, where m < n.

传递给mapConcurrently多个 m 块是不够的,因为那样的话,活动工人的数量往往会少于 m .任务将比其他任务更早完成,从而导致利用率缺口.

It's not enough either to pass to mapConcurrently multiple chunks of m, because then the number of active workers will tend to be less than m since some of the tasks will complete earlier than others, leaving a utilization gap.

有没有一种简单的方法-几乎与我希望使用mapConcurrently一样简单-实现同时执行任务队列直到所有任务完成的工作池?

Is there an easy way -- nearly as easy as using mapConcurrently I hope -- to implement a worker-pool concurrently performing a queue of tasks until all tasks are done?

还是保持Haskell简单并使用xargs -P进行进程级并行是更容易吗?

Or is it easier just to keep the Haskell simple and do process-level parallelism with xargs -P?

推荐答案

也许最简单的解决方案是使用

Perhaps the simplest solution is to throttle the IO actions using a semaphore before wrapping them in Concurrently, using a helper function like this one:

withConc :: QSem -> (a -> IO b) -> (a -> Concurrently b)
withConc sem f = \a -> Concurrently 
    (bracket_ (waitQSem sem) (signalQSem sem) (f a))

我们可以将withConc遍历以对任何Traversable任务容器执行受限制的并发遍历:

We can use withConc in combination with traverse to perform a throttled concurrent traversal of any Traversable container of tasks:

traverseThrottled :: Int -> (a -> IO b) -> [a] -> IO [b]
traverseThrottled concLevel action tasks = do
    sem <- newQSem concLevel
    runConcurrently (traverse (withConc sem action) tasks)

此方法的一个缺点是,使用Concurrently会创建与任务数量一样多的线程,并且由于信号量,在任何给定的时间中只有一部分子线程会做实际的工作.

One disadvantage of this approach is that the use of Concurrently will create as many threads as there are tasks, and only a subset of them will be doing actual work at any given time, thanks to the semaphore.

另一方面,Haskell中的线程很便宜,因此在任务数量不是很大的情况下,我认为这是可以接受的解决方案.

On the other hand, threads in Haskell are cheap so I think it is an acceptable solution in cases for which the number of tasks is not very big.

编辑:为traverseThrottled提供更通用的签名:

Giving traverseThrottled a more general signature:

import Data.Traversable 
import Control.Concurrent
import Control.Concurrent.Async 
import Control.Exception

traverseThrottled :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b) 
traverseThrottled concLevel action taskContainer = do
    sem <- newQSem concLevel
    let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action
    runConcurrently (traverse (Concurrently . throttledAction) taskContainer)

这篇关于在Haskell中使用工作程序池运行并行URL下载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆