在C#中循环异步任务列表 [英] Looping Async Task List in C#
问题描述
我试图不断地从几个网站分析数据。我想这个动作是$ P $在循环中以异步方式单独pformed直到程序结束。我不知道的结构应该是什么这样的逻辑。
现在我按照这种模式。
异步公共无效ParseAll(列表<&网站GT;站点列表)
{
清单<任务> Tasklist命令=新的List<任务>(); 的foreach(站点S IN站点列表)
{
TaskList.Add(s.ParseData);
} 等待Task.WhenAll(任务列表)
}
问题是,如果我周围建造此方法循环那么首先更新站点将不得不等待,直到整个列表完成之前,该方法可以再次运行。从理论上讲,我想要做的只是把每个站点重新站上任务列表
当它完成了它的 ParseData code的底>方法,但我不知道如果多数民众赞成可能的话,或者如果多数民众赞成的最佳方式。
从理论上讲,我愿做的只是把每个站点回
当它完成了任务列表的底部,其ParseData code>
块引用>看起来你需要保持被处理站点的队列。下面是我对这个,使用
SemaphoreSlim
。这种方式也可以限制并发任务的数量小于站点的实际数量,或者在即时添加新的网站。 A的CancellationToken
用于从外部停止处理。异步无效使用的
这里IMO有道理的,QueueSiteAsync
跟踪它启动的任务。使用系统;
使用System.Collections.Generic;
使用System.Linq的;
使用的System.Threading;
使用System.Threading.Tasks;命名空间AsyncLoop
{
类节目
{
公共类网站
{
公共字符串网址{搞定;组; }
公共异步任务ParseDataAsync(令牌的CancellationToken)
{
//模拟下载和解析
INT延迟=新的随机(Environment.TickCount)。接下来(100,1000);
等待Task.Delay(延迟,令牌);
Console.WriteLine(加工:#{0},延迟:{1},this.Url,延迟);
}
} 对象_lock =新的对象();
HashSet的<任务> _pending =新的HashSet<任务>(); //正在进行网站
SemaphoreSlim _semaphore; 异步无效QueueSiteAsync(SITE网站的CancellationToken令牌)
{
FUNC<任务> processSiteAsync =异步()=>
{
等待_semaphore.WaitAsync(令牌).ConfigureAwait(假);
尝试
{
等待site.ParseDataAsync(标记);
QueueSiteAsync(站点,标记);
}
最后
{
_semaphore.Release();
}
}; 变种任务= processSiteAsync();
锁(_lock)
_pending.Add(任务);
尝试
{
等待任务;
锁(_lock)
_pending.Remove(任务);
}
抓住
{
如果(task.IsCanceled&安培;!&安培;!task.IsFaulted)
扔; //非任务的错误,再次引发 //离开故障任务待定名单和退出
// ProcessAllSites将它捡起来
}
} 公共异步任务ProcessAllSites(
网站[]网站,INT maxParallel,令牌的CancellationToken)
{
_semaphore =新SemaphoreSlim(Math.Min(sites.Length,maxParallel)); //启动所有站点
的foreach(在网站VAR网站)
QueueSiteAsync(站点,标记); //等待取消
尝试
{
等待Task.Delay(Timeout.Infinite,令牌);
}
赶上(OperationCanceledException)
{
} //等待尚未完成的任务
任务[]任务;
锁(_lock)
任务= _pending.ToArray();
等待Task.WhenAll(任务);
} //测试
静态无效的主要(字串[] args)
{
//在10S取消处理
VAR CTS =新CancellationTokenSource(millisecondsDelay:10000);
VAR网站= Enumerable.Range(0,数:10)。选择(I =>
新的网站{URL = i.ToString()});
尝试
{
新计划()。ProcessAllSites(
sites.ToArray(),
maxParallel:5,
令牌:cts.Token).Wait();
}
赶上(AggregateException前)
{
的foreach(在ex.InnerExceptions VAR innerEx)
Console.WriteLine(innerEx.Message);
}
}
}
}您可能还需要下载和分析分解为单独的管道,检查这了解更多详情。
I am trying to parse data from several websites continuously. I would like this action to be preformed individually in a loop in an asynchronous manner until the program is closed. I am not sure what the structure should be for this kind of logic.
Right now I am following this pattern.
async public void ParseAll(List<Site> SiteList) { List<Task> TaskList = new List<Task>(); foreach(Site s in SiteList) { TaskList.Add(s.ParseData); } await Task.WhenAll(TaskList) }
The issue is that if I construct a Loop around this method then the sites that are updated first will have to wait until the whole list is finished before the method can run again. Theoretically, what I would like to do is just put each site back on the bottom of the
TaskList
when it finished itsParseData
method but I am not sure if thats possible, or if thats the best way.解决方案Theoretically, what I would like to do is just put each site back on the bottom of the TaskList when it finished its
ParseData
Looks like you need to maintain a queue of sites to be processed. Below is my take on this, using
SemaphoreSlim
. This way you can also limit the number of concurrent tasks to be less than the actual number of sites, or add new sites on-the-fly. ACancellationToken
is used to stop the processing from outside. The use ofasync void
is justified here IMO,QueueSiteAsync
keeps track of the tasks it starts.using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace AsyncLoop { class Program { public class Site { public string Url { get; set; } public async Task ParseDataAsync(CancellationToken token) { // simulate download and parse int delay = new Random(Environment.TickCount).Next(100, 1000); await Task.Delay(delay, token); Console.WriteLine("Processed: #{0}, delay: {1}", this.Url, delay); } } object _lock = new Object(); HashSet<Task> _pending = new HashSet<Task>(); // sites in progress SemaphoreSlim _semaphore; async void QueueSiteAsync(Site site, CancellationToken token) { Func<Task> processSiteAsync = async () => { await _semaphore.WaitAsync(token).ConfigureAwait(false); try { await site.ParseDataAsync(token); QueueSiteAsync(site, token); } finally { _semaphore.Release(); } }; var task = processSiteAsync(); lock (_lock) _pending.Add(task); try { await task; lock (_lock) _pending.Remove(task); } catch { if (!task.IsCanceled && !task.IsFaulted) throw; // non-task error, re-throw // leave the faulted task in the pending list and exit // ProcessAllSites will pick it up } } public async Task ProcessAllSites( Site[] sites, int maxParallel, CancellationToken token) { _semaphore = new SemaphoreSlim(Math.Min(sites.Length, maxParallel)); // start all sites foreach (var site in sites) QueueSiteAsync(site, token); // wait for cancellation try { await Task.Delay(Timeout.Infinite, token); } catch (OperationCanceledException) { } // wait for pending tasks Task[] tasks; lock (_lock) tasks = _pending.ToArray(); await Task.WhenAll(tasks); } // testing static void Main(string[] args) { // cancel processing in 10s var cts = new CancellationTokenSource(millisecondsDelay: 10000); var sites = Enumerable.Range(0, count: 10).Select(i => new Site { Url = i.ToString() }); try { new Program().ProcessAllSites( sites.ToArray(), maxParallel: 5, token: cts.Token).Wait(); } catch (AggregateException ex) { foreach (var innerEx in ex.InnerExceptions) Console.WriteLine(innerEx.Message); } } } }
You may also want to separate download and parsing into separate pipelines, check this for more details.
这篇关于在C#中循环异步任务列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!