使用boost :: asio时是否需要实现阻塞? [英] Do I need to implement blocking when using boost::asio?
问题描述
我的问题是,如果我在多个线程上运行io_service :: run(),是否需要在这些异步函数上实现阻塞?
My question is, if I run io_service::run () on multiple threads, do I need to implement blocking on these asynchronous functions?
示例:
int i = 0;
int j = 0;
void test_timer(boost::system::error_code ec)
{
//I need to lock up here ?
if (i++ == 10)
{
j = i * 10;
}
timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(500));
timer.async_wait(&test_timer);
}
void threadMain()
{
io_service.run();
}
int main()
{
boost::thread_group workers;
timer.async_wait(&test_timer);
for (int i = 0; i < 5; i++){
workers.create_thread(&threadMain);
}
io_service.run();
workers.join_all();
return 0;
}
推荐答案
异步的定义是它是非阻塞的.
The definition of async is that it is non-blocking.
如果您要问我是否必须同步从不同线程对共享对象的访问",则该问题无关紧要,答案取决于为共享对象记录的线程安全性.
If you mean to ask "do I have to synchronize access to shared objects from different threads" - that question is unrelated and the answer depends on the thread-safety documented for the object you are sharing.
对于Asio,基本上(粗略地概述),您需要将并发访问(从多个线程并发)同步到所有类型,除了boost::asio::io_context
¹,².
For Asio, basically (rough summary) you need to synchronize concurrent access (concurrent as in: from multiple threads) to all types except boost::asio::io_context
¹,².
您的示例使用多个运行io服务的线程,这意味着处理程序可以在这些线程中的任何一个上运行.这意味着您实际上是在共享全局对象,而实际上他们需要保护.
Your sample uses multiple threads running the io service, meaning handlers run on any of those threads. This means that effectively you're sharing the globals and indeed they need protection.
但是 因为您的应用程序逻辑(异步调用链)指示只有一个操作待处理,而共享计时器对象上的下一个异步操作为总是从该链中进行调度,访问是逻辑上全部来自单个线程(称为隐式链.请参见
However Because your application logic (the async call chain) dictates that only one operation is ever pending, and the next async operation on the shared timer object is always scheduled from within that chain, the access is logically all from a single thread (called an implicit strand. See Why do I need strand per connection when using boost::asio?
最简单的方法:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
boost::asio::io_service io_service;
boost::asio::deadline_timer timer { io_service };
struct state_t {
int i = 0;
int j = 0;
} state;
void test_timer(boost::system::error_code ec)
{
if (ec != boost::asio::error::operation_aborted) {
{
if (state.i++ == 10) {
state.j = state.i * 10;
if (state.j > 100)
return; // stop after 5 seconds
}
}
timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(50));
timer.async_wait(&test_timer);
}
}
int main()
{
boost::thread_group workers;
timer.expires_from_now(boost::posix_time::milliseconds(50));
timer.async_wait(&test_timer);
for (int i = 0; i < 5; i++){
workers.create_thread([] { io_service.run(); });
}
workers.join_all();
std::cout << "i = " << state.i << std::endl;
std::cout << "j = " << state.j << std::endl;
}
注意,我从主线程中删除了
io_service::run()
,因为它与join()
是多余的(除非您确实想要 6 运行处理程序的线程,而不是5).
Note I removed the
io_service::run()
from the main thread as it is redundant with thejoin()
(unless you really wanted 6 threads running the handlers, not 5).
打印
i = 11
j = 110
注意
这里潜伏着一个陷阱.说,您不想像我一样以固定的数量保释,但是想要停止,您会很想这样做:
Caveat
There's a pitfall lurking here. Say, you didn't want to bail at a fixed number, like I did, but want to stop, you'd be tempted to do:
timer.cancel();
来自main
的
.这是不合法的,因为deadline_timer
对象是 not 线程安全的.您需要
from main
. That's not legal, because the deadline_timer
object is not thread safe. You'd need to either
- 使用全局
atomic_bool
发出终止请求的信号 - 将
timer.cancel()
发布到与计时器异步链相同的 strand 上.但是,只有一个显式链,因此,如果不更改代码以使用显式链,就无法做到这一点.
- use a global
atomic_bool
to signal the request for termination - post the
timer.cancel()
on the same strand as the timer async chain. However, there is only an explicit strand, so you can't do it without changing the code to use an explicit strand.
让我们通过拥有两个带有各自隐式链的计时器使事情变得复杂.这意味着仍然不需要同步对计时器实例的访问,但是必须对i
和j
进行访问.
Let's complicate things by having two timers, with their own implicit strands. This means access to the timer instances still need not be synchronized, but access to i
and j
does need to be.
注意:在此演示中,我使用
synchronized_value<>
进行修饰.您可以使用mutex
和lock_guard
手动编写类似的逻辑.
Note In this demo I use
synchronized_value<>
for elegance. You can write similar logic manually usingmutex
andlock_guard
.
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>
boost::asio::io_service io_service;
struct state {
int i = 0;
int j = 0;
};
boost::synchronized_value<state> shared_state;
struct TimerChain {
boost::asio::deadline_timer _timer;
TimerChain() : _timer{io_service} {
_timer.expires_from_now(boost::posix_time::milliseconds(50));
resume();
}
void resume() {
_timer.async_wait(boost::bind(&TimerChain::test_timer, this, _1));
};
void test_timer(boost::system::error_code ec)
{
if (ec != boost::asio::error::operation_aborted) {
{
auto state = shared_state.synchronize();
if (state->i++ == 10) {
state->j = state->i * 10;
}
if (state->j > 100) return; // stop after some iterations
}
_timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
resume();
}
}
};
int main()
{
boost::thread_group workers;
TimerChain timer1;
TimerChain timer2;
for (int i = 0; i < 5; i++){
workers.create_thread([] { io_service.run(); });
}
workers.join_all();
auto state = shared_state.synchronize();
std::cout << "i = " << state->i << std::endl;
std::cout << "j = " << state->j << std::endl;
}
打印
i = 12
j = 110
添加显式子线
现在添加它们非常简单:
Adding The Explicit Strands
Now it's pretty straight-forward to add them:
struct TimerChain {
boost::asio::io_service::strand _strand;
boost::asio::deadline_timer _timer;
TimerChain() : _strand{io_service}, _timer{io_service} {
_timer.expires_from_now(boost::posix_time::milliseconds(50));
resume();
}
void resume() {
_timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
};
void stop() { // thread safe
_strand.post([this] { _timer.cancel(); });
}
// ...
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>
boost::asio::io_service io_service;
struct state {
int i = 0;
int j = 0;
};
boost::synchronized_value<state> shared_state;
struct TimerChain {
boost::asio::io_service::strand _strand;
boost::asio::deadline_timer _timer;
TimerChain() : _strand{io_service}, _timer{io_service} {
_timer.expires_from_now(boost::posix_time::milliseconds(50));
resume();
}
void resume() {
_timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
};
void stop() { // thread safe
_strand.post([this] { _timer.cancel(); });
}
void test_timer(boost::system::error_code ec)
{
if (ec != boost::asio::error::operation_aborted) {
{
auto state = shared_state.synchronize();
if (state->i++ == 10) {
state->j = state->i * 10;
}
}
// continue indefinitely
_timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
resume();
}
}
};
int main()
{
boost::thread_group workers;
TimerChain timer1;
TimerChain timer2;
for (int i = 0; i < 5; i++){
workers.create_thread([] { io_service.run(); });
}
boost::this_thread::sleep_for(boost::chrono::seconds(10));
timer1.stop();
timer2.stop();
workers.join_all();
auto state = shared_state.synchronize();
std::cout << "i = " << state->i << std::endl;
std::cout << "j = " << state->j << std::endl;
}
打印
i = 400
j = 110
¹(或使用旧名称boost::asio::io_service
)
²生命周期突变在这方面不被视为成员操作(即使对于线程安全对象,您也必须手动同步共享对象的构造/破坏)
² lifetime mutations are not considered member operations in this respect (you have to manually synchronize construction/destruction of shared objects even for thread-safe objects)
这篇关于使用boost :: asio时是否需要实现阻塞?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!