这个boost :: asio和boost :: coroutine使用模式有什么问题? [英] What's wrong with this boost::asio and boost::coroutine usage pattern?

查看:384
本文介绍了这个boost :: asio和boost :: coroutine使用模式有什么问题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题中,我描述了 boost :: asio boost :: coroutine 使用模式,这会导致我的应用程序随机崩溃,我从我的代码和 valgrind GDB 输出。

In this question I described boost::asio and boost::coroutine usage pattern which causes random crashes of my application and I published extract from my code and valgrind and GDB output.

为了进一步调查问题,我创建了较小的概念证明应用程序,它应用相同的模式。我看到同样的问题出现在较小的程序中,我在这里发布的源。

In order to investigate the problem further I created smaller proof of concept application which applies the same pattern. I saw that the same problem arises in the smaller program which source I publish here.

代码启动几个线程,并创建一个连接池与几个虚拟连接提供的数字)。其他参数是无符号整数,它们扮演假请求的角色。 sendRequest 函数的虚拟实现只是启动异步计时器,用于等待输入数字的秒数和 yileds

The code starts a few threads and creates a connection pool with a few dummy connections (user supplied numbers). Additional arguments are unsigned integer numbers which plays the role of fake requests. The dummy implementation of sendRequest function just starts asynchronous timer for waiting number of seconds equal to the input number and yileds from the function.

有人可以看到这个代码的问题,他能提出一些修正吗?

Can someone see the problem with this code and can he propose some fix for it?

#include "asiocoroutineutils.h"
#include "concurrentqueue.h"

#include <iostream>
#include <thread>

#include <boost/lexical_cast.hpp>

using namespace std;
using namespace boost;
using namespace utils;

#define id this_thread::get_id() << ": "

// ---------------------------------------------------------------------------

/*!
 * \brief This is a fake Connection class
 */
class Connection
{
public:
    Connection(unsigned connectionId)
        : _id(connectionId)
    {
    }

    unsigned getId() const
    {
        return _id;
    }

    void sendRequest(asio::io_service& ioService,
                     unsigned seconds,
                     AsioCoroutineJoinerProxy,
                     asio::yield_context yield)
    {
        cout << id << "Connection " << getId()
             << " Start sending: " << seconds << endl;

        // waiting on this timer is palceholder for any asynchronous operation
        asio::steady_timer timer(ioService);
        timer.expires_from_now(chrono::seconds(seconds));
        coroutineAsyncWait(timer, yield);

        cout << id << "Connection " << getId()
             << " Received response: " << seconds << endl;
    }

private:
    unsigned _id;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
typedef std::shared_ptr<asio::steady_timer> TimerPtr;

// ---------------------------------------------------------------------------

class ConnectionPool
{
public:
    ConnectionPool(size_t connectionsCount)
    {
        for(size_t i = 0; i < connectionsCount; ++i)
        {
            cout << "Creating connection: " << i << endl;
            _connections.emplace_back(new Connection(i));
        }
    }

    ConnectionPtr getConnection(TimerPtr timer,
                                asio::yield_context& yield)
    {
        lock_guard<mutex> lock(_mutex);

        while(_connections.empty())
        {
            cout << id << "There is no free connection." << endl;

            _timers.emplace_back(timer);
            timer->expires_from_now(
                asio::steady_timer::clock_type::duration::max());

            _mutex.unlock();
            coroutineAsyncWait(*timer, yield);
            _mutex.lock();

            cout << id << "Connection was freed." << endl;
        }

        cout << id << "Getting connection: "
             << _connections.front()->getId() << endl;

        ConnectionPtr connection = std::move(_connections.front());
        _connections.pop_front();
        return connection;
    }

    void addConnection(ConnectionPtr connection)
    {
        lock_guard<mutex> lock(_mutex);

        cout << id << "Returning connection " << connection->getId()
             << " to the pool." << endl;

        _connections.emplace_back(std::move(connection));

        if(_timers.empty())
            return;

        auto timer = _timers.back();
        _timers.pop_back();
        auto& ioService = timer->get_io_service();

        ioService.post([timer]()
        {
            cout << id << "Wake up waiting getConnection." << endl;
            timer->cancel();
        });
    }

private:
    mutex _mutex;
    deque<ConnectionPtr> _connections;
    deque<TimerPtr> _timers;
};

typedef unique_ptr<ConnectionPool> ConnectionPoolPtr;

// ---------------------------------------------------------------------------

class ScopedConnection
{
public:
    ScopedConnection(ConnectionPool& pool,
                     asio::io_service& ioService,
                     asio::yield_context& yield)
        : _pool(pool)
    {
        auto timer = make_shared<asio::steady_timer>(ioService);
        _connection = _pool.getConnection(timer, yield);
    }

    Connection& get()
    {
        return *_connection;
    }

    ~ScopedConnection()
    {
        _pool.addConnection(std::move(_connection));
    }

private:
    ConnectionPool& _pool;
    ConnectionPtr _connection;
};

// ---------------------------------------------------------------------------

void sendRequest(asio::io_service& ioService,
                 ConnectionPool& pool,
                 unsigned seconds,
                 asio::yield_context yield)
{
    cout << id << "Constructing request ..." << endl;

    AsioCoroutineJoiner joiner(ioService);

    ScopedConnection connection(pool, ioService, yield);

    asio::spawn(ioService, bind(&Connection::sendRequest,
                                connection.get(),
                                std::ref(ioService),
                                seconds,
                                AsioCoroutineJoinerProxy(joiner),
                                placeholders::_1));

    joiner.join(yield);

    cout << id << "Processing response ..." << endl;
}

// ---------------------------------------------------------------------------

void threadFunc(ConnectionPool& pool,
                ConcurrentQueue<unsigned>& requests)
{
    try
    {
        asio::io_service ioService;

        while(true)
        {
            unsigned request;
            if(!requests.tryPop(request))
                break;

            cout << id << "Scheduling request: " << request << endl;

            asio::spawn(ioService, bind(sendRequest,
                                        std::ref(ioService),
                                        std::ref(pool),
                                        request,
                                        placeholders::_1));
        }

        ioService.run();
    }
    catch(const std::exception& e)
    {
        cerr << id << "Error: " << e.what() << endl;
    }
}

// ---------------------------------------------------------------------------

int main(int argc, char* argv[])
{
    if(argc < 3)
    {
        cout << "Usage: ./async_request poolSize threadsCount r0 r1 ..."
             << endl;
        return -1;
    }

    try
    {
        auto poolSize = lexical_cast<size_t>(argv[1]);
        auto threadsCount = lexical_cast<size_t>(argv[2]);

        ConcurrentQueue<unsigned> requests;
        for(int i = 3; i < argc; ++i)
        {
            auto request = lexical_cast<unsigned>(argv[i]);
            requests.tryPush(request);
        }

        ConnectionPoolPtr pool(new ConnectionPool(poolSize));

        vector<unique_ptr<thread>> threads;
        for(size_t i = 0; i < threadsCount; ++i)
        {
            threads.emplace_back(
                new thread(threadFunc, std::ref(*pool), std::ref(requests)));
        }

        for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
    }
    catch(const std::exception& e)
    {
        cerr << "Error: " << e.what() << endl;
    }

    return 0;
}

这里是上面代码使用的一些帮助实用程序:

Here are some helper utilities used by the above code:

#pragma once

#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>

namespace utils
{

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
                               boost::asio::yield_context& yield)
{
    boost::system::error_code ec;
    timer.async_wait(yield[ec]);
    if(ec && ec != boost::asio::error::operation_aborted)
        throw std::runtime_error(ec.message());
}

class AsioCoroutineJoiner
{
public:
    explicit AsioCoroutineJoiner(boost::asio::io_service& io)
        : _timer(io), _count(0) {}

    void join(boost::asio::yield_context yield)
    {
        assert(_count > 0);
        _timer.expires_from_now(
            boost::asio::steady_timer::clock_type::duration::max());
        coroutineAsyncWait(_timer, yield);
    }

    void inc()
    {
        ++_count;
    }

    void dec()
    {
        assert(_count > 0);
        --_count;
        if(0 == _count)
            _timer.cancel();
    }

private:
    boost::asio::steady_timer _timer;
    std::size_t _count;

}; // AsioCoroutineJoiner class

class AsioCoroutineJoinerProxy
{
public:
    AsioCoroutineJoinerProxy(AsioCoroutineJoiner& joiner)
        : _joiner(joiner)
    {
        _joiner.inc();
    }

    AsioCoroutineJoinerProxy(const AsioCoroutineJoinerProxy& joinerProxy)
        : _joiner(joinerProxy._joiner)
    {
        _joiner.inc();
    }

    ~AsioCoroutineJoinerProxy()
    {
        _joiner.dec();
    }

private:
    AsioCoroutineJoiner& _joiner;

}; // AsioCoroutineJoinerProxy class

} // utils namespace

为了完整性代码最后缺少的部分是 ConcurrentQueue 类。在此处粘贴时间太长,但如果您想要,您可以在这里

For completeness of the code the last missing part is ConcurrentQueue class. It is too long to paste it here, but if you want you can find it here.

应用程序的示例用法是:

Example usage of the application is:


./ connectionpooltest 3 3 5 7 8 1 0 9 2 4 3 6

./connectionpooltest 3 3 5 7 8 1 0 9 2 4 3 6

其中第一个数字3是伪连接计数,第二个数字3是使用的线程数。后面的数字是假请求。

where the first number 3 are fake connections count and the second number 3 are the number of used threads. Numbers after them are fake requests.

GDB 的输出与上述相同问题

The output of valgrind and GDB is the same as in the mentioned above question.

使用的 boost 版本为 1.57 。编译器为 GCC 4.8.3 。操作系统是 CentOS Linux版本7.1.1503

Used version of boost is 1.57. The compiler is GCC 4.8.3. The operating system is CentOS Linux release 7.1.1503

推荐答案

似乎所有 valgrind由于 BOOST_USE_VALGRIND 宏未定义为 Tanner Sansbury 在与 / 29180589 / is-it-it-safe-to-use-spawn-direct-in-an-asio-stackfull-coroutine>这个问题。看起来除了这个程序是正确的。

It seems that all valgrind errors are caused because of BOOST_USE_VALGRIND macro is not defined as Tanner Sansbury points in comment related to this question. It seems that except this the program is correct.

这篇关于这个boost :: asio和boost :: coroutine使用模式有什么问题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆