boost :: asio :: async_write-确保只有一个未完成的调用 [英] boost::asio::async_write - ensure only one outstanding call

查看:69
本文介绍了boost :: asio :: async_write-确保只有一个未完成的调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据文档:

程序必须确保该流完成之前,该流不执行其他任何写操作(例如async_write,该流的async_write_some函数或任何其他执行写操作的组合操作)."

"The program must ensure that the stream performs no other write operations (such as async_write, the stream's async_write_some function, or any other composed operations that perform writes) until this operation completes."

这是否意味着,在调用第一个处理程序之前,我不能再次调用boost :: asio :: async_write吗?一个人如何做到这一点并且仍然是异步的?

Does this mean, I cannot call boost::asio::async_write a second time until the handler for the first is called? How does one achieve this and still be asynchronous?

如果我有发送方法:

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
    auto callback = boost::bind(&Connection::OnSend, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred);
    boost::asio::async_write(m_socket, boost::asio::buffer(data), callback);
}

我是否必须将其更改为类似的内容?

Do I have to change it to something like:

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
    // Issue a send
    std::lock_guard<std::mutex> lock(m_numPostedSocketIOMutex);
    ++m_numPostedSocketIO;

    m_numPostedSocketIOConditionVariable.wait(lock, [this]() {return m_numPostedSocketIO == 0; });

    auto callback = boost::bind(&Connection::OnSend, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred);
    boost::asio::async_write(m_socket, boost::asio::buffer(data), callback);
}

如果是的话,那我是不是在第一次通话后再次阻塞?

and if so, then aren't I blocking after the first call again?

推荐答案

这里是一个完整,可编译和经过测试的示例,在阅读了RustyX的答案和后续编辑内容后,我对其进行了研究并通过反复试验进行了研究.

Here is a complete, compilable, and tested, example, that I researched and got to work through trial and error after reading the answer and subsequent edits from RustyX.

Connection.h

#pragma once

#include <boost/asio.hpp>

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>

//--------------------------------------------------------------------
class ConnectionManager;

//--------------------------------------------------------------------
class Connection : public std::enable_shared_from_this<Connection>
{
public:

    typedef std::shared_ptr<Connection> SharedPtr;

    // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
    static Connection::SharedPtr Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket);

    //
    static std::string ErrorCodeToString(const boost::system::error_code & errorCode);

    Connection(const Connection &) = delete;
    Connection(Connection &&) = delete;
    Connection & operator = (const Connection &) = delete;
    Connection & operator = (Connection &&) = delete;
    ~Connection();

    // We have to defer the start until we are fully constructed because we share_from_this()
    void Start();
    void Stop();

    void Send(const std::vector<char> & data);

private:

    static size_t                                           m_nextClientId;

    size_t                                                  m_clientId;
    ConnectionManager *                                     m_owner;
    boost::asio::ip::tcp::socket                            m_socket;
    std::atomic<bool>                                       m_stopped;
    boost::asio::streambuf                                  m_receiveBuffer;
    mutable std::mutex                                      m_sendMutex;
    std::vector<char>                                       m_sendBuffers[2];         // Double buffer
    int                                                     m_activeSendBufferIndex;
    bool                                                    m_sending;

    std::vector<char>                                       m_allReadData;            // Strictly for test purposes

    Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket);

    void DoReceive();
    void DoSend();
};

//--------------------------------------------------------------------

Connection.cpp

#include "Connection.h"
#include "ConnectionManager.h"

#include <boost/bind.hpp>

#include <algorithm>
#include <cstdlib>

//--------------------------------------------------------------------
size_t Connection::m_nextClientId(0);

//--------------------------------------------------------------------
Connection::SharedPtr Connection::Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket)
{
    return Connection::SharedPtr(new Connection(connectionManager, std::move(socket)));
}

//--------------------------------------------------------------------------------------------------
std::string Connection::ErrorCodeToString(const boost::system::error_code & errorCode)
{
    std::ostringstream debugMsg;
    debugMsg << " Error Category: " << errorCode.category().name() << ". "
             << " Error Message: "  << errorCode.message() << ". ";

    // IMPORTANT - These comparisons only work if you dynamically link boost libraries
    //             Because boost chose to implement boost::system::error_category::operator == by comparing addresses
    //             The addresses are different in one library and the other when statically linking.
    //
    // We use make_error_code macro to make the correct category as well as error code value.
    // Error code value is not unique and can be duplicated in more than one category.
    if (errorCode == boost::asio::error::make_error_code(boost::asio::error::connection_refused))
    {
        debugMsg << " (Connection Refused)";
    }
    else if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof))
    {
        debugMsg << " (Remote host has disconnected)";
    }
    else
    {
        debugMsg << " (boost::system::error_code has not been mapped to a meaningful message)";
    }

    return debugMsg.str();
}

//--------------------------------------------------------------------
Connection::Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket)
    :
    m_clientId                          (m_nextClientId++)
  , m_owner                             (connectionManager)
  , m_socket                            (std::move(socket))
  , m_stopped                           (false)
  , m_receiveBuffer                     ()
  , m_sendMutex                         ()
  , m_sendBuffers                       ()
  , m_activeSendBufferIndex             (0)
  , m_sending                           (false)
  , m_allReadData                       ()
{
    printf("Client connection with id %zd has been created.", m_clientId);
}

//--------------------------------------------------------------------
Connection::~Connection()
{
    // Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business
    printf("Client connection with id %zd has been destroyed.", m_clientId);
}

//--------------------------------------------------------------------
void Connection::Start()
{
    DoReceive();
}

//--------------------------------------------------------------------
void Connection::Stop()
{
    // The entire connection class is only kept alive, because it is a shared pointer and always has a ref count
    // as a consequence of the outstanding async receive call that gets posted every time we receive.
    // Once we stop posting another receive in the receive handler and once our owner release any references to
    // us, we will get destroyed.
    m_stopped = true;
    m_owner->OnConnectionClosed(shared_from_this());
}

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
    std::lock_guard<std::mutex> lock(m_sendMutex);

    // Append to the inactive buffer
    std::vector<char> & inactiveBuffer = m_sendBuffers[m_activeSendBufferIndex ^ 1];
    inactiveBuffer.insert(inactiveBuffer.end(), data.begin(), data.end());

    //
    DoSend();
}

//--------------------------------------------------------------------
void Connection::DoSend()
{
    // Check if there is an async send in progress
    // An empty active buffer indicates there is no outstanding send
    if (m_sendBuffers[m_activeSendBufferIndex].empty())
    {
        m_activeSendBufferIndex ^= 1;

        std::vector<char> & activeBuffer = m_sendBuffers[m_activeSendBufferIndex];
        auto self(shared_from_this());

        boost::asio::async_write(m_socket, boost::asio::buffer(activeBuffer),
            [self](const boost::system::error_code & errorCode, size_t bytesTransferred)
            {
                std::lock_guard<std::mutex> lock(self->m_sendMutex);

                self->m_sendBuffers[self->m_activeSendBufferIndex].clear();

                if (errorCode)
                {
                    printf("An error occured while attemping to send data to client id %zd. %s", self->m_clientId, ErrorCodeToString(errorCode).c_str());

                    // An error occurred
                    // We do not stop or close on sends, but instead let the receive error out and then close
                    return;
                }

                // Check if there is more to send that has been queued up on the inactive buffer,
                // while we were sending what was on the active buffer
                if (!self->m_sendBuffers[self->m_activeSendBufferIndex ^ 1].empty())
                {
                    self->DoSend();
                }
            });
    }
}

//--------------------------------------------------------------------
void Connection::DoReceive()
{
    auto self(shared_from_this());

    boost::asio::async_read_until(m_socket, m_receiveBuffer, '#',
        [self](const boost::system::error_code & errorCode, size_t bytesRead)
        {
            if (errorCode)
            {
                // Check if the other side hung up
                if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof))
                {
                    // This is not really an error. The client is free to hang up whenever they like
                    printf("Client %zd has disconnected.", self->m_clientId);
                }
                else
                {
                    printf("An error occured while attemping to receive data from client id %zd. Error Code: %s", self->m_clientId, ErrorCodeToString(errorCode).c_str());
                }

                // Notify our masters that we are ready to be destroyed
                self->m_owner->OnConnectionClosed(self);

                // An error occured
                return;
            }

            // Grab the read data
            std::istream stream(&self->m_receiveBuffer);
            std::string data;
            std::getline(stream, data, '#');
            data += "#";

            printf("Received data from client %zd: %s", self->m_clientId, data.c_str());

            // Issue the next receive
            if (!self->m_stopped)
            {
                self->DoReceive();
            }
        });
}

//--------------------------------------------------------------------

ConnectionManager.h

#pragma once

#include "Connection.h"

// Boost Includes
#include <boost/asio.hpp>

// Standard Includes
#include <thread>
#include <vector>

//--------------------------------------------------------------------
class ConnectionManager
{
public:

    ConnectionManager(unsigned port, size_t numThreads);
    ConnectionManager(const ConnectionManager &) = delete;
    ConnectionManager(ConnectionManager &&) = delete;
    ConnectionManager & operator = (const ConnectionManager &) = delete;
    ConnectionManager & operator = (ConnectionManager &&) = delete;
    ~ConnectionManager();

    void Start();
    void Stop();

    void OnConnectionClosed(Connection::SharedPtr connection);

protected:

    boost::asio::io_service            m_io_service;
    boost::asio::ip::tcp::acceptor     m_acceptor;
    boost::asio::ip::tcp::socket       m_listenSocket;
    std::vector<std::thread>           m_threads;

    mutable std::mutex                 m_connectionsMutex;
    std::vector<Connection::SharedPtr> m_connections;

    boost::asio::deadline_timer        m_timer;

    void IoServiceThreadProc();

    void DoAccept();
    void DoTimer();
};

//--------------------------------------------------------------------

ConnectionManager.cpp

#include "ConnectionManager.h"

#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

#include <system_error>
#include <cstdio>

//------------------------------------------------------------------------------
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads)
    :
    m_io_service  ()
  , m_acceptor    (m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
  , m_listenSocket(m_io_service)
  , m_threads     (numThreads)
  , m_timer       (m_io_service)
{
}

//------------------------------------------------------------------------------
ConnectionManager::~ConnectionManager()
{
    Stop();
}

//------------------------------------------------------------------------------
void ConnectionManager::Start()
{
    if (m_io_service.stopped())
    {
        m_io_service.reset();
    }

    DoAccept();

    for (auto & thread : m_threads)
    {
        if (!thread.joinable())
        {
            thread.swap(std::thread(&ConnectionManager::IoServiceThreadProc, this));
        }
    }

    DoTimer();
}

//------------------------------------------------------------------------------
void ConnectionManager::Stop()
{
    {
        std::lock_guard<std::mutex> lock(m_connectionsMutex);
        m_connections.clear();
    }

    // TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get destroyed?
    //        Because remember they have outstanding ref count to thier shared_ptr in the async handlers
    m_io_service.stop();

    for (auto & thread : m_threads)
    {
        if (thread.joinable())
        {
            thread.join();
        }
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::IoServiceThreadProc()
{
    try
    {
        // Log that we are starting the io_service thread
        {
            printf("io_service socket thread starting.");
        }

        // Run the asynchronous callbacks from the socket on this thread
        // Until the io_service is stopped from another thread
        m_io_service.run();
    }
    catch (std::system_error & e)
    {
        printf("System error caught in io_service socket thread. Error Code: %d", e.code().value());
    }
    catch (std::exception & e)
    {
        printf("Standard exception caught in io_service socket thread. Exception: %s", e.what());
    }
    catch (...)
    {
        printf("Unhandled exception caught in io_service socket thread.");
    }

    {
        printf("io_service socket thread exiting.");
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::DoAccept()
{
    m_acceptor.async_accept(m_listenSocket,
        [this](const boost::system::error_code errorCode)
        {
            if (errorCode)
            {
                printf("An error occured while attemping to accept connections. Error Code: %s", Connection::ErrorCodeToString(errorCode).c_str());
                return;
            }

            // Create the connection from the connected socket
            std::lock_guard<std::mutex> lock(m_connectionsMutex);
            Connection::SharedPtr connection = Connection::Create(this, m_listenSocket);
            m_connections.push_back(connection);
            connection->Start();

            DoAccept();
        });
}

//------------------------------------------------------------------------------
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection)
{
    std::lock_guard<std::mutex> lock(m_connectionsMutex);

    auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection);
    if (itConnection != m_connections.end())
    {
        m_connections.erase(itConnection);
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::DoTimer()
{
    if (!m_io_service.stopped())
    {
        // Send messages every second
        m_timer.expires_from_now(boost::posix_time::seconds(30));
        m_timer.async_wait(
            [this](const boost::system::error_code & errorCode)
            {
                std::lock_guard<std::mutex> lock(m_connectionsMutex);
                for (auto connection : m_connections)
                {
                    connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'});
                }

                DoTimer();
            });
    }
}

main.cpp

#include "ConnectionManager.h"

#include <cstring>
#include <iostream>
#include <string>

int main()
{
    // Start up the server
    ConnectionManager connectionManager(5000, 2);
    connectionManager.Start();

    // Pretend we are doing other things or just waiting for shutdown
    std::this_thread::sleep_for(std::chrono::minutes(5));

    // Stop the server
    connectionManager.Stop();

    return 0;
}

这篇关于boost :: asio :: async_write-确保只有一个未完成的调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆