如何将boost io_service与优先级队列一起使用? [英] How to use the boost io_service with a priority queue?
问题描述
我有一个具有两个功能的程序.一个是循环计时器,另一个是接收一些套接字.
I have a program that have two function. one is a cycle timer, the other one is receiving some sockets.
我发现,如果在触发计时器之前有一个以上的软件包进入,则boost将运行所有套接字句柄,然后运行计时器句柄.
I found that, if there were more then one packages coming in before the timer tirggered, the boost will run all the socket-handles and then run the timer-handle.
我编写了一个简单的代码来模拟这种计时,如下所示:
I wrote a simple code to simulate this timing like below:
#include <iostream>
#include <memory>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/steady_timer.hpp>
std::string get_time()
{
struct timespec time_spec;
clock_gettime(CLOCK_REALTIME, &time_spec);
int h = (int)(time_spec.tv_sec / 60 / 60 % 24);
int m = (int)(time_spec.tv_sec / 60 % 60);
int s = (int)(time_spec.tv_sec % 60);
int ms = (int)(time_spec.tv_nsec / 1000);
char st[50];
snprintf(st, 50, "[%02d:%02d:%02d:%06d]", h, m, s, ms);
return std::string(st);
}
void fA()
{
std::cout << get_time() << " : fA()" << std::endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(40));
}
void fB()
{
std::cout << get_time() << " : fB()" << std::endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(20));
}
int main(int argc, char *argv[])
{
boost::asio::io_service io;
std::shared_ptr<boost::asio::io_service::work> work = std::make_shared<boost::asio::io_service::work>(io);
std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io);
std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io);
std::cout << get_time() << " : start" << std::endl;
t100ms->expires_from_now(std::chrono::milliseconds(100));
t80ms->expires_from_now(std::chrono::milliseconds(80));
t100ms->async_wait([&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t100ms" << std::endl;
}
});
t80ms->async_wait([&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t80ms" << std::endl;
io.post(fA);
io.post(fB);
}
});
io.run();
return 0;
}
此代码的重用为:
[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : fB()
[08:15:40:623186] : t100ms
但是,我想要的结果是:
But, the result I want is :
[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : t100ms
[08:15:40:604037] : fB()
t100ms可以在fA和fB之间运行,该时间比开始的100ms后更接近正确的想要的时间[ 08:15:40:582721
].
The t100ms could be run between the fA and the fB, which time is more near the correct wantted time [08:15:40:582721
] at the 100ms later from the start.
我找到了调用示例,其中提供了优先级队列的示例.
I found a Invocation example, which give an example for a priority queue.
并尝试通过将我的代码添加到此示例中来对其进行修改.
And try to modify it by add my codes into this example.
...
timer.async_wait(pri_queue.wrap(42, middle_priority_handler));
std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io_service);
std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io_service);
std::cout << get_time() << " : start" << std::endl;
t100ms->expires_from_now(std::chrono::milliseconds(100));
t80ms->expires_from_now(std::chrono::milliseconds(80));
t100ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t100ms" << std::endl;
}
}));
t80ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t80ms" << std::endl;
io_service.post(pri_queue.wrap(0, fA));
io_service.post(pri_queue.wrap(0, fB));
}
}));
while (io_service.run_one())
...
但是,结果仍然没有显示出来.如下所示:
But, the result still not shown as my mind. It like below:
[08:30:13:868299] : start
High priority handler
Middle priority handler
Low priority handler
[08:30:13:948437] : t80ms
[08:30:13:948496] : fA()
[08:30:13:988606] : fB()
[08:30:14:008774] : t100ms
我在哪里错了?
推荐答案
处理程序按照其发布的顺序运行.
Handlers are run in the order in which they are posted.
80ms到期后,您将立即发布 fA()
和 fB()
.当然,它们将首先运行,因为t100ms仍在等待处理.
When the 80ms expire, you immediately post both fA()
and fB()
. Of course, they will run first because t100ms is still pending.
这是您的示例,但经过简化:
Here's your example but much simplified:
Live On Coliru
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using boost::asio::io_context;
using boost::asio::steady_timer;
using namespace std::chrono_literals;
namespace {
static auto now = std::chrono::system_clock::now;
static auto get_time = [start = now()]{
return "at " + std::to_string((now() - start)/1ms) + "ms:\t";
};
void message(std::string msg) {
std::cout << (get_time() + msg + "\n") << std::flush; // minimize mixing output from threads
}
auto make_task = [](auto name, auto duration) {
return [=] {
message(name);
std::this_thread::sleep_for(duration);
};
};
}
int main() {
io_context io;
message("start");
steady_timer t100ms(io, 100ms);
t100ms.async_wait([&](auto ec) {
message("t100ms " + ec.message());
});
steady_timer t80ms(io, 80ms);
t80ms.async_wait([&](auto ec) {
message("t80ms " + ec.message());
post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));
});
io.run();
}
打印
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 120ms: task B
at 140ms: t100ms Success
一种方法
假设您确实要计时操作,考虑运行多个线程.经过这三个字的更改,输出为:
One Approach
Assuming you're really trying to time the operation, consider running multiple threads. With this three-word change the output is:
at 1ms: start
at 81ms: t80ms Success
at 81ms: task A
at 82ms: task B
at 101ms: t100ms Success
要仍然对A和B进行序列化,请通过以下方法将它们张贴在链上:
To serialize A and B still, post them on a strand by changing:
post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));
收件人
auto s = make_strand(io);
post(s, make_task("task A", 40ms));
post(s, make_task("task B", 20ms));
现在打印文件
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 100ms: t100ms Success
at 120ms: task B
(下面有完整列表).
当您不想使用线程时(例如,出于简单性/安全性),另一种方法确实需要队列.我会考虑将其写成简单的方式:
The other approach when you do no wish to use threads (for simplicity/safety e.g.), you will indeed require a queue. I'd consider writing it out as simply:
struct Queue {
template <typename Ctx>
Queue(Ctx context) : strand(make_strand(context)) {}
void add(Task f) {
post(strand, [this, f=std::move(f)] {
if (tasks.empty())
run();
tasks.push_back(std::move(f));
});
}
private:
boost::asio::any_io_executor strand;
std::deque<Task> tasks;
void run() {
post(strand, [this] { drain_loop(); });
}
void drain_loop() {
if (tasks.empty()) {
message("queue empty");
} else {
tasks.front()(); // invoke task
tasks.pop_front();
run();
}
}
};
现在,我们可以安全地选择是否在线程上下文中使用它-因为所有队列操作都在一个链上.
Now we can safely choose whether we want it in a threaded context or not - because all queue operations are on a strand.
int main() {
thread_pool io; // or io_context io;
Queue tasks(io.get_executor());
message("start");
steady_timer t100ms(io, 100ms);
t100ms.async_wait([&](auto ec) {
message("t100ms " + ec.message());
});
steady_timer t80ms(io, 80ms);
t80ms.async_wait([&](auto ec) {
message("t80ms " + ec.message());
tasks.add(make_task("task A", 40ms));
tasks.add(make_task("task B", 40ms));
});
io.join(); // or io.run()
}
使用 thread_pool io;
:
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 100ms: t100ms Success
at 120ms: task B
at 160ms: queue empty
使用 io_context io;
(或者当然是 thread_pool io(1);
):
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 120ms: task B
at 160ms: t100ms Success
at 160ms: queue empty
这篇关于如何将boost io_service与优先级队列一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!