C ++低延迟螺纹异步缓冲流(用于记录) - 升压 [英] C++ Low-Latency Threaded Asynchronous Buffered Stream (intended for logging) – Boost

查看:150
本文介绍了C ++低延迟螺纹异步缓冲流(用于记录) - 升压的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

3,而下面的循环包含code已被注释掉。我搜索,便于识别(TAG1,TAG2和TAG3)。我只是希望,而循环等待上测试,同时尽量减少CPU资源尽可能继续之前要成为真正的条件。我第一次使用Boost条件变量试过了,但有一个竞争条件。把线程睡眠的x微秒是低效的,因为没有办法,以precisely时间唤醒。最后,提振:: this_thread ::收益率()似乎并没有做任何事情。大概是因为我只有一个双核系统上2个活动线程。具体来说,我如何才能让下面三个标签领域的运行效率,同时也引入尽可能少的不必要阻塞越好。

3 while loops below contain code that has been commented out. I search for ("TAG1", "TAG2", and "TAG3") for easy identification. I simply want the while loops to wait on the condition tested to become true before proceeding while minimizing CPU resources as much as possible. I first tried using Boost condition variables, but there's a race condition. Putting the thread to sleep for 'x' microseconds is inefficient because there is no way to precisely time the wakeup. Finally, boost::this_thread::yield() does not seem to do anything. Probably because I only have 2 active threads on a dual-core system. Specifically, how can I make the three tagged areas below run more efficiently while introducing as little unnecessary blocking as possible.

背景

目标:

我有记录大量数据的应用程序。分析之后,我发现,很多时间是在伐木作业(日志文本或二进制到本地硬盘上的文件)所消耗。我的目标是减少的等待​​时间上logData通过调用螺纹缓冲流记录器代替非螺纹直接写入调用调用

I have an application that logs a lot of data. After profiling, I found that much time is consumed on the logging operations (logging text or binary to a file on the local hard disk). My objective is to reduce the latency on logData calls by replacing non-threaded direct write calls with calls to a threaded buffered stream logger.

选项探讨的:


  • 升级2005时代的慢硬盘SSD ...可能。成本是不是太高......但是涉及到大量的工作...... 200多台电脑都必须升级...

  • 升压ASIO ......我不需要所有的摄/网络开销,寻找一些更简单,更轻质。

设计:


  • 生产者和消费者线程模式,应用程序将数据写入到一个缓冲区,然后在后台线程它晚些时候写入磁盘。所以,最终的目标是让应用层回叫尽可能快的的WriteMessage功能,而数据是正确/完全记录到一个FIFO顺序晚些时候的日志文件。

  • 只有一个应用程序线程,只有一个作家线程。

  • 基于环形缓冲区。这样做的原因决定是使用尽可能少的锁越好,最好...并请纠正我,如果我错了......我不认为我需要的。

  • 缓冲区是静态分配的字符数组,但如果需要的话/所需性能方面的原因可以把它移动到堆中。

  • 缓冲器具有指向应写入到文件中的下一个字符的起始指针。缓冲器具有指向数组索引之后的最后一个字符被写入到该文件的结束指针。最终的指针永远不会传递开始指针。如果消息来自于比所述缓冲器时,则写入等待,直到缓冲器被清空和的情况下直接把过大的消息中的缓冲液(一旦缓冲器被清空,工作线程写入新的信息到文件不会写任何东西所以没有争)。

  • 作家(工作线程)仅更新环形缓冲区的开始指针。

  • 主(应用程序线程)仅更新环形缓冲区的结束指针,并再次,它只是插入新数据时,有可用空间缓冲......否则空间无论是在缓存中等待变得可用或写入直接如上所述。

  • 工作线程连续检查,看是否有要被写入的数据(由壳体指示当缓冲器起始指针!=缓冲器结束指针)。如果不存在要写入的数据,工作线程应理想入睡和醒一旦应用程序线程已插入的东西到缓冲区(和改变了缓冲器的结束指针,使得它不再指向相同的索引的开始指针)。我下面的while循环不断检查条件涉及。这是等待缓冲一个非常坏/低效的方式。

结果:


  • 在2009年我的时代双核笔记本电脑的固态硬盘,我看到线程/缓冲基准与直接写的总写入时间大约是1:6(0.609秒与0.095秒),但充满变数。通常情况下,写缓冲基准实际上比直接写慢。我相信,变异性是由于差的执行等待空间在缓冲器腾出,等待缓冲器空,并具有工作线程等待工作变得可用。我有分寸,一些while循环的消耗超过10000次,我怀疑那些周期实际上是另一个线程(工人或应用程序)需要完成的计算被等待对硬件资源的竞争。

  • 输出似乎退房。启用测试模式和10小缓冲区大小为压力测试,我显示差异数百条输出的MB的,发现它等于输入。

编译与升压的当前版本(1.55)

标题

    #ifndef BufferedLogStream_h
    #define BufferedLogStream_h

    #include <stdio.h>
    #include <iostream>
    #include <iostream>
    #include <cstdlib>
    #include "boost\chrono\chrono.hpp"
    #include "boost\thread\thread.hpp"
    #include "boost\thread\locks.hpp"
    #include "boost\thread\mutex.hpp"
    #include "boost\thread\condition_variable.hpp"
    #include <time.h>

    using namespace std;

    #define BENCHMARK_STR_SIZE 128
    #define NUM_BENCHMARK_WRITES 524288
    #define TEST 0
    #define BENCHMARK 1
    #define WORKER_LOOP_WAIT_MICROSEC 20
    #define MAIN_LOOP_WAIT_MICROSEC 10

    #if(TEST)
    #define BUFFER_SIZE 10 
    #else 
    #define BUFFER_SIZE 33554432 //4 MB
    #endif

    class BufferedLogStream {
        public:
            BufferedLogStream();
            void openFile(char* filename);
            void flush();
            void close();
            inline void writeMessage(const char* message, unsigned int length);
            void writeMessage(string message);
            bool operator() () { return start != end; }

        private:
            void threadedWriter();
            inline bool hasSomethingToWrite();
            inline unsigned int getFreeSpaceInBuffer();
            void appendStringToBuffer(const char* message, unsigned int length);

            FILE* fp;
            char* start;
            char* end;
            char* endofringbuffer;
            char ringbuffer[BUFFER_SIZE];
            bool workerthreadkeepalive;
            boost::mutex mtx;
            boost::condition_variable waitforempty;
            boost::mutex workmtx;
            boost::condition_variable waitforwork;

            #if(TEST)
            struct testbuffer {
                int length;
                char message[BUFFER_SIZE * 2];
            };

            public:
                void test();

            private:
                void getNextRandomTest(testbuffer &tb);
                FILE* datatowrite;
            #endif

        #if(BENCHMARK)
            public:
                void runBenchmark();

            private:
                void initBenchmarkString();
                void runDirectWriteBaseline();
                void runBufferedWriteBenchmark();

                char benchmarkstr[BENCHMARK_STR_SIZE];
        #endif
    };

    #if(TEST)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->openFile("replicated.txt");
        bl->test();
        bl->close();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif

    #if(BENCHMARK)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->runBenchmark();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif //for benchmark

    #endif

实施

    #include "BufferedLogStream.h"

    BufferedLogStream::BufferedLogStream() {
        fp = NULL;
        start = ringbuffer;
        end = ringbuffer;
        endofringbuffer = ringbuffer + BUFFER_SIZE;
        workerthreadkeepalive = true;
    }

    void BufferedLogStream::openFile(char* filename) {
        if(fp) close();
        workerthreadkeepalive = true;
        boost::thread t2(&BufferedLogStream::threadedWriter, this);
        fp = fopen(filename, "w+b");
    }

    void BufferedLogStream::flush() {
        fflush(fp); 
    }

    void BufferedLogStream::close() {
        workerthreadkeepalive = false;
        if(!fp) return;
        while(hasSomethingToWrite()) {
            boost::unique_lock<boost::mutex> u(mtx);
            waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
        }
        flush();        
        fclose(fp);             
        fp = NULL;          
    }

    void BufferedLogStream::threadedWriter() {
        while(true) {
            if(start != end) {
                char* currentend = end;
                if(start < currentend) {
                    fwrite(start, 1, currentend - start, fp);
                }
                else if(start > currentend) {
                    if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp); 
                    fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
                }
                start = currentend;
                waitforempty.notify_one();
            }
            else { //start == end...no work to do
                if(!workerthreadkeepalive) return;
                boost::unique_lock<boost::mutex> u(workmtx);
                waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
            }
        }
    }

    bool BufferedLogStream::hasSomethingToWrite() {
        return start != end;
    }

    void BufferedLogStream::writeMessage(string message) {
        writeMessage(message.c_str(), message.length());
    }

    unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
        if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
        if(end == start) return BUFFER_SIZE-1;
        return start - end - 1; //case where start > end
    }

    void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
        if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
            memcpy(end, message, length);
            end += length;
        }
        else {
            int lengthtoendofbuffer = endofringbuffer - end;
            if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
            int remainderlength =  length - lengthtoendofbuffer;
            memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
            end = ringbuffer + remainderlength;
        }
    }

    void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
        if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
            while(hasSomethingToWrite()); {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            fwrite(message, 1, length, fp);
        }
        else {
            //wait until there is enough free space to insert new string
            while(getFreeSpaceInBuffer() < length) {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            appendStringToBuffer(message, length);
        }
        waitforwork.notify_one();
    }

    #if(TEST)
        void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
            tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
            for(int i = 0; i < tb.length; i++) {
                tb.message[i] = rand() % 26 + 65;
            }
            tb.message[tb.length] = '\n';
            tb.length++;
            tb.message[tb.length] = '\0';
        }

        void BufferedLogStream::test() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            testbuffer tb;
            datatowrite = fopen("orig.txt", "w+b");
            for(unsigned int i = 0; i < 7000000; i++) {
                if(i % 1000000 == 0) cout << i << endl;
                getNextRandomTest(tb);
                writeMessage(tb.message, tb.length);
                fwrite(tb.message, 1, tb.length, datatowrite);
            }       
            fflush(datatowrite);
            fclose(datatowrite);
        }
    #endif

    #if(BENCHMARK) 
        void BufferedLogStream::initBenchmarkString() {
            for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
                benchmarkstr[i] = rand() % 26 + 65;
            }
            benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
        }

        void BufferedLogStream::runDirectWriteBaseline() {
            clock_t starttime = clock();
            fp = fopen("BenchMarkBaseline.txt", "w+b");
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
            }   
            fflush(fp);
            fclose(fp);
            clock_t elapsedtime = clock() - starttime;
            cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBufferedWriteBenchmark() {
            clock_t starttime = clock();
            openFile("BufferedBenchmark.txt");
            cout << "Opend file" << endl;
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
            }   
            cout << "Wrote" << endl;
            close();
            cout << "Close" << endl;
            clock_t elapsedtime = clock() - starttime;
            cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBenchmark() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            initBenchmarkString();
            runDirectWriteBaseline();
            runBufferedWriteBenchmark();
        }
    #endif


更新:2013年11月25日

我更新使用下面的boost :: condition_variables,特别是wait_for()方法,code所推荐的叶夫根尼·Panasyuk。这就避免了一遍又一遍检查不必要在相同的条件。我目前看到的缓冲版本运行在大约1/6的时间作为缓冲/直写的版本。这不是理想的情况下,因为这两种情况下是由硬盘的限制(在我的情况下,2010年代SSD)。我打算使用code以下的环境下硬盘不会成为瓶颈,大多数如果不是所有的时间,缓冲区应该有可用空间来容纳的WriteMessage请求。这使我想到我的下一个问题。我应该怎样做很大的缓冲区?我不介意分配32 MB的或64 MB,以确保它永远不会填满。在code将能够腾出了系统的运行。凭直觉,我觉得这是一个坏主意静态分配32 MB的字符数组。是吗?总之,我希望当我运行低于预期我的应用程序中code,logData()的调用延迟将大大降低,这将产生整体的处理时间减少显著

I updated the code below use boost::condition_variables, specifically the wait_for() method as recommended by Evgeny Panasyuk. This avoids unnecessarily checking the same condition over and over again. I am currently seeing the buffered version run in about 1/6th the time as the unbuffered / direct-write version. This is not the ideal case because both cases are limited by the hard disk (in my case a 2010 era SSD). I plan to use the code below in an environment where the hard disk will not be the bottleneck and most if not all the time, the buffer should have space available to accommodate the writeMessage requests. That brings me to my next question. How big should I make the buffer? I don't mind allocating 32 MBs or 64 MB to ensure that it never fills up. The code will be running on systems that can spare that. Intuitively, I feel that it's a bad idea to statically allocate a 32 MB character array. Is it? Anyhow, I expect that when I run the code below for my intended application, the latency of logData() calls will be greatly reduced which will yield a significant reduction in overall processing time.

如果有人看到任何方式,使下面的更好(更快,更稳健,更精简,等等)code,请让我知道。我AP preciate反馈。拉津,你的方法怎么会比我下面贴更快或更有效?我还挺喜欢只是有一个缓冲区,使之足够大,它几乎从来没有填满的想法。然后,我就不必担心来自不同的缓冲区读取。叶夫根尼·Panasyuk,我喜欢使用现有的code只要有可能,尤其是如果它是一个现有的boost库的方法。不过,我也没有看到spcs_queue怎么比我下面的更有效。我宁愿对付一个大的缓冲区比许多较小的和担心分裂输入我的拆分输入流和拼接回来一起输出。你的方法可以让我卸载从主线程格式化到工作线程。这是一个切肉刀的方法。但我现在还无法确定它是否会节省大量的时间,实现充分受益,我将不得不修改code,我没有自己的。

If anyone sees any way to make the code below better (faster, more robust, leaner, etc), please let me know. I appreciate the feedback. Lazin, how would your approach be faster or more efficient than what I have posted below? I kinda like the idea of just having one buffer and making it large enough so that it practically never fills up. Then I don't have to worry about reading from different buffers. Evgeny Panasyuk, I like the approach of using existing code whenever possible, especially if it's an existing boost library. However, I also don't see how the spcs_queue is more efficient than what I have below. I'd rather deal with one large buffer than many smaller ones and have to worry about splitting splitting my input stream on the input and splicing it back together on the output. Your approach would allow me to offload the formatting from the main thread onto the worker thread. That is a cleaver approach. But I'm not sure yet whether it will save a lot of time and to realize the full benefit, I would have to modify code that I do not own.

//结束更新

推荐答案

常规的解决方案。

我想你一定看 Naggle算法。对于一个生产者和一个消费者,这将是这样的:

I think you must look at the Naggle algorithm. For one producer and one consumer this would look like this:


  • 在开始的缓冲区是空的,工作线程处于空闲状态,等待事件。

  • 生产者将数据写入到缓冲器并通知工作线程。

  • 工作线程醒来,并开始写操作。

  • 制片人尝试写入另一条消息,但缓冲区所使用的工人,所以制片人分配另一个缓冲区,并写入信息。

  • 制片人尝试写入另一条消息,I / O仍在进行中,因此生产者写入消息previously分配的缓冲区。

  • 工作线程写完缓冲文件,并看到有数据另一个缓冲区,因此抓住它,并开始写。

  • 的第一个缓冲区用于制作写所有连续的消息,直到进步二等写操作。

此模式将有助于实现低时延的要求,单一的信息将被写入到光盘瞬间,但大量的活动将通过大批量的greather吞吐量编写。

This schema will help achieve low latency requirement, single message will be written to disc instantaneously, but large amount of events will be written by large batches for greather throughput.

如果你的日志消息有水平 - 可以改善这个模式一点点。所有的错误信息具有较高的优先级(级别),并必须立即保存在光盘上(因为它们是罕见的,但非常有价值的),但调试和跟踪消息具有低优先级,可以缓冲,以节省带宽(因为它们是非常频繁的,但不是有价值错误和信息消息)。所以,当你写错误消息,则必须等到工作线程完成编写信息(以及在相同的缓冲区中的所有消息),然后继续,但调试和跟踪消息可以只写入缓冲。

If your log messages have levels - you can improve this schema a little bit. All error messages have high priority(level) and must be saved on disc immediately (because they are rare but very valuable) but debug and trace messages have low priority and can be buffered to save bandwidth (because they are very frequent but not as valuable as error and info messages). So when you write error message, you must wait until worker thread is done writing your message (and all messages that are in the same buffer) and then continue, but debug and trace messages can be just written to buffer.

线程。

每个应用程序线程产卵工作线程是昂贵的。您必须使用单一的作​​家线程每个日志文件。写缓冲器必须线程之间共享。每个缓冲区必须有两个指针 - commit_pointer prepare_pointer 。缓冲区的开头和 commit_pointer 可用于辅助线程之间的所有缓冲空间。 commit_pointer 之间的缓冲空间和 prepare_pointer 当前由应用程序线程更新。不变: commit_pointer &LT; = prepare_pointer

Spawning worker thread for each application thread is to costly. You must use single writer thread for each log file. Write buffers must be shared between threads. Each buffer must have two pointers - commit_pointer and prepare_pointer. All buffer space between beginning of the buffer and commit_pointer are available for worker thread. Buffer space between commit_pointer and prepare_pointer are currently updated by application threads. Invariant: commit_pointer <= prepare_pointer.

写操作可以在两个步骤中进行。

Write operations can be performed in two steps.


  1. prepare写。这种操作储备在缓冲空间。

    • 制片计算LEN(消息)和原子更新 prepare_pointer ;

    • prepare_pointer 值和LEN是消费者保存;

  1. Prepare write. This operation reserves space in a buffer.
    • Producer calculates len(message) and atomically updates prepare_pointer;
    • Old prepare_pointer value and len is saved by consumer;

  • 制片人预留缓冲空间的开头(老prepare_pointer值)写入信息。

  • 制片人忙等待,直到 commit_pointer 等于旧 prepare_pointer 价值,其保存在本地变量

  • 制片人做承诺写入操作 commit_pointer = commit_pointer + len个原子。

  • Producer writes message at the beginning of the reserved buffer space (old prepare_pointer value).
  • Producer busy-waits until commit_pointer is equal to old prepare_pointer value that its save in local variable.
  • Producer commit write operation by doing commit_pointer = commit_pointer + len atomically.

要prevent假共享,LEN(消息),可以四舍五入到高速缓存行的大小和所有额外空间可填充的空间。

To prevent false sharing, len(message) can be rounded to cache line size and all extra space can be filled with spaces.

// pseudocode
void write(const char* message) {
    int len = strlen(message);  // TODO: round to cache line size
    const char* old_prepare_ptr;
    // Prepare step
    while(1) 
    {
        old_prepare_ptr = prepare_ptr;
        if (
            CAS(&prepare_ptr, 
                 old_prepare_ptr, 
                 prepare_ptr + len) == old_prepare_ptr
            )
            break;
        // retry if another thread perform prepare op.
    }
    // Write message
    memcpy((void*)old_prepare_ptr, (void*)message, len);
    // Commit step
    while(1)
    {
        const char* old_commit_ptr = commit_ptr;
        if (
             CAS(&commit_ptr, 
                  old_commit_ptr, 
                  old_commit_ptr + len) == old_commit_ptr
            )
            break;
        // retry if another thread commits
    }
    notify_worker_thread();
}

这篇关于C ++低延迟螺纹异步缓冲流(用于记录) - 升压的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆