Boost.Asio的:它是用每个连接/插座`io_service`一件好事吗? [英] Boost.Asio: Is it a good thing to use a `io_service` per connection/socket?

查看:92
本文介绍了Boost.Asio的:它是用每个连接/插座`io_service`一件好事吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个实现一个线程每个连接模型的应用程序。但每个连接必须是停止的。我曾尝试<一个href=\"http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/blocking_tcp_echo_server.cpp\"相对=nofollow>这个Boost.Asio的例子它实现了我想要的东西阻塞版本。但是,一点点质疑之后,我发现,没有可靠的方法来阻止这个例子的会话。所以我想实现我自己的。我不得不使用异步函数。因为我想打一个线程只管理一个连接,没有办法控制它的异步工作,采用该线程,我决定用 io_service对象为每个连接/插座/线程。

因此​​,它是一个很好的方法,你知道一个更好的办法?

我的code是在这里,您可以检查并查看它:

 的#include&LT;升压/ asio.hpp&GT;
#包括LT&;升压/ bind.hpp&GT;
#包括LT&;升压/ array.hpp&GT;
#包括LT&;升压/ thread.hpp&GT;
#包括LT&;升压/ scoped_ptr.hpp&GT;
#包括LT&;列表&gt;
#包括LT&;&iostream的GT;
#包括LT&;串GT;
#包括LT&;&的IStream GT;命名空间BA =的boost :: ASIO;
命名空间BS =的boost ::制度;
命名空间B =提振;BA的typedef :: IP :: TCP ::受体acceptor_type;
BA的typedef :: IP :: TCP ::插座socket_type;常量短PORT = 11235;
一流的服务器;//一个连接都有自己的io_service对象和插座
类连接{
保护:
    巴:: io_service对象服务;
    socket_type袜子;
    B ::螺纹*螺纹;
    巴::流缓冲stream_buffer; //读取等
    服务器*服务器;
    无效AsyncReadString(){
        巴:: async_read_until(
            袜子,
            stream_buffer,
            '\\ 0',//空字符是分隔符
            B ::绑定(安培;连接:: ReadHandler,为此,
                巴::占位符::错误
                巴::占位符:: bytes_transferred));
    }
    无效AsyncWriteString(常量标准::字符串&安培; S){
        标准::字符串中newstr = S +'\\ 0'; //添加空字符
        巴:: async_write(
            袜子,
            巴::缓冲液(newstr.c_str(),newstr.size()),
            B ::绑定(安培;连接:: WriteHandler,为此,
                巴::占位符::错误
                巴::占位符:: bytes_transferred));
    }
    虚拟无效会话(){
        AsyncReadString();
        service.run(); //最后运行
    }
    标准::字符串ExtractString(){
        的std :: istream处于(安培; stream_buffer);
        性病::字符串s;
        的std ::函数getline(是,S,'\\ 0');
        返回S;
    }
    虚拟无效ReadHandler(
        常量BS ::错误_ code和; EC,
        的std ::为size_t bytes_transferred){
        如果(!EC){
            性病::法院LT&;&LT; (ExtractString()+\\ n);
            的std :: cout.flush();
            AsyncReadString(); //再读
        }
        其他{
            //什么也不做,这个将在以后删除
        }
    }
    虚拟无效WriteHandler(
        常量BS ::错误_ code和; EC,
        的std ::为size_t bytes_transferred){
    }
上市:
    连接(服务器* S):
        服务(),
        袜子(服务),
        服务器(S),
        螺纹(NULL)
    {}
    socket_type&安培;套接字(){
        返回袜子;
    }
    无效的start(){
        如果(线程)删除线;
        线程=新的B ::线程(
            B ::绑定(安培;连接::会议,本));
    }
    无效加入(){
        如果(线)线程&GT;加入();
    }
    空隙停止(){
        service.stop();
    }
    无效KILLME();
    虚拟〜连接(){
    }
};//服务器也有自己的io_service对象,但它仅用于接受
一流的服务器{
上市:
    性病::名单&LT;连接* GT;连接;
保护:
    巴:: io_service对象服务;
    acceptor_type ACC;
    B ::螺纹*螺纹;
    虚拟无效AcceptHandler(常量BS ::错误_ code和; EC){
        如果(!EC){
            Connections.back() - GT;启动();
            Connections.push_back(新连接(本));
            acc.async_accept(
                Connections.back() - GT;插座()
                B ::绑定(安培;服务器:: AcceptHandler,
                    这个,
                    巴::占位符::错误));
        }
        其他{
            // 没做什么
            //因为新的会话将被删除
            在析构函数自动//
        }
    }
    虚拟无效的ThreadFunc(){
        Connections.push_back(新连接(本));
        acc.async_accept(
            Connections.back() - GT;插座()
            B ::绑定(安培;服务器:: AcceptHandler,
                这个,
                巴::占位符::错误));
        service.run();
    }
上市:
    服务器():
        服务(),
        ACC(服务,BA :: IP :: TCP ::端点(BA :: IP :: TCP :: V4(),PORT)),
        螺纹(NULL)
    {}
    无效的start(){
        如果(线程)删除线;
        线程=新的B ::线程(
            B ::绑定(安培;服务器:: ThreadFunc中,这一点));
    }
    空隙停止(){
        service.stop();
    }
    无效加入(){
        如果(线)线程&GT;加入();
    }
    无效StopAllConnections(){
        为(自动C:连接){
            C-&GT;停止();
        }
    }
    无效JoinAllConnections(){
        为(自动C:连接){
            C-&GT;加入();
        }
    }
    无效KillAllConnections(){
        为(自动C:连接){
            删除C;
        }
        Connections.clear();
    }
    无效KillConnection(连接* C){
        Connections.remove(C);
        删除C;
    }
    虚拟服务器〜(){
        删除线程;
        //连接应该由用户删除(?)
    }
};无效连接:: KILLME(){
    服务器 - &GT; KillConnection(本);
}诠释主(){
    尝试{
        服务器S;
        s.Start();
        的std :: cin.get(); //等待进入
        s.Stop(); //停止监听第一
        s.StopAllConnections(); //中断正在进行的连接
        s.Join(); //等待服务器,应立即返回
        s.JoinAllConnections(); //等待正在进行的连接
        s.KillAllConnections(); //销毁连接对象
        //在范围结束时,服务器将被销毁
    }
    赶上(性病::例外急症){
        的std :: CERR&LT;&LT; 例外:&LT;&LT; e.what()&所述;&下;的std :: ENDL;
        返回1;
    }
    返回0;
}


解决方案

没有。每个连接使用 io_service对象对象绝对是一个味道。特别是因为你还上运行一个专用的线程每个连接。

在这一点上,你要问自己什么也买不同步吗?你可以拥有所有的code同步,并有相同数量的线程等。

显然要多路连接到服务的远较小数目。在实践中有一些明智的车型,如


  1. 一个 io_service对象跟单服务线程(这通常是好)。没有排队的服务任务可能会永远阻塞显著时间或延迟将遭受


  2. 一个 io_service对象与多家执行处理程序的线程。在游泳池的线程数应足以提供服务的最大值。支持的同时的CPU密集型任务数(或再次,等待时间会开始上升)


  3. 每个线程的io_service对象,每个逻辑内核通常一个线程和线程亲和力,所以它大棒来表示的核心。这是理想的高速缓存位置


更新:演示

下面是显示了惯用的风格使用演示选项1 从上面:

<大骨节病> 住在Coliru

 的#include&LT;升压/ array.hpp&GT;
#包括LT&;升压/ asio.hpp&GT;
#包括LT&;升压/ bind.hpp&GT;
#包括LT&;升压/ enable_shared_from_this.hpp&GT;
#包括LT&;升压/ make_shared.hpp&GT;
#包括LT&;升压/ thread.hpp&GT;
#包括LT&;&iostream的GT;
#包括LT&;&的IStream GT;
#包括LT&;列表&gt;
#包括LT&;串GT;命名空间BA =的boost :: ASIO;
命名空间BS =的boost ::制度;
命名空间B =提振;BA的typedef :: IP :: TCP ::受体acceptor_type;
BA的typedef :: IP :: TCP ::插座socket_type;常量短PORT = 11235;//一个连接都有自己的io_service对象和插座
类连接:公共B :: enable_shared_from_this&LT;连接GT&;
{
上市:
    TYPEDEF提高:: shared_ptr的&LT;连接GT&; PTR;
保护:
    socket_type袜子;
    巴::流缓冲stream_buffer; //读取等
    性病::字符串消息;    无效AsyncReadString(){
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;        巴:: async_read_until(
            袜子,
            stream_buffer,
            '\\ 0',//空字符是分隔符
            B ::绑定(安培;连接:: ReadHandler,shared_from_this()
                巴::占位符::错误
                巴::占位符:: bytes_transferred));
    }
    无效AsyncWriteString(常量标准::字符串&安培; S){
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;        消息= S;        巴:: async_write(
            袜子,
            巴::缓冲液(message.c_str(),message.size()+ 1)
            B ::绑定(安培;连接:: WriteHandler,shared_from_this()
                巴::占位符::错误
                巴::占位符:: bytes_transferred));
    }
    标准::字符串ExtractString(){
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;        的std :: istream处于(安培; stream_buffer);
        性病::字符串s;
        的std ::函数getline(是,S,'\\ 0');
        返回S;
    }
    无效ReadHandler(
        常量BS ::错误_ code和; EC,
        的std ::为size_t bytes_transferred)
    {
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;        如果(!EC){
            性病::法院LT&;&LT; (ExtractString()+\\ n);
            的std :: cout.flush();
            AsyncReadString(); //再读
        }
        其他{
            //什么也不做,这个将在以后删除
        }
    }
    无效WriteHandler(常量BS ::错误_ code和; EC,的std ::为size_t bytes_transferred){
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;
    }
上市:
    连接(BA :: io_service对象和放大器; SVC):袜子(SVC){}    虚拟〜连接(){
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;
    }    socket_type&安培;套接字(){返回袜子; }
    空会话(){AsyncReadString(); }
    空隙停止(){sock.cancel(); }
};//服务器也有自己的io_service对象,但它仅用于接受
一流的服务器{
上市:
    性病::名单&LT;提高:: weak_ptr的&LT;连接GT&; &GT; m_connections;
保护:
    巴:: io_service对象_service;
    推动::可选&LT; BA :: io_service对象::工作&GT; _工作;
    acceptor_type _ACC;
    B ::螺纹螺纹;    无效AcceptHandler(常量BS ::错误_ code和; EC,连接:: PTR接受){
        如果(!EC){
            承兑&GT;会议();
            DoAccept();
        }
        其他{
            //什么也不做新的会议将自动被删除
            //析构函数
        }
    }    无效DoAccept(){
        汽车newaccept =提振:: make_shared&LT;连接GT - (_服务);        _acc.async_accept(
            newaccept-&GT;插座()
            B ::绑定(安培;服务器:: AcceptHandler,
                这个,
                巴::占位符::错误
                newaccept
            ));
    }上市:
    服务器():
        _服务(),
        _work(BA :: io_service对象::工作(_service))
        _ACC(_Service,BA :: IP :: TCP ::端点(BA :: IP :: TCP :: V4(),PORT)),
        螺纹(B ::绑定(安培; BA :: io_service对象::来看,&安培; _service))
    {}    〜服务器(){
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;
        停止();
        _work.reset();
        如果(thread.joinable())的Thread.join();
    }    无效的start(){
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;
        DoAccept();
    }    空隙停止(){
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;
        _acc.cancel();
    }    无效StopAllConnections(){
        性病::法院LT&;&LT; __ preTTY_FUNCTION__&LT;&LT; \\ n;
        为(自动C:m_connections){
            如果(自动p值= c.lock())
                P-&GT;停止();
        }
    }
};诠释主(){
    尝试{
        服务器S;
        s.Start();        的std :: CERR&LT;&LT; 关闭在2秒内... \\ N的;
        B :: this_thread :: sleep_for(B ::计时::秒(2));        的std :: CERR&LT;&LT; 停止接受... \\ N的;
        s.Stop();        的std :: CERR&LT;&LT; 关机... \\ N的;
        s.StopAllConnections(); //中断正在进行的连接
    } //服务器的析构函数将加入服务线程
    赶上(性病::例外急症){
        的std :: CERR&LT;&LT; __FUNCTION__&LT;&LT; :&所述;&下; __LINE__&LT;&LT; \\ n;
        的std :: CERR&LT;&LT; 例外:&LT;&LT; e.what()&所述;&下;的std :: ENDL;
        返回1;
    }    的std :: CERR&LT;&LT; 拜拜\\ N的;
}

我修改了的main()来2秒,无需用户干预运行。这是这样我就可以演示它的 住在Coliru (中当然,这是有限WRT客户端进程的数量)。

如果您有很多运行它( 很多)客户端,例如使用

  $时间(一个在{1..1000}; DO(睡眠1 $ RANDOM;回声-e的hello world $ RANDOM \\\\ 0| netcat的本地主机11235) &安培;完成;等待)

您会发现,这两个第二个窗口中处理所有这些:

  $ ./test |排序| uniq的-c |排序-n |尾巴
关闭2秒...
关闭...
再见
      2世界您好28214
      2的Hello World 4554
      2的Hello World 6216
      2的Hello World 7864
      2的Hello World 9966
      2无效服务器::停止()
   1000标准::字符串连接:: ExtractString()
   1001虚拟连接::〜连接()
   2000无效连接:: AsyncReadString()
   2000无效连接:: ReadHandler(常量的boost ::系统::错误_ code和;,的std ::为size_t)

如果你真的发狂,提高 1000 来如 100000 那里,你会得到的东西类似:

  sehe @桌面:/ tmp目录$ ./test |排序| uniq的-c |排序-n |尾巴
关闭2秒...
关闭...
再见
      2的Hello World 5483
      2你好世界579
      2的Hello World 5865
      2你好世界938
      2无效服务器::停止()
      3的Hello World 9613
   1741标准::字符串连接:: ExtractString()
   1742虚拟连接::〜连接()
   3482无效连接:: AsyncReadString()
   3482无效连接:: ReadHandler(常量的boost ::系统::错误_ code和;,的std ::为size_t)

在服务器重复2秒运行。

I want to create an application that implements one-thread-per-connection model. But each connection must be stoppable. I have tried this boost.asio example which implements the blocking version of what I want. But after a little bit questioning I've found out that there is no reliable way to stop the session of that example. So I've tried to implement my own. I had to use asynchronous functions. Since I want to make a thread to manage only one connection and there is no way to control which asynchronous job is employed to which thread, I decided to use io_service for each connection/socket/thread.

So is it a good approach, do you know a better approach?

My code is here so you can examine and review it:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;
class Server;

// A connection has its own io_service and socket
class Connection {
protected:
    ba::io_service service;
    socket_type sock;
    b::thread *thread;
    ba::streambuf stream_buffer;    // for reading etc
    Server *server;
    void AsyncReadString() {
        ba::async_read_until(
            sock,
            stream_buffer,
            '\0',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::string newstr = s + '\0';  // add a null char
        ba::async_write(
            sock,
            ba::buffer(newstr.c_str(), newstr.size()),
            b::bind(&Connection::WriteHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    virtual void Session() {
        AsyncReadString();
        service.run();  // run at last
    }
    std::string ExtractString() {
        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }
    virtual void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    virtual void WriteHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
    }
public:
    Connection(Server *s) :
        service(),
        sock(service),
        server(s),
        thread(NULL)
    {  }
    socket_type& Socket() {
        return sock;
    }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Connection::Session, this));
    }
    void Join() {
        if (thread) thread->join();
    }
    void Stop() {
        service.stop();
    }
    void KillMe();
    virtual ~Connection() {
    }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<Connection*> Connections;
protected:
    ba::io_service service;
    acceptor_type acc;
    b::thread *thread;
    virtual void AcceptHandler(const bs::error_code &ec) {
        if (!ec) {
            Connections.back()->Start();
            Connections.push_back(new Connection(this));
            acc.async_accept(
                Connections.back()->Socket(),
                b::bind(&Server::AcceptHandler,
                    this,
                    ba::placeholders::error));
        }
        else {
            // do nothing
            // since the new session will be deleted
            // automatically by the destructor
        }
    }
    virtual void ThreadFunc() {
        Connections.push_back(new Connection(this));
        acc.async_accept(
            Connections.back()->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error));
        service.run();
    }
public:
    Server():
        service(),
        acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(NULL)
    {  }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Server::ThreadFunc, this));
    }
    void Stop() {
        service.stop();
    }
    void Join() {
        if (thread) thread->join();
    }
    void StopAllConnections() {
        for (auto c : Connections) {
            c->Stop();
        }
    }
    void JoinAllConnections() {
        for (auto c : Connections) {
            c->Join();
        }
    }
    void KillAllConnections() {
        for (auto c : Connections) {
            delete c;
        }
        Connections.clear();
    }
    void KillConnection(Connection *c) {
        Connections.remove(c);
        delete c;
    }
    virtual ~Server() {
        delete thread;
        // connection should be deleted by the user (?)
    }
};

void Connection::KillMe() {
    server->KillConnection(this);
}

int main() {
    try {
        Server s;
        s.Start();
        std::cin.get(); // wait for enter
        s.Stop();   // stop listening first
        s.StopAllConnections(); // interrupt ongoing connections
        s.Join();   // wait for server, should return immediately
        s.JoinAllConnections(); // wait for ongoing connections
        s.KillAllConnections(); // destroy connection objects
        // at the end of scope, Server will be destroyed
    }
    catch (std::exception &e) {
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }
    return 0;
}

解决方案

No. Using an io_service object per connection is definitely a smell. Especially since you're also running each connection on a dedicated thread.

At this point you have to ask yourself what did asynchrony buy you? You can have all the code synchronous and have exactly the same number of threads etc.

Clearly you want to multiplex the connections onto a far smaller number of services. In practice there are a few sensible models like

  1. a single io_service with a single service thread (this is usually good). No tasks queued on the service may ever block for significant time or the latency will suffer

  2. a single io_service with a number of threads executing handlers. The number of threads in the pool should be enough to service the max. number of simultaneous CPU intensive tasks supported (or again, the latency will start to go up)

  3. an io_service per thread, usually one thread per logical core and with thread affinity so that it "sticks" to that core. This can be ideal for cache locality

UPDATE: Demo

Here's a demo that shows the idiomatic style using option 1. from above:

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket   socket_type;

const short PORT = 11235;

// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
    typedef boost::shared_ptr<Connection> Ptr;
protected:
    socket_type    sock;
    ba::streambuf  stream_buffer; // for reading etc
    std::string    message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        ba::async_read_until(
            sock,
            stream_buffer,
            '\0',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        message = s;

        ba::async_write(
            sock,
            ba::buffer(message.c_str(), message.size()+1),
            b::bind(&Connection::WriteHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }
    void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) 
    {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }
public:
    Connection(ba::io_service& svc) : sock(svc) { }

    virtual ~Connection() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

    socket_type& Socket() { return sock;          } 
    void Session()        { AsyncReadString();    } 
    void Stop()           { sock.cancel();        }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<boost::weak_ptr<Connection> > m_connections;
protected:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _work;
    acceptor_type _acc;
    b::thread thread;

    void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
        if (!ec) {
            accepted->Session();
            DoAccept();
        }
        else {
            // do nothing the new session will be deleted automatically by the
            // destructor
        }
    }

    void DoAccept() {
        auto newaccept = boost::make_shared<Connection>(_service);

        _acc.async_accept(
            newaccept->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error,
                newaccept
            ));
    }

public:
    Server():
        _service(),
        _work(ba::io_service::work(_service)),
        _acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(b::bind(&ba::io_service::run, &_service))
    {  }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        Stop();
        _work.reset();
        if (thread.joinable()) thread.join();
    }

    void Start() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        DoAccept();
    }

    void Stop() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        _acc.cancel();
    }

    void StopAllConnections() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        for (auto c : m_connections) {
            if (auto p = c.lock())
                p->Stop();
        }
    }
};

int main() {
    try {
        Server s;
        s.Start();

        std::cerr << "Shutdown in 2 seconds...\n";
        b::this_thread::sleep_for(b::chrono::seconds(2));

        std::cerr << "Stop accepting...\n";
        s.Stop();

        std::cerr << "Shutdown...\n";
        s.StopAllConnections(); // interrupt ongoing connections
    } // destructor of Server will join the service thread
    catch (std::exception &e) {
        std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }

    std::cerr << "Byebye\n";
}

I modified the main() to run for 2 seconds without user intervention. This is so I can demo it Live On Coliru (of course, it's limited w.r.t the number of client processes).

If you run it with a lot (a lot) of clients, using e.g.

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 11235)& done; wait)

You will find that the two second window handles them all:

$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 28214
      2 hello world 4554
      2 hello world 6216
      2 hello world 7864
      2 hello world 9966
      2 void Server::Stop()
   1000 std::string Connection::ExtractString()
   1001 virtual Connection::~Connection()
   2000 void Connection::AsyncReadString()
   2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

If you really go berserk and raise 1000 to e.g. 100000 there, you'll get things similar to:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 5483
      2 hello world 579
      2 hello world 5865
      2 hello world 938
      2 void Server::Stop()
      3 hello world 9613
   1741 std::string Connection::ExtractString()
   1742 virtual Connection::~Connection()
   3482 void Connection::AsyncReadString()
   3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

On repeated 2-second runs of the server.

这篇关于Boost.Asio的:它是用每个连接/插座`io_service`一件好事吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆