如何正确并行化工作人员任务? [英] How to properly parallelize worker tasks?

查看:69
本文介绍了如何正确并行化工作人员任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请考虑以下代码段,并注意设置numberTasksToSpinOff等于1然后等于3,4或更大(取决于计算机上的线程资源)之间的总运行时间差异.当完成更多任务时,我注意到运行时间更长.

Consider the following code snippet and notice the difference in total runtime between setting numberTasksToSpinOff equal to 1 and then 3,4, or more (depending on thread resources on your machine). I notice much longer run times when spinning off more tasks.

我故意将数据收集传递到每个工作程序实例从中同时读取的每个工作程序实例中.我认为只要这些操作只是读取或枚举,任务就可以访问共享数据结构而不会阻塞.

I passed on purpose a data collection into each worker instance that each worker tasks reads from at the same time. I thought that tasks can access a shared data structure without blocking as long as those operations are only reads or enumerations.

我的目标是剥离多个任务,这些任务通过读取操作在同一个共享数据结构上进行迭代,并在大约同一时间完全完成,而不管任务分拆出来的数量如何.

My goal is to spin off multiple tasks that iterate over the same shared data structure via read operations and complete altogether at around the same time regardless of number tasks spun off.

编辑:请参阅第二个代码片段,其中我实现了Parallel.Foreach()并创建每个工作人员自己的数据集,因此不能通过不同的任务/线程访问相同的数据结构.但是我仍然看到不可接受的开销.

Edit: Please see second code snippet where I implement Parallel.Foreach() and create each worker's own dataset, hence no accessing identical data structures by different tasks/threads. Yet I still see an unacceptable amount of overhead.

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine($"Entry Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        //run
        var task = Task.Run(async () =>
        {
            Console.WriteLine($"Entry RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            await RunMe();

            Console.WriteLine($"Exit RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
        });

        task.Wait();

        Console.WriteLine($"Exit Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static async Task RunMe()
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 6;
        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var tasks = new List<Task>();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i, dataPoints));
        }

        //start timer
        watch.Restart();

        //spin off tasks
        foreach (var worker in workers)
        {
            tasks.Add(Task.Run(() =>
            {
                Console.WriteLine($"Entry WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
                worker.DoSomeWork();
                Console.WriteLine($"Exit WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            }));

        }

        //completion tasks
        await Task.WhenAll(tasks);

        //stop timer
        watch.Stop();

        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    private List<double> _data;

    public Worker(int workerId, List<double> data)
    {
        WorkerId = workerId;
        _data = data;
    }

    public void DoSomeWork()
    {
        var indexPos = 0;

        foreach (var dp in _data)
        {
            var subSet = _data.Skip(indexPos).Take(_data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

第二个代码段:

class Program
{
    static void Main(string[] args)
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 1;
        var numberItems = 20000;
        //var random = new Random((int)DateTime.Now.Ticks);
        //var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i));
        }

        //start timer
        watch.Restart();

        //parellel work

        if (workers.Any())
        {
            var processorCount = Environment.ProcessorCount;
            var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = processorCount };
            Parallel.ForEach(workers, parallelOptions, DoSomeWork);
        }

        //stop timer
        watch.Stop();
        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static void DoSomeWork(Worker worker)
    {
        Console.WriteLine($"WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        var indexPos = 0;

        foreach (var dp in worker.Data)
        {
            var subSet = worker.Data.Skip(indexPos).Take(worker.Data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    public List<double> Data { get; set; }

    public Worker(int workerId)
    {
        WorkerId = workerId;

        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        Data = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();

    }
}

推荐答案

注意:以下答案基于测试和观察,而非基于定义知识.

您分拆的任务越多,生成的开销就越大,因此总的执行时间也会增加. 但是,如果您从另一个角度考虑它,您会发现实际处理的数据点"将增加您启动的更多任务(直到达到可用硬件线程的限制):

The more task you spin off the more overhead you generate and thus the total execution time also rises. BUT if you think of it from another viewpoint you will see that the actually processed "data-points" will increase the more tasks you spin up (up until you reach the limit of available hardware-threads):

以下值是在我的计算机(4C/8T)上生成的,每个列表具有10000点:

The following values are generated on my machine (4C/8T) with 10000 points per list:

  • 1名工人-> 1891毫秒-> 5288 p/s
  • 2名工人-> 1921毫秒-> 10411 p/s
  • 4名工作人员-> 2670毫秒-> 14981 p/s
  • 8名工作人员-> 4871毫秒-> 16423 p/s
  • 12名工作人员-> 7449毫秒-> 16109 p/s

您会看到直到我达到核心限制"为止,处理后的数据显着增加,然后直到达到线程限制"为止,它的增加仍然很明显,但之后又减少了,因为开销增加了,并且没有更多可用的硬件资源.

There you see until I reach my "core-limit" the processed data increases significantly, then until I reach my "thread-limit" it increases still noticeable, but after that it decreases again, because of the risen overhead and no more available hardware-resources.

这篇关于如何正确并行化工作人员任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆