如何定期将c#FileStream刷新到磁盘? [英] How to periodically flush c# FileStream to the disk?

查看:81
本文介绍了如何定期将c#FileStream刷新到磁盘?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

上下文:

我正在为 Web API 项目实现一种日志记录机制,该项目可以通过多种方法将序列化的对象写入文件,而这些方法又可以由外部进程读取( nxLog 更准确).该应用程序托管在IIS上,并使用18个工作进程.App池每天回收一次.合并日志记录方法的服务的预期负载为10,000 req/s.简而言之,这是一个经典的 products/consumer 问题,涉及多个生产者(产生日志的方法)和一个使用者(从日志文件读取的外部进程).更新:每个进程也使用多个线程.

I am implementing a logging mechanism for a Web API project that writes serialized objects to a file from multiple methods which in turn is read by an external process (nxLog to be more accurate). The application is hosted on IIS and uses 18 worker processes. The App pool is recycled once a day. The expected load on the services that will incorporate the logging methods is 10,000 req/s. In short this is a classic produces/consumer problem with multiple producers (the methods that produce logs) and one consumer (the external process who reads from the log files). Update: Each process uses multiple threads as well.

我使用了 BlockingCollection 来存储数据(并解决争用条件)和一个长期运行的任务,该任务将数据从集合中写入磁盘.

I used BlockingCollection to store data (and solve the race condition) and a long running task that writes the data from the collection to the disk.

要写入磁盘,我正在使用 StreamWriter FileStream .
因为写入频率几乎是恒定的(如我所说的每秒10,000次写入),所以我决定在应用程序池的整个生命周期中保持流打开,并定期将日志写入磁盘.我依靠App Pool回收和我的DI框架每天处理记录器.还要注意,该类将是单例的,因为我不想有多个线程专用于从我的线程池进行写操作.

To write to the disk I am using a StreamWriter and a FileStream.
Because the write frequency is almost constant ( as I said 10,000 write per second) I decided to keep the streams open for the entire lifetime of the application pool and periodically write logs to the disk. I rely on the App Pool recycle and my DI framework to dispose my logger daily. Also note that this class will be singleton, because I didn't want to have more than one thread dedicated to writing from my thread pool.

显然,FileStream对象只有在被处置后才会写入磁盘.现在,我不希望FileStream等待一整天,直到它写入磁盘.容纳所有序列化对象所需的内存将是巨大的,更不用说应用程序或服务器上的任何崩溃都将导致数据丢失或文件损坏.

Apparently the FileStream object will not write to the disk until it is disposed. Now I don't want the FileStream to wait for an entire day until it writes to the disk. The memory it will require to hold all that serialized object will be tremendous, not to mention that any crash on the application or the server will cause data loss or corrupted file.

现在我的问题:

如何使基础流(FileStream和StreamWriter)定期写入磁盘而不进行处理?我最初的假设是,一旦FileSteam超过其缓冲区大小(默认情况下为4K),它将写入磁盘.

更新:答案中提到的不一致之处已得到解决.

UPDATE: The inconsistencies mentioned in the answer have been fixed.

代码:

public class EventLogger: IDisposable, ILogger
{
    private readonly BlockingCollection<List<string>> _queue;
    private readonly Task _consumerTask;
    private FileStream _fs;
    private StreamWriter _sw;
    public EventLogger()
    {            
        OpenFile();
        _queue = new BlockingCollection<List<string>>(50);
        _consumerTask = Task.Factory.StartNew(Write, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }
    private void OpenFile()
    {
        _fs?.Dispose();
        _sw?.Dispose();            
        _logFilePath = $"D:\Log\log{DateTime.Now.ToString(yyyyMMdd)}{System.Diagnostic.Process.GetCurrentProcess().Id}.txt";
        _fs = new FileStream(_logFilePath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
        _sw = new StreamWriter(_fs);
    }
    public void Dispose()
    {            
         _queue?.CompleteAdding();
         _consumerTask?.Wait();            
         _sw?.Dispose();
         _fs?.Dispose();
         _queue?.Dispose();            

    }
    public void Log(List<string> list)
    {
        try
        {               
            _queue.TryAdd(list, 100);               

        }
        catch (Exception e)
        {
            LogError(LogLevel.Error, e);
        }
    }
    private void Write()
    {
        foreach (List<string> items in _queue.GetConsumingEnumerable())
        {               
            items.ForEach(item =>
            {                    
                _sw?.WriteLine(item);                    
            });
        }

    }
}

推荐答案

您的问题有一些不一致之处".

There are a few "inconsistencies" with your question.

该应用程序托管在IIS上,并使用18个工作进程

The application is hosted on IIS and uses 18 worker processes

.

_logFilePath = $"D:\ Log \ log {DateTime.Now.ToString(yyyyMMdd)} {System.Diagnostic.Process.GetCurrentProcess().Id} .txt";

_logFilePath = $"D:\Log\log{DateTime.Now.ToString(yyyyMMdd)}{System.Diagnostic.Process.GetCurrentProcess().Id}.txt";

.

通过多种方法将序列化的对象写入文件

writes serialized objects to a file from multiple methods

将所有这些放在一起,您似乎只有一个线程的情况,而不是多线程的情况.而且,由于每个进程都有单独的日志,因此不存在争用问题或同步需求.我的意思是,我根本不明白为什么需要 BlockingCollection .您可能忘记提到您的Web进程中有多个线程.我将在这里进行假设.

Putting all of this together, you seem to have a single threaded situation as opposed to a multi-threaded one. And since there is a separate log per process, there is no contention problem or need for synchronization. What I mean to say is, I don't see why the BlockingCollection is needed at all. It's possible that you forgot to mention that there are multiple threads within your web process. I will make that assumption here.

另一个问题是您的代码无法编译

Another problems is that your code does not compile

  1. 类名是 Logger ,但是 EventLogger 函数看起来像构造函数.
  2. 一些更不正确的字符串等语法
  1. class name is Logger but the EventLogger function looks like a constructor.
  2. some more incorrect syntax with string, etc

抛开所有这些,如果您确实有争用的情况,并且想要通过多个线程或进程写入同一日志,则您的类似乎满足了您的大部分需求.我已经修改了您的班级以做更多的事情.首先要注意的是以下项目

Putting all that aside, if you really have a contention situation and want to write to the same log via multiple threads or processes, your class seems to have most of what you need. I have modified your class to do some more things. Chief to note are the below items

  1. 修复了所有语法错误的假设
  2. 添加了一个计时器,该计时器将定期调用刷新.这将需要一个 lock 对象,以便不中断写入操作
  3. StreamWriter 构造函数中使用了显式的缓冲区大小.您应该试探性地确定哪种尺寸最适合您.另外,您应该从 StreamWriter 中禁用 AutoFlush ,以便可以将写入的内容写入缓冲区而不是文件中,以提供更好的性能.
  1. Fixed all the syntax errors making assumptions
  2. Added a timer, which will call the flush periodically. This will need a lock object so as to not interrupt the write operation
  3. Used an explicit buffer size in the StreamWriter constructor. You should heuristically determine what size works best for you. Also, you should disable AutoFlush from StreamWriter so you can have your writes hit the buffer instead of the file, providing better performance.

下面是带有更改的代码

public class EventLogger : IDisposable, ILogger {
    private readonly BlockingCollection<List<string>> _queue;
    private readonly Task _consumerTask;
    private FileStream _fs;
    private StreamWriter _sw;
    private System.Timers.Timer _timer;
    private object streamLock = new object();

    private const int MAX_BUFFER = 16 * 1024;      // 16K
    private const int FLUSH_INTERVAL = 10 * 1000;  // 10 seconds

    public  EventLogger() {
        OpenFile();
        _queue = new BlockingCollection<List<string>>(50);
        _consumerTask = Task.Factory.StartNew(Write, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);

    }

    void SetupFlushTimer() {
        _timer = new System.Timers.Timer(FLUSH_INTERVAL);
        _timer.AutoReset = true;
        _timer.Elapsed += TimedFlush;
    }

    void TimedFlush(Object source, System.Timers.ElapsedEventArgs e) {
        _sw?.Flush();
    }

    private void OpenFile() {
        _fs?.Dispose();
        _sw?.Dispose();
        var _logFilePath = $"D:\\Log\\log{DateTime.Now.ToString("yyyyMMdd")}{System.Diagnostics.Process.GetCurrentProcess().Id}.txt";
        _fs = new FileStream(_logFilePath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
        _sw = new StreamWriter(_fs, Encoding.Default, MAX_BUFFER); // TODO: use the correct encoding here
        _sw.AutoFlush = false;
    }

    public void Dispose() {
        _timer.Elapsed -= TimedFlush;
        _timer.Dispose();

        _queue?.CompleteAdding();
        _consumerTask?.Wait();
        _sw?.Dispose();
        _fs?.Dispose();
        _queue?.Dispose();

    }
    public void Log(List<string> list) {
        try {
            _queue.TryAdd(list, 100);

        } catch (Exception e) {
            LogError(LogLevel.Error, e);
        }
    }

    private void Write() {
        foreach (List<string> items in _queue.GetConsumingEnumerable()) {
            lock (streamLock) {
                items.ForEach(item => {
                    _sw?.WriteLine(item);
                });
            }
        }

    }
}


有4个因素控制着这种机制的性能,了解它们之间的关系很重要.下面的示例有望使它变得清晰


There are 4 factors controlling the performance of this mechanism, and it is important to understand their relationship. Below example will hopefully make it clear

  • List< string> 的平均大小为50字节
  • 呼叫/秒为10,000
  • MAX_BUFFER 是1024 * 1024字节(1 Meg)
  • average size of List<string> is 50 Bytes
  • Calls/sec is 10,000
  • MAX_BUFFER is 1024 * 1024 Bytes (1 Meg)

您正在每秒产生500,000字节的数据,因此1 Meg缓冲区只能容纳2秒的数据.即,即使 FLUSH_INTERVAL 设置为10秒,当缓冲区空间用完时,缓冲区也将每2秒(平均)自动刷新一次.

You are producing 500,000 Bytes of data per second, so a 1 Meg buffer can hold only 2 seconds worth of data. i.e. Even if FLUSH_INTERVAL is set to 10 seconds the buffer will AutoFlush every 2 seconds (on an average) when it runs out of buffer space.

还请记住,盲目地增加 MAX_BUFFER 并不会有所帮助,因为由于较大的缓冲区大小,实际的刷新操作将花费更长的时间.

Also remember that increasing the MAX_BUFFER blindly will not help, since the actual flush operation will take longer due to the bigger buffer size.

主要要了解的是,当传入数据速率(对您的 EventLog 类而言)和传出数据速率(对磁盘而言)存在差异时,您将需要一个无限大小的缓冲区(假设该进程持续运行),否则您将不得不减慢平均速度.匹配平均传入率.外发率

The main thing to understand is that when there is a difference in incoming data rates (to your EventLog class) and outgoing data rates (to the disk), you will either need an infinite sized buffer (assuming continuously running process) or you will have to slow down your avg. incoming rate to match avg. outgoing rate

这篇关于如何定期将c#FileStream刷新到磁盘?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆