提高:: ASIO async_receive_from UDP端点线程之间共享? [英] boost::asio async_receive_from UDP endpoint shared between threads?

查看:484
本文介绍了提高:: ASIO async_receive_from UDP端点线程之间共享?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

升压ASIO特别允许多个线程调用上的io_service对象run()方法。这似乎是一个伟大的方式来创建一个多线程的UDP服务器。不过,我打的,我努力得到答案的一个障碍。

在看一个典型的呼叫async_receive_from:

  m_socket-> async_receive_from(
        提高:: ASIO ::缓​​冲液(m_recv_buffer)
        m_remote_endpoint,
        提高::绑定(
            &安培; udp_server :: handle_receive,
            这个,
            提高:: ASIO ::占位符::错误
            提高:: ASIO ::占位符:: bytes_transferred));

远程端点和消息缓冲区不通过到处理程序通过,但都处在较高水平范围(成员变量在我的例子)。在code来处理UDP报文到达时看起来像:

 无效dns_server :: handle_receive(常量的boost ::系统::错误_ code和;错误,的std ::为size_t大小)
{
    //处理消息
    胡说(m_recv_buffer,大小);    //发送回的东西
    回应(m_remote_endpoint);
}

如果有运行多个线程,请问同步工作的?具有单端点和接收线程之间共享缓冲器意味着ASIO等待处理程序以调用处理器在该消息在此期间到达的情况下,另一线程之前单个线程内完成。这似乎否定了允许多个线程调用运行摆在首位的地步。

如果我想要得到的请求的并发服,它看起来像我需要交班的工作包,在结束点的副本一起,以一个单独的线程允许处理方法立即返回,这样ASIO可以得到并通过并行另一条消息给调用run()的线程的另一个。

这似乎比有点讨厌了。缺少什么我在这里?


解决方案

  

有一个单一的终点和接收线程之间共享的缓存意味着ASIO等待处理程序在一个线程中完成


如果你的意思是用一个单独的线程运行服务时,那么这是正确的。

,否则这不是这种情况。相反,短耳只是说行为是未定义,当你调用一个服务对象上的操作(即插座,而不是io_service对象)兼任。


  

这似乎否定了允许多个线程首先调用运行的地步。


尚未除非处理需要相当多的时间。

引进 Timer.5样品 看起来像关于你的话题一个很好的阐述。

会话

要分开请求特定的数据(缓冲和端点)要在会话的一些概念。在短耳一个流行的机制是不是的约束的shared_ptr 取值的或共享的,这个会话类(升压绑定支持绑定到的boost :: shared_ptr实例直接)。

要避免 m_socket 成员并发,不同步访问,您可以添加锁或使用办法为上面记载的联系Timer.5样本。

演示

下面为您的享受就是在 Daytime.6 异步UDP daytime服务器,修改了许多服务的IO线程的工作。

需要注意的是,从逻辑上讲,还是有只有一个IO线程(),所以我们不违反插座类的记录线程安全。

然而,不同于官方样片,响应可能会排队无序,根据由实际处理 udp_session :: handle_request 所花费的时间。

注意


  • udp_session 类,它保持每个请求的缓冲区和远程端点

  • 线程,其能够在多个内核扩展实际处理的负载(未在IO)的池。

 的#include<&的ctime GT;
#包括LT&;&iostream的GT;
#包括LT&;串GT;
#包括LT&;升压/ array.hpp>
#包括LT&;升压/ bind.hpp>
#包括LT&;升压/ shared_ptr.hpp>
#包括LT&;升压/ enable_shared_from_this.hpp>
#包括LT&;升压/ make_shared.hpp>
#包括LT&;升压/ asio.hpp>
#包括LT&;升压/ thread.hpp>使用名字空间boost;
使用ASIO ::知识产权:: UDP;
使用系统::错误_ code;标准::字符串make_daytime_string()
{
    使用命名空间std; //为time_t的,时间的ctime;
    time_t的现在=时间(0);
    返回的ctime(安培,现在);
}类udp_server; //向前声明结构udp_session:enable_shared_from_this< udp_session> {    udp_session(udp_server *服务器):server_(服务器){}    无效handle_request(常量错误_ code&安培;错误);    无效handle_sent(常量错误_ code和; EC,的std ::为size_t){
        //此处响应已发送
        如果(EC){
            性病::法院LT&;< 发送错误回应<< remote_endpoint_<< :&所述;&下; ec.message()&所述;&下; \\ n;
        }
    }    UDP ::端点remote_endpoint_;
    阵列<焦炭,100〕 recv_buffer_;
    性病::字符串消息;
    udp_server * server_;
};类udp_server
{
    shared_ptr的typedef的LT&;&udp_session GT; shared_session;
  上市:
    udp_server(ASIO :: io_service对象和放大器; io_service对象)
        :socket_(io_service对象,UDP ::端点(UDP :: V4(),1313)),
          strand_(io_service对象)
    {
        receive_session();
    }  私人的:
    无效receive_session()
    {
        //我们的会议期间举办的缓冲区+端点
        自动会话= make_shared< udp_session>(本);        socket_.async_receive_from(
                ASIO ::缓​​冲区(会话级> recv_buffer_)
                会话级> remote_endpoint_,
                strand_.wrap(
                    绑定(安培; udp_server :: handle_receive,为此,
                        会话,//保持有效缓冲/端点
                        ASIO ::占位符::错误
                        ASIO ::占位:: bytes_transferred)));
    }    无效handle_receive(shared_session会议,const的错误_ code和; EC,性病::为size_t / * * bytes_transferred /){
        //现在,处理任何可用线程池中当前会话
        socket_.get_io_service()后(绑定(安培; udp_session :: handle_request,会话,EC))。        //立即接受新报
        receive_session();
    }    无效enqueue_response(shared_session常量和放大器;会议){
        socket_.async_send_to(ASIO ::缓​​冲区(会话级>消息),会话级> remote_endpoint_,
                strand_.wrap(BIND(安培; udp_session :: handle_sent,
                        会话,//保持有效缓冲/端点
                        ASIO ::占位符::错误
                        ASIO ::占位:: bytes_transferred)));
    }    UDP ::插座socket_;
    ASIO ::链strand_;    朋友结构udp_session;
};无效udp_session :: handle_request(const的错误_ code&安培;错误)
{
    如果(!错误||错误== ASIO ::错误:: message_size)
    {
        消息= make_daytime_string(); //让我们假设,这可能是缓慢的        //让服务器协调实际IO
        服务器_-> enqueue_response(shared_from_this());
    }
}诠释的main()
{
    尝试{
        ASIO :: io_service对象io_service对象;
        udp_server服务器(io_service对象);        thread_group组;
        对(无符号I = 0; I<螺纹:: hardware_concurrency(); ++ I)
            group.create_thread(BIND(安培; ASIO :: io_service对象::来看,REF(io_service对象)));        group.join_all();
    }
    赶上(性病::例外急症){
        的std :: CERR<< e.what()&所述;&下;的std :: ENDL;
    }
}

关闭想法

有趣的是,在大多数情况下,你会看到单线程版本表现一样好,而且也没有理由去设计复杂。

另外,你可以使用单线程 io_service对象献给IO和使用老式工人池以执行请求的后台处理,如果这确实是CPU密集型的一部分​​。首先,这简化了设计,其次是因为没有更多的需要协调贴在链上的任务,这可能提高在IO任务的吞吐量。

Boost asio specifically allows multiple threads to call the run() method on an io_service. This seems like a great way to create a multithreaded UDP server. However, I've hit a snag that I'm struggling to get an answer to.

Looking at a typical async_receive_from call:

m_socket->async_receive_from(
        boost::asio::buffer(m_recv_buffer),
        m_remote_endpoint,
        boost::bind(
            &udp_server::handle_receive,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

The remote endpoint and message buffer are not passed through to the handler, but are at a higher scope level (member variable in my example). The code to handle the UDP message when it arrives will look something like:

void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size)
{
    // process message
    blah(m_recv_buffer, size);

    // send something back
    respond(m_remote_endpoint);
}

If there are multiple threads running, how does the synchronisation work? Having a single end point and receive buffer shared between the threads implies that asio waits for a handler to complete within a single thread before calling the handler in another thread in the case that a message arrived in the meantime. That seems to negate the point of allowing multiple threads to call run in the first place.

If I want to get concurrent serving of requests, it looks like I need to hand off the work packets, along with a copy of the end point, to a separate thread allowing the handler method to return immediately so that asio can get on and pass another message in parallel to another one of the threads that called run().

That seems more than somewhat nasty. What am I missing here?

解决方案

Having a single end point and receive buffer shared between the threads implies that asio waits for a handler to complete within a single thread

If you mean "when running the service with a a single thread" then this is correct.

Otherwise, this isn't the case. Instead Asio just says behaviour is "undefined" when you call operations on a single service object (i.e. the socket, not the io_service) concurrently.

That seems to negate the point of allowing multiple threads to call run in the first place.

Not unless processing takes a considerable amount of time.

The first paragraphs of the introduction of the Timer.5 sample seem like a good exposition about your topic.

Session

To separate the request-specific data (buffer and endpoint) you want some notion of a session. A popular mechanism in Asio is either bound shared_ptrs or a shared-from-this session class (boost bind supports binding to boost::shared_ptr instances directly).

Strand

To avoid concurrent, unsynchronized access to members of m_socket you can either add locks or use the strand approach as documented in the Timer.5 sample linked above.

Demo

Here for your enjoyment is the Daytime.6 asynchronous UDP daytime server, modified to work with many service IO threads.

Note that, logically, there's still only a single IO thread (the strand) so we don't violate the socket class's documented thread-safety.

However, unlike the official sample, the responses may get queued out of order, depending on the time taken by the actual processing in udp_session::handle_request.

Note the

  • a udp_session class to hold the buffers and remote endpoint per request
  • a pool of threads, which are able to scale the load of actual processing (not the IO) over multiple cores.

#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

using namespace boost;
using asio::ip::udp;
using system::error_code;

std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}

class udp_server; // forward declaration

struct udp_session : enable_shared_from_this<udp_session> {

    udp_session(udp_server* server) : server_(server) {}

    void handle_request(const error_code& error);

    void handle_sent(const error_code& ec, std::size_t) {
        // here response has been sent
        if (ec) {
            std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "\n";
        }
    }

    udp::endpoint remote_endpoint_;
    array<char, 100> recv_buffer_;
    std::string message;
    udp_server* server_;
};

class udp_server
{
    typedef shared_ptr<udp_session> shared_session;
  public:
    udp_server(asio::io_service& io_service)
        : socket_(io_service, udp::endpoint(udp::v4(), 1313)), 
          strand_(io_service)
    {
        receive_session();
    }

  private:
    void receive_session()
    {
        // our session to hold the buffer + endpoint
        auto session = make_shared<udp_session>(this);

        socket_.async_receive_from(
                asio::buffer(session->recv_buffer_), 
                session->remote_endpoint_,
                strand_.wrap(
                    bind(&udp_server::handle_receive, this,
                        session, // keep-alive of buffer/endpoint
                        asio::placeholders::error,
                        asio::placeholders::bytes_transferred)));
    }

    void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) {
        // now, handle the current session on any available pool thread
        socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec));

        // immediately accept new datagrams
        receive_session();
    }

    void enqueue_response(shared_session const& session) {
        socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_,
                strand_.wrap(bind(&udp_session::handle_sent, 
                        session, // keep-alive of buffer/endpoint
                        asio::placeholders::error,
                        asio::placeholders::bytes_transferred)));
    }

    udp::socket  socket_;
    asio::strand strand_;

    friend struct udp_session;
};

void udp_session::handle_request(const error_code& error)
{
    if (!error || error == asio::error::message_size)
    {
        message = make_daytime_string(); // let's assume this might be slow

        // let the server coordinate actual IO
        server_->enqueue_response(shared_from_this());
    }
}

int main()
{
    try {
        asio::io_service io_service;
        udp_server server(io_service);

        thread_group group;
        for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
            group.create_thread(bind(&asio::io_service::run, ref(io_service)));

        group.join_all();
    }
    catch (std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
}

Closing thoughts

Interestingly, in most cases you'll see the single-thread version performing just as well, and there's no reason to complicate the design.

Alternatively, you can use a single-threaded io_service dedicated to the IO and use an old fashioned worker pool to do the background processing of the requests if this is indeed the CPU intensive part. Firstly, this simplifies the design, secondly this might improve the throughput on the IO tasks because there is no more need to coordinate the tasks posted on the strand.

这篇关于提高:: ASIO async_receive_from UDP端点线程之间共享?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆