如何正确并行化严重依赖 I/O 的作业 [英] How to properly parallelise job heavily relying on I/O

查看:22
本文介绍了如何正确并行化严重依赖 I/O 的作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个必须处理大量数据的控制台应用程序.

I'm building a console application that have to process a bunch of data.

基本上,应用程序从数据库中获取引用.对于每个引用,解析文件的内容并进行一些更改.这些文件是 HTML 文件,该过程正在使用 RegEx 替换(查找引用并将它们转换为链接)进行繁重的工作.然后将结果存储在文件系统中并发送到外部系统.

Basically, the application grabs references from a DB. For each reference, parse the content of the file and make some changes. The files are HTML files, and the process is doing a heavy work with RegEx replacements (find references and transform them into links). The results in then stored on the file system and sent to an external system.

如果我按顺序恢复该过程:

If I resume the process, in a sequential way :

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

我的程序运行正常,但速度很慢.这就是为什么我想并行化这个过程.

My program is working correctly but is quite slow. That's why I want to parallelise the process.

到目前为止,我做了一个简单的并行化添加 AsParallel :

By now, I made a simple Parallelization adding AsParallel :

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});

这个简单的改变减少了过程的持续时间(减少了 25% 的时间).但是,我对并行化的理解是,如果对依赖 I/O 的资源进行并行化,则不会有太多好处(或者更糟的是,好处更少),因为 I/O 不会神奇地翻倍.

This simple change decrease the duration of the process (25% less time). However, what I understand with parallelization is that there won't be much benefits (or worse, less benefits) if parallelyzing over resources relying on I/O, because the i/o won't magically doubles.

这就是为什么我认为我应该改变我的方法而不是将整个过程并行化,而是创建依赖的链式队列任务.

That's why I think I should change my approach not to parallelize the whole process, but to create dependent chained queued tasks.

I.E.,我应该创建一个类似的流程:

I.E., I should create a flow like :

队列读取文件.完成后,排队 ParseHtml.完成后,Queue 都发送到 WS 并在本地写入.完成后,记录结果.

Queue read file. When finished, Queue ParseHtml. When finished, Queue both send to WS and write locally. When finished, log the result.

但是,我不知道如何实现这样的想法.

However, I don't know how to realize such think.

我觉得它会以一组消费者/生产者队列结束,但我没有找到正确的样本.

I feel it will ends in a set of consumer/producer queues, but I didn't find a correct sample.

此外,我不确定是否会有好处.

And moreover, I'm not sure if there will be benefits.

谢谢指教

事实上,我是使用 c# 4.5 的完美人选...如果只是 rtm :)

In fact, I'm the perfect candidate for using c# 4.5... if only it was rtm :)

[Edit 2] 另一件让我认为它没有正确并行化的事情是,在资源监视器中,我看到 CPU、网络 I/O 和磁盘 I/O 的图表不稳定.一个高,其他低到中

Another thing making me thinking it's not correctly parallelized, is that in the resource monitor, I see graphs of CPU, network I/O and disk I/O not stable. when one is high, others are low to medium

推荐答案

您没有在任何代码中利用任何异步 I/O API.您所做的一切都受 CPU 限制,所有 I/O 操作都将浪费 CPU 资源阻塞.AsParallel 用于计算绑定任务,如果您想利用异步 I/O,您需要在 <= v4.0 中利用基于异步编程模型 (APM) 的 API.这是通过在您正在使用的基于 I/O 的类上查找 BeginXXX/EndXXX 方法并在可用时利用这些方法来完成的.

You're not leveraging any async I/O APIs in any of your code. Everything you're doing is CPU bound and all your I/O operations are going to waste CPU resources blocking. AsParallel is for compute bound tasks, if you want to take advantage of async I/O you need to leverage the Asynchronous Programming Model (APM) based APIs today in <= v4.0. This is done by looking for BeginXXX/EndXXX methods on the I/O based classes you're using and leveraging those whenever available.

首先阅读这篇文章:TPL TaskFactory.FromAsync 与具有阻塞方法的任务

接下来,无论如何您都不想在这种情况下使用 AsParallel.AsParallel 启用流式传输,这将导致立即为每个项目安排一个新任务,但您在这里不需要/不想要它.使用 Parallel::ForEach 划分工作会更好地为您服务.

Next, you don't want to use AsParallel in this case anyway. AsParallel enables streaming which will result in an immediately scheduling a new Task per item, but you don't need/want that here. You'd be much better served by partitioning the work using Parallel::ForEach.

让我们看看如何使用这些知识在您的特定情况下实现最大并发:

Let's see how you can use this knowledge to achieve max concurrency in your specific case:

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{ 
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath, 
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read 
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});

现在,请注意以下几点:

Now, here's few notes:

  1. 这是示例代码,所以我使用 1MB 缓冲区来读/写文件.这对于 HTML 文件来说是多余的,并且浪费了系统资源.您可以降低它以满足您的最大需求,也可以将链式读取/写入实现到 StringBuilder 中,这是我留给您的练习,因为我将编写约 500 多行代码来执行异步链式读取/写入.:P
  2. 您会注意到,在读/写任务的延续中,我有 TaskContinuationOptions.AttachedToParent.这非常重要,因为它会阻止 Parallel::ForEach 开始工作的工作线程在所有底层异步调用完成之前完成.如果这不在这里,您将同时启动所有 5000 个项目的工作,这会用数千个计划任务污染 TPL 子系统,并且根本无法正确扩展.
  3. 我调用 SendToWs 并发地将文件写入此处的文件共享.我不知道 SendToWs 实现的基础是什么,但它听起来也是异步的一个很好的候选者.现在假设它是纯计算工作,因此在执行时会消耗 CPU 线程.我把它留给您作为练习,以找出如何最好地利用我向您展示的内容来提高那里的吞吐量.
  4. 这是所有类型的自由格式,我的大脑是这里唯一的编译器,所以我使用 SO 的语法高亮显示来确保语法良好.所以,请原谅任何语法错误,如果我把任何事情搞砸了以至于你无法理解它,请告诉我,我会跟进.
  1. This is sample code so I'm using a 1MB buffer to read/write files. This is excessive for HTML files and wasteful of system resources. You can either lower it to suit your max needs or implement chained reads/writes into a StringBuilder which is an excercise I leave up to you since I'd be writing ~500 more lines of code to do async chained reads/writes. :P
  2. You'll note that on the continuations for the read/write tasks I have TaskContinuationOptions.AttachedToParent. This is very important as it will prevent the worker thread that the Parallel::ForEach starts the work with from completing until all the underlying async calls have completed. If this was not here you would kick off work for all 5000 items concurrently which would pollute the TPL subsystem with thousands of scheduled Tasks and not scale properly at all.
  3. I call SendToWs concurrent to writing the file to the file share here. I don't know what is underlying the implementation of SendToWs, but it too sounds like a good candidate for making async. Right now it's assumed it's pure compute work and, as such, is going to burn a CPU thread while executing. I leave it as an excercise to you to figure out how best to leverage what I've shown you to improve throughput there.
  4. This is all typed free form and my brain was the only compiler here and SO's syntax higlighting is all I used to make sure syntax was good. So, please forgive any syntax errors and let me know if I screwed up anything too badly that you can't make heads or tails of it and I'll follow up.

这篇关于如何正确并行化严重依赖 I/O 的作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆