C#并发和异步处理文件 [英] C# Process Files Concurrently And Asynchronously

查看:102
本文介绍了C#并发和异步处理文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

寻找有关创建潜在的多线程异步应用程序的最佳实践的帮助.该应用程序将通过几个目录查找某种模式(每个目录可配置).对于在该目录中找到的所有文件,它将为每个文件启动异步操作(读/写,db操作,api调用等).目录本身应该并发处理,因为它们彼此无关.

Looking for some help with best practices on creating a potentially multi-threaded asynchronous application. This application will look through several directories for a certain pattern (configurable per directory). For all of the files it finds in that directory, it will kick off an asynchronous operation for each file (read/write, db operations, api calls, etc). The directories themselves should be processed concurrently as they are un-related to each other.

据我了解, Task 可能并不总是在单独的线程上执行.因为此应用程序可能必须同时处理数十到数百个文件,所以我想确保自己最大化了该应用程序的吞吐量.还值得注意的是,此应用程序运行时目录中可能有也可能没有文件.

It's my understanding that Task may not always execute on a separate thread. Because this application may have to handle dozens to hundreds of files at any one time, I want to make sure I am maximizing throughput of the application. It's also worth noting that there may or may not be files in the directory when this application runs.

仅仅是使用 Task 足以完成此任务并实现最大吞吐量,还是将 Parallel.ForEach 与异步功能进行某种组合会更好?以下是我到目前为止创建的内容,仅供测试查看,看起来它一次在同一线程上处理1个目录.

Is simply using Task enough to accomplish this and achieve maximum throughput, or is there some combination of Parallel.ForEach with an asynchronous function that would be better? Below is what I have created so far just as a test to see and it looks like it's processing 1 directory at a time on the same thread.

主要

class Program {
    static IEnumerable<DirectoryConfig> GetDirectoryConfigs() {
        return new DirectoryConfig[] {
            new DirectoryConfig {
                DirectoryPath = @"PATH_1",
                Token = "*",
                FileProcessor = new FileProcessor()
            },
            new DirectoryConfig {
                DirectoryPath = @"PATH_2",
                Token = "*",
                FileProcessor = new FileProcessor()
            }
        };
    }
    static async Task Main(string[] args) {
        IEnumerable<DirectoryConfig> directoryConfigs = GetDirectoryConfigs();

        List<Task> tasks = new List<Task>();

        foreach(DirectoryConfig config in directoryConfigs) {
            Console.WriteLine("Processing directory {0}", config.DirectoryPath);

            tasks.Add(new DirectoryMonitor().ProcessDirectoryAsync(config));
        }

        await Task.WhenAll(tasks);
    }
}

DirectoryMonitor

class DirectoryMonitor {
    public Task ProcessDirectoryAsync(DirectoryConfig config) {
        List<Task> tasks = new List<Task>();

        foreach (string file in Directory.GetFiles(config.DirectoryPath, config.Token)) {
            tasks.Add(config.FileProcessor.ProcessAsync(file));
        }

        return Task.WhenAll(tasks);
    }
}

FileProcessor

class FileProcessor : IFileProcessor {
    public async Task ProcessAsync(string file) {
        string fileName = Path.GetFileName(file);
        Console.WriteLine("Processing file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId);
        using (StreamReader reader = new StreamReader(file)) {
            int lineNumber = 0;
            while(!reader.EndOfStream) {
                Console.WriteLine("Reading line {0} of file {1}", ++lineNumber, fileName);
                string line = await reader.ReadLineAsync();

                await DoAsyncWork(line);
            }
        }
    }

    private Task DoAsyncWork(string line) {
        return Task.Delay(1000);
    }
}

推荐答案

对于这种工作,您可以使用的功能强大的工具是

For this kind of job a powerful tool you could use is the TPL Dataflow library. With this tool you can create a processing pipeline consisting of many linked blocks, with the data flowing from the first block to the last (circles and meshes are also possible).

这种方法的优点是:

  1. 您可以在任务并行性之上获得数据并行性.所有模块都同时且彼此独立地工作.
  2. 您可以最佳地配置每个异构操作的并发级别(也称为并行度).例如,进行API调用可能是高度可并行化的,而从硬盘读取可能根本无法并行化.
  3. 您可以立即使用高级选项( BoundedCapacity CancellationToken 等).
  4. 您获得了对同步和异步委托的内置支持.

以下是如何使用TPL Dataflow术语重写原始代码的方法.使用了三个块,两个 <代码> TransformManyBlock s和一个 ActionBlock .

Below is how you could rewrite your original code in TPL Dataflow terms. Three blocks are used, two TransformManyBlocks and one ActionBlock.

var directoryBlock = new TransformManyBlock<DirectoryConfig, string>(config =>
{
    return Directory.GetFiles(config.DirectoryPath, config.Token);
});

var fileBlock = new TransformManyBlock<string, string>(filePath =>
{
    return File.ReadLines(filePath);
});

var lineBlock = new ActionBlock<string>(async line =>
{
    await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 4
});

directoryBlock.LinkTo(fileBlock, new DataflowLinkOptions { PropagateCompletion = true });
fileBlock.LinkTo(lineBlock, new DataflowLinkOptions { PropagateCompletion = true });

foreach (DirectoryConfig config in GetDirectoryConfigs()) {
    await directoryBlock.SendAsync(config);

directoryBlock.Complete();
await lineBlock.Completion;

该示例不是很好,因为所有工作都由最后一个块( lineBlock )完成,而前两个块实际上什么也不做.由于所有目录的所有文件的所有行很快都会在 ActionBlock 的输入缓冲区中排队,因此它的内存效率也不高,除非处理这些行的速度比从磁盘读取它们的速度快.您需要使用 BoundedCapacity 解决此问题.

This example is not very good since all the work is done by the last block (the lineBlock), and the first two blocks are doing essentially nothing. It is also not memory-efficient since all lines of all files of all directories will soon become queued in the input buffer of the ActionBlock, unless processing the lines happens to be faster than reading them from the disk. You'll need to configure the blocks with BoundedCapacity to solve this problem.

此示例还无法说明如何为不同类型的文件提供不同的块,并使用每个链接的不同过滤谓词将 directoryBlock 链接到所有文件:

This example also fails to demonstrate how you could have different blocks for different types of files, and link the directoryBlock to all of them using a different filtering predicate for each link:

directoryBlock.LinkTo(csvBlock, filePath => Path.GetExtension(filePath) == "csv");
directoryBlock.LinkTo(xlsBlock, filePath => Path.GetExtension(filePath) == "xls");
directoryBlock.LinkTo(generalFileBlock); // Anything that is neither csv nor xls

您还可以使用其他类型的块,例如 package .

There are also other types of blocks you could use, like the TransformBlock and the BatchBlock. The TPL Dataflow is based on the Task Parallel Library (TPL), and it is essentially a high level task-generator that creates and controls the lifecycle of the tasks needed in order to process a workload of given type, based on declarative configuration. It is built-in the .NET Core, and available as a package for .NET Framework.

这篇关于C#并发和异步处理文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆