如何节流并行异步webrequests [英] How to throttle concurrent Async webrequests

查看:124
本文介绍了如何节流并行异步webrequests的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我经常需要做大量webrequests的,而不会加重网络

I often need to make a large number of webrequests, without overloading the network

我目前做到这一点通过运行并行同步请求,利用ThreadPool.SetMinThreads和MaxDegreeOfParallelism准确指定多少个请求并行运行

I currently do this by running synchronous requests in parallel, utilizing ThreadPool.SetMinThreads and MaxDegreeOfParallelism to exactly specify how many requests run concurrently

现在这个工作得很好,但感觉错了。

Now this works just fine, but it feels wrong.

我真的想利用异步方法,但我不能工作,如何限制并发请求数。

I would really like to utilize async methods, but i cant work out how to limit the number of concurrent requests.

我这样做(使用WebClient的,没有错误处理的简洁)的并行方式简化了的例子:

A simplified example of my parallel way of doing this( using a webclient and no error handling for brevity):

Private Function SearchSitesForKeywordInParallel(ByVal keyword As String, ByVal sites As String(), ByVal maxConcurrency As Integer) As String()
    Dim po As New ParallelOptions
    po.MaxDegreeOfParallelism = maxConcurrency
    Threading.ThreadPool.SetMinThreads(maxConcurrency, 2)
    Dim sitesContainingKeyword As New Concurrent.ConcurrentBag(Of String)

    Parallel.For(0, sites.Count, po, Sub(i)
                                         Dim wc As New Net.WebClient
                                         wc.Proxy = Nothing
                                         Dim pageSource As String = wc.DownloadString(sites(i))
                                         If pageSource.Contains(keyword) Then
                                             sitesContainingKeyword.Add(sites(i))
                                         End If
                                     End Sub)
    Return sitesContainingKeyword.ToArray
End Function

这是一个阻塞函数,这正是我需要。
现在我已经测试了webclient.downloadStringAsync方法循环常规,它会触发所有的请求pretty多一次,网络过载。

This is a blocking function, which is what i require. Now i have tested the webclient.downloadStringAsync method in a regular for loop, and it will fire all the requests pretty much at once, overloading the network.

起初我想这样做的是使点¯x要求,然后作出新的,因为每个响应返回。

What i would like to do is initially make X requests, then make new ones as each response comes back.

我相当肯定的任务是去了,即时通讯正一的方式已经阅读在C#中一些很好的实现,但我的C#的经验是有限的,我也很难翻译C#lambadas到vb.net。

I am fairly sure tasks is the way to go, and im positive a have read some very nice implementations in c#, but my c# experience is limited, and i have a hard time translating c# lambadas to vb.net.

我也仅限于VS2010和.NET4,所以.net4.5异步等待细微不是我的选择。

I am also limited to vs2010 and .net4, so the niceties of .net4.5 async await are not an option for me.

任何帮助非常AP preciated

Any help very much appreciated

推荐答案

您可以在异步使用VB.NET做到这一点,Wintellect的Powerthreading库的<一个href=\"http://stackoverflow.com/questions/7107013/do-we-still-need-richters-asyncenumerator\">AsyncEnumerator类,你可以从得到的NuGet。

You can do this asynchronously in VB.NET using the Wintellect Powerthreading library's AsyncEnumerator class, which you can get from NuGet.

这给你一些等待的功能,但在VS2010与.net 2.0工程4.0,同时给你的升级路径到4.5异步功能。

This gives you some of the functionality of Await but works in VS2010 with .Net 2.0 to 4.0 while giving you an upgrade path to the 4.5 async features.

缺点是,Web客户端异步方法需要根据任务℃的EAP-到APM垫片;>与AsyncEnumerator使用,所以code是一个相当复杂得多

The downside is that the WebClient async methods require an EAP-to-APM shim based on Task<> to be used with AsyncEnumerator, so the code is quite a lot more complicated.

要控制并发请求数最简单的方法是启动点¯x异步操作,那么就启动另一个时候每次结束。

The simplest way to control the number of concurrent requests is to initiate X async operations, then just initiate another every time one completes.

举例code:

Imports System.Collections.Generic
Imports System.Runtime.CompilerServices
Imports System.Threading.Tasks
Imports System.Net
Imports Wintellect.Threading.AsyncProgModel

Module TaskExtension
    REM http://msdn.microsoft.com/en-us/library/hh873178.aspx
    <Extension()>
    Public Function AsApm(Of T1)(ByVal task As Task(Of T1), callback As AsyncCallback, state As Object) As IAsyncResult
        If (task Is Nothing) Then
            Throw New ArgumentNullException("task")
        End If
        Dim tcs = New TaskCompletionSource(Of T1)(state)
        task.ContinueWith(Sub(t As Task(Of T1))
                              If (t.IsFaulted) Then
                                  tcs.TrySetException(t.Exception.InnerExceptions)
                              ElseIf t.IsCanceled Then
                                  tcs.TrySetCanceled()
                              Else : tcs.TrySetResult(t.Result)
                              End If
                              If (Not callback Is Nothing) Then
                                  callback(tcs.Task)
                              End If
                          End Sub, TaskScheduler.Default)
        Return tcs.Task
    End Function
End Module

Module ApmAsyncDownload
    Public Function DownloadStringAsync(url As Uri) As Task(Of String)
        Dim tcs As New TaskCompletionSource(Of String)
        Dim wc As New WebClient()
        AddHandler wc.DownloadStringCompleted, Sub(s As Object, e As System.Net.DownloadStringCompletedEventArgs)
                                                   If (Not (e.Error Is Nothing)) Then
                                                       tcs.TrySetException(e.Error)
                                                   ElseIf e.Cancelled Then
                                                       tcs.TrySetCanceled()
                                                   Else : tcs.TrySetResult(e.Result)
                                                   End If
                                               End Sub
        wc.DownloadStringAsync(url)
        Return tcs.Task
    End Function
    Public Function BeginDownloadString(url As Uri, callback As AsyncCallback, state As Object) As IAsyncResult
        Return DownloadStringAsync(url).AsApm(callback, state)
    End Function
    Public Function EndDownloadString(asyncResult As IAsyncResult) As String
        Dim castToTask As Task(Of String) = asyncResult
        Return castToTask.Result
    End Function
End Module

Public Class AsyncIterators
    Private Shared Iterator Function SearchUrl(ae As AsyncEnumerator(Of Boolean), keyword As String, uri As Uri) As IEnumerator(Of Int32)
        ae.Result = False
        ApmAsyncDownload.BeginDownloadString(uri, ae.End(0, AddressOf ApmAsyncDownload.EndDownloadString), Nothing)
        Yield 1
        If (ae.IsCanceled()) Then
            Return
        End If
        Try
            Dim page As String = ApmAsyncDownload.EndDownloadString(ae.DequeueAsyncResult)
            ae.Result = page.Contains(keyword)
        Catch ex As AggregateException
        End Try
    End Function
    Public Shared Iterator Function SearchIterator(ae As AsyncEnumerator(Of List(Of String)), keyword As String, urls As List(Of Uri)) As IEnumerator(Of Int32)
        ae.Result = New List(Of String)
        'Control how many searches are started asynchonously
        Dim startSearches = Math.Min(3, urls.Count)
        Dim enumerator = urls.GetEnumerator
        Dim toBeCompleted = urls.Count
        Do Until (toBeCompleted <= 0)
            While (startSearches > 0)
                If enumerator.MoveNext Then
                    Dim subAe = New AsyncEnumerator(Of Boolean)()
                    subAe.SyncContext = Nothing
                    subAe.BeginExecute(SearchUrl(subAe, keyword, enumerator.Current), ae.End(0, Function(ar As IAsyncResult) As AsyncEnumerator.EndObjectXxx
                                                                                                    subAe.EndExecute(ar)
                                                                                                End Function), enumerator.Current)
                End If
                startSearches = startSearches - 1
            End While
            'Wait for first async search to complete
            Yield 1
            toBeCompleted = toBeCompleted - 1
            If (ae.IsCanceled()) Then
                Exit Do
            End If
            'Get result of the search and add to results
            Dim result = ae.DequeueAsyncResult()
            Dim completedAe = AsyncEnumerator(Of Boolean).FromAsyncResult(result)
            If (completedAe.EndExecute(result)) Then
                Dim uri As Uri = result.AsyncState
                ae.Result.Add(uri.OriginalString)
            End If
            'Start 1 more search
            startSearches = startSearches + 1
        Loop
    End Function
End Class

Module Module1
    Sub Main()
        Dim searchAe = New AsyncEnumerator(Of List(Of String))()
        searchAe.SyncContext = Nothing
        Dim urlStrings = New List(Of String) From {"http://www.google.com", "http://www.yahoo.com", "http://www.dogpile.com"}
        Dim uris = urlStrings.Select(Function(urlString As String) As Uri
                                         Return New Uri(urlString)
                                     End Function).ToList()
        For Each Str As String In searchAe.EndExecute(searchAe.BeginExecute(AsyncIterators.SearchIterator(searchAe, "search", uris), Nothing, Nothing))
            Console.WriteLine(Str)
        Next
        Console.ReadKey()
    End Sub
End Module

和我现在看到你的意思翻译C#lambda表达式是什么!

And I now see what you mean about translating c# lambdas!

这篇关于如何节流并行异步webrequests的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆