使用boost :: asio时是否需要实现阻塞? [英] Do I need to implement blocking when using boost::asio?

查看:76
本文介绍了使用boost :: asio时是否需要实现阻塞?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是,如果我在多个线程上运行io_service :: run(),是否需要在这些异步函数上实现阻塞?

My question is, if I run io_service::run () on multiple threads, do I need to implement blocking on these asynchronous functions?

示例:

int i = 0;
int j = 0;

void test_timer(boost::system::error_code ec)
{
    //I need to lock up here ?
    if (i++ == 10)
    {
        j = i * 10;
    }
    timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(500));
    timer.async_wait(&test_timer);
}

void threadMain()
{
    io_service.run();
}

int main()
{
    boost::thread_group workers;
    timer.async_wait(&test_timer);

    for (int i = 0; i < 5; i++){
        workers.create_thread(&threadMain);
    }

    io_service.run();
    workers.join_all();
    return 0;
}

推荐答案

异步的定义是它是非阻塞的.

The definition of async is that it is non-blocking.

如果您要问我是否必须同步从不同线程对共享对象的访问",则该问题无关紧要,答案取决于为共享对象记录的线程安全性.

If you mean to ask "do I have to synchronize access to shared objects from different threads" - that question is unrelated and the answer depends on the thread-safety documented for the object you are sharing.

对于Asio,基本上(粗略地概述),您需要将并发访问(从多个线程并发)同步到所有类型,除了boost::asio::io_context¹,².

For Asio, basically (rough summary) you need to synchronize concurrent access (concurrent as in: from multiple threads) to all types except boost::asio::io_context¹,².

您的示例使用多个运行io服务的线程,这意味着处理程序可以在这些线程中的任何一个上运行.这意味着您实际上是在共享全局对象,而实际上他们需要保护.

Your sample uses multiple threads running the io service, meaning handlers run on any of those threads. This means that effectively you're sharing the globals and indeed they need protection.

但是 因为您的应用程序逻辑(异步调用链)指示只有一个操作待处理,而共享计时器对象上的下一个异步操作为总是从该链中进行调度,访问是逻辑上全部来自单个线程(称为隐式链.请参见

However Because your application logic (the async call chain) dictates that only one operation is ever pending, and the next async operation on the shared timer object is always scheduled from within that chain, the access is logically all from a single thread (called an implicit strand. See Why do I need strand per connection when using boost::asio?

最简单的方法:

在Coliru上直播

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>

boost::asio::io_service io_service;
boost::asio::deadline_timer timer { io_service };

struct state_t {
    int i = 0;
    int j = 0;
} state;

void test_timer(boost::system::error_code ec)
{
    if (ec != boost::asio::error::operation_aborted) {
        {
            if (state.i++ == 10) {
                state.j = state.i * 10;
                if (state.j > 100)
                    return; // stop after 5 seconds
            }
        }
        timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(50));
        timer.async_wait(&test_timer);
    }
}

int main()
{
    boost::thread_group workers;
    timer.expires_from_now(boost::posix_time::milliseconds(50));
    timer.async_wait(&test_timer);

    for (int i = 0; i < 5; i++){
        workers.create_thread([] { io_service.run(); });
    }

    workers.join_all();
    std::cout << "i = " << state.i << std::endl;
    std::cout << "j = " << state.j << std::endl;
}

注意,我从主线程中删除了io_service::run(),因为它与join()是多余的(除非您确实想要 6 运行处理程序的线程,而不是5).

Note I removed the io_service::run() from the main thread as it is redundant with the join() (unless you really wanted 6 threads running the handlers, not 5).

打印

i = 11
j = 110

注意

这里潜伏着一个陷阱.说,您不想像我一样以固定的数量保释,但是想要停止,您会很想这样做:

Caveat

There's a pitfall lurking here. Say, you didn't want to bail at a fixed number, like I did, but want to stop, you'd be tempted to do:

timer.cancel();

来自main

.这是合法的,因为deadline_timer对象是 not 线程安全的.您需要

from main. That's not legal, because the deadline_timer object is not thread safe. You'd need to either

  • 使用全局atomic_bool发出终止请求的信号
  • timer.cancel()发布到与计时器异步链相同的 strand 上.但是,只有一个显式链,因此,如果不更改代码以使用显式链,就无法做到这一点.
  • use a global atomic_bool to signal the request for termination
  • post the timer.cancel() on the same strand as the timer async chain. However, there is only an explicit strand, so you can't do it without changing the code to use an explicit strand.

让我们通过拥有两个带有各自隐式链的计时器使事情变得复杂.这意味着仍然不需要同步对计时器实例的访问,但是必须对ij 进行访问.

Let's complicate things by having two timers, with their own implicit strands. This means access to the timer instances still need not be synchronized, but access to i and j does need to be.

注意:在此演示中,我使用synchronized_value<>进行修饰.您可以使用mutexlock_guard手动编写类似的逻辑.

Note In this demo I use synchronized_value<> for elegance. You can write similar logic manually using mutex and lock_guard.

在Coliru上直播

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>

boost::asio::io_service io_service;

struct state {
    int i = 0;
    int j = 0;
};

boost::synchronized_value<state> shared_state;

struct TimerChain {
    boost::asio::deadline_timer _timer;

    TimerChain() : _timer{io_service} {
        _timer.expires_from_now(boost::posix_time::milliseconds(50));
        resume();
    }

    void resume() {
        _timer.async_wait(boost::bind(&TimerChain::test_timer, this, _1));
    };

    void test_timer(boost::system::error_code ec)
    {
        if (ec != boost::asio::error::operation_aborted) {
            {
                auto state = shared_state.synchronize();
                if (state->i++ == 10) {
                    state->j = state->i * 10;
                }
                if (state->j > 100) return; // stop after some iterations
            }
            _timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
            resume();
        }
    }
};

int main()
{
    boost::thread_group workers;
    TimerChain timer1;
    TimerChain timer2;

    for (int i = 0; i < 5; i++){
        workers.create_thread([] { io_service.run(); });
    }

    workers.join_all();
    auto state = shared_state.synchronize();
    std::cout << "i = " << state->i << std::endl;
    std::cout << "j = " << state->j << std::endl;
}

打印

i = 12
j = 110

添加显式子线

现在添加它们非常简单:

Adding The Explicit Strands

Now it's pretty straight-forward to add them:

struct TimerChain {
    boost::asio::io_service::strand _strand;
    boost::asio::deadline_timer _timer;

    TimerChain() : _strand{io_service}, _timer{io_service} {
        _timer.expires_from_now(boost::posix_time::milliseconds(50));
        resume();
    }

    void resume() {
        _timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
    };

    void stop() { // thread safe
        _strand.post([this] { _timer.cancel(); });
    }

    // ...

在Coliru上直播

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>

boost::asio::io_service io_service;

struct state {
    int i = 0;
    int j = 0;
};

boost::synchronized_value<state> shared_state;

struct TimerChain {
    boost::asio::io_service::strand _strand;
    boost::asio::deadline_timer _timer;

    TimerChain() : _strand{io_service}, _timer{io_service} {
        _timer.expires_from_now(boost::posix_time::milliseconds(50));
        resume();
    }

    void resume() {
        _timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
    };

    void stop() { // thread safe
        _strand.post([this] { _timer.cancel(); });
    }

    void test_timer(boost::system::error_code ec)
    {
        if (ec != boost::asio::error::operation_aborted) {
            {
                auto state = shared_state.synchronize();
                if (state->i++ == 10) {
                    state->j = state->i * 10;
                }
            }
            // continue indefinitely
            _timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
            resume();
        }
    }
};

int main()
{
    boost::thread_group workers;
    TimerChain timer1;
    TimerChain timer2;

    for (int i = 0; i < 5; i++){
        workers.create_thread([] { io_service.run(); });
    }

    boost::this_thread::sleep_for(boost::chrono::seconds(10));
    timer1.stop();
    timer2.stop();

    workers.join_all();

    auto state = shared_state.synchronize();
    std::cout << "i = " << state->i << std::endl;
    std::cout << "j = " << state->j << std::endl;
}

打印

i = 400
j = 110


¹(或使用旧名称boost::asio::io_service)

²生命周期突变在这方面不被视为成员操作(即使对于线程安全对象,您也必须手动同步共享对象的构造/破坏)

² lifetime mutations are not considered member operations in this respect (you have to manually synchronize construction/destruction of shared objects even for thread-safe objects)

这篇关于使用boost :: asio时是否需要实现阻塞?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆