使用C#读取数百万个小文件 [英] Reading millions of small files with C#
问题描述
我每天都有数百万个日志文件生成,我需要阅读所有日志文件,并将它们放在一起作为一个文件,以便在其他应用程序中对其进行一些处理.
I have millions of log files which generating every day and I need to read all of them and put together as a single file to do some process on it in other app.
我正在寻找最快的方法.目前,我正在使用线程,任务和类似的并行方式:
I'm looking for the fastest way to do this. Currently I'm using Threads, Tasks and parallel like this:
Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>
{
ReadFiles(files[i]);
});
void ReadFiles(string file)
{
try
{
var txt = File.ReadAllText(file);
filesTxt.Add(tmp);
}
catch { }
GlobalCls.ThreadNo--;
}
或
foreach (var file in files)
{
//Int64 index = i;
//var file = files[index];
while (Process.GetCurrentProcess().Threads.Count > 100)
{
Thread.Sleep(100);
Application.DoEvents();
}
new Thread(() => ReadFiles(file)).Start();
GlobalCls.ThreadNo++;
// Task.Run(() => ReadFiles(file));
}
问题在于读取几千个文件后,读取速度越来越慢!
The problem is that after a few thousand reading files, the reading gets slower and slower!!
知道为什么吗?读取数百万个小文件的最快方法是什么?谢谢.
Any idea why? and what's the fastest approaches to reading millions small files? Thank you.
推荐答案
似乎您正在将所有文件的内容加载到内存中,然后再将它们写回到单个文件中.这可以解释为什么随着时间的流逝,该过程变得越来越慢.
It seems that you are loading the contents of all files in memory, before writing them back to the single file. This could explain why the process becomes slower over time.
一种优化过程的方法是将阅读部分与写作部分分开,并并行进行.这称为生产者-消费者模式.可以使用Parallel
类,线程或任务来实现它,但是我将演示基于强大的
A way to optimize the process is to separate the reading part from the writing part, and do them in parallel. This is called the producer-consumer pattern. It can be implemented with the Parallel
class, or with threads, or with tasks, but I will demonstrate instead an implementation based on the powerful TPL Dataflow library, that is particularly suited for jobs like this.
private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,
string targetFilePath, CancellationToken cancellationToken = default,
IProgress<int> progress = null)
{
var readerBlock = new TransformBlock<string, string>(async filePath =>
{
return File.ReadAllText(filePath); // Read the small file
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2, // Reading is parallelizable
BoundedCapacity = 100, // No more than 100 file-paths buffered
CancellationToken = cancellationToken, // Cancel at any time
});
StreamWriter streamWriter = null;
int filesProcessed = 0;
var writerBlock = new ActionBlock<string>(text =>
{
streamWriter.Write(text); // Append to the target file
filesProcessed++;
if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1, // We can't parallelize the writer
BoundedCapacity = 100, // No more than 100 file-contents buffered
CancellationToken = cancellationToken, // Cancel at any time
});
readerBlock.LinkTo(writerBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
// This is a tricky part. We use BoundedCapacity, so we must propagate manually
// a possible failure of the writer to the reader, otherwise a deadlock may occur.
PropagateFailure(writerBlock, readerBlock);
// Open the output stream
using (streamWriter = new StreamWriter(targetFilePath))
{
// Feed the reader with the file paths
foreach (var filePath in sourceFilePaths)
{
var accepted = await readerBlock.SendAsync(filePath,
cancellationToken); // Cancel at any time
if (!accepted) break; // This will happen if the reader fails
}
readerBlock.Complete();
await writerBlock.Completion;
}
async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
{
try { await block1.Completion.ConfigureAwait(false); }
catch (Exception ex)
{
if (block1.Completion.IsCanceled) return; // On cancellation do nothing
block2.Fault(ex);
}
}
}
用法示例:
var cts = new CancellationTokenSource();
var progress = new Progress<int>(value =>
{
// Safe to update the UI
Console.WriteLine($"Files processed: {value:#,0}");
});
var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",
SearchOption.AllDirectories); // Include subdirectories
await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);
BoundedCapacity
用于控制内存使用.
The BoundedCapacity
is used to keep the memory usage under control.
If the disk drive is SSD, you can try reading with a MaxDegreeOfParallelism
larger than 2.
为获得最佳性能,您可以考虑写入不同于包含源文件的驱动器的光盘驱动器.
For best performance you could consider writing to a different disc drive than the drive containing the source files.
TPL数据流库可以作为一个包使用.NET Framework,并且是.NET Core的内置组件.
The TPL Dataflow library is available as a package for .NET Framework, and is build-in for .NET Core.
这篇关于使用C#读取数百万个小文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!