如何充分依靠I/O适当并行化作业 [英] How to properly parallelise job heavily relying on I/O

查看:82
本文介绍了如何充分依靠I/O适当并行化作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个必须处理大量数据的控制台应用程序.

I'm building a console application that have to process a bunch of data.

基本上,应用程序从数据库中获取引用.对于每个引用,请分析文件的内容并进行一些更改.这些文件是HTML文件,此过程使用RegEx替换进行了大量工作(查找引用并将其转换为链接).然后将结果存储在文件系统中并发送到外部系统.

Basically, the application grabs references from a DB. For each reference, parse the content of the file and make some changes. The files are HTML files, and the process is doing a heavy work with RegEx replacements (find references and transform them into links). The results in then stored on the file system and sent to an external system.

如果我继续该过程,请按顺序进行:

If I resume the process, in a sequential way :

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

我的程序可以正常运行,但是速度很慢.这就是为什么我要并行化该过程.

My program is working correctly but is quite slow. That's why I want to parallelise the process.

现在,我添加了AsParallel进行了简单的Parallelization:

By now, I made a simple Parallelization adding AsParallel :

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});

这个简单的更改减少了过程的持续时间(减少了25%的时间).但是,我对并行化的了解是,如果对依赖I/O的资源进行并行化,将不会有太多好处(或更糟的是,更少的好处),因为I/O不会神奇地翻倍.

This simple change decrease the duration of the process (25% less time). However, what I understand with parallelization is that there won't be much benefits (or worse, less benefits) if parallelyzing over resources relying on I/O, because the i/o won't magically doubles.

这就是为什么我认为我应该更改方法,而不是使整个过程并行化,而是创建相关的链式排队任务.

That's why I think I should change my approach not to parallelize the whole process, but to create dependent chained queued tasks.

即,我应该创建像这样的流程:

I.E., I should create a flow like :

排队读取文件.完成后,对ParseHtml进行排队.完成后,Queue都发送到WS并在本地写入.完成后,记录结果.

Queue read file. When finished, Queue ParseHtml. When finished, Queue both send to WS and write locally. When finished, log the result.

但是,我不知道如何实现这种想法.

However, I don't know how to realize such think.

我认为它将以一组消费者/生产者队列结尾,但是我没有找到正确的样本.

I feel it will ends in a set of consumer/producer queues, but I didn't find a correct sample.

此外,我不确定是否会有好处.

And moreover, I'm not sure if there will be benefits.

感谢建议

实际上,如果是rtm,我是使用c#4.5的理想人选:)

In fact, I'm the perfect candidate for using c# 4.5... if only it was rtm :)

另一件事让我觉得它没有正确并行化,就是在资源监视器中,我看到CPU,网络I/O和磁盘I/O的图形不稳定.当一个人高时,其他人则从低到中

Another thing making me thinking it's not correctly parallelized, is that in the resource monitor, I see graphs of CPU, network I/O and disk I/O not stable. when one is high, others are low to medium

推荐答案

您没有在任何代码中利用任何异步I/O API.您正在做的所有事情都受CPU限制,并且您所有的I/O操作都将浪费CPU资源. AsParallel用于计算绑定任务,如果您想利用异步I/O,则需要在< = v4.0中利用基于异步编程模型(APM)的API.这是通过在您正在使用的基于I/O的类上查找BeginXXX/EndXXX方法并在可能的情况下利用这些方法来实现的.

You're not leveraging any async I/O APIs in any of your code. Everything you're doing is CPU bound and all your I/O operations are going to waste CPU resources blocking. AsParallel is for compute bound tasks, if you want to take advantage of async I/O you need to leverage the Asynchronous Programming Model (APM) based APIs today in <= v4.0. This is done by looking for BeginXXX/EndXXX methods on the I/O based classes you're using and leveraging those whenever available.

为入门者阅读此帖子: TPL TaskFactory. FromAsync与带有阻止方法的任务

Read this post for starters: TPL TaskFactory.FromAsync vs Tasks with blocking methods

接下来,在这种情况下,您都不想使用AsParallel. AsParallel启用流传输,这将导致立即为每个项目安排新的任务,但是您在这里不需要/不需要.使用Parallel::ForEach对工作进行分区会更好.

Next, you don't want to use AsParallel in this case anyway. AsParallel enables streaming which will result in an immediately scheduling a new Task per item, but you don't need/want that here. You'd be much better served by partitioning the work using Parallel::ForEach.

让我们看看如何在特定情况下使用此知识来实现​​最大并发性:

Let's see how you can use this knowledge to achieve max concurrency in your specific case:

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{ 
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath, 
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read 
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});

现在,这里有几点注意事项:

Now, here's few notes:

  1. 这是示例代码,因此我正在使用1MB的缓冲区读取/写入文件.这对于HTML文件来说是多余的,并且浪费了系统资源.您可以降低它以满足您的最大需求,也可以将链接的读/写实现到StringBuilder中,这是我的专职工作,因为我要编写约500行以上的代码来执行异步的链接读/写. :P
  2. 您会注意到,在继续执行读/写任务时,我有TaskContinuationOptions.AttachedToParent.这非常重要,因为它将阻止Parallel::ForEach用来启动工作的工作线程完成,直到所有基础异步调用都完成为止.如果不在这里,那么您将同时启动所有5000个项目的工作,这将污染TPL子系统并产生数千个预定任务,并且根本无法正确扩展.
  3. 我同时调用SendToWs,以将文件写入此处的文件共享.我不知道SendToWs实现的基础是什么,但这听起来也很适合进行异步处理.现在假设它是纯计算工作,因此将在执行时刻录CPU线程.我想让您锻炼一下,从而找出如何最好地利用我向您展示的内容来提高吞吐量的方法.
  4. 这都是自由键入的形式,我的大脑是这里唯一的编译器,SO的语法高亮是我用来确保语法良好的全部.因此,请原谅任何语法错误,如果我将任何事情搞砸了,以至于你弄不清楚它的正负,我都会跟进.
  1. This is sample code so I'm using a 1MB buffer to read/write files. This is excessive for HTML files and wasteful of system resources. You can either lower it to suit your max needs or implement chained reads/writes into a StringBuilder which is an excercise I leave up to you since I'd be writing ~500 more lines of code to do async chained reads/writes. :P
  2. You'll note that on the continuations for the read/write tasks I have TaskContinuationOptions.AttachedToParent. This is very important as it will prevent the worker thread that the Parallel::ForEach starts the work with from completing until all the underlying async calls have completed. If this was not here you would kick off work for all 5000 items concurrently which would pollute the TPL subsystem with thousands of scheduled Tasks and not scale properly at all.
  3. I call SendToWs concurrent to writing the file to the file share here. I don't know what is underlying the implementation of SendToWs, but it too sounds like a good candidate for making async. Right now it's assumed it's pure compute work and, as such, is going to burn a CPU thread while executing. I leave it as an excercise to you to figure out how best to leverage what I've shown you to improve throughput there.
  4. This is all typed free form and my brain was the only compiler here and SO's syntax higlighting is all I used to make sure syntax was good. So, please forgive any syntax errors and let me know if I screwed up anything too badly that you can't make heads or tails of it and I'll follow up.

这篇关于如何充分依靠I/O适当并行化作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆