使用boost的async_write的异步tcp服务器会导致错误的文件描述符 [英] Asynchronous tcp server using boost's async_write results in bad file descriptor

查看:120
本文介绍了使用boost的async_write的异步tcp服务器会导致错误的文件描述符的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首先,我不是英语为母语的人,所以我可能会犯一些语法错误,对此感到抱歉...

First of all, I'm not a native English speaker, so I would probably make some grammar mistakes, sorry for that...

我正在尝试使用C ++和Boost创建一个异步TCP服务器.我已经成功地接受了客户并从他们那里收到消息,但是我无法回复他们的消息.我要实现的是在TCPServer类上具有一种方法,该方法可答复所有连接的客户端.我已经创建了一个这样做的方法,但是当我调用TCPServer :: write时,我得到了一个错误文件描述符".TcpConnectionHandler :: handle_write错误参数上发生错误.

I'm trying to create an asynchronous TCP server using C++ and Boost. I have had success accepting clients and receiving messages from them, but I can't reply to their messages. What I want to achieve is having a method on the TCPServer class that replies to all the connected clients. I have created a method for doing so, but when I call to TCPServer::write I get a "Bad file descriptor" error on the TcpConnectionHandler::handle_write error argument.

您能帮我弄清楚我做错了什么吗?

Could you help me figuring out what I'm doing wrong?

tcp_server.h

#ifndef TCP_SERVER_
#define TCP_SERVER_

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>

namespace my_project
{
  class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler>
  {
    public:

      TcpConnectionHandler(std::string log_prefix, boost::asio::io_service& io_service, boost::function<void(std::string&)> received_message_callback);

      boost::asio::ip::tcp::socket& socket();

      void start();

      void write(const std::string& message);

    private:

      void writeImpl(const std::string& message);

      void write();

      void handle_read(const boost::system::error_code& error, size_t bytes_transferred);

      void handle_write(const boost::system::error_code& error, size_t bytes_transferred);

      boost::asio::ip::tcp::socket socket_;
      boost::asio::streambuf message_;
      std::string log_prefix_;
      boost::function<void(std::string&)> received_message_callback_;

      std::deque<std::string> outbox_;
      boost::asio::io_service& io_service_;
      boost::asio::io_service::strand strand_;
  };

  class TcpServer
  {
    public:

      TcpServer(std::string log_prefix, unsigned int port, boost::function<void(std::string&)> received_message_callback);
      ~TcpServer();

      void start();

      void write(std::string content);

    private:

      void start_accept();

      void handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, const boost::system::error_code& error);

      boost::shared_ptr<TcpConnectionHandler> connection_;
      boost::asio::io_service io_service_;
      boost::asio::ip::tcp::acceptor acceptor_;
      std::string log_prefix_;
      boost::function<void(std::string&)> received_message_callback_;
      boost::condition_variable connection_cond_;
      boost::mutex connection_mutex_;
      bool client_connected_;
      boost::thread *io_thread_;                     /**< Thread to run boost.asio io_service. */
  };

} // namespace my_project

#endif // #ifndef TCP_SERVER_

tcp_server.cpp

#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"

namespace my_project
{
  // TcpConnectionHandler

  TcpConnectionHandler::TcpConnectionHandler(std::string log_prefix, boost::asio::io_service& io_service, boost::function<void(std::string&)> received_message_callback) : io_service_(io_service), strand_(io_service_), socket_(io_service), outbox_()
  {
    log_prefix_ = log_prefix;
    received_message_callback_ = received_message_callback;
  }

  boost::asio::ip::tcp::socket& TcpConnectionHandler::socket()
  {
    return socket_;
  }

  void TcpConnectionHandler::start()
  {
    async_read_until(socket_,
        message_,
        "\r\n",
        boost::bind(&TcpConnectionHandler::handle_read,
                    shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
  }

  void TcpConnectionHandler::write(const std::string& message)
  {
    strand_.post(boost::bind(&TcpConnectionHandler::writeImpl, this, message));
  }

  void TcpConnectionHandler::writeImpl(const std::string& message)
  {
    outbox_.push_back( message );
    if ( outbox_.size() > 1 ) {
        // outstanding async_write
        return;
    }

    this->write();
  }

  void TcpConnectionHandler::write()
  {
    const std::string& message = outbox_[0];
    boost::asio::async_write(
            socket_,
            boost::asio::buffer( message.c_str(), message.size() ),
            strand_.wrap(
                boost::bind(
                    &TcpConnectionHandler::handle_write,
                    this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred
                    )
                )
            );
  }

  void TcpConnectionHandler::handle_read(const boost::system::error_code& error, size_t bytes_transferred)
  {
    // Check for client disconnection
    if ((boost::asio::error::eof == error) || (boost::asio::error::connection_reset == error))
    {
      //LOG(ERROR) << log_prefix_ << " TCP/IP client disconnected!";
      return;
    }

    // Convert stream to string
    std::istream stream(&message_);
    std::istreambuf_iterator<char> eos;
    std::string message_str(std::istreambuf_iterator<char>(stream), eos);

    //LOG(DEBUG) << log_prefix_ << " communication object received message: " << getPrintableMessage(message_str);

    std::istringstream iss(message_str);

    std::string msg;
    std::getline(iss, msg, '\r'); // Consumes from the streambuf.

    // Discard the rest of the message from buffer
    message_.consume(message_.size());

    if (!error)
    {
      received_message_callback_(msg);
      start();
    }
    else
    {
      // TODO: Handler here the error
    }
  }

  void TcpConnectionHandler::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
  {
    outbox_.pop_front();

    if ( error ) {
        std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
        return;
    }

    if ( !outbox_.empty() ) {
        // more messages to send
        this->write();
    }
  }


  // TcpServer

  TcpServer::TcpServer(std::string log_prefix, unsigned int port, boost::function<void(std::string&)> received_message_callback) : acceptor_(io_service_, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)), client_connected_(false), io_thread_(NULL)
  {

    log_prefix_ = log_prefix;
    received_message_callback_ = received_message_callback;

    start_accept();

    // Run io_service in secondary thread
    io_thread_ = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));

  }

  TcpServer::~TcpServer()
  {
      if (io_thread_)
      {
        io_service_.stop();
        io_thread_->interrupt();
        io_thread_->join();
        delete io_thread_;
      }
  }

  void TcpServer::start()
  {
    // Wait until client is connected to our TCP server. (condition variable)
    boost::unique_lock<boost::mutex> lock(connection_mutex_);

    while (!client_connected_)
    {
      //LOG(INFO) << "Waiting for " << log_prefix_ << " client to establish connection...";

      connection_cond_.wait(lock);
    }

    //LOG(INFO) << log_prefix_ << " client successfully connected.";
  }

  void TcpServer::write(std::string content)
  {
    connection_->write(content);
  }

  void TcpServer::start_accept()
  {
    // Create a new connection handler
    connection_.reset(new TcpConnectionHandler(log_prefix_, acceptor_.get_io_service(), received_message_callback_));

    // Asynchronous accept operation and wait for a new connection.
    acceptor_.async_accept(connection_->socket(),
        boost::bind(&TcpServer::handle_accept, this, connection_,
        boost::asio::placeholders::error));

    //LOG(DEBUG) << log_prefix_ << " communication object started asynchronous TCP/IP connection acceptance.";
  }


  void TcpServer::handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, const boost::system::error_code& error)
  {
    if (!error)
    {
      //LOG(INFO) << log_prefix_ << " client connected!";
      connection->start();
      boost::mutex::scoped_lock lock(connection_mutex_);
      client_connected_ = true;
      connection_cond_.notify_one();
      //LOG(INFO) << log_prefix_ << " client connection accepted";
    }

    start_accept();
  }
}

提前谢谢!

推荐答案

我稍微修改了代码与Boost 1.74.0兼容.

然后我在ASAN上运行它:

Then I ran it with ASAN:

=================================================================
==22695==ERROR: AddressSanitizer: heap-use-after-free on address 0x61b000000080 at pc 0x5571c3b61379 bp 0x7ffddce81980 sp 0x7ffddce81970
READ of size 8 at 0x61b000000080 thread T0
    #0 0x5571c3b61378 in boost::asio::detail::strand_executor_service::strand_impl::~strand_i...
    #1 0x5571c3c02599 in std::_Sp_counted_ptr<boost::asio::detail::strand_executor_service::s...
    #2 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr...
    #3 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /...
    #4 0x5571c3b5ff84 in std::__shared_ptr<boost::asio::detail::strand_executor_service::stra...
    #5 0x5571c3b5ffeb in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
    #6 0x5571c3b7d8d0 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
    #7 0x5571c3c0da6c in void std::destroy_at<boost::asio::strand<boost::asio::execution::any...
    #8 0x5571c3c0b761 in void std::allocator_traits<std::allocator<boost::asio::strand<boost:...
    #9 0x5571c3c01847 in std::_Sp_counted_ptr_inplace<boost::asio::strand<boost::asio::execut...
    #10 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /us...
    #11 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() ...
    #12 0x5571c3b25690 in std::__shared_ptr<void, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr(...
    #13 0x5571c3b256f7 in std::shared_ptr<void>::~shared_ptr() /usr/include/c++/10/bits/share...
    #14 0x5571c3b25769 in boost::asio::execution::detail::any_executor_base::destroy_shared(b...
    #15 0x5571c3b24ef2 in boost::asio::execution::detail::any_executor_base::~any_executor_ba...
    #16 0x5571c3b6a0f7 in boost::asio::execution::any_executor<boost::asio::execution::contex...
    #17 0x5571c3bbf447 in my_project::TcpConnectionHandler::~TcpConnectionHandler() /home/seh...
    #18 0x5571c3bbf4e1 in void boost::checked_delete<my_project::TcpConnectionHandler>(my_pro...
    #19 0x5571c3c020a9 in boost::detail::sp_counted_impl_p<my_project::TcpConnectionHandler>:...
    #20 0x5571c3b5bf6b in boost::detail::sp_counted_base::release() /home/sehe/custom/boost_1...
    #21 0x5571c3b5c537 in boost::detail::shared_count::~shared_count() /home/sehe/custom/boos...
    #22 0x5571c3b6a324 in boost::shared_ptr<my_project::TcpConnectionHandler>::~shared_ptr() ...
    #23 0x5571c3b1d48e in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverfl...
    #24 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
    #25 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)
    #26 0x5571c3b18b99 in _start (/home/sehe/Projects/stackoverflow/sotest+0x182b99)

0x61b000000080 is located 0 bytes inside of 1640-byte region [0x61b000000080,0x61b0000006e8)
freed by thread T0 here:
    #0 0x7ff1f1eb4407 in operator delete(void*, unsigned long) (/usr/lib/x86_64-linux-gnu/lib...
    #1 0x5571c3c003f4 in boost::asio::detail::strand_executor_service::~strand_executor_servi...
    #2 0x5571c3b35e4a in boost::asio::detail::service_registry::destroy(boost::asio::executio...
    #3 0x5571c3b3578b in boost::asio::detail::service_registry::destroy_services() /home/sehe...
    #4 0x5571c3b37a4f in boost::asio::execution_context::destroy() /home/sehe/custom/boost_1_...
    #5 0x5571c3b3788a in boost::asio::execution_context::~execution_context() /home/sehe/cust...
    #6 0x5571c3b54621 in boost::asio::io_context::~io_context() /home/sehe/custom/boost_1_74_...
    #7 0x5571c3b1d43b in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverflo...
    #8 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
    #9 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)

previously allocated by thread T0 here:
    #0 0x7ff1f1eb33a7 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.6+...
    #1 0x5571c3bc0faa in boost::asio::execution_context::service* boost::asio::detail::servic...
    #2 0x5571c3b3623a in boost::asio::detail::service_registry::do_use_service(boost::asio::e...
    #3 0x5571c3bb74f9 in boost::asio::detail::strand_executor_service& boost::asio::detail::s...
    #4 0x5571c3bab8e7 in boost::asio::detail::strand_executor_service& boost::asio::use_servi...
    #5 0x5571c3ba085f in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
    #6 0x5571c3b92744 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
    #7 0x5571c3b7d844 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
    #8 0x5571c3b18e8a in my_project::TcpConnectionHandler::TcpConnectionHandler(std::__cxx11:...
    #9 0x5571c3b1dbe4 in my_project::TcpServer::start_accept() /home/sehe/Projects/stackoverf...
    #10 0x5571c3b1c775 in my_project::TcpServer::TcpServer(std::__cxx11::basic_string<char, s...
    #11 0x5571c3b1e7b9 in main /home/sehe/Projects/stackoverflow/test.cpp:266
    #12 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)

SUMMARY: AddressSanitizer: heap-use-after-free /home/sehe/custom/boost_1_74_0/boost/asio/detail/impl/strand_executor_service.ipp:88 in boost::asio::detail::strand_executor_service::strand_impl::~strand_impl()
Shadow bytes around the buggy address:
  0x0c367fff7fc0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c367fff7fd0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c367fff7fe0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c367fff7ff0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c367fff8000: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
=>0x0c367fff8010:[fd]fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8020: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8030: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8040: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8050: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
  0x0c367fff8060: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
Shadow byte legend (one shadow byte represents 8 application bytes):
  Addressable:           00
  Partially addressable: 01 02 03 04 05 06 07 
  Heap left redzone:       fa
  Freed heap region:       fd
  Stack left redzone:      f1
  Stack mid redzone:       f2
  Stack right redzone:     f3
  Stack after return:      f5
  Stack use after scope:   f8
  Global redzone:          f9
  Global init order:       f6
  Poisoned by user:        f7
  Container overflow:      fc
  Array cookie:            ac
  Intra object redzone:    bb
  ASan internal:           fe
  Left alloca redzone:     ca
  Right alloca redzone:    cb
  Shadow gap:              cc
==22695==ABORTING

如您所见,〜TcpServer 析构函数使连接在 io_service 被销毁后使用.重新排序成员将解决此问题:

As you can see, ~TcpServer destructor causes the connections to use the io_service after it was destroyed. Reordering members will fix this:

boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::io_service io_service_;
boost::asio::ip::tcp::acceptor acceptor_;

应该是

boost::asio::io_service io_service_;
boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::ip::tcp::acceptor acceptor_;

更多

要真正收听,您似乎想/wait/退出服务,而不是 stop() interrupt()线程?>

还请注意,不需要动态分配线程.为什么C ++程序员应尽量减少对"new"的使用?

TcpServer::~TcpServer() {
    if (io_thread_.joinable()) {
        //io_service_.stop();
        //io_thread_.interrupt();
        io_thread_.join();
    }
}

终身问题:shared_from_this?

boost::bind(&TcpConnectionHandler::handle_write, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)

该片段无法捕获共享指针,这导致破坏了 TcpConnectionHandler .(当 socket 被破坏时,它被关闭).实际上,您的问题是 UB ,因为它是免费使用的,但是您很幸运";不会看到崩溃,从而导致您以无效的句柄的形式看到UB.

This piece fails to capture the shared pointer, which leads to TcpConnectionHandler being destructed. (When the socket is destructed, it is closed). Actually, your problem is UB because it's use-after-free but you're "lucky" to not see a crash, which leads you to see the UB in the form of an invalid handle.

将它们都修复两次:

post(executor_,
     boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));

还有

async_read_until(socket_, message_, "\r\n",
     boost::bind(&TcpConnectionHandler::handle_read,
                 shared_from_this(),
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred));

实时演示

我通过以下方法成功地对其进行了测试:

Live Demo

I tested it successfully with the following:

#define BOOST_BIND_NO_PLACEHOLDERS
#ifndef TCP_SERVER_
#define TCP_SERVER_

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <iostream>

namespace my_project {
class TcpConnectionHandler
        : public boost::enable_shared_from_this<TcpConnectionHandler> {
  public:
    TcpConnectionHandler(
        std::string log_prefix, boost::asio::any_io_executor executor,
        boost::function<void(std::string&)> received_message_callback);

    boost::asio::ip::tcp::socket& socket();

    void start();
    void write(const std::string& message);

  private:
    void writeImpl(const std::string& message);
    void write();
    void handle_read(const boost::system::error_code& error,
                     size_t bytes_transferred);
    void handle_write(const boost::system::error_code& error,
                      size_t bytes_transferred);

    boost::asio::any_io_executor executor_;
    boost::asio::ip::tcp::socket socket_;
    boost::asio::streambuf message_;
    std::string log_prefix_;
    boost::function<void(std::string&)> received_message_callback_;

    std::deque<std::string> outbox_;
};

class TcpServer {
  public:
    TcpServer(std::string log_prefix, unsigned int port,
              boost::function<void(std::string&)> received_message_callback);
    ~TcpServer();

    void start();

    void write(std::string content);

  private:
    void start_accept();

    void handle_accept(boost::shared_ptr<TcpConnectionHandler> connection,
                       const boost::system::error_code& error);

    boost::asio::io_service io_service_;
    boost::shared_ptr<TcpConnectionHandler> connection_;
    boost::asio::ip::tcp::acceptor acceptor_;
    std::string log_prefix_;
    boost::function<void(std::string&)> received_message_callback_;
    boost::condition_variable connection_cond_;
    boost::mutex connection_mutex_;
    bool client_connected_;
    boost::thread io_thread_; /**< Thread to run boost.asio io_service. */
};

} // namespace my_project

#endif // #ifndef TCP_SERVER_

//#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"

namespace my_project {
// TcpConnectionHandler
TcpConnectionHandler::TcpConnectionHandler(
    std::string log_prefix, boost::asio::any_io_executor executor,
    boost::function<void(std::string&)> received_message_callback)
    : executor_(make_strand(executor)),
      socket_(executor_),
      log_prefix_(log_prefix),
      received_message_callback_(received_message_callback)
{ }

boost::asio::ip::tcp::socket& TcpConnectionHandler::socket() { return socket_; }

void TcpConnectionHandler::start() {
    async_read_until(socket_, message_, "\r\n",
         boost::bind(&TcpConnectionHandler::handle_read,
                     shared_from_this(),
                     boost::asio::placeholders::error,
                     boost::asio::placeholders::bytes_transferred));
}

void TcpConnectionHandler::write(const std::string& message) {
    post(executor_,
         boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
}

void TcpConnectionHandler::writeImpl(const std::string& message) {
    outbox_.push_back(message);
    if (outbox_.size() > 1) {
        // outstanding async_write
        return;
    }

    write();
}

void TcpConnectionHandler::write() {
    const std::string& message = outbox_[0];
    boost::asio::async_write(
        socket_, boost::asio::buffer(message.c_str(), message.size()),
        boost::asio::bind_executor(
            executor_,
            boost::bind(&TcpConnectionHandler::handle_write, shared_from_this(),
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)));
}

void TcpConnectionHandler::handle_read(const boost::system::error_code& error,
                                       size_t /*bytes_transferred*/) {

    std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
    // Check for client disconnection
    if ((boost::asio::error::eof == error) ||
        (boost::asio::error::connection_reset == error)) {
        // LOG(ERROR) << log_prefix_ << " TCP/IP client disconnected!";
        return;
    }

    // Convert stream to string
    std::istream stream(&message_);
    std::istreambuf_iterator<char> eos;
    std::string message_str(std::istreambuf_iterator<char>(stream), eos);

    // LOG(DEBUG) << log_prefix_ << " communication object received message: "
    // << getPrintableMessage(message_str);

    std::istringstream iss(message_str);

    std::string msg;
    std::getline(iss, msg, '\r'); // Consumes from the streambuf.

    // Discard the rest of the message from buffer
    message_.consume(message_.size());

    if (!error) {
        received_message_callback_(msg);
        start();
    } else {
        // TODO: Handler here the error
    }
}

void TcpConnectionHandler::handle_write(const boost::system::error_code& error,
                                        size_t /*bytes_transferred*/) {
    outbox_.pop_front();

    if (error) {
        std::cerr << "could not write: "
                  << boost::system::system_error(error).what() << std::endl;
        return;
    }

    if (!outbox_.empty()) {
        // more messages to send
        write();
    }
}

// TcpServer

TcpServer::TcpServer(
    std::string log_prefix, unsigned int port,
    boost::function<void(std::string&)> received_message_callback)
        : acceptor_(io_service_, boost::asio::ip::tcp::endpoint(
                                     boost::asio::ip::tcp::v4(), port)),
          client_connected_(false) {

    log_prefix_ = log_prefix;
    received_message_callback_ = received_message_callback;

    start_accept();

    // Run io_service in secondary thread
    io_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
}

TcpServer::~TcpServer() {
    if (io_thread_.joinable()) {
        //io_service_.stop();
        //io_thread_.interrupt();
        io_thread_.join();
    }
}

void TcpServer::start() {
    // Wait until client is connected to our TCP server. (condition variable)
    boost::unique_lock<boost::mutex> lock(connection_mutex_);

    while (!client_connected_) {
        // LOG(INFO) << "Waiting for " << log_prefix_ << " client to establish
        // connection...";

        connection_cond_.wait(lock);
    }

    // LOG(INFO) << log_prefix_ << " client successfully connected.";
}

void TcpServer::write(std::string content) { connection_->write(content); }

void TcpServer::start_accept() {
    // Create a new connection handler
    connection_.reset(new TcpConnectionHandler(
        log_prefix_, acceptor_.get_executor(), received_message_callback_));

    // Asynchronous accept operation and wait for a new connection.
    acceptor_.async_accept(connection_->socket(),
                           boost::bind(&TcpServer::handle_accept, this,
                                       connection_,
                                       boost::asio::placeholders::error));

    // LOG(DEBUG) << log_prefix_ << " communication object started asynchronous
    // TCP/IP connection acceptance.";
}

void TcpServer::handle_accept(
    boost::shared_ptr<TcpConnectionHandler> connection,
    const boost::system::error_code& error) {
    std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
    if (!error) {
        // LOG(INFO) << log_prefix_ << " client connected!";
        connection->start();
        boost::mutex::scoped_lock lock(connection_mutex_);
        client_connected_ = true;
        connection_cond_.notify_one();
        // LOG(INFO) << log_prefix_ << " client connection accepted";
    }

    start_accept();
}
} // namespace my_project

int main() {
    my_project::TcpServer s("demo", 6868, [](std::string& s) {
        std::cout << "Received msg: " << std::quoted(s) << "\n";
    });
}

例如,作为客户

cat test.cpp | netcat -Cw 0 localhost 6868

像打印全部一样

Received msg: "//#define BOOST_BIND_GLOBAL_PLACEHOLDERS"
Received msg: "#include <boost/thread.hpp>"
Received msg: "std::string& message);"
Received msg: "td::string> outbox_;"
Received msg: ":shared_ptr<TcpConnectionHandler> connection_;"
Received msg: "#include \"utils.h\""
Received msg: "ConnectionHandler::start() {"
Received msg: "::writeImpl(const std::string& message) {"
Received msg: "s(),"
Received msg: "       // LOG(ERROR) << log_prefix_ << \" TCP/IP client disconnected!\";"
Received msg: " // Consumes from the streambuf."
Received msg: "e: \""
Received msg: "nt_connected_(false) {"
Received msg: "d to our TCP server. (condition variable)"
Received msg: "ection handler"
Received msg: "nication object started asynchronous"
Received msg: "ond_.notify_one();"

如您所见,您必须修复必须考虑到 bytes_received 的问题,但我现在将其留给您.

As you can see you have to fix that you need to take bytes_received into account, but I'll leave that to you for now.

哦.我注意到另一件事.如果仅打算接受1个连接,则不要重置 connection _ ,因为这意味着 write 会作用于未连接的实例.也许您想保留已连接客户端的列表?

Oh. I noticed another thing. If you intend to only accept 1 connection, don't reset the connection_, because that means that write acts on an unconnected instance. Perhaps you wanted to keep a list of connected clients?

这是一个简单的重做,用 list< weak_ptr< TcpConnectionHandler>替换单个 connection _ 成员.>connections _; .它甚至内置了基本的垃圾收集.

Here's a simple rework to replace the single connection_ member with a list<weak_ptr<TcpConnectionHandler> > connections_;. It even has a basic garbage collection built in.

消息将广播到所有连接的客户端.

Message are broadcast to all connected clients.

在Coliru上直播

Live On Coliru

//#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#define BOOST_BIND_NO_PLACEHOLDERS
#ifndef TCP_SERVER_
#define TCP_SERVER_

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <iostream>

namespace my_project {
    namespace ph = boost::asio::placeholders;
    using boost::system::error_code;
    using boost::asio::ip::tcp;
    using Executor = boost::asio::executor;
    //using Executor = boost::asio::any_io_executor; // boost 1.74.0
    using Callback = boost::function<void(std::string const&)>;

    class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler> {
      public:
        TcpConnectionHandler(Executor executor, Callback callback);

        tcp::socket& socket() { return socket_; }

        void start();
        void write(std::string const& message);

        ~TcpConnectionHandler() { std::cerr << __FUNCTION__ << "\n"; }
      private:
        void writeImpl(const std::string& message);
        void write_loop();
        void handle_read(error_code error, size_t bytes_transferred);
        void handle_write(error_code error, size_t bytes_transferred);

        Executor executor_;
        tcp::socket socket_;
        boost::asio::streambuf message_;
        Callback received_message_callback_;

        std::list<std::string> outbox_;
    };

    using ConnectionPtr = boost::shared_ptr<TcpConnectionHandler>;
    using ConnectionHandle = boost::weak_ptr<TcpConnectionHandler>;

    class TcpServer {
      public:
        TcpServer(unsigned short port, Callback callback);
        ~TcpServer();

        void write(std::string const& content);

      private:
        void start_accept();

        void handle_accept(ConnectionPtr connection,
                           error_code error);

        boost::asio::io_service io_service_;
        std::list<ConnectionHandle> connections_;
        tcp::acceptor acceptor_;
        Callback received_message_callback_;
        boost::thread io_thread_; /**< Thread to run boost.asio io_service. */
    };

} // namespace my_project

#endif // #ifndef TCP_SERVER_

//#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"

namespace my_project {
    // TcpConnectionHandler
    TcpConnectionHandler::TcpConnectionHandler(Executor executor, Callback callback)
        : executor_(make_strand(executor)),
          socket_(executor_),
          received_message_callback_(callback)
    { }

    void TcpConnectionHandler::start() {
        async_read_until(socket_, message_, "\r\n",
             boost::bind(&TcpConnectionHandler::handle_read,
                         shared_from_this(), ph::error, ph::bytes_transferred));
    }

    void TcpConnectionHandler::write(const std::string& message) {
        //std::cerr << __FUNCTION__ << ": " << message.length() << "\n";
        post(executor_, boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
    }

    void TcpConnectionHandler::writeImpl(const std::string& message) {
        outbox_.push_back(message);
        if (outbox_.size() == 1)
            write_loop();
    }

    void TcpConnectionHandler::write_loop() {
        boost::asio::async_write(
            socket_, boost::asio::buffer(outbox_.front()),
            boost::asio::bind_executor(
                executor_,
                boost::bind(&TcpConnectionHandler::handle_write, shared_from_this(),
                            ph::error, ph::bytes_transferred)));
    }

    void TcpConnectionHandler::handle_read(error_code error, size_t bytes_transferred) {
        auto f = buffers_begin(message_.data()), l = f + bytes_transferred;
        message_.consume(bytes_transferred);

        std::istringstream iss(std::string(f, l));
        for (std::string msg; std::getline(iss, msg, '\n');) {
            if (msg.back() == '\r') {
                msg.pop_back();
            }
            received_message_callback_(msg);
        }; // Consumes from the streambuf.


        if (!error) {
            start();
        } else {
            std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
        }
    }

    void TcpConnectionHandler::handle_write(error_code error, size_t /*bytes_transferred*/) {
        outbox_.pop_front();

        if (error) {
            std::cerr << "could not write: " << error.message() << std::endl;
        } else if (!outbox_.empty()) {
            // more messages to send
            write_loop();
        }
    }

    // TcpServer
    TcpServer::TcpServer(unsigned short port, Callback callback)
       : acceptor_(io_service_, { {}, port }),
         received_message_callback_(callback)
    {
        start_accept();
        io_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
    }

    TcpServer::~TcpServer() {
        if (io_thread_.joinable()) {
            //io_service_.stop();
            //io_thread_.interrupt();
            io_thread_.join();
        }
    }

    void TcpServer::write(std::string const& content) {
        for (auto& handle : connections_)
            if (ConnectionPtr con = handle.lock())
                con->write(content);
    }

    void TcpServer::start_accept() {
        // optionally garbage-collect connection handles
        connections_.remove_if(std::mem_fn(&ConnectionHandle::expired));

        // Create a new connection handler
        auto connection_ = boost::make_shared<TcpConnectionHandler>(
            acceptor_.get_executor(), received_message_callback_);

        // Asynchronous accept operation and wait for a new connection.
        acceptor_.async_accept(connection_->socket(),
            boost::bind(&TcpServer::handle_accept, this, connection_, ph::error));
    }

    void TcpServer::handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, error_code error) {
        //std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
        if (!error) {
            connections_.push_back(connection);
            connection->start();
        }

        start_accept();
    }
} // namespace my_project

int main() {
    my_project::TcpServer server(6868, [&server](std::string const& msg) {
        std::cout << "Broadcasting msg: " << std::quoted(msg) << "\n";
        server.write(msg + "\r\n");
    });
}

使用模拟客户端:

(for a in 1 2 3; do echo -e "Foo $a\nBar $a\nQux $a" | nc -Cw1 localhost 6868 | while read line; do echo "Job $a: $line"; done& done;)

在服务器上打印:

Broadcasting msg: "Foo 1"
Broadcasting msg: "Foo 2"
Broadcasting msg: "Bar 1"
Broadcasting msg: "Bar 2"
Broadcasting msg: "Foo 3"
Broadcasting msg: "Qux 1"
Broadcasting msg: "Qux 2"
Broadcasting msg: "Bar 3"
Broadcasting msg: "Qux 3"
handle_read: End of file
~TcpConnectionHandler
handle_read: End of file
~TcpConnectionHandler
handle_read: End of file
~TcpConnectionHandler

然后客户端打印(取决于时间):

And the clients print (depending on timings):

Job 1: Foo 1
Job 2: Bar 1
Job 1: Bar 1
Job 2: Foo 2
Job 1: Foo 2
Job 3: Foo 3
Job 2: Qux 1
Job 1: Qux 1
Job 3: Bar 2
Job 2: Foo 3
Job 1: Foo 3
Job 3: Bar 3
Job 1: Bar 2
Job 2: Bar 2
Job 3: Qux 2
Job 1: Bar 3
Job 2: Bar 3
Job 3: Qux 3
Job 1: Qux 2
Job 2: Qux 2
Job 1: Qux 3
Job 2: Qux 3

这篇关于使用boost的async_write的异步tcp服务器会导致错误的文件描述符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆