boost::asio 异步服务器设计 [英] boost::asio async server design

查看:37
本文介绍了boost::asio 异步服务器设计的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前我正在使用设计,当服务器读取流的前 4 个字节然后在标头解码后读取 N 个字节.

Currently I'm using design when server reads first 4 bytes of stream then read N bytes after header decoding.

但我发现第一次 async_read 和第二次读取之间的时间是 3-4 毫秒.我刚刚从回调中打印了控制台时间戳以进行测量.我总共发送了 10 个字节的数据.为什么要花这么多时间阅读?

But I found that time between first async_read and second read is 3-4 ms. I just printed in console timestamp from callbacks for measuring. I sent 10 bytes of data in total. Why it takes so much time to read?

我在调试模式下运行它,但我认为调试的 1 个连接是在从套接字读取之间没有 3 毫秒的延迟.也许我需要另一种在数据包"上切割 TCP 流的方法?

I running it in debug mode but I think that 1 connection for debug is not so much to have a 3 ms delay between reads from socket. Maybe I need another approach to cut TCP stream on "packets"?

更新:我在这里发布了一些代码

UPDATE: I post some code here

void parseHeader(const boost::system::error_code& error)
        {
            cout<<"[parseHeader] "<<lib::GET_SERVER_TIME()<<endl;
            if (error) {
                close();
                return;
            }
            GenTCPmsg::header result = msg.parseHeader();
            if (result.error == GenTCPmsg::parse_error::__NO_ERROR__) {
                msg.setDataLength(result.size);
                boost::asio::async_read(*socket, 
                    boost::asio::buffer(msg.data(), result.size),
                    (*_strand).wrap(
                    boost::bind(&ConnectionInterface::parsePacket, shared_from_this(), boost::asio::placeholders::error)));
            } else {
                close();
            }
        }
        void parsePacket(const boost::system::error_code& error)
        {
            cout<<"[parsePacket] "<<lib::GET_SERVER_TIME()<<endl;
            if (error) {
                close();
                return;
            }
            protocol->parsePacket(msg);
            msg.flush();
            boost::asio::async_read(*socket, 
                boost::asio::buffer(msg.data(), config::HEADER_SIZE),
                (*_strand).wrap(
                boost::bind(&ConnectionInterface::parseHeader, shared_from_this(), boost::asio::placeholders::error)));
        }

如您所见,unix 时间戳相差 3-4 毫秒.我想了解为什么 parseHeader 和 parsePacket 之间经过了这么多时间.这不是客户端问题,摘要数据是 10 个字节,但我不能发送更多,延迟正好在调用之间.我正在使用 Flash 客户端版本 11.我所做的只是通过打开的套接字发送 ByteArray.我不确定这会延迟客户端.我一次发送所有 10 个字节.我如何调试实际延迟在哪里?

As you see unix timestamps differ in 3-4 ms. I want to understand why so many time elapse between parseHeader and parsePacket. This is not a client problem, summary data is 10 bytes, but i cant sent much much more, delay is exactly between calls. I'm using flash client version 11. What i do is just send ByteArray through opened socket. I don't sure that delays on client. I send all 10 bytes at once. How can i debug where actual delay is?

推荐答案

有太多的未知数无法从发布的代码中确定延迟的根本原因.不过,可以采取一些方法和注意事项来帮助确定问题:

There are far too many unknowns to identify the root cause of the delay from the posted code. Nevertheless, there are a few approaches and considerations that can be taken to help to identify the problem:

  • 启用处理程序跟踪 适用于 Boost.Asio 1.47+.只需定义 BOOST_ASIO_ENABLE_HANDLER_TRACKING,Boost.Asio 就会将调试输出(包括时间戳)写入标准错误流.这些时间戳可用于帮助过滤由应用程序代码(parseHeader()parsePacket() 等)引入的延迟.
  • 验证字节排序是否得到正确处理.例如,如果协议将标头的 size 字段定义为网络字节顺序中的两个字节,并且服务器将该字段作为原始 short 处理,则在接收到正文大小为<代码>10:
    • 大端机器将调用 async_read 读取 10 个字节.读取操作应该很快完成,因为套接字已经具有可供读取的 10 字节主体.
    • 小端机器将调用 async_read 读取 2560 个字节.读取操作可能会保持未完成状态,因为尝试读取的字节数远远超过预期.
    • Enable handler tracking for Boost.Asio 1.47+. Simply define BOOST_ASIO_ENABLE_HANDLER_TRACKING and Boost.Asio will write debug output, including timestamps, to the standard error stream. These timestamps can be used to help filter out delays introduced by application code (parseHeader(), parsePacket(), etc.).
    • Verify that byte-ordering is being handled properly. For example, if the protocol defines the header's size field as two bytes in network-byte-order and the server is handling the field as a raw short, then upon receiving a message that has a body size of 10:
      • A big-endian machine will call async_read reading 10 bytes. The read operation should complete quickly as the socket already has the 10 byte body available for reading.
      • A little-endian machine will call async_read reading 2560 bytes. The read operation will likely remain outstanding, as far more bytes are trying to be read than is intended.

      这是我开始的一个简单示例:

      Here is a simple example from which I started:

      #include <iostream>
      
      #include <boost/array.hpp>
      #include <boost/asio.hpp>
      #include <boost/bind.hpp>
      #include <boost/date_time/posix_time/posix_time.hpp>
      #include <boost/enable_shared_from_this.hpp>
      #include <boost/make_shared.hpp>
      #include <boost/shared_ptr.hpp>
      
      class tcp_server
        : public boost::enable_shared_from_this< tcp_server >
      {
      private:
      
        enum 
        {
           header_size = 4,
           data_size   = 10,
           buffer_size = 1024,
           max_stamp   = 50
        };
      
        typedef boost::asio::ip::tcp tcp;
      
      public:
      
        typedef boost::array< boost::posix_time::ptime, max_stamp > time_stamps;
      
      public:
      
        tcp_server( boost::asio::io_service& service,
                    unsigned short port )
          : strand_( service ),
            acceptor_( service, tcp::endpoint( tcp::v4(), port ) ),
            socket_( service ),
            index_( 0 )
        {}
      
        /// @brief Returns collection of timestamps.
        time_stamps& stamps()
        {
          return stamps_;
        }
      
        /// @brief Start the server.
        void start()
        {
          acceptor_.async_accept( 
            socket_,
            boost::bind( &tcp_server::handle_accept, this,
                         boost::asio::placeholders::error ) );
        }
      
      private:
      
        /// @brief Accept connection.
        void handle_accept( const boost::system::error_code& error ) 
        {
          if ( error )
          {  
            std::cout << error.message() << std::endl;
            return;
          }
      
          read_header();
        }
      
        /// @brief Read header.
        void read_header()
        {
          boost::asio::async_read(
            socket_,
            boost::asio::buffer( buffer_, header_size ),
            boost::bind( &tcp_server::handle_read_header, this,
                         boost::asio::placeholders::error,
                         boost::asio::placeholders::bytes_transferred ) );
        }
      
        /// @brief Handle reading header.
        void
        handle_read_header( const boost::system::error_code& error,
                            std::size_t bytes_transferred )
        {
          if ( error )
          {  
            std::cout << error.message() << std::endl;
            return;
          }
      
          // If no more stamps can be recorded, then stop the async-chain so
          // that io_service::run can return.
          if ( !record_stamp() ) return;
      
          // Read data.
          boost::asio::async_read(
            socket_,
            boost::asio::buffer( buffer_, data_size ),
            boost::bind( &tcp_server::handle_read_data, this,
                         boost::asio::placeholders::error,
                         boost::asio::placeholders::bytes_transferred ) );
      
        }
      
        /// @brief Handle reading data.
        void handle_read_data( const boost::system::error_code& error,
                               std::size_t bytes_transferred )
        {
          if ( error )
          {  
            std::cout << error.message() << std::endl;
            return;
          }
      
          // If no more stamps can be recorded, then stop the async-chain so
          // that io_service::run can return.
          if ( !record_stamp() ) return;
      
          // Start reading header again.
          read_header();
        }
      
        /// @brief Record time stamp.
        bool record_stamp()
        {
          stamps_[ index_++ ] = boost::posix_time::microsec_clock::local_time();
      
          return index_ < max_stamp;
        }
      
      private:
        boost::asio::io_service::strand strand_;
        tcp::acceptor acceptor_;
        tcp::socket socket_;
        boost::array< char, buffer_size > buffer_;
        time_stamps stamps_;
        unsigned int index_;
      };
      
      
      int main()
      {
        boost::asio::io_service service;
      
        // Create and start the server.
        boost::shared_ptr< tcp_server > server =
          boost::make_shared< tcp_server >( boost::ref(service ), 33333 );  
        server->start();
      
        // Run.  This will exit once enough time stamps have been sampled.
        service.run();
      
        // Iterate through the stamps.
        tcp_server::time_stamps& stamps = server->stamps();
        typedef tcp_server::time_stamps::iterator stamp_iterator;
        using boost::posix_time::time_duration;
        for ( stamp_iterator iterator = stamps.begin() + 1,
                             end      = stamps.end();
              iterator != end;
              ++iterator )
        {
           // Obtain the delta between the current stamp and the previous.
           time_duration delta = *iterator - *(iterator - 1);
           std::cout << "Delta: " << delta.total_milliseconds() << " ms"
                     << std::endl;
        }
        // Calculate the total delta.
        time_duration delta = *stamps.rbegin() - *stamps.begin();
        std::cout <<    "Total" 
                  << "\n  Start: " << *stamps.begin()
                  << "\n  End:   " << *stamps.rbegin()
                  << "\n  Delta: " << delta.total_milliseconds() << " ms"
                  << std::endl;
      }
      

      关于实现的一些注意事项:

      A few notes about the implementation:

      • 只有一个线程(主)和一个异步链read_header->handle_read_header->handle_read_data.这应该可以最大限度地减少准备运行的处理程序等待可用线程的时间.
      • 为了专注于 boost::asio::async_read,通过以下方式将噪音降至最低:
        • 使用预先分配的缓冲区.
        • 不使用 shared_from_this()strand::wrap.
        • 记录时间戳,并执行收集后处理.
        • There is only one thread (main) and one asynchronous chain read_header->handle_read_header->handle_read_data. This should minimize the amount of time a ready-to-run handler spends waiting for an available thread.
        • To focus on boost::asio::async_read, noise is minimized by:
          • Using a pre-allocated buffer.
          • Not using shared_from_this() or strand::wrap.
          • Recording the timestamps, and perform processing post-collection.

          我使用 gcc 4.4.0 和 Boost 1.50 在 CentOS 5.4 上编译.为了驱动数据,我选择使用 netcat 发送 1000 字节:

          I compiled on CentOS 5.4 using gcc 4.4.0 and Boost 1.50. To drive the data, I opted to send 1000 bytes using netcat:

          $ ./a.out > output &
          [1] 18623
          $ echo "$(for i in {0..1000}; do echo -n "0"; done)" | nc 127.0.0.1 33333
          [1]+  Done                    ./a.out >output
          $ tail output
          Delta: 0 ms
          Delta: 0 ms
          Delta: 0 ms
          Delta: 0 ms
          Delta: 0 ms
          Delta: 0 ms
          Total
            Start: 2012-Sep-10 21:22:45.585780
            End:   2012-Sep-10 21:22:45.586716
            Delta: 0 ms

          没有延迟,我通过修改 boost::asio::async_read 调用扩展了示例,将 this 替换为 shared_from_this()> 并用 strand_.wrap() 包裹 ReadHandlers.我运行了更新的示例,仍然没有发现延迟.不幸的是,这是我根据问题中发布的代码所能得到的.

          Observing no delay, I expanded upon the example by modifying the boost::asio::async_read calls, replacing this with shared_from_this() and wrapping the ReadHandlerss with strand_.wrap(). I ran the updated example and still observed no delay. Unfortunately, that is as far as I could get based on the code posted in the question.

          考虑扩展示例,在每次迭代中添加来自真实实现的一部分.例如:

          Consider expanding upon the example, adding in a piece from the real implementation with each iteration. For example:

          • 首先使用 msg 变量的类型来控制缓冲区.
          • 接下来发送有效数据,并引入parseHeader()parsePacket函数.
          • 最后,介绍 lib::GET_SERVER_TIME() 打印.
          • Start with using the msg variable's type to control the buffer.
          • Next, send valid data, and introduce parseHeader() and parsePacket functions.
          • Finally, introduce the lib::GET_SERVER_TIME() print.

          如果示例代码尽可能接近真实代码,并且 boost::asio::async_read 没有观察到延迟,则 ReadHandler s在实际代码中可能已准备好运行,但它们正在等待同步(链)或资源(线程),从而导致延迟:

          If the example code is as close as possible to the real code, and no delay is being observed with boost::asio::async_read, then the ReadHandlers may be ready-to-run in the real code, but they are waiting on synchronization (the strand) or a resource (a thread), resulting in a delay:

          • 如果延迟是与链同步的结果,则考虑Robin的建议,阅读更大的块数据以潜在地减少每条消息所需的读取量.
          • 如果延迟是等待线程的结果,则考虑使用额外的线程调用 io_service::run().
          • If the delay is the result of synchronization with the strand, then consider Robin's suggestion by reading a larger block of data to potentially reduce the amount of reads required per-message.
          • If the delay is the result of waiting for a thread, then consider having an additional thread call io_service::run().

          这篇关于boost::asio 异步服务器设计的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆