与仅循环相比,为什么我的TPL数据流管道在读取巨大的CSV文件时速度较慢? [英] Why is my TPL Dataflow Pipeline slower in reading huge CSV files compared to just looping?

查看:45
本文介绍了与仅循环相比,为什么我的TPL数据流管道在读取巨大的CSV文件时速度较慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我的要求是读取多个CSV文件(每个文件至少有一百万行),然后解析每一行.目前,按照我分解管道的方式,我首先创建一个单独的管道,仅将CSV文件读入字符串[],然后计划稍后再创建解析管道.

但是看到文件读取管道的结果,我很傻,因为它比循环遍历CSV文件然后遍历行要慢得多.

 静态公共IPropagatorBlock< string,string []>CreatePipeline(int batchSize){var lineBufferBlock =新的BufferBlock< string>(新的DataflowBlockOptions {BoundedCapacity = batchSize});var fileReadingBlock = new ActionBlock< string>(async(filePath)=>{使用(var fileStream = File.OpenRead(filePath)){使用(var streamReader = new StreamReader(fileStream,Encoding.UTF8,true,batchSize)){弦线;while((line = streamReader.ReadLine())!= null){var isCompleted =等待lineBufferBlock.SendAsync(line);while(!isCompleted){isCompleted =等待lineBufferBlock.SendAsync(line);}}}}},新的ExecutionDataflowBlockOptions {确保顺序= true,MaxDegreeOfParallelism = Environment.ProcessorCount});var fileParsingBlock = new TransformBlock< string,string []>(((line)=>{返回line.Split(,");},新的ExecutionDataflowBlockOptions {确保顺序= true,MaxDegreeOfParallelism = Environment.ProcessorCount});lineBufferBlock.LinkTo(fileParsingBlock,new DataflowLinkOptions {PropagateCompletion = true});fileReadingBlock.Completion.ContinueWith(((task)=>{lineBufferBlock.Complete();});返回DataflowBlock.Encapsulate(fileReadingBlock,fileParsingBlock);} 

然后我最终按如下方式使用它

(int i = 1; i< 5; i ++)的

  {var filePath = $"C:\\ Users \\ File {i} .csv";fileReadingPipeline.SendAsync(filePath);}fileReadingPipeline.Complete();而(true){尝试 {var outputRows = fileReadingPipeline.Receive();foreach(outputRows中的字符串词){}}catch(InvalidOperationException e){休息;}} 

以下是我的直接循环代码:

(int i = 1; i< 5; i ++)的

  {var filePath = $"C:\\ Users \\ File {i} .csv";foreach(File.ReadLines(filePath)中的字符串行){foreach(row.Split(,")中的字符串词){}}} 

TPL Dataflow的性能差异降低到约15秒,而循环代码的性能差异则下降到5s.

编辑

根据评论中的更好建议,我从管道中删除了不必要的lineBufferBlock,这是我现在的代码.但是性能仍然保持不变.

  var fileReadingBlock = new TransformManyBlock< string,string>(((filePath)=>{返回File.ReadLines(filePath);},新的ExecutionDataflowBlockOptions {确保顺序= true,MaxDegreeOfParallelism = Environment.ProcessorCount});var fileParsingBlock = new TransformBlock< string,string []>(((line)=>{返回line.Split(,");},新的ExecutionDataflowBlockOptions {确保顺序= true,MaxDegreeOfParallelism = Environment.ProcessorCount});fileReadingBlock.LinkTo(fileParsingBlock,new DataflowLinkOptions {PropagateCompletion = true});返回DataflowBlock.Encapsulate(fileReadingBlock,fileParsingBlock); 

解决方案

在配置管道时,应牢记将要完成此工作的硬件的功能.TPL Dataflow本身并不能完成任务,而是将其委派给CPU,HDD/SSD,网卡等.例如,当从硬盘读取文件时,指示TPL从中读取数据可能是徒劳的.由于硬盘驱动器机械臂的头部不能同时物理定位在8个位置上,因此并发8个文件.这归结为以下事实:从文件系统读取文件不是特别并行友好的.如果使用SSD,它会稍微好一些,但是您必须根据具体情况进行测试.

并行化的另一个问题是粒度.您希望工作负载是大块的,而不是细粒度的.否则,在缓冲区之间传递消息以及在每次传输周围放置内存障碍以确保跨线程可见性的成本,可能会抵消您从采用并行机制可能期望得到的任何好处.提示:将单个 string 拆分为多个部分是非常精细的操作.

这是一种实现方法:

 使用静态MoreLinq.Extensions.BatchExtension;var reader = new TransformManyBlock< string,string []>(filePath =>{返回File.ReadLines(filePath).Batch(100,r => r.ToArray());},新的ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 1});var parser = new TransformBlock< string [],string [] []>(lines =>{返回lines.Select(line => line.Split(,")).ToArray();},新的ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = Environment.ProcessorCount});reader.LinkTo(parser,new DataflowLinkOptions {PropagateCompletion = true}); 

此示例使用 批处理 运算符从 MoreLinq 包中进行传递每行100条,而不是一一传递.您可以在此处找到其他批处理选项..>


更新:另一个建议是提高 ThreadPool 按需创建的最小线程数( 解决方案

When you configure a pipeline, you should have in mind the capabilities of the hardware that is going to do the job. The TPL Dataflow is not doing the job by itself, it's delegating it to the CPU, the HDD/SSD, the network card etc. For example when reading files from a hard disk, it is probably futile to instruct the TPL to read data from 8 files concurrently, because the head of the mechanical arm of the HDD can not be physically located in 8 places at the same time. This boils down to the fact that reading files from filesystems is not particularly parallel-friendly. It is slightly better in case of SSDs, but you'll have to test it in a case by case basis.

Another issue with parallelization is granularity. You want the workload to be chunky, not granular. Otherwise the cost of passing messages from buffer to buffer, and putting memory barriers around each transfer to ensure cross-thread visibility, may negate any benefits you may expect from employing parallelism. Tip: splitting a single string to parts is a highly granular operation.

Here is a way to do it:

using static MoreLinq.Extensions.BatchExtension;

var reader = new TransformManyBlock<string, string[]>(filePath =>
{
    return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 1
});

var parser = new TransformBlock<string[], string[][]>(lines =>
{
    return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
});

reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });

This example uses the Batch operator from the MoreLinq package in order to pass the lines around in batches of 100, instead of passing them one by one. You can find other batching options here.


Update: One more suggestion is to boost the minimum number of threads that the ThreadPool creates on demand (SetMinThreads). Otherwise the ThreadPool will be immediately saturated by the MaxDegreeOfParallelism = Environment.ProcessorCount configuration, which will cause small but noticeable (500 msec) delays, because of the intentional laziness of the ThreadPool's thread-injection algorithm.

ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
    Environment.ProcessorCount * 2);

It is enough to call this method once at the start of the program.

这篇关于与仅循环相比,为什么我的TPL数据流管道在读取巨大的CSV文件时速度较慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆