提高:: ASIO异步服务器设计 [英] boost::asio async server design

查看:260
本文介绍了提高:: ASIO异步服务器设计的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前即时通讯使用的设计时,服务器将读取第一个4字节流,那么头解码后读N个字节。

但我foung第一async_read和第二读之间的时间为3-4毫秒。我只是印在控制台时间戳从回调进行测量。我送10个字节的数据总量。为什么需要这么多的时间来阅读?


  

我运行在调试模式下,但我认为,调试1连接
  与其说是为了有一个3毫秒的延迟,从插座中读取的。也许我需要
  另一种方法削减TCP流的数据包?


  
  

更新:我在这里发表一些code


 无效parseHeader(常量的boost ::系统::错误_ code&安培;错误)
        {
            COUT<<[parseHeader]<< LIB :: GET_SERVER_TIME()<< ENDL;
            如果(错误){
                关();
                返回;
            }
            GenTCPmsg ::包头结果= msg.parseHeader();
            如果(result.error == GenTCPmsg :: PARSE_ERROR :: __ NO_ERROR__){
                msg.setDataLength(result.size);
                提高:: ASIO :: async_read(*插槽,
                    提高:: ASIO ::缓​​冲区(msg.data(),result.size)
                    (* _strand).wrap(
                    提高::绑定(安培; ConnectionInterface :: parsePacket,shared_from_this(),提振:: ASIO ::占位符::错误)));
            }其他{
                关();
            }
        }
        无效parsePacket(常量的boost ::系统::错误_ code&安培;错误)
        {
            COUT<<[parsePacket]<< LIB :: GET_SERVER_TIME()<< ENDL;
            如果(错误){
                关();
                返回;
            }
            协议 - > parsePacket(MSG);
            msg.flush();
            提高:: ASIO :: async_read(*插槽,
                提高:: ASIO ::缓​​冲区(msg.data(),配置:: HEADER_SIZE)
                (* _strand).wrap(
                提高::绑定(安培; ConnectionInterface :: parseHeader,shared_from_this(),提振:: ASIO ::占位符::错误)));
        }

正如你所看到的unix时间戳在3-4毫秒有所不同。我想知道为什么这么多时间推移betweeen parseHeader和parsePacket。这不是一个客户端的问题,汇总数据是10个字节,但我不能送得多多,延迟也正是调用之间。我使用闪光灯客户端版本11.我做的是刚刚经历打开的套接字发送的ByteArray。我不相信delayes客户端。我立刻把所有10个字节。我该如何调试那些实际的延迟?


解决方案

有太多的未知因素,以确定从发布的code延迟的根本原因。然而,也有可以采取有助于找出问题的几个方法和注意事项:


  • 启用<一个href=\"http://www.boost.org/doc/libs/1_51_0/doc/html/boost_asio/overview/core/handler_tracking.html\">handler跟踪为Boost.Asio的1.47+。简单地定义 BOOST_ASIO_ENABLE_HANDLER_TRACKING 和Boost.Asio的会写调试输出,包括时间戳,标准错误流。这些时间戳可以用来帮助过滤掉应用code( parseHeader() parsePacket()等)。

  • 验证字节顺序正在妥善处理。例如,如果该协议报头的大小字段定义为在网络字节顺序的两个字节和服务器处理字段作为原料短,则在接收的消息拥有的车身尺寸 10

    • 大端机将调用 async_read 10 字节。读操作应尽快完成,因为插槽已经有 10 字节可以读取的身体。

    • 一个小端设备将呼叫 async_read 2560 字节。读操作可能会保持未决,就多个字节试图比旨在被读取。


  • 使用跟踪工具,如 strace的,的 ltrace 等。

  • 修改Boost.Asio的,整个调用堆栈添加时间戳。 Boost.Asio的附带作为头文件只能库。因此,用户可以根据需要进行修改,以提供尽可能多的详细程度。虽然不是最干净最容易或办法,将在整个调用堆栈时间戳打印语句可以帮助提供洞察时机。

  • 尝试复制在很短的,简单的,自身的行为包含的例子。最简单的例子开始,以确定该延迟是systamtic。然后,反复于示例扩展,以便它变得更接近现实code。与每个迭代。


下面是从我开始一个简单的例子:

 的#include&LT;&iostream的GT;#包括LT&;升压/ array.hpp&GT;
#包括LT&;升压/ asio.hpp&GT;
#包括LT&;升压/ bind.hpp&GT;
#包括LT&;升压/ DATE_TIME /了posix_time / posix_time.hpp&GT;
#包括LT&;升压/ enable_shared_from_this.hpp&GT;
#包括LT&;升压/ make_shared.hpp&GT;
#包括LT&;升压/ shared_ptr.hpp&GT;类tcp_server
  :公众的boost :: enable_shared_from_this&LT; tcp_server&GT;
{
私人的:  枚举
  {
     header_size = 4,
     DATA_SIZE = 10,
     BUFFER_SIZE = 1024,
     max_stamp = 50
  };  TYPEDEF提高:: ASIO ::知识产权:: TCP TCP;上市:  TYPEDEF提高::数组&LT;提高::了posix_time ::分组时间,max_stamp&GT; time_stamps;上市:  tcp_server(提高:: ASIO :: io_service对象和放大器;服务,
              无符号短口)
    :strand_(服务)
      acceptor_(服务,TCP ::端点(TCP :: V4(),口))
      socket_(服务),
      INDEX_(0)
  {}  /// @brief返回时间戳的集合。
  time_stamps&安培;邮票()
  {
    返回stamps_;
  }  /// @brief启动服务器。
  无效的start()
  {
    acceptor_.async_accept(
      插座_,
      提高::绑定(安培; tcp_server :: handle_accept,为此,
                   提高:: ASIO ::占位符::错误));
  }私人的:  /// @brief接受连接。
  无效handle_accept(常量的boost ::系统::错误_ code&安培;错误)
  {
    如果(错误)
    {
      性病::法院LT&;&LT;返回Error.message()&所述;&下;的std :: ENDL;
      返回;
    }    read_header();
  }  /// @brief读头。
  无效read_header()
  {
    提高:: ASIO :: async_read(
      插座_,
      提高:: ASIO ::缓​​冲区(buffer_,header_size)
      提高::绑定(安培; tcp_server :: handle_read_header,为此,
                   提高:: ASIO ::占位符::错误
                   提高:: ASIO ::占位符:: bytes_transferred));
  }  /// @brief处理读头。
  空虚
  handle_read_header(常量的boost ::系统::错误_ code和;错误,
                      的std ::为size_t bytes_transferred)
  {
    如果(错误)
    {
      性病::法院LT&;&LT;返回Error.message()&所述;&下;的std :: ENDL;
      返回;
    }    //如果可以记录更多的邮票,然后停止异步链等等
    //该io_service对象::运行可以返回。
    如果回报(record_stamp()!);    //读取数据。
    提高:: ASIO :: async_read(
      插座_,
      提高:: ASIO ::缓​​冲区(buffer_,DATA_SIZE)
      提高::绑定(安培; tcp_server :: handle_read_data,为此,
                   提高:: ASIO ::占位符::错误
                   提高:: ASIO ::占位符:: bytes_transferred));  }  /// @brief处理读取数据。
  无效handle_read_data(常量的boost ::系统::错误_ code和;错误,
                         的std ::为size_t bytes_transferred)
  {
    如果(错误)
    {
      性病::法院LT&;&LT;返回Error.message()&所述;&下;的std :: ENDL;
      返回;
    }    //如果可以记录更多的邮票,然后停止异步链等等
    //该io_service对象::运行可以返回。
    如果回报(record_stamp()!);    //开始再次读取头。
    read_header();
  }  /// @brief记录时间戳。
  布尔record_stamp()
  {
    stamps_ [指数_ ++] =的boost ::了posix_time :: microsec_clock :: LOCAL_TIME();    返回INDEX_&LT; max_stamp;
  }私人的:
  提高:: ASIO :: io_service对象::链strand_;
  TCP ::受acceptor_;
  TCP ::插座socket_;
  提高::数组&LT;焦炭,BUFFER_SIZE&GT;缓冲_;
  time_stamps stamps_;
  无符号整型INDEX_;
};
诠释的main()
{
  提高:: ASIO :: io_service对象服务;  //创建并启动服务器。
  提高:: shared_ptr的&LT; tcp_server&GT;服务器=
    提高:: make_shared&LT; tcp_server&GT;(升压:: REF(服务),33333);
  服务器 - &GT;启动();  // 跑。这将退出,一旦有足够的时间戳记已取样。
  service.run();  //通过迭代邮票。
  tcp_server :: time_stamps&安培;邮票=服务器 - &GT;邮票();
  的typedef tcp_server :: time_stamps:迭代stamp_iterator;
  使用boost ::了posix_time :: TIME_DURATION;
  为(stamp_iterator迭代= stamps.begin()+ 1,
                       结束= stamps.end();
        迭代器=结束!;
        ++迭代器)
  {
     //获取当前的邮票和previous之间的增量。
     TIME_DURATION三角洲= *迭代器 - *(迭代器 - 1);
     性病::法院LT&;&LT; 三角洲:&LT;&LT; delta.total_milliseconds()&所述;&下; 女士
               &LT;&LT;的std :: ENDL;
  }
  //计算总的增量。
  TIME_DURATION三角洲= * stamps.rbegin() - * stamps.begin();
  性病::法院LT&;&LT; 总
            &LT;&LT; \\ n启动:&LT;&LT; * stamps.begin()
            &LT;&LT; \\ n结束:&LT;&LT; * stamps.rbegin()
            &LT;&LT; \\ n三角洲:&LT;&LT; delta.total_milliseconds()&所述;&下; 女士
            &LT;&LT;的std :: ENDL;
}

有关实施的几个注意事项:


  • 只有一个线程(主)和一个异步链 read_header-> handle_read_header-> handle_read_data 。这应尽量减少时间随时可以运行处理程序花费等待可用线程的数量。

  • 要注重的boost ::支持ASIO :: async_read ,噪声被最小化:

    • 使用pre-分配的缓冲区。

    • 不使用 shared_from_this()链::包装

    • 录制时间戳和进行处理,收集后。


我用gcc 4.4.0和Boost 1.50在CentOS 5.4编译。驱动数据,我选择用发送1000字节 netcat的

 $ ./a.out>输出&
[1] 18623
$回声$(因为我在{0 .. 1000};也呼应-n0;完成)| NC 33333 127.0.0.1
[1] +做./a.out>输出
$尾输出
三角洲:0毫秒
三角洲:0毫秒
三角洲:0毫秒
三角洲:0毫秒
三角洲:0毫秒
三角洲:0毫秒

  开始时间:2012年09月10 21:22:45.585780
  结束时间:2012年09月10 21:22:45.586716
  三角洲:0毫秒

观察无延迟,我通过修改的boost ::支持ASIO :: async_read 通话,免去扩大后的例子这个 shared_from_this()和包装的 ReadHandlers s的 strand_.wrap()。我跑了更新的例子,仍在观察没有延迟。不幸的是,只要我能得到基础上张贴的问题code。

在考虑的例子扩大,从每次迭代真正落实了一块加入。例如:


  • 启动与使用信息变量的类型来控制缓冲。

  • 接下来,发送有效数据,并引入 parseHeader() parsePacket 功能。

  • 最后,介绍一下 LIB :: GET_SERVER_TIME()打印。


如果这个例子code是尽可能接近到现实code,并正在与观察到没有延迟的boost ::支持ASIO :: async_read ,那么 ReadHandler 之间可以随时可以运行在真实code,但他们正在等待同步(铸坯)或资源(线程) ,导致延迟:


  • 如果延迟是与链同步的结果,然后通过读一个较大的块考虑的罗宾的建议数据以潜在的减少的每条消息读取所需的量。

  • 如果延迟的等待线程的结果,再考虑有一个额外的线程调用 :: io_service对象的run()

Currently im using design when server reads first 4 bytes of stream then read N bytes after header decoding.

But i foung that time between first async_read and second read is 3-4 ms. I just printed in console timestamp from callbacks for measuring. I sent 10 bytes of data in total. Why it takes so much time to read?

I running it in debug mode but i think that 1 connection for debug is not so much to have a 3 ms delay between reads from socket. Maybe i need another approach to cut TCP stream on "packets"?

UPDATE: I post some code here

void parseHeader(const boost::system::error_code& error)
        {
            cout<<"[parseHeader] "<<lib::GET_SERVER_TIME()<<endl;
            if (error) {
                close();
                return;
            }
            GenTCPmsg::header result = msg.parseHeader();
            if (result.error == GenTCPmsg::parse_error::__NO_ERROR__) {
                msg.setDataLength(result.size);
                boost::asio::async_read(*socket, 
                    boost::asio::buffer(msg.data(), result.size),
                    (*_strand).wrap(
                    boost::bind(&ConnectionInterface::parsePacket, shared_from_this(), boost::asio::placeholders::error)));
            } else {
                close();
            }
        }
        void parsePacket(const boost::system::error_code& error)
        {
            cout<<"[parsePacket] "<<lib::GET_SERVER_TIME()<<endl;
            if (error) {
                close();
                return;
            }
            protocol->parsePacket(msg);
            msg.flush();
            boost::asio::async_read(*socket, 
                boost::asio::buffer(msg.data(), config::HEADER_SIZE),
                (*_strand).wrap(
                boost::bind(&ConnectionInterface::parseHeader, shared_from_this(), boost::asio::placeholders::error)));
        }

As you see unix timestamps differ in 3-4 ms. I want to understand why so many time elapse betweeen parseHeader and parsePacket. This is not a client problem, summary data is 10 bytes, but i cant sent much much more, delay is exactly between calls. I'm using flash client version 11. What i do is just send ByteArray through opened socket. I dont sure that delayes on client. I send all 10 bytes at once. How can i debug where actual delay is?

解决方案

There are far too many unknowns to identify the root cause of the delay from the posted code. Nevertheless, there are a few approaches and considerations that can be taken to help to identify the problem:

  • Enable handler tracking for Boost.Asio 1.47+. Simply define BOOST_ASIO_ENABLE_HANDLER_TRACKING and Boost.Asio will write debug output, including timestamps, to the standard error stream. These timestamps can be used to help filter out delays introduced by application code (parseHeader(), parsePacket(), etc.).
  • Verify that byte-ordering is being handled properly. For example, if the protocol defines the header's size field as two bytes in network-byte-order and the server is handling the field as a raw short, then upon receiving a message that has a body size of 10:
    • A big-endian machine will call async_read reading 10 bytes. The read operation should complete quickly as the socket already has the 10 byte body available for reading.
    • A little-endian machine will call async_read reading 2560 bytes. The read operation will likely remain outstanding, as far more bytes are trying to be read than is intended.
  • Use tracing tools such as strace, ltrace, etc.
  • Modify Boost.Asio, adding timestamps throughout the callstack. Boost.Asio is shipped as a header-file only library. Thus, users may modify it to provide as much verbosity as desired. While not the cleanest or easiest of approaches, adding a print statement with timestamps throughout the callstack may help provide visibility into timing.
  • Try duplicating the behavior in a short, simple, self contained example. Start with the simplest of examples to determine if the delay is systamtic. Then, iteratively expand upon the example so that it becomes closer to the real-code with each iteration.

Here is a simple example from which I started:

#include <iostream>

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>

class tcp_server
  : public boost::enable_shared_from_this< tcp_server >
{
private:

  enum 
  {
     header_size = 4,
     data_size   = 10,
     buffer_size = 1024,
     max_stamp   = 50
  };

  typedef boost::asio::ip::tcp tcp;

public:

  typedef boost::array< boost::posix_time::ptime, max_stamp > time_stamps;

public:

  tcp_server( boost::asio::io_service& service,
              unsigned short port )
    : strand_( service ),
      acceptor_( service, tcp::endpoint( tcp::v4(), port ) ),
      socket_( service ),
      index_( 0 )
  {}

  /// @brief Returns collection of timestamps.
  time_stamps& stamps()
  {
    return stamps_;
  }

  /// @brief Start the server.
  void start()
  {
    acceptor_.async_accept( 
      socket_,
      boost::bind( &tcp_server::handle_accept, this,
                   boost::asio::placeholders::error ) );
  }

private:

  /// @brief Accept connection.
  void handle_accept( const boost::system::error_code& error ) 
  {
    if ( error )
    {  
      std::cout << error.message() << std::endl;
      return;
    }

    read_header();
  }

  /// @brief Read header.
  void read_header()
  {
    boost::asio::async_read(
      socket_,
      boost::asio::buffer( buffer_, header_size ),
      boost::bind( &tcp_server::handle_read_header, this,
                   boost::asio::placeholders::error,
                   boost::asio::placeholders::bytes_transferred ) );
  }

  /// @brief Handle reading header.
  void
  handle_read_header( const boost::system::error_code& error,
                      std::size_t bytes_transferred )
  {
    if ( error )
    {  
      std::cout << error.message() << std::endl;
      return;
    }

    // If no more stamps can be recorded, then stop the async-chain so
    // that io_service::run can return.
    if ( !record_stamp() ) return;

    // Read data.
    boost::asio::async_read(
      socket_,
      boost::asio::buffer( buffer_, data_size ),
      boost::bind( &tcp_server::handle_read_data, this,
                   boost::asio::placeholders::error,
                   boost::asio::placeholders::bytes_transferred ) );

  }

  /// @brief Handle reading data.
  void handle_read_data( const boost::system::error_code& error,
                         std::size_t bytes_transferred )
  {
    if ( error )
    {  
      std::cout << error.message() << std::endl;
      return;
    }

    // If no more stamps can be recorded, then stop the async-chain so
    // that io_service::run can return.
    if ( !record_stamp() ) return;

    // Start reading header again.
    read_header();
  }

  /// @brief Record time stamp.
  bool record_stamp()
  {
    stamps_[ index_++ ] = boost::posix_time::microsec_clock::local_time();

    return index_ < max_stamp;
  }

private:
  boost::asio::io_service::strand strand_;
  tcp::acceptor acceptor_;
  tcp::socket socket_;
  boost::array< char, buffer_size > buffer_;
  time_stamps stamps_;
  unsigned int index_;
};


int main()
{
  boost::asio::io_service service;

  // Create and start the server.
  boost::shared_ptr< tcp_server > server =
    boost::make_shared< tcp_server >( boost::ref(service ), 33333 );  
  server->start();

  // Run.  This will exit once enough time stamps have been sampled.
  service.run();

  // Iterate through the stamps.
  tcp_server::time_stamps& stamps = server->stamps();
  typedef tcp_server::time_stamps::iterator stamp_iterator;
  using boost::posix_time::time_duration;
  for ( stamp_iterator iterator = stamps.begin() + 1,
                       end      = stamps.end();
        iterator != end;
        ++iterator )
  {
     // Obtain the delta between the current stamp and the previous.
     time_duration delta = *iterator - *(iterator - 1);
     std::cout << "Delta: " << delta.total_milliseconds() << " ms"
               << std::endl;
  }
  // Calculate the total delta.
  time_duration delta = *stamps.rbegin() - *stamps.begin();
  std::cout <<    "Total" 
            << "\n  Start: " << *stamps.begin()
            << "\n  End:   " << *stamps.rbegin()
            << "\n  Delta: " << delta.total_milliseconds() << " ms"
            << std::endl;
}

A few notes about the implementation:

  • There is only one thread (main) and one asynchronous chain read_header->handle_read_header->handle_read_data. This should minimize the amount of time a ready-to-run handler spends waiting for an available thread.
  • To focus on boost::asio::async_read, noise is minimized by:
    • Using a pre-allocated buffer.
    • Not using shared_from_this() or strand::wrap.
    • Recording the timestamps, and perform processing post-collection.

I compiled on CentOS 5.4 using gcc 4.4.0 and Boost 1.50. To drive the data, I opted to send 1000 bytes using netcat:

$ ./a.out > output &
[1] 18623
$ echo "$(for i in {0..1000}; do echo -n "0"; done)" | nc 127.0.0.1 33333
[1]+  Done                    ./a.out >output
$ tail output
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Total
  Start: 2012-Sep-10 21:22:45.585780
  End:   2012-Sep-10 21:22:45.586716
  Delta: 0 ms

Observing no delay, I expanded upon the example by modifying the boost::asio::async_read calls, replacing this with shared_from_this() and wrapping the ReadHandlerss with strand_.wrap(). I ran the updated example and still observed no delay. Unfortunately, that is as far as I could get based on the code posted in the question.

Consider expanding upon the example, adding in a piece from the real implementation with each iteration. For example:

  • Start with using the msg variable's type to control the buffer.
  • Next, send valid data, and introduce parseHeader() and parsePacket functions.
  • Finally, introduce the lib::GET_SERVER_TIME() print.

If the example code is as close as possible to the real code, and no delay is being observed with boost::asio::async_read, then the ReadHandlers may be ready-to-run in the real code, but they are waiting on synchronization (the strand) or a resource (a thread), resulting in a delay:

  • If the delay is the result of synchronization with the strand, then consider Robin's suggestion by reading a larger block of data to potentially reduce the amount of reads required per-message.
  • If the delay is the result of waiting for a thread, then consider having an additional thread call io_service::run().

这篇关于提高:: ASIO异步服务器设计的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆