如何实现异步性,而不是并行的F# [英] How to achieve Asynchrony instead of Parallelism in F#

查看:147
本文介绍了如何实现异步性,而不是并行的F#的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(坚持一个常见的​​例子有异步的许多网页抓取)

(Sticking to a common example with async fetch of many web pages)

我将如何剥离网页请求的多个(几百个)异步,然后等待所有请求,才去到下一个步骤完成? Async.AsParallel一次处理,通过在CPU上内核数量控制的几个请求。抓住一个网页是不是CPU密集型的操作。不满意Async.AsParallel的加速,我寻找替代品。

How would I spin off multiple (hundreds) of web page requests asynchronously, and then wait for all requests to complete before going to the next step? Async.AsParallel processes a few requests at a time, controlled by number of cores on the CPU. Grabbing a web page is not a CPU-bound operation. Not satisfied with the speedup of Async.AsParallel, I am looking for alternatives.

我试图连接Async.StartAsTask和任务[]。为WaitAll之间的点。出于本能,我写了下面code,但它并没有编译。

I tried to connect the dots between Async.StartAsTask and Task[].WaitAll. Instinctively, I wrote the following code, but it does not compile.

let processItemsConcurrently (items : int seq) = 
  let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
  Tasks.Task.WaitAll(tasks) 

你会如何处理这个?

How would you approach this?

推荐答案

Async.Parallel 是几乎可以肯定在这里。不知道你有什么不愉快的; F#asyncs的强度比任务并行CPU绑定的东西更在于在异步计算(这是更切合工作 S和.NET 4.0 TPL)。这里有一个完整的例子:

Async.Parallel is almost definitely right here. Not sure what you're not happy with; the strength of F# asyncs lies more in async computing than in task-parallel CPU-bound stuff (which is more tailored to Tasks and the .NET 4.0 TPL). Here's a full example:

open System.Diagnostics 
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions 

let sites = [|
    "http://bing.com"
    "http://google.com"
    "http://cnn.com"
    "http://stackoverflow.com"
    "http://yahoo.com"
    "http://msdn.com"
    "http://microsoft.com"
    "http://apple.com"
    "http://nfl.com"
    "http://amazon.com"
    "http://ebay.com"
    "http://expedia.com"
    "http://twitter.com"
    "http://reddit.com"
    "http://hulu.com"
    "http://youtube.com"
    "http://wikipedia.org"
    "http://live.com"
    "http://msn.com"
    "http://wordpress.com"
    |]

let print s = 
    // careful, don't create a synchronization bottleneck by printing
    //printf "%s" s
    ()

let printSummary info fullTimeMs =
    Array.sortInPlaceBy (fun (i,_,_) -> i) info
//  for i, size, time in info do
//      printfn "%2d  %7d  %5d" i size time
    let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
    printfn "longest request took %dms" longest
    let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
    let seconds = float fullTimeMs / 1000.
    printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)

let FetchAllSync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url ->
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use resp = req.GetResponse()
        use stream = resp.GetResponseStream()
        use reader = new StreamReader(stream,
                            System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let contents = reader.ReadToEnd()
        print "r"
        i, contents.Length, sw.ElapsedMilliseconds)
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

let FetchAllAsync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url -> async {
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use! resp = req.AsyncGetResponse()
        use stream = resp.GetResponseStream()
        use reader = new AsyncStreamReader(stream, // F# PowerPack
                           System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let! contents = reader.ReadToEnd()  // in F# PowerPack
        print "r"
        return i, contents.Length, sw.ElapsedMilliseconds })
                    |> Async.Parallel 
                    |> Async.RunSynchronously 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length 

for i in 1..3 do // to warmup and show variance
    let time1,r1 = FetchAllSync()
    printfn "Sync took %dms, result was %d" time1 r1
    let time2,r2 = FetchAllAsync()
    printfn "Async took %dms, result was %d  (speedup=%2.2f)" 
        time2 r2 (float time1/ float time2)
    printfn ""

在我的4芯盒,这始终给人一种近4倍的速度提升。

On my 4-core box, this consistently gives a nearly 4x speedup.

修改

在回答您的意见,我已经更新了code。你在我添加更多的网站和我没有看到预期的加速权利(仍持有稳定在4倍)。我已经开始增加略高于调试输出,将继续调查,看看别的东西是节流的连接...

In reply to your comment, I've updated the code. You're right in that I've added more sites and am not seeing the expected speedup (still holding steady around 4x). I've started adding a little debugging output above, will continue investigating to see if something else is throttling the connections...

修改

再次Editted的code。好吧,我发现了什么可能是瓶颈。下面是AsyncReadToEnd在PowerPack中执行:

Editted the code again. Well, I found what might be the bottleneck. Here's the implementation of AsyncReadToEnd in the PowerPack:

type System.IO.StreamReader with
   member s.AsyncReadToEnd () = 
       FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())

在换句话说,它只是块线程池线程读取同步。哎呀!让我看看,如果我能解决的。

In other words, it just blocks a threadpool thread and reads synchronously. Argh!!! Let me see if I can work around that.

修改

好了,在PowerPack中的AsyncStreamReader做正确的事情,现在我使用的。

Ok, the AsyncStreamReader in the PowerPack does the right thing, and I'm using that now.

然而,问题的关键似乎是变化的。

However, the key issue seems to be variance.

当你打比方说,cnn.com,很多时候结果会回来像500毫秒。但每过一段时间你会得到一个要求,即需要4秒,这当然是有可能杀死了明显的异步PERF,因为整个时间是最不幸的请求的时间。

When you hit, say, cnn.com, a lot of the time the result will come back in like 500ms. But every once in a while you get that one request that takes 4s, and this of course potentially kills the apparent async perf, since the overall time is the time of the unluckiest request.

运行上面的程序中,我看到速度提升约2.5倍至9倍于我的家2芯盒。这是非常充满变数,但。它仍然有可能有一个在我已经错过了节目的一些瓶颈,但是我觉得可能占所有我所看到的,在这一点上,差异的最网页的

Running the program above, I see speedups from about 2.5x to 9x on my 2-core box at home. It is very highly variable, though. It's still possible there's some bottleneck in the program that I've missed, but I think the variance-of-the-web may account for all of what I'm seeing at this point.

这篇关于如何实现异步性,而不是并行的F#的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆