C ++:Boost.Asio:在新线程中启动SSL Server会话 [英] C++: Boost.Asio: Start SSL Server session in a new thread

查看:98
本文介绍了C ++:Boost.Asio:在新线程中启动SSL Server会话的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我基于此服务器示例,我已经完成了所有通信协议.服务器应该从多个客户端的多个连接中接收多个连接,所以我想将会话彼此分开,希望我可以用std::thread做到这一点.

I wrote a pair of server/client programs based on this example for the server and I'm done of all communication protocols. The server is supposed to receive multiple connections from multiple connections from multiple client, so I want to separate the sessions from each other, and I'm hoping I could do that with std::thread.

这看起来很简单,但我根本不知道该怎么做.在线上的所有示例似乎都显示了如何并行运行功能,但似乎没有显示如何在新线程中创建对象.

This looks to be easy, but I don't know how to do it at all. All examples online seem to show how to run a function in parallel, but it doesn't seem to show how to create an object in a new thread.

我已经发表了一些评论,以解释我对该会话机制的理解.

I've put some comments to explain my understanding for this session mechanism.

我要使用的代码如下:

class server
{
public:
  server(boost::asio::io_service& io_service, unsigned short port)
    : io_service_(io_service),
      acceptor_(io_service,
          boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
      context_(io_service, boost::asio::ssl::context::sslv23)
  {
    //some code...


    //notice the next lines here create the session object, and then recurs that to receive more connections
    session* new_session = new session(io_service_, context_);

    //this is called to accept more connections if available, the callback function is called with start() to start the session
    acceptor_.async_accept(new_session->socket(),
        boost::bind(&server::handle_accept, this, new_session,
          boost::asio::placeholders::error));
  }

  void handle_accept(session* new_session, const boost::system::error_code& error)
  {
    if (!error)
    {
      //so the session starts here, and another object is created waiting for another session
      new_session->start();
      new_session = new session(io_service_, context_);
      //now this function is, again, a call back function to make use of new_session, the new object that's waiting for a connection
      acceptor_.async_accept(new_session->socket(),
          boost::bind(&server::handle_accept, this, new_session,
            boost::asio::placeholders::error));
    }
    else
    {
      delete new_session;
    }
  }

private:
  boost::asio::io_service& io_service_;
  boost::asio::ip::tcp::acceptor acceptor_;
  boost::asio::ssl::context context_;
};

如何在新的std::thread中创建这些会话?

How can I create these sessions in new std::thread?

如果您需要任何其他信息,请询问.谢谢.

If you require any additional information, please ask. Thanks.

推荐答案

我为

I've reworked the example for the linked answer mixed with your sample code.

它演示了相同的原理,但是在硬件支持的尽可能多的线程(即thread::hardware_concurrency)上运行io_service.

It demonstrates the same principle but running the io_service on as many threads as your hardware has support for (i.e. thread::hardware_concurrency).

这里的收获是

  • (共享的)对象生存期
  • 线程安全

大多数Asio对象不是线程安全的.因此,您需要同步访问它们.老式的互斥(例如std::mutex等)在这种情况下不能很好地工作(因为您确实不想锁定每个完成处理程序,并且您 eeeeeeee 不想持有一个跨异步调用¹.

Most Asio objects are not thread-safe. Therefore you need synchronize access to them. Old fashioned mutual exclusion (std::mutex etc.) don't work well in this scenario (because you really don't want to lock on every completion handler and you reeeeeeally don't want to hold a lock across asynchronous calls ¹.

在这种情况下,Boost Asio具有strand的概念:

Boost Asio has the concept of strands for this situation:

  • http://www.boost.org/doc/libs/1_58_0/doc/html/boost_asio/reference/io_service__strand.html
  • Why do I need strand per connection when using boost::asio?

我选择了最简单的解决方案,使所有操作都在一个套接字"上(ssl流/连接/会话,或者您可以在逻辑上进行引用).

I elected the simplest solution to make all operations on a "socket" (ssl stream/connection/session or however you would refer to this logically) on a strand.

此外,我还对acceptor_的所有访问权进行了序列化.

And besides that I made all the access to acceptor_ serialized on its own strand.

混合解决方案可能会将所有连接移至io_service +池,并将侦听器(Server)保留在单独的io_service上,这可能是它自己的隐式链

A hybrid solution might move all the connections on a io_service+pool and keep the listener (Server) on a separate io_service which could then be it's own implicit strand

注意:关于关机顺序:

  • 我明确销毁了Server,因此我们可以根据需要将acceptor_停在其strand(!!)上.
  • pool线程在所有连接都关闭之前不会完成.如果要对此进行控制,请再次查看链接的答案(其中显示了如何使弱指针跟踪连接).或者,您可以将会话中的所有异步操作与超时一起使用,并检查Server的关闭信号.
  • I made the destruction of Server explicit so we can stop the acceptor_ on its strand(!!) as required.
  • The pool threads will not complete until all connections have been closed. If you want control over this, see the linked answer again (which shows how to keep weak pointers tracking the connections). Alternatively, you can accompany all the asynchronous operations in the session with timeouts and check the Server for shutdown signal.

在Coliru上直播

#include <boost/array.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace bs  = boost::system;
namespace ba  = boost::asio;
namespace bas = ba::ssl;

using ba::ip::tcp;
using SslContext = boost::shared_ptr<bas::context>;

typedef ba::ip::tcp::acceptor    acceptor_type;
typedef bas::stream<tcp::socket> stream_type;

const short PORT = 26767;

class Session : public boost::enable_shared_from_this<Session>
{
public:
    typedef boost::shared_ptr<Session> Ptr;

    Session(ba::io_service& svc, SslContext ctx) : strand_(svc), ctx_(ctx), stream(svc, *ctx) { }

    virtual ~Session() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

    stream_type::lowest_layer_type& socket() { return stream.lowest_layer(); } 
    void start()                             { AsyncReadString();          } 
    void Stop()                              { stream.shutdown();            } 

protected:
    ba::io_service::strand strand_;
    SslContext             ctx_;
    stream_type            stream;
    ba::streambuf          stream_buffer;
    std::string            message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        ba::async_read_until(
            stream,
            stream_buffer,
            '\0', // null-char is a delimiter
            strand_.wrap(
                boost::bind(&Session::ReadHandler, shared_from_this(),
                    ba::placeholders::error,
                    ba::placeholders::bytes_transferred)));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        message = s;

        ba::async_write(
            stream,
            ba::buffer(message.c_str(), message.size()+1),
            strand_.wrap(
                boost::bind(&Session::WriteHandler, shared_from_this(),
                         ba::placeholders::error,
                         ba::placeholders::bytes_transferred)));
    }

    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }

    void ReadHandler(const bs::error_code &ec, std::size_t /*bytes_transferred*/) 
    {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }

    void WriteHandler(const bs::error_code &/*ec*/, std::size_t /*bytes_transferred*/) {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }
};

class Server : public boost::enable_shared_from_this<Server>
{
  public:
    Server(ba::io_service& io_service, unsigned short port) :
        strand_  (io_service),
        acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
        context_ (boost::make_shared<bas::context>(io_service, bas::context::sslv23))
    {
        //
    }

    void start_accept() {
        auto new_session = boost::make_shared<Session>(strand_.get_io_service(), context_);

        acceptor_.async_accept(new_session->socket(),
                strand_.wrap(boost::bind(&Server::handle_accept, this, new_session, ba::placeholders::error)));
    }

    void stop_accept() {
        auto keep = shared_from_this();
        strand_.post([keep] { keep->acceptor_.close(); });
    }

    void handle_accept(Session::Ptr new_session, const bs::error_code& error)
    {
        if (!error) {
            new_session->start();
            start_accept(); // uses `acceptor_` safely because of the strand_
        }
    }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

  private:
    ba::io_service::strand strand_;
    tcp::acceptor          acceptor_;
    SslContext             context_;
};

int main() {
    ba::io_service svc;
    boost::thread_group pool;

    {
        auto s = boost::make_shared<Server>(svc, PORT);
        s->start_accept();

        for (auto i = 0u; i<boost::thread::hardware_concurrency(); ++i)
            pool.create_thread([&]{svc.run();});

        std::cerr << "Shutdown in 10 seconds...\n";
        boost::this_thread::sleep_for(boost::chrono::seconds(10)); // auto-shutdown in 10s

        std::cerr << "Shutdown...\n";
    } // destructor of Server // TODO thread-safe

    pool.join_all();
}

哪些印刷品

$ (for a in {1..20000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 6767)& done)

$ time ./test | sort | uniq -c | sort -n | tail
Shutdown in 10 seconds...
Shutdown...
      1 Server::~Server()
      1 void Session::AsyncReadString()virtual Session::~Session()
      1 void Session::AsyncReadString()void Session::ReadHandler(const boost::system::error_code&, std::size_t)
      1 void Session::ReadHandler(const boost::system::error_code&, std::size_t)void Session::AsyncReadString()
      3 
   4523 void Session::AsyncReadString()
   4524 void Session::ReadHandler(const boost::system::error_code&, std::size_t)
   4526 virtual Session::~Session()

real    0m10.128s
user    0m0.430s
sys 0m0.262s


¹异步的全部目的是避免阻塞可能花费更长"时间的IO操作.而且锁定的想法是永远不要持有锁定更长"的时间,否则它们会破坏可扩展性


¹ The whole point of asynchrony is to avoid blocking on IO operations that may take "longer". And the idea of locking is to never hold locks for "longer" amounts of time or they will kill scalability

这篇关于C ++:Boost.Asio:在新线程中启动SSL Server会话的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆