并发和异步处理文件 [英] Process files concurrently and asynchronously

查看:24
本文介绍了并发和异步处理文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

寻求有关创建潜在多线程异步应用程序的最佳实践的帮助.此应用程序将查看多个目录以获取特定模式(每个目录可配置).对于它在该目录中找到的所有文件,它将启动每个文件的异步操作(读/写、数据库操作、API 调用等).目录本身应该被并发处理,因为它们彼此不相关.

Looking for some help with best practices on creating a potentially multi-threaded asynchronous application. This application will look through several directories for a certain pattern (configurable per directory). For all of the files it finds in that directory, it will kick off an asynchronous operation for each file (read/write, DB operations, API calls, etc). The directories themselves should be processed concurrently as they are unrelated to each other.

据我所知,Task 可能并不总是在单独的线程上执行.由于此应用程序可能必须同时处理数十到数百个文件,因此我想确保最大限度地提高应用程序的吞吐量.还值得注意的是,该应用程序运行时目录中可能有文件也可能没有文件.

It's my understanding that Task may not always execute on a separate thread. Because this application may have to handle dozens to hundreds of files at any one time, I want to make sure I am maximizing throughput of the application. It's also worth noting that there may or may not be files in the directory when this application runs.

仅使用 Task 就足以完成此任务并实现最大吞吐量,还是将 Parallel.ForEach 与异步函数结合使用会更好?以下是我迄今为止创建的内容,作为测试查看,看起来它在同一线程上一次处理 1 个目录.

Is simply using Task enough to accomplish this and achieve maximum throughput, or is there some combination of Parallel.ForEach with an asynchronous function that would be better? Below is what I have created so far just as a test to see and it looks like it's processing 1 directory at a time on the same thread.

主要

class Program {
    static IEnumerable<DirectoryConfig> GetDirectoryConfigs() {
        return new DirectoryConfig[] {
            new DirectoryConfig {
                DirectoryPath = @"PATH_1",
                Token = "*",
                FileProcessor = new FileProcessor()
            },
            new DirectoryConfig {
                DirectoryPath = @"PATH_2",
                Token = "*",
                FileProcessor = new FileProcessor()
            }
        };
    }
    static async Task Main(string[] args) {
        IEnumerable<DirectoryConfig> directoryConfigs = GetDirectoryConfigs();

        List<Task> tasks = new List<Task>();

        foreach(DirectoryConfig config in directoryConfigs) {
            Console.WriteLine("Processing directory {0}", config.DirectoryPath);

            tasks.Add(new DirectoryMonitor().ProcessDirectoryAsync(config));
        }

        await Task.WhenAll(tasks);
    }
}

目录监控

class DirectoryMonitor {
    public Task ProcessDirectoryAsync(DirectoryConfig config) {
        List<Task> tasks = new List<Task>();

        foreach (string file in Directory.GetFiles(config.DirectoryPath, config.Token)) {
            tasks.Add(config.FileProcessor.ProcessAsync(file));
        }

        return Task.WhenAll(tasks);
    }
}

文件处理器

class FileProcessor : IFileProcessor {
    public async Task ProcessAsync(string file) {
        string fileName = Path.GetFileName(file);
        Console.WriteLine("Processing file {0} on thread {1}", fileName,
            Thread.CurrentThread.ManagedThreadId);
        using (StreamReader reader = new StreamReader(file)) {
            int lineNumber = 0;
            while(!reader.EndOfStream) {
                Console.WriteLine("Reading line {0} of file {1}", ++lineNumber, fileName);
                string line = await reader.ReadLineAsync();

                await DoAsyncWork(line);
            }
        }
    }

    private Task DoAsyncWork(string line) {
        return Task.Delay(1000);
    }
}

推荐答案

对于此类工作,您可以使用的强大工具是 TPL 数据流 库.使用此工具,您可以创建由许多链接块组成的处理管道,数据从第一个块流向最后一个块(圆形和网格也是可能的).

For this kind of job a powerful tool you could use is the TPL Dataflow library. With this tool you can create a processing pipeline consisting of many linked blocks, with the data flowing from the first block to the last (circles and meshes are also possible).

这种方法的优点是:

  1. 您可以在任务并行性之上获得数据并行性.所有块都同时并相互独立地工作.
  2. 您可以优化配置每个异构操作的并发级别(也称为并行度).例如,执行 API 调用可能是高度可并行化的,而从硬盘读取可能根本无法并行化.
  3. 您可以获得开箱即用的高级选项(BoundedCapacityCancellationToken 等).
  4. 您可以获得对同步和异步委托的内置支持.

以下是如何用 TPL 数据流术语重写原始代码.使用了三个块,两个TransformManyBlocks 和一个 ActionBlock.

Below is how you could rewrite your original code in TPL Dataflow terms. Three blocks are used, two TransformManyBlocks and one ActionBlock.

var directoryBlock = new TransformManyBlock<DirectoryConfig, string>(config =>
{
    return Directory.GetFiles(config.DirectoryPath, config.Token);
});

var fileBlock = new TransformManyBlock<string, string>(filePath =>
{
    return File.ReadLines(filePath);
});

var lineBlock = new ActionBlock<string>(async line =>
{
    await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 4
});

directoryBlock.LinkTo(fileBlock, new DataflowLinkOptions { PropagateCompletion = true });
fileBlock.LinkTo(lineBlock, new DataflowLinkOptions { PropagateCompletion = true });

foreach (DirectoryConfig config in GetDirectoryConfigs())
    await directoryBlock.SendAsync(config);

directoryBlock.Complete();
await lineBlock.Completion;

这个例子不是很好,因为所有的工作都是由最后一个块(lineBlock)完成的,而前两个块基本上什么都不做.它也不是内存有效的,因为所有目录的所有文件的所有行很快就会在 ActionBlock 的输入缓冲区中排队,除非处理这些行的速度恰好比从磁盘读取它们要快.您需要使用 BoundedCapacity 来解决这个问题.

This example is not very good since all the work is done by the last block (the lineBlock), and the first two blocks are doing essentially nothing. It is also not memory-efficient since all lines of all files of all directories will soon become queued in the input buffer of the ActionBlock, unless processing the lines happens to be faster than reading them from the disk. You'll need to configure the blocks with BoundedCapacity to solve this problem.

这个例子也没有演示如何为不同类型的文件使用不同的块,并为每个链接使用不同的过滤谓词将 directoryBlock 链接到所有它们:

This example also fails to demonstrate how you could have different blocks for different types of files, and link the directoryBlock to all of them using a different filtering predicate for each link:

directoryBlock.LinkTo(csvBlock, filePath => Path.GetExtension(filePath) == "csv");
directoryBlock.LinkTo(xlsBlock, filePath => Path.GetExtension(filePath) == "xls");
directoryBlock.LinkTo(generalFileBlock); // Anything that is neither csv nor xls

您还可以使用其他类型的块,例如 TransformBlockBatchBlock.TPL 数据流基于任务并行库 (TPL),它本质上是一个高级任务生成器,它基于声明式配置创建和控制处理给定类型工作负载所需任务的生命周期.它内置于 .NET Core 中,并作为 提供a> 用于 .NET 框架.

There are also other types of blocks you could use, like the TransformBlock and the BatchBlock. The TPL Dataflow is based on the Task Parallel Library (TPL), and it is essentially a high level task-generator that creates and controls the lifecycle of the tasks needed in order to process a workload of given type, based on declarative configuration. It is built-in the .NET Core, and available as a package for .NET Framework.

这篇关于并发和异步处理文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆