C ++:Boost.Asio:在新线程中启动SSL Server会话 [英] C++: Boost.Asio: Start SSL Server session in a new thread
问题描述
我基于此服务器示例,我已经完成了所有通信协议.服务器应该从多个客户端的多个连接中接收多个连接,所以我想将会话彼此分开,希望我可以用std::thread
做到这一点.
I wrote a pair of server/client programs based on this example for the server and I'm done of all communication protocols. The server is supposed to receive multiple connections from multiple connections from multiple client, so I want to separate the sessions from each other, and I'm hoping I could do that with std::thread
.
这看起来很简单,但我根本不知道该怎么做.在线上的所有示例似乎都显示了如何并行运行功能,但似乎没有显示如何在新线程中创建对象.
This looks to be easy, but I don't know how to do it at all. All examples online seem to show how to run a function in parallel, but it doesn't seem to show how to create an object in a new thread.
我已经发表了一些评论,以解释我对该会话机制的理解.
I've put some comments to explain my understanding for this session mechanism.
我要使用的代码如下:
class server
{
public:
server(boost::asio::io_service& io_service, unsigned short port)
: io_service_(io_service),
acceptor_(io_service,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
context_(io_service, boost::asio::ssl::context::sslv23)
{
//some code...
//notice the next lines here create the session object, and then recurs that to receive more connections
session* new_session = new session(io_service_, context_);
//this is called to accept more connections if available, the callback function is called with start() to start the session
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
void handle_accept(session* new_session, const boost::system::error_code& error)
{
if (!error)
{
//so the session starts here, and another object is created waiting for another session
new_session->start();
new_session = new session(io_service_, context_);
//now this function is, again, a call back function to make use of new_session, the new object that's waiting for a connection
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
else
{
delete new_session;
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ssl::context context_;
};
如何在新的std::thread
中创建这些会话?
How can I create these sessions in new std::thread
?
如果您需要任何其他信息,请询问.谢谢.
If you require any additional information, please ask. Thanks.
推荐答案
I've reworked the example for the linked answer mixed with your sample code.
它演示了相同的原理,但是在硬件支持的尽可能多的线程(即thread::hardware_concurrency
)上运行io_service.
It demonstrates the same principle but running the io_service on as many threads as your hardware has support for (i.e. thread::hardware_concurrency
).
这里的收获是
- (共享的)对象生存期
- 线程安全
大多数Asio对象不是线程安全的.因此,您需要同步访问它们.老式的互斥(例如std::mutex
等)在这种情况下不能很好地工作(因为您确实不想锁定每个完成处理程序,并且您
Most Asio objects are not thread-safe. Therefore you need synchronize access to them. Old fashioned mutual exclusion (std::mutex
etc.) don't work well in this scenario (because you really don't want to lock on every completion handler and you reeeeeeally don't want to hold a lock across asynchronous calls ¹.
在这种情况下,Boost Asio具有strand
的概念:
Boost Asio has the concept of strand
s for this situation:
- http://www.boost .org/doc/libs/1_58_0/doc/html/boost_asio/reference/io_service__strand.html
- 为什么需要绞线使用boost :: asio时每个连接都可以使用?
- http://www.boost.org/doc/libs/1_58_0/doc/html/boost_asio/reference/io_service__strand.html
- Why do I need strand per connection when using boost::asio?
我选择了最简单的解决方案,使所有操作都在一个套接字"上(ssl流/连接/会话,或者您可以在逻辑上进行引用).
I elected the simplest solution to make all operations on a "socket" (ssl stream/connection/session or however you would refer to this logically) on a strand.
此外,我还对acceptor_
的所有访问权进行了序列化.
And besides that I made all the access to acceptor_
serialized on its own strand.
混合解决方案可能会将所有连接移至
io_service
+池,并将侦听器(Server
)保留在单独的io_service
上,这可能是它自己的隐式链 >
A hybrid solution might move all the connections on a
io_service
+pool and keep the listener (Server
) on a separateio_service
which could then be it's own implicit strand
注意:关于关机顺序:
- 我明确销毁了
Server
,因此我们可以根据需要将acceptor_
停在其strand
(!!)上. -
pool
线程在所有连接都关闭之前不会完成.如果要对此进行控制,请再次查看链接的答案(其中显示了如何使弱指针跟踪连接).或者,您可以将会话中的所有异步操作与超时一起使用,并检查Server
的关闭信号.
- I made the destruction of
Server
explicit so we can stop theacceptor_
on itsstrand
(!!) as required. - The
pool
threads will not complete until all connections have been closed. If you want control over this, see the linked answer again (which shows how to keep weak pointers tracking the connections). Alternatively, you can accompany all the asynchronous operations in the session with timeouts and check theServer
for shutdown signal.
#include <boost/array.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>
namespace bs = boost::system;
namespace ba = boost::asio;
namespace bas = ba::ssl;
using ba::ip::tcp;
using SslContext = boost::shared_ptr<bas::context>;
typedef ba::ip::tcp::acceptor acceptor_type;
typedef bas::stream<tcp::socket> stream_type;
const short PORT = 26767;
class Session : public boost::enable_shared_from_this<Session>
{
public:
typedef boost::shared_ptr<Session> Ptr;
Session(ba::io_service& svc, SslContext ctx) : strand_(svc), ctx_(ctx), stream(svc, *ctx) { }
virtual ~Session() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
stream_type::lowest_layer_type& socket() { return stream.lowest_layer(); }
void start() { AsyncReadString(); }
void Stop() { stream.shutdown(); }
protected:
ba::io_service::strand strand_;
SslContext ctx_;
stream_type stream;
ba::streambuf stream_buffer;
std::string message;
void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
ba::async_read_until(
stream,
stream_buffer,
'\0', // null-char is a delimiter
strand_.wrap(
boost::bind(&Session::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "\n";
message = s;
ba::async_write(
stream,
ba::buffer(message.c_str(), message.size()+1),
strand_.wrap(
boost::bind(&Session::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}
std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}
void ReadHandler(const bs::error_code &ec, std::size_t /*bytes_transferred*/)
{
std::cout << __PRETTY_FUNCTION__ << "\n";
if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
void WriteHandler(const bs::error_code &/*ec*/, std::size_t /*bytes_transferred*/) {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
};
class Server : public boost::enable_shared_from_this<Server>
{
public:
Server(ba::io_service& io_service, unsigned short port) :
strand_ (io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
context_ (boost::make_shared<bas::context>(io_service, bas::context::sslv23))
{
//
}
void start_accept() {
auto new_session = boost::make_shared<Session>(strand_.get_io_service(), context_);
acceptor_.async_accept(new_session->socket(),
strand_.wrap(boost::bind(&Server::handle_accept, this, new_session, ba::placeholders::error)));
}
void stop_accept() {
auto keep = shared_from_this();
strand_.post([keep] { keep->acceptor_.close(); });
}
void handle_accept(Session::Ptr new_session, const bs::error_code& error)
{
if (!error) {
new_session->start();
start_accept(); // uses `acceptor_` safely because of the strand_
}
}
~Server() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
private:
ba::io_service::strand strand_;
tcp::acceptor acceptor_;
SslContext context_;
};
int main() {
ba::io_service svc;
boost::thread_group pool;
{
auto s = boost::make_shared<Server>(svc, PORT);
s->start_accept();
for (auto i = 0u; i<boost::thread::hardware_concurrency(); ++i)
pool.create_thread([&]{svc.run();});
std::cerr << "Shutdown in 10 seconds...\n";
boost::this_thread::sleep_for(boost::chrono::seconds(10)); // auto-shutdown in 10s
std::cerr << "Shutdown...\n";
} // destructor of Server // TODO thread-safe
pool.join_all();
}
哪些印刷品
$ (for a in {1..20000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 6767)& done)
$ time ./test | sort | uniq -c | sort -n | tail
Shutdown in 10 seconds...
Shutdown...
1 Server::~Server()
1 void Session::AsyncReadString()virtual Session::~Session()
1 void Session::AsyncReadString()void Session::ReadHandler(const boost::system::error_code&, std::size_t)
1 void Session::ReadHandler(const boost::system::error_code&, std::size_t)void Session::AsyncReadString()
3
4523 void Session::AsyncReadString()
4524 void Session::ReadHandler(const boost::system::error_code&, std::size_t)
4526 virtual Session::~Session()
real 0m10.128s
user 0m0.430s
sys 0m0.262s
¹异步的全部目的是避免阻塞可能花费更长"时间的IO操作.而且锁定的想法是永远不要持有锁定更长"的时间,否则它们会破坏可扩展性
¹ The whole point of asynchrony is to avoid blocking on IO operations that may take "longer". And the idea of locking is to never hold locks for "longer" amounts of time or they will kill scalability
这篇关于C ++:Boost.Asio:在新线程中启动SSL Server会话的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!