使用C#读取数百万个小文件 [英] Reading millions of small files with C#

查看:189
本文介绍了使用C#读取数百万个小文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每天都有数百万个日志文件生成,我需要阅读所有日志文件,并将它们放在一起作为一个文件,以便在其他应用程序中对其进行一些处理.

I have millions of log files which generating every day and I need to read all of them and put together as a single file to do some process on it in other app.

我正在寻找最快的方法.目前,我正在使用线程,任务和类似的并行方式:

I'm looking for the fastest way to do this. Currently I'm using Threads, Tasks and parallel like this:

Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>
{
    ReadFiles(files[i]);
});

void ReadFiles(string file)
{
    try
    {
        var txt = File.ReadAllText(file);
        filesTxt.Add(tmp);
    }
    catch { }
    GlobalCls.ThreadNo--;
}

foreach (var file in files)
{
    //Int64 index = i;
    //var file = files[index];
    while (Process.GetCurrentProcess().Threads.Count > 100)
    { 
        Thread.Sleep(100);
        Application.DoEvents();
    }
    new Thread(() => ReadFiles(file)).Start();
    GlobalCls.ThreadNo++;
    // Task.Run(() => ReadFiles(file));      
}

问题在于读取几千个文件后,读取速度越来越慢!

The problem is that after a few thousand reading files, the reading gets slower and slower!!

知道为什么吗?读取数百万个小文件的最快方法是什么?谢谢.

Any idea why? and what's the fastest approaches to reading millions small files? Thank you.

推荐答案

似乎您正在将所有文件的内容加载到内存中,然后再将它们写回到单个文件中.这可以解释为什么随着时间的流逝,该过程变得越来越慢.

It seems that you are loading the contents of all files in memory, before writing them back to the single file. This could explain why the process becomes slower over time.

一种优化过程的方法是将阅读部分与写作部分分开,并并行进行.这称为生产者-消费者模式.可以使用Parallel类,线程或任务来实现它,但是我将演示基于强大的

A way to optimize the process is to separate the reading part from the writing part, and do them in parallel. This is called the producer-consumer pattern. It can be implemented with the Parallel class, or with threads, or with tasks, but I will demonstrate instead an implementation based on the powerful TPL Dataflow library, that is particularly suited for jobs like this.

private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,
    string targetFilePath, CancellationToken cancellationToken = default,
    IProgress<int> progress = null)
{
    var readerBlock = new TransformBlock<string, string>(async filePath =>
    {
        return File.ReadAllText(filePath); // Read the small file
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2, // Reading is parallelizable
        BoundedCapacity = 100, // No more than 100 file-paths buffered
        CancellationToken = cancellationToken, // Cancel at any time
    });

    StreamWriter streamWriter = null;

    int filesProcessed = 0;
    var writerBlock = new ActionBlock<string>(text =>
    {
        streamWriter.Write(text); // Append to the target file
        filesProcessed++;
        if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // We can't parallelize the writer
        BoundedCapacity = 100, // No more than 100 file-contents buffered
        CancellationToken = cancellationToken, // Cancel at any time
    });

    readerBlock.LinkTo(writerBlock,
        new DataflowLinkOptions() { PropagateCompletion = true });

    // This is a tricky part. We use BoundedCapacity, so we must propagate manually
    // a possible failure of the writer to the reader, otherwise a deadlock may occur.
    PropagateFailure(writerBlock, readerBlock);

    // Open the output stream
    using (streamWriter = new StreamWriter(targetFilePath))
    {
        // Feed the reader with the file paths
        foreach (var filePath in sourceFilePaths)
        {
            var accepted = await readerBlock.SendAsync(filePath,
                cancellationToken); // Cancel at any time
            if (!accepted) break; // This will happen if the reader fails
        }
        readerBlock.Complete();
        await writerBlock.Completion;
    }

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
    {
        try { await block1.Completion.ConfigureAwait(false); }
        catch (Exception ex)
        {
            if (block1.Completion.IsCanceled) return; // On cancellation do nothing
            block2.Fault(ex);
        }
    }
}

用法示例:

var cts = new CancellationTokenSource();
var progress = new Progress<int>(value =>
{
    // Safe to update the UI
    Console.WriteLine($"Files processed: {value:#,0}");
});
var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",
    SearchOption.AllDirectories); // Include subdirectories
await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);

BoundedCapacity 用于控制内存使用.

The BoundedCapacity is used to keep the memory usage under control.

如果磁盘驱动器是SSD,则可以尝试使用

If the disk drive is SSD, you can try reading with a MaxDegreeOfParallelism larger than 2.

为获得最佳性能,您可以考虑写入不同于包含源文件的驱动器的光盘驱动器.

For best performance you could consider writing to a different disc drive than the drive containing the source files.

TPL数据流库可以作为一个包使用.NET Framework,并且是.NET Core的内置组件.

The TPL Dataflow library is available as a package for .NET Framework, and is build-in for .NET Core.

这篇关于使用C#读取数百万个小文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆