如何将boost io_service与优先级队列一起使用? [英] How to use the boost io_service with a priority queue?

查看:78
本文介绍了如何将boost io_service与优先级队列一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有两个功能的程序.一个是循环计时器,另一个是接收一些套接字.

I have a program that have two function. one is a cycle timer, the other one is receiving some sockets.

我发现,如果在触发计时器之前有一个以上的软件包进入,则boost将运行所有套接字句柄,然后运行计时器句柄.

I found that, if there were more then one packages coming in before the timer tirggered, the boost will run all the socket-handles and then run the timer-handle.

我编写了一个简单的代码来模拟这种计时,如下所示:

I wrote a simple code to simulate this timing like below:

#include <iostream>
#include <memory>

#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/steady_timer.hpp>

std::string get_time()
{
    struct timespec time_spec;
    clock_gettime(CLOCK_REALTIME, &time_spec);
    int h  = (int)(time_spec.tv_sec / 60 / 60 % 24);
    int m  = (int)(time_spec.tv_sec / 60 % 60);
    int s  = (int)(time_spec.tv_sec % 60);
    int ms = (int)(time_spec.tv_nsec / 1000);
    char st[50];
    snprintf(st, 50, "[%02d:%02d:%02d:%06d]", h, m, s, ms);

    return std::string(st);
}

void fA()
{
  std::cout << get_time() << " : fA()" << std::endl;
  boost::this_thread::sleep(boost::posix_time::milliseconds(40));
}

void fB()
{
  std::cout << get_time() << " : fB()" << std::endl;
  boost::this_thread::sleep(boost::posix_time::milliseconds(20));
}

int main(int argc, char *argv[])
{
    boost::asio::io_service io;
    std::shared_ptr<boost::asio::io_service::work> work = std::make_shared<boost::asio::io_service::work>(io);

    std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io);
    std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io);

    std::cout << get_time() << " : start" << std::endl;

    t100ms->expires_from_now(std::chrono::milliseconds(100));
    t80ms->expires_from_now(std::chrono::milliseconds(80));

    t100ms->async_wait([&](const boost::system::error_code &_error) {
        if(_error.value() == boost::system::errc::errc_t::success) {
            std::cout << get_time() << " : t100ms" << std::endl;
        }
    });
    t80ms->async_wait([&](const boost::system::error_code &_error) {
        if(_error.value() == boost::system::errc::errc_t::success) {
            std::cout << get_time() << " : t80ms" << std::endl;
            io.post(fA);
            io.post(fB);
        }
    });

    io.run();

    return 0;
}

此代码的重用为:

[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : fB()
[08:15:40:623186] : t100ms

但是,我想要的结果是:

But, the result I want is :

[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : t100ms
[08:15:40:604037] : fB()

t100ms可以在fA和fB之间运行,该时间比开始的100ms后更接近正确的想要的时间[ 08:15:40:582721 ].

The t100ms could be run between the fA and the fB, which time is more near the correct wantted time [08:15:40:582721] at the 100ms later from the start.

我找到了调用示例,其中提供了优先级队列的示例.

I found a Invocation example, which give an example for a priority queue.

并尝试通过将我的代码添加到此示例中来对其进行修改.

And try to modify it by add my codes into this example.

    ...

    timer.async_wait(pri_queue.wrap(42, middle_priority_handler));


    std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io_service);
    std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io_service);

    std::cout << get_time() << " : start" << std::endl;

    t100ms->expires_from_now(std::chrono::milliseconds(100));
    t80ms->expires_from_now(std::chrono::milliseconds(80));

    t100ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
        if(_error.value() == boost::system::errc::errc_t::success) {
            std::cout << get_time() << " : t100ms" << std::endl;
        }
    }));
    t80ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
        if(_error.value() == boost::system::errc::errc_t::success) {
            std::cout << get_time() << " : t80ms" << std::endl;
            io_service.post(pri_queue.wrap(0, fA));
            io_service.post(pri_queue.wrap(0, fB));
        }
    }));

    while (io_service.run_one())

    ...

但是,结果仍然没有显示出来.如下所示:

But, the result still not shown as my mind. It like below:

[08:30:13:868299] : start
High priority handler
Middle priority handler
Low priority handler
[08:30:13:948437] : t80ms
[08:30:13:948496] : fA()
[08:30:13:988606] : fB()
[08:30:14:008774] : t100ms

我在哪里错了?

推荐答案

处理程序按照其发布的顺序运行.

Handlers are run in the order in which they are posted.

80ms到期后,您将立即发布 fA() fB().当然,它们将首先运行,因为t100ms仍在等待处理.

When the 80ms expire, you immediately post both fA() and fB(). Of course, they will run first because t100ms is still pending.

这是您的示例,但经过简化:

Here's your example but much simplified:

在Coliru上直播

Live On Coliru

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using boost::asio::io_context;
using boost::asio::steady_timer;
using namespace std::chrono_literals;

namespace {
    static auto now = std::chrono::system_clock::now;
    static auto get_time = [start = now()]{
        return "at " + std::to_string((now() - start)/1ms) + "ms:\t";
    };

    void message(std::string msg) {
        std::cout << (get_time() + msg + "\n") << std::flush; // minimize mixing output from threads
    }

    auto make_task = [](auto name, auto duration) {
        return [=] {
            message(name);
            std::this_thread::sleep_for(duration);
        };
    };
}

int main() {
    io_context io;

    message("start");

    steady_timer t100ms(io, 100ms);
    t100ms.async_wait([&](auto ec) {
        message("t100ms " + ec.message());
    });

    steady_timer t80ms(io, 80ms);
    t80ms.async_wait([&](auto ec) {
        message("t80ms " + ec.message());
        post(io, make_task("task A", 40ms));
        post(io, make_task("task B", 20ms));
    });

    io.run();
}

打印

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 120ms:       task B
at 140ms:       t100ms Success

一种方法

假设您确实要计时操作,考虑运行多个线程.经过这三个字的更改,输出为:

One Approach

Assuming you're really trying to time the operation, consider running multiple threads. With this three-word change the output is:

at 1ms: start
at 81ms:    t80ms Success
at 81ms:    task A
at 82ms:    task B
at 101ms:   t100ms Success

要仍然对A和B进行序列化,请通过以下方法将它们张贴在链上:

To serialize A and B still, post them on a strand by changing:

post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));

收件人

auto s = make_strand(io);
post(s, make_task("task A", 40ms));
post(s, make_task("task B", 20ms));

现在打印文件

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 100ms:       t100ms Success
at 120ms:       task B

(下面有完整列表).

当您不想使用线程时(例如,出于简单性/安全性),另一种方法确实需要队列.我会考虑将其写成简单的方式:

The other approach when you do no wish to use threads (for simplicity/safety e.g.), you will indeed require a queue. I'd consider writing it out as simply:

struct Queue {
    template <typename Ctx>
    Queue(Ctx context) : strand(make_strand(context)) {}

    void add(Task f) {
        post(strand, [this, f=std::move(f)] {
            if (tasks.empty())
                run();
            tasks.push_back(std::move(f));
        });
    }

  private:
    boost::asio::any_io_executor strand;
    std::deque<Task> tasks;

    void run() {
        post(strand, [this] { drain_loop(); });
    }

    void drain_loop() {
        if (tasks.empty()) {
            message("queue empty");
        } else {
            tasks.front()(); // invoke task
            tasks.pop_front();
            run();
        }
    }
};

现在,我们可以安全地选择是否在线程上下文中使用它-因为所有队列操作都在一个链上.

Now we can safely choose whether we want it in a threaded context or not - because all queue operations are on a strand.

int main() {
    thread_pool io; // or io_context io;
    Queue tasks(io.get_executor());

    message("start");

    steady_timer t100ms(io, 100ms);
    t100ms.async_wait([&](auto ec) {
        message("t100ms " + ec.message());
    });

    steady_timer t80ms(io, 80ms);
    t80ms.async_wait([&](auto ec) {
        message("t80ms " + ec.message());
        tasks.add(make_task("task A", 40ms));
        tasks.add(make_task("task B", 40ms));
    });

    io.join(); // or io.run()
}

使用 thread_pool io; :

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 100ms:       t100ms Success
at 120ms:       task B
at 160ms:       queue empty

使用 io_context io; (或者当然是 thread_pool io(1); ):

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 120ms:       task B
at 160ms:       t100ms Success
at 160ms:       queue empty

这篇关于如何将boost io_service与优先级队列一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆