在C#中循环异步任务列表 [英] Looping Async Task List in C#

查看:77
本文介绍了在C#中循环异步任务列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图不断地从几个网站分析数据。我想这个动作是$ P $在循环中以异步方式单独pformed直到程序结束。我不知道的结构应该是什么这样的逻辑。

现在我按照这种模式。

 异步公共无效ParseAll(列表<&网站GT;站点列表)
{
    清单<任务> Tasklist命令=新的List<任务>();    的foreach(站点S IN站点列表)
    {
        TaskList.Add(s.ParseData);
    }    等待Task.WhenAll(任务列表)
}

问题是,如果我周围建造此方法循环那么首先更新站点将不得不等待,直到整个列表完成之前,该方法可以再次运行。从理论上讲,我想要做的只是把每个站点重新站上任务列表当它完成了它的 ParseData 方法,但我不知道如果多数民众赞成可能的话,或者如果多数民众赞成的最佳方式。


解决方案

  

从理论上讲,我愿做的只是把每个站点回
  当它完成了任务列表的底部,其 ParseData


看起来你需要保持被处理站点的队列。下面是我对这个,使用 SemaphoreSlim 。这种方式也可以限制并发任务的数量小于站点的实际数量,或者在即时添加新的网站。 A 的CancellationToken 用于从外部停止处理。异步无效使用这里IMO有道理的, QueueSiteAsync 跟踪它启动的任务。

 使用系统;
使用System.Collections.Generic;
使用System.Linq的;
使用的System.Threading;
使用System.Threading.Tasks;命名空间AsyncLoop
{
    类节目
    {
        公共类网站
        {
            公共字符串网址{搞定;组; }
            公共异步任务ParseDataAsync(令牌的CancellationToken)
            {
                //模拟下载和解析
                INT延迟=新的随机(Environment.TickCount)。接下来(100,1000);
                等待Task.Delay(延迟,令牌);
                Console.WriteLine(加工:#{0},延迟:{1},this.Url,延迟);
            }
        }        对象_lock =新的对象();
        HashSet的<任务> _pending =新的HashSet<任务>(); //正在进行网站
        SemaphoreSlim _semaphore;        异步无效QueueSiteAsync(SITE网站的CancellationToken令牌)
        {
            FUNC<任务> processSiteAsync =异步()=>
            {
                等待_semaphore.WaitAsync(令牌).ConfigureAwait(假);
                尝试
                {
                    等待site.ParseDataAsync(标记);
                    QueueSiteAsync(站点,标记);
                }
                最后
                {
                    _semaphore.Release();
                }
            };            变种任务= processSiteAsync();
            锁(_lock)
                _pending.Add(任务);
            尝试
            {
                等待任务;
                锁(_lock)
                    _pending.Remove(任务);
            }
            抓住
            {
                如果(task.IsCanceled&安培;!&安培;!task.IsFaulted)
                    扔; //非任务的错误,再次引发                //离开故障任务待定名单和退出
                // ProcessAllSites将它捡起来
            }
        }        公共异步任务ProcessAllSites(
            网站[]网站,INT maxParallel,令牌的CancellationToken)
        {
            _semaphore =新SemaphoreSlim(Math.Min(sites.Length,maxParallel));            //启动所有站点
            的foreach(在网站VAR网站)
                QueueSiteAsync(站点,标记);            //等待取消
            尝试
            {
                等待Task.Delay(Timeout.Infinite,令牌);
            }
            赶上(OperationCanceledException)
            {
            }            //等待尚未完成的任务
            任务[]任务;
            锁(_lock)
                任务= _pending.ToArray();
            等待Task.WhenAll(任务);
        }        //测试
        静态无效的主要(字串[] args)
        {
            //在10S取消处理
            VAR CTS =新CancellationTokenSource(millisecondsDelay:10000);
            VAR网站= Enumerable.Range(0,数:10)。选择(I =>
                新的网站{URL = i.ToString()});
            尝试
            {
                新计划()。ProcessAllSites(
                    sites.ToArray(),
                    maxParallel:5,
                    令牌:cts.Token).Wait();
            }
            赶上(AggregateException前)
            {
                的foreach(在ex.InnerExceptions VAR innerEx)
                    Console.WriteLine(innerEx.Message);
            }
        }
    }
}

您可能还需要下载和分析分解为单独的管道,检查了解更多详情。

I am trying to parse data from several websites continuously. I would like this action to be preformed individually in a loop in an asynchronous manner until the program is closed. I am not sure what the structure should be for this kind of logic.

Right now I am following this pattern.

async public void ParseAll(List<Site> SiteList)
{
    List<Task> TaskList = new List<Task>();

    foreach(Site s in SiteList)
    {
        TaskList.Add(s.ParseData);
    }

    await Task.WhenAll(TaskList)
}

The issue is that if I construct a Loop around this method then the sites that are updated first will have to wait until the whole list is finished before the method can run again. Theoretically, what I would like to do is just put each site back on the bottom of the TaskList when it finished its ParseData method but I am not sure if thats possible, or if thats the best way.

解决方案

Theoretically, what I would like to do is just put each site back on the bottom of the TaskList when it finished its ParseData

Looks like you need to maintain a queue of sites to be processed. Below is my take on this, using SemaphoreSlim. This way you can also limit the number of concurrent tasks to be less than the actual number of sites, or add new sites on-the-fly. A CancellationToken is used to stop the processing from outside. The use of async void is justified here IMO, QueueSiteAsync keeps track of the tasks it starts.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncLoop
{
    class Program
    {
        public class Site
        {
            public string Url { get; set; }
            public async Task ParseDataAsync(CancellationToken token)
            {
                // simulate download and parse
                int delay = new Random(Environment.TickCount).Next(100, 1000);
                await Task.Delay(delay, token);
                Console.WriteLine("Processed: #{0}, delay: {1}", this.Url, delay);
            }
        }

        object _lock = new Object();
        HashSet<Task> _pending = new HashSet<Task>(); // sites in progress
        SemaphoreSlim _semaphore;

        async void QueueSiteAsync(Site site, CancellationToken token)
        {
            Func<Task> processSiteAsync = async () =>
            {
                await _semaphore.WaitAsync(token).ConfigureAwait(false);
                try 
                {           
                    await site.ParseDataAsync(token);
                    QueueSiteAsync(site, token);
                }
                finally
                {
                    _semaphore.Release();
                }
            };

            var task = processSiteAsync();
            lock (_lock)
                _pending.Add(task);
            try
            {
                await task;
                lock (_lock)
                    _pending.Remove(task);
            }
            catch
            {
                if (!task.IsCanceled && !task.IsFaulted)
                    throw; // non-task error, re-throw

                // leave the faulted task in the pending list and exit
                // ProcessAllSites will pick it up
            }
        }

        public async Task ProcessAllSites(
            Site[] sites, int maxParallel, CancellationToken token)
        {
            _semaphore = new SemaphoreSlim(Math.Min(sites.Length, maxParallel));

            // start all sites
            foreach (var site in sites)
                QueueSiteAsync(site, token);

            // wait for cancellation
            try
            {
                await Task.Delay(Timeout.Infinite, token);
            }
            catch (OperationCanceledException)
            {
            }

            // wait for pending tasks
            Task[] tasks;
            lock (_lock)
                tasks = _pending.ToArray();
            await Task.WhenAll(tasks);
        }

        // testing
        static void Main(string[] args)
        {
            // cancel processing in 10s
            var cts = new CancellationTokenSource(millisecondsDelay: 10000); 
            var sites = Enumerable.Range(0, count: 10).Select(i => 
                new Site { Url = i.ToString() });
            try
            {
                new Program().ProcessAllSites(
                    sites.ToArray(), 
                    maxParallel: 5, 
                    token: cts.Token).Wait();
            }
            catch (AggregateException ex)
            {
                foreach (var innerEx in ex.InnerExceptions)
                    Console.WriteLine(innerEx.Message);
            }
        }
    }
}

You may also want to separate download and parsing into separate pipelines, check this for more details.

这篇关于在C#中循环异步任务列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆