提高ASIO async_write:如何才能不交错async_write电话? [英] boost asio async_write : how to not interleaving async_write calls?

查看:177
本文介绍了提高ASIO async_write:如何才能不交错async_write电话?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是我实现的:


  • 客户端A发送一个消息客户端B

  • 服务器进程 async_read 数据适量和消息
    将等待来自客户端的新数据(为了不阻止客户端A)

  • 之后服务器将处理信息(可能是做一个MySQL
    查询),然后用 async_write 将消息发送到客户端B。

问题是,如果客户端发送消息的真快, async_writes 将交织previous async_write处理程序被调用之前。

有没有一种简单的方法来避免这个问题?

编辑1:
如果一个客户端C只是客户端A后,将消息发送给客户端B,同样的问题应该出现......

编辑2:
这将工作?因为它似乎阻止,我不知道在哪里...

 命名空间结构{
  类User {
  上市:
    用户(提高:: ASIO :: io_service对象和放大器; io_service对象,提高:: ASIO :: SSL ::背景和放大器;上下文):
      m_socket(io_service对象,上下文),m_strand(io_service对象),is_writing(假){}    ssl_socket&安培; getSocket(){
      返回m_socket;
    }    提高:: ASIO ::链getStrand(){
      返回m_strand;
    }    无效的push(性病::字符串str){
      m_strand.post(升压::绑定(安培;结构::用户:: strand_push,对此,STR));
    }    无效strand_push(性病::字符串str){      性病::法院LT&;< 推&所述;&下;提高:: this_thread :: get_id()<<的std :: ENDL;
      m_queue.push(STR);
      如果(!is_writing){
        写();
        性病::法院LT&;< 会写<<的std :: ENDL;
      }
      性病::法院LT&;< 已经写入<<的std :: ENDL;
    }    无效的write(){
      性病::法院LT&;< 写作<<的std :: ENDL;
      is_writing = TRUE;
      性病::字符串str = m_queue.front();
      提高:: ASIO :: async_write(m_socket,
                               升压:: ASIO ::缓​​冲液(str.c_str(),str.size()),
                               提高::绑定(安培;结构::用户发送::这个)
                               );
    }    空隙发送(){
      性病::法院LT&;< 发送<<的std :: ENDL;
      m_queue.pop();
      如果(!m_queue.empty()){
        写();
        返回;
      }
      其他
        is_writing = FALSE;
      性病::法院LT&;< 做派<<的std :: ENDL;
    }  私人的:
    ssl_socket m_socket;
    提高:: ASIO ::链m_strand;
    的std ::队列<标准::字符串> m_queue;
    布尔is_writing;
  };
}#万一


解决方案

  

有没有一种简单的方法来避免这个问题?


是,为每一个客户端传出队列。检查在 async_write 完成处理队列的大小,如果不为零,再掀 async_write 操作。这里是一个样本

 的#include<升压/ asio.hpp>
#包括LT&;升压/ bind.hpp>#包括LT&;&双端GT;
#包括LT&;&iostream的GT;
#包括LT&;串GT;一流的连接
{
上市:
    连接(
            提高:: ASIO :: io_service对象和放大器; io_service对象
            ):
        _io_service(io_service对象)
        _strand(_io_service)
        _socket(_io_service)
        _outbox()
    {    }    无效写入(
            常量标准::字符串&安培;信息
            )
    {
        _strand.post(
                提高::绑定(
                    &安培;连接:: writeImpl,
                    这个,
                    信息
                    )
                );
    }私人的:
    无效writeImpl(
            常量标准::字符串&安培;信息
            )
    {
        _outbox.push_back(消息);
        如果(_outbox.size()→1){
            //优秀async_write
            返回;
        }        这 - >写();
    }    无效的write()
    {
        常量标准::字符串&安培;消息= _outbox [0];
        提高:: ASIO :: async_write(
                _插座,
                升压:: ASIO ::缓​​冲液(message.c_str(),message.size()),
                _strand.wrap(
                    提高::绑定(
                        &安培;连接:: writeHandler,
                        这个,
                        提高:: ASIO ::占位符::错误
                        提高:: ASIO ::占位符:: bytes_transferred
                        )
                    )
                );
    }    无效writeHandler(
            常量的boost ::系统::错误_ code&放;错误,
            常量为size_t bytesTransferred
            )
    {
        _outbox.pop_front();        如果(错误){
            的std :: CERR<< 不能写:<<提高::系统:: SYSTEM_ERROR(错误)。什么()<<的std :: ENDL;
            返回;
        }        如果(!_outbox.empty()){
            //更多的消息发送
            这 - >写();
        }
    }
私人的:
    的typedef的std :: deque的<标准::字符串>发件箱;私人的:
    提高:: ASIO :: io_service对象和放大器; _io_service;
    提高:: ASIO :: io_service对象::链_strand;
    提高:: ASIO ::知识产权:: TCP ::插座_socket;
    发件箱_outbox;
};INT
主要()
{
    提高:: ASIO :: io_service对象io_service对象;
    连接符(io_service对象);
}

一些关键点


  • 提高:: ASIO :: io_service对象::股保护访问连接:: _发件箱

  • 的处理程序是由连接出动::写(),因为它是公共

这不是明显给我,如果你在你的问题在​​使用的例子类似的做法,因为所有的方法都是公开的。

Here's my implementation :

  • Client A send a message for Client B
  • Server process the message by async_read the right amount of data and will wait for new data from Client A (in Order not to block Client A)
  • Afterwards Server will process the information (probably do a mysql query) and then send the message to Client B with async_write.

The problem is, if Client A send message really fast, async_writes will interleave before the previous async_write handler is called.

Is there a simple way to avoid this problem ?

EDIT 1 : If a Client C sends a message to Client B just after Client A, the same issue should appear...

EDIT 2 : This would work ? because it seems to block, I don't know where...

 namespace structure {                                                              
  class User {                                                                     
  public:                                                                          
    User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) :
      m_socket(io_service, context), m_strand(io_service), is_writing(false) {}    

    ssl_socket& getSocket() {                                                      
      return m_socket;                                                             
    }                                                                              

    boost::asio::strand getStrand() {                                              
      return m_strand;                                                             
    }                                                                              

    void push(std::string str) {                                                   
      m_strand.post(boost::bind(&structure::User::strand_push, this, str));        
    }                                                                              

    void strand_push(std::string str) {                                            

      std::cout << "pushing: " << boost::this_thread::get_id() << std::endl;       
      m_queue.push(str);                                                           
      if (!is_writing) {                                                           
        write();                                                                   
        std::cout << "going to write" << std::endl;                                
      }                                                                            
      std::cout << "Already writing" << std::endl;                                 
    }                                                                              

    void write() {                                                                 
      std::cout << "writing" << std::endl;                                         
      is_writing = true;                                                           
      std::string str = m_queue.front();                                           
      boost::asio::async_write(m_socket,                                           
                               boost::asio::buffer(str.c_str(), str.size()),       
                               boost::bind(&structure::User::sent, this)           
                               );                                                  
    }                                                                              

    void sent() {                                                                  
      std::cout << "sent" << std::endl;                                            
      m_queue.pop();                                                               
      if (!m_queue.empty()) {                                                      
        write();                                                                   
        return;                                                                    
      }                                                                            
      else                                                                         
        is_writing = false;                                                        
      std::cout << "done sent" << std::endl;                                       
    }                                          

  private:                                     
    ssl_socket          m_socket;              
    boost::asio::strand m_strand;              
    std::queue<std::string>     m_queue;       
    bool                        is_writing;    
  };                                           
}                                              

#endif

解决方案

Is there a simple way to avoid this problem ?

Yes, maintain an outgoing queue for each client. Inspect the queue size in the async_write completion handler, if non-zero, start another async_write operation. Here is a sample

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <deque>
#include <iostream>
#include <string>

class Connection
{
public:
    Connection(
            boost::asio::io_service& io_service
            ) :
        _io_service( io_service ),
        _strand( _io_service ),
        _socket( _io_service ),
        _outbox()
    {

    }

    void write( 
            const std::string& message
            )
    {
        _strand.post(
                boost::bind(
                    &Connection::writeImpl,
                    this,
                    message
                    )
                );
    }

private:
    void writeImpl(
            const std::string& message
            )
    {
        _outbox.push_back( message );
        if ( _outbox.size() > 1 ) {
            // outstanding async_write
            return;
        }

        this->write();
    }

    void write()
    {
        const std::string& message = _outbox[0];
        boost::asio::async_write(
                _socket,
                boost::asio::buffer( message.c_str(), message.size() ),
                _strand.wrap(
                    boost::bind(
                        &Connection::writeHandler,
                        this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred
                        )
                    )
                );
    }

    void writeHandler(
            const boost::system::error_code& error,
            const size_t bytesTransferred
            )
    {
        _outbox.pop_front();

        if ( error ) {
            std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
            return;
        }

        if ( !_outbox.empty() ) {
            // more messages to send
            this->write();
        }
    }


private:
    typedef std::deque<std::string> Outbox;

private:
    boost::asio::io_service& _io_service;
    boost::asio::io_service::strand _strand;
    boost::asio::ip::tcp::socket _socket;
    Outbox _outbox;
};

int
main()
{
    boost::asio::io_service io_service;
    Connection foo( io_service );
}

some key points

  • the boost::asio::io_service::strand protects access to Connection::_outbox
  • a handler is dispatched from Connection::write() since it is public

it wasn't obvious to me if you were using similar practices in the example in your question since all methods are public.

这篇关于提高ASIO async_write:如何才能不交错async_write电话?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆