什么原因导致boost :: coroutine中的随机崩溃? [英] What causes a random crash in boost::coroutine?

查看:382
本文介绍了什么原因导致boost :: coroutine中的随机崩溃?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个多线程应用程序,它使用 boost :: asio boost :: coroutine 通过集成在 boost :: asio 。每个线程都有自己的 io_service 对象。线程之间的唯一共享状态是连接池,当连接从连接池获取或从连接池返回时,用 mutex 锁定连接池。当池中没有足够的连接时,我在池的内部结构中推动无限 asio :: steady_tiemer ,并且异步等待它,并从couroutine函数中产生。当其他线程返回到池的连接时,它检查是否有等待计时器,它从内部结构获取等待计时器,它获得其 io_service 对象并发布一个lambda,唤醒计时器以恢复暂停协同。我在应用程序中有随机崩溃。我尝试用 valgrind 调查问题。它发现一些问题,但我不能理解他们,因为他们发生在 boost :: coroutine boost :: asio 内部。这里是我的代码和从 valgrind 输出的片段。有人可以看到并解释这个问题吗?

I have a multithread application which uses boost::asio and boost::coroutine via its integration in boost::asio. Every thread has its own io_service object. The only shared state between threads are connection pools which are locked with mutex when connection is get or returned from/to the connection pool. When there is not enough connections in the pool I push infinite asio::steady_tiemer in internal structure of the pool and asynchronously waiting on it and I yielding from the couroutine function. When other thread returns connection to the pool it checks whether there is waiting timers, it gets waiting timer from the internal structure, it gets its io_service object and posts a lambda which wakes up the timer to resume the suspended coroutine. I have random crashes in the application. I try to investigate the problem with valgrind. It founds some issues but I cannot understand them because they happen in boost::coroutine and boost::asio internals. Here are fragments from my code and from valgrind output. Can someone see and explain the problem?

这里是调用代码:

template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
    AvlRequestDataList allRequests;
    for(auto& requestContext : avlRequestContexts)
    {
        if(!requestContext.pullProvider || !requestContext.toAskGDS())
            continue;

        auto& requests = requestContext.pullProvider->getRequestsData();
        copy(requests.begin(), requests.end(), back_inserter(allRequests));
    }

    if(allRequests.size() == 0)
        return;

    boost::asio::io_service ioService;
    curl::AsioMultiplexer multiplexer(ioService);

    for(auto& request : allRequests)
    {
        using namespace boost::asio;

        spawn(ioService, [&multiplexer, &request](yield_context yield)
        {
            request->prepare(multiplexer, yield);
        });
    }

    while(true)
    {
        try
        {
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
            ioService.run();
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
            break;
        }
        catch(const std::exception& e)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
        }
        catch(...)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
        }
    }
}

$ c> prepare 在生成的lambda中调用的函数实现:

Here is the prepare function implementation which is called in spawned lambda:

void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
                                 boost::asio::yield_context yield)
{
    auto& ioService = multiplexer.getIoService();
    _connection = _pool.getConnection(ioService, yield);
    _connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);

    multiplexer.addEasyHandle(_connection->getHandle(),
                              [this](const curl::EasyHandleResult& result)
    {
        if(0 == result.responseCode)
            returnQuota();
        VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
        _pool.addConnection(std::move(_connection));
    });
}


void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
                             boost::asio::yield_context yield)
{
    try
    {
        prepareImpl(multiplexer, yield);
    }
    catch(const std::exception& e)
    {
        VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
        returnQuota();
    }
    catch(...)
    {
        VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
        returnQuota();
    }
}

returnQuota 函数是 AvlRequestData 类的纯虚方法及其在 TravelportRequestData 类中的实现我的所有测试如下:

The returnQuota function is pure virtual method of the AvlRequestData class and its implementation for the TravelportRequestData class which is used in all my tests is the following:

void returnQuota() const override
{
    auto& avlQuotaManager = AvlQuotaManager::getInstance();
    avlQuotaManager.consumeQuotaTravelport(-1);
}

这里是 push / strong>连接池的方法。

Here are push and pop methods of the connection pool.

auto AvlConnectionPool::getConnection(
        TimerPtr timer,
        asio::yield_context yield) -> ConnectionPtr
{
    lock_guard<mutex> lock(_mutex);

    while(_connections.empty())
    {
        _timers.emplace_back(timer);
        timer->expires_from_now(
            asio::steady_timer::clock_type::duration::max());

        _mutex.unlock();
        coroutineAsyncWait(*timer, yield);
        _mutex.lock();
    }

    ConnectionPtr connection = std::move(_connections.front());
    _connections.pop_front();

    VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    ++_connectionsGiven;

    return connection;
}

void AvlConnectionPool::addConnection(ConnectionPtr connection,
                                      Side side /* = Back */)
{
    lock_guard<mutex> lock(_mutex);

    if(Front == side)
        _connections.emplace_front(std::move(connection));
    else
        _connections.emplace_back(std::move(connection));

    VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    if(_timers.empty())
        return;

    auto timer = _timers.back();
    _timers.pop_back();

    auto& ioService = timer->get_io_service();
    ioService.post([timer](){ timer->cancel(); });

    VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
                                  % _connectionPoolName));
}

这是 coroutineAsyncWait 的实现。

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
                               boost::asio::yield_context yield)
{
    boost::system::error_code ec;
    timer.async_wait(yield[ec]);
    if(ec && ec != boost::asio::error::operation_aborted)
        throw std::runtime_error(ec.message());
}

最后是 valgrind 输出的第一部分:

And finally the first part of the valgrind output:


== 8189 ==主题41:

== 8189 ==无效读取大小8

== 8189 == at 0x995F84:void boost :: coroutines :: detail :: trampoline_push_void,void,boost :: asio :: detail :: coro_entry_point,void(anonymous namespace):: executeRequests>> :: vector<(anonymous namespace):: AvlRequestContext,std :: allocator<(anonymous namespace):: AvlRequestContext>>&):: {lambda(boost :: asio :: basic_yield_context>)#1} boost :: coroutines :: basic_standard_stack_allocator>>(long)(trampoline_push.hpp:65)

== 8189 ==地址0x2e3b5528不是stack'd,malloc'd或(最近)free'd

==8189== Thread 41:
==8189== Invalid read of size 8
==8189== at 0x995F84: void boost::coroutines::detail::trampoline_push_void, void, boost::asio::detail::coro_entry_point, void (anonymous namespace)::executeRequests > >(std::vector<(anonymous namespace)::AvlRequestContext, std::allocator<(anonymous namespace)::AvlRequestContext> >&)::{lambda(boost::asio::basic_yield_context >)#1}>&, boost::coroutines::basic_standard_stack_allocator > >(long) (trampoline_push.hpp:65)
==8189== Address 0x2e3b5528 is not stack'd, malloc'd or (recently) free'd

当我使用附带调试器的 valgrind 时,会停止 trampoline_push.hpp boost :: coroutine 库中。

When I use valgrind with debugger attached it stops in the following function in trampoline_push.hpp in boost::coroutine library.

53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│     typedef typename Coro::param_type   param_type;
57│
58│     BOOST_ASSERT( vp);
59│
60│     param_type * param(
61│         reinterpret_cast< param_type * >( vp) );
62│     BOOST_ASSERT( 0 != param);
63│
64│     Coro * coro(
65├>        reinterpret_cast< Coro * >( param->coro) );
66│     BOOST_ASSERT( 0 != coro);
67│
68│     coro->run();
69│ }


推荐答案

当对象需要被删除,boost :: asio不适当地处理它没有适当使用shared_ptr和weak_ptr。当崩溃发生时,它们很难调试,因为很难在失败时查看io_service队列正在做什么。

Ultimately I found that when objects need to be deleted, boost::asio doesn't handle it gracefully without proper use of shared_ptr and weak_ptr. When crashes do occur, they are very difficult to debug, because its hard to look into what the io_service queue is doing at the time of failure.

完成完全异步后客户端架构最近和运行到随机崩溃问题,我有一些提示提供。不幸的是,我不知道这些是否会解决您的问题,但希望它能提供一个正确的开始。

After doing a full asynchronous client architecture recently and running into random crashing issues, I have a few tips to offer. Unfortunately, I cannot know whether these will solve your issues, but hopefully it provides a good start in the right direction.


  1. 使用boost :: asio :: asio_handler_invoke而不是io_service.post():

  1. Use boost::asio::asio_handler_invoke instead of io_service.post():


自动& ioService = timer-> get_io_service();

auto& ioService = timer->get_io_service();

ioService.post(timer {timer-> cancel();});

ioService.post(timer{ timer->cancel(); });


$ b b

在协程中使用post / dispatch通常是一个坏主意。当从协同程序调用时,始终使用asio_handler_invoke。然而,在这种情况下,你可以安全地调用 timer-> cancel(),无需将其发布到消息循环。

Using post/dispatch within a coroutine is usually a bad idea. Always use the asio_handler_invoke when you are called from a coroutine. In this case, however, you can probably safely call timer->cancel() without posting it to the message loop anyways.

您的计时器似乎不使用shared_ptr对象。无论在你的应用程序的其余部分是什么,没有办法知道什么时候这些对象应该销毁。我强烈建议对所有的计时器对象使用shared_ptr对象。此外,任何指向类方法的指针也应该使用 shared_from_this()。如果这个被破坏(在堆栈上)或者超出了其他地方,使用一个简单这个在shared_ptr。无论你做什么,不要在对象的构造函数中使用 shared_from_this()

Your timers do not appear to use shared_ptr objects. Regardless of what is going on in the rest of your application, there is no way to know for sure when these objects should be destroyed. I would highly recommend using shared_ptr objects for all of your timer objects. Also, any pointer to class methods should use shared_from_this() as well. Using a plain this can be quite dangerous if this is destructed (on the stack) or goes out of scope somewhere else in a shared_ptr. Whatever you do, do not use shared_from_this() in the constructor of an object!

当正在执行io_service中的处理程序但是处理程序的一部分不再有效时崩溃,这是一个很难调试的问题。被泵入io_service的处理程序对象包括任何指向定时器的指针,或指向可能需要执行处理程序的对象的指针。

If you're getting a crash when a handler within the io_service is being executed, but part of the handler is no longer valid, this is a seriously difficult thing to debug. The handler object that is pumped into the io_service includes any pointers to timers, or pointers to objects that might be necessary to execute the handler.

我强烈建议使用shared_ptr对象包围任何asio类。如果问题消失了,那么它可能的破坏顺序就会出现问题。

I highly recommend going overboard with shared_ptr objects wrapped around any asio classes. If the problem goes away, then its likely order of destruction issues.

这篇关于什么原因导致boost :: coroutine中的随机崩溃?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆