确保boost :: deadline_timer不接受新的等待,除非先前的等待已过期 [英] Ensure no new wait is accepted by boost::deadline_timer unless previous wait is expired

查看:112
本文介绍了确保boost :: deadline_timer不接受新的等待,除非先前的等待已过期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现由C ++代码控制的硬件设备的同步操作.

假定可以在其中执行Open/Close的两种类型的设备. 我需要实现的是为Specified Duration打开一种类型的设备.第二类设备也是如此.

我已经用boost::deadline_timer编写了代码:

#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>


class Test : public std::enable_shared_from_this <Test>
{
public:
    Test() :io_(), timerOne_(io_),timerTwo_(io_){}
    void Open(int num);
    void Close(int num);
    void TimedOpen(int num, int dur);
    void Run();
private:
    boost::asio::io_service io_;
    boost::asio::deadline_timer timerOne_;
    boost::asio::deadline_timer timerTwo_;
};

void Test::Open(int type)
{
    std::cout << "Open for Number : " << type << std::endl;
}

void Test::Close(int type)
{
    std::cout << "Close for Number : " << type << std::endl;
}

void Test::TimedOpen(int type, int dur)
{
    switch (type)
    {
    case 1:
    {
              io_.reset();
              auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
              fn(type);
              timerOne_.expires_from_now(boost::posix_time::seconds(dur));
              timerOne_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
              Run();
              std::cout << "Function Exiting" << std::endl;
              std::cout << "-----------------------------------------------" << std::endl;
              return;
    }

    case 2:
    {
              io_.reset();
              auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
              fn(type);
              timerTwo_.expires_from_now(boost::posix_time::seconds(dur));
              timerTwo_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
              Run();
              std::cout << "Function Exiting" << std::endl;
              std::cout << "-----------------------------------------------" << std::endl;
              return;
    }

    }

}

void Test::Run()
{
    boost::thread th(boost::bind(&boost::asio::io_service::run, &io_));
}

int main()
{
    auto t = std::make_shared<Test>();
    t->TimedOpen(1, 60);
    t->TimedOpen(2, 30);
    t->TimedOpen(1, 5);
    t->TimedOpen(2, 2);
    char line[128];
    while (std::cin.getline(line, 128))
    {
        if (strcmp(line, "\n")) break;
    }
    return 0;
}

输出为:

Open for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Function Exiting
-----------------------------------------------
Open for Number : 1
Close for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Close for Number : 2
Function Exiting
-----------------------------------------------
Close for Number : 2
Close for Number : 1

对于timerOne_

它不等待先前的wait到期,即,一旦执行t->TimedOpen(1, 5),先前的动作t->TimedOpen(1, 60)被取消.

所以Close for Number : 1出现在输出中,而无需等待t->TimedOpen(1, 60).

我要实现的是,如果multiple waits are encountered对于任何类型的timer,则所有操作都应排队,即

如果我输入:

t->TimedOpen(1, 60);
t->TimedOpen(1, 10);
t->TimedOpen(1, 5);

它应该在60+10+5秒内执行TimedOpen Operation.目前仅执行5秒钟.另外,它应该是非阻塞的,即我不能使用wait() instead of async_wait().

我该如何实现?

摘要: 我的要求是安排对boost::deadline_timer()的操作,即除非先前的等待时间到期,否则对它的多个操作都将排队.

解决方案

就像在评论中提到的那样,您希望每个类型"都有队列.

让每种类型的队列都称为会话".

通过在单个strand¹上链接来自单个队列的所有异步等待,您可以获得有效的序列化(也避免了队列/会话上的同步).

唯一棘手的问题是在没有任何人在飞行时开始异步等待.不变的是,异步操作在iff !queue_.empty()进行中:

struct Session : std::enable_shared_from_this<Session> {
    Session(boost::asio::io_service &io, int type) : strand_(io), timer_(io), type(type) {}

    void Enqueue(int duration) {
        auto This = shared_from_this();
        strand_.post([This,duration,this] { 
                std::cout << "t0 + " << std::setw(4) << mark() << "ms Enqueue for Number: "  << type <<  " (dur:"  << duration       <<  ")\n";
                This->queue_.push(duration);
                if (This->queue_.size() == 1)
                    This->Wait();
            });
    }

  private:
    boost::asio::strand strand_;
    boost::asio::deadline_timer timer_;
    int type;
    std::queue<int> queue_;

    void Close() {
        assert(!queue_.empty());
        std::cout << "t0 + " << std::setw(4) << mark() << "ms Close for Number :  "  << type <<  " (dur:"  << queue_.front() <<  ") (depth " << queue_.size() << ")\n";

        queue_.pop();
        Wait();
    }
    void Wait() {
        if (!queue_.empty()) {
            std::cout << "t0 + " << std::setw(4) << mark() << "ms Open for Number :   "  << type <<  " (dur:"  << queue_.front() <<  ") (depth " << queue_.size() << ")\n";
            timer_.expires_from_now(boost::posix_time::milliseconds(queue_.front()));
            timer_.async_wait(strand_.wrap(std::bind(&Session::Close, shared_from_this())));
        }
    }
};

现在Test类变得更加简单(事实上,它根本不需要共享",但是我把这个细节留给了读者,这是众所周知的练习):

class Test : public std::enable_shared_from_this<Test> {
    using guard = boost::lock_guard<boost::mutex>;
public:
    Test() : io_(), work_(boost::asio::io_service::work(io_)) {
        io_thread = boost::thread([this] { io_.run(); });
    }

    void TimedOpen(int num, int duration);

    void Stop() {
        {
            guard lk(mx_);
            if (work_) work_.reset();
        }
        io_thread.join();
    }

    ~Test() {
        Stop();

        guard lk(mx_);
        timers_ex_.clear();
    }

private:
    mutable boost::mutex mx_;
    boost::asio::io_service io_;
    boost::optional<boost::asio::io_service::work> work_;
    std::map<int, std::shared_ptr<Session> > timers_ex_;
    boost::thread io_thread;
};

void Test::TimedOpen(int type, int duration) {
    guard lk(mx_);

    auto &session = timers_ex_[type];
    if (!session) session = std::make_shared<Session>(io_, type);

    session->Enqueue(duration);
}

如您所见,我已经

  • 推断为任意数量的类型
  • 使操作具有线程安全性
  • 添加了自t0
  • 以来的相对时间戳(以毫秒为单位)
  • 修复了完全损坏的io_service寿命.现在,施工开始了. work_变量在空闲时保持活动状态.
  • Stop()将其关闭(首先耗尽会话队列).
  • 销毁隐式调用Stop()

这是一个测试运行:

在Coliru上直播

int main() {
    auto t = std::make_shared<Test>();
    t->TimedOpen(1, 300);
    t->TimedOpen(2, 150);
    t->TimedOpen(1,  50);
    t->TimedOpen(2,  20);

    boost::this_thread::sleep_for(boost::chrono::milliseconds(400));
    std::cout << "================\n";
    t->TimedOpen(1,  50);
    t->TimedOpen(2,  20);
    t->TimedOpen(1, 300);
    t->TimedOpen(2, 150);

    t->Stop();
}

打印

t0 +    0ms Enqueue for Number: 1 (dur:300)
t0 +    0ms Open for Number :   1 (dur:300) (depth 1)
t0 +    0ms Enqueue for Number: 2 (dur:150)
t0 +    0ms Open for Number :   2 (dur:150) (depth 1)
t0 +    0ms Enqueue for Number: 1 (dur:50)
t0 +    0ms Enqueue for Number: 2 (dur:20)
t0 +  150ms Close for Number :  2 (dur:150) (depth 2)
t0 +  150ms Open for Number :   2 (dur:20) (depth 1)
t0 +  170ms Close for Number :  2 (dur:20) (depth 1)
t0 +  300ms Close for Number :  1 (dur:300) (depth 2)
t0 +  300ms Open for Number :   1 (dur:50) (depth 1)
t0 +  350ms Close for Number :  1 (dur:50) (depth 1)
================
t0 +  400ms Enqueue for Number: 1 (dur:50)
t0 +  400ms Open for Number :   1 (dur:50) (depth 1)
t0 +  400ms Enqueue for Number: 2 (dur:20)
t0 +  400ms Open for Number :   2 (dur:20) (depth 1)
t0 +  400ms Enqueue for Number: 1 (dur:300)
t0 +  400ms Enqueue for Number: 2 (dur:150)
t0 +  420ms Close for Number :  2 (dur:20) (depth 2)
t0 +  420ms Open for Number :   2 (dur:150) (depth 1)
t0 +  450ms Close for Number :  1 (dur:50) (depth 2)
t0 +  450ms Open for Number :   1 (dur:300) (depth 1)
t0 +  570ms Close for Number :  2 (dur:150) (depth 1)
t0 +  750ms Close for Number :  1 (dur:300) (depth 1)


¹为什么我需要使用boost :: asio吗?

I am trying to achieve synchronization operation for hardware devices controlled by my C++ code.

Suppose Two types of devices are there on which I can perform Open/Close. What I need to achieve is Open one type of device for Specified Duration. Same is true for Second type Of device.

I have written code with boost::deadline_timer:

#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>


class Test : public std::enable_shared_from_this <Test>
{
public:
    Test() :io_(), timerOne_(io_),timerTwo_(io_){}
    void Open(int num);
    void Close(int num);
    void TimedOpen(int num, int dur);
    void Run();
private:
    boost::asio::io_service io_;
    boost::asio::deadline_timer timerOne_;
    boost::asio::deadline_timer timerTwo_;
};

void Test::Open(int type)
{
    std::cout << "Open for Number : " << type << std::endl;
}

void Test::Close(int type)
{
    std::cout << "Close for Number : " << type << std::endl;
}

void Test::TimedOpen(int type, int dur)
{
    switch (type)
    {
    case 1:
    {
              io_.reset();
              auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
              fn(type);
              timerOne_.expires_from_now(boost::posix_time::seconds(dur));
              timerOne_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
              Run();
              std::cout << "Function Exiting" << std::endl;
              std::cout << "-----------------------------------------------" << std::endl;
              return;
    }

    case 2:
    {
              io_.reset();
              auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
              fn(type);
              timerTwo_.expires_from_now(boost::posix_time::seconds(dur));
              timerTwo_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
              Run();
              std::cout << "Function Exiting" << std::endl;
              std::cout << "-----------------------------------------------" << std::endl;
              return;
    }

    }

}

void Test::Run()
{
    boost::thread th(boost::bind(&boost::asio::io_service::run, &io_));
}

int main()
{
    auto t = std::make_shared<Test>();
    t->TimedOpen(1, 60);
    t->TimedOpen(2, 30);
    t->TimedOpen(1, 5);
    t->TimedOpen(2, 2);
    char line[128];
    while (std::cin.getline(line, 128))
    {
        if (strcmp(line, "\n")) break;
    }
    return 0;
}

The Output is:

Open for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Function Exiting
-----------------------------------------------
Open for Number : 1
Close for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Close for Number : 2
Function Exiting
-----------------------------------------------
Close for Number : 2
Close for Number : 1

For timerOne_ It does not wait for previous wait to expire i.e. as soon as t->TimedOpen(1, 5) is executed the previous action t->TimedOpen(1, 60) is cancelled.

So Close for Number : 1 appears in output without waiting for t->TimedOpen(1, 60).

What I want to achieve is that if multiple waits are encountered for any type of timer, all the operations should be queued i.e.

If I type:

t->TimedOpen(1, 60);
t->TimedOpen(1, 10);
t->TimedOpen(1, 5);

It should do TimedOpen Operation for 60+10+5 seconds. Currently it does only for 5 secs. Also It should be non blocking i.e. I can not use wait() instead of async_wait().

How do I achieve it?

Summary: My requirement is to schedule operations on a boost::deadline_timer() i.e. multiple operations on it will be queued unless previous wait is expired.

解决方案

Like was mentioned in a comment, you will want to have queues per "type".

Let's name the per-type queue a "session".

By chaining all async waits from a single queue on a single strand¹ you get effective serialization (also avoids synchronization on the queue/session).

The only tricky bit is to start async wait when none is in flight. The invariant is that async operations are in flight iff !queue_.empty():

struct Session : std::enable_shared_from_this<Session> {
    Session(boost::asio::io_service &io, int type) : strand_(io), timer_(io), type(type) {}

    void Enqueue(int duration) {
        auto This = shared_from_this();
        strand_.post([This,duration,this] { 
                std::cout << "t0 + " << std::setw(4) << mark() << "ms Enqueue for Number: "  << type <<  " (dur:"  << duration       <<  ")\n";
                This->queue_.push(duration);
                if (This->queue_.size() == 1)
                    This->Wait();
            });
    }

  private:
    boost::asio::strand strand_;
    boost::asio::deadline_timer timer_;
    int type;
    std::queue<int> queue_;

    void Close() {
        assert(!queue_.empty());
        std::cout << "t0 + " << std::setw(4) << mark() << "ms Close for Number :  "  << type <<  " (dur:"  << queue_.front() <<  ") (depth " << queue_.size() << ")\n";

        queue_.pop();
        Wait();
    }
    void Wait() {
        if (!queue_.empty()) {
            std::cout << "t0 + " << std::setw(4) << mark() << "ms Open for Number :   "  << type <<  " (dur:"  << queue_.front() <<  ") (depth " << queue_.size() << ")\n";
            timer_.expires_from_now(boost::posix_time::milliseconds(queue_.front()));
            timer_.async_wait(strand_.wrap(std::bind(&Session::Close, shared_from_this())));
        }
    }
};

Now the Test class becomes much simpler (in fact it doesn't need to be "shared" at all, but I've left that detail as the proverbial exercise for the reader):

class Test : public std::enable_shared_from_this<Test> {
    using guard = boost::lock_guard<boost::mutex>;
public:
    Test() : io_(), work_(boost::asio::io_service::work(io_)) {
        io_thread = boost::thread([this] { io_.run(); });
    }

    void TimedOpen(int num, int duration);

    void Stop() {
        {
            guard lk(mx_);
            if (work_) work_.reset();
        }
        io_thread.join();
    }

    ~Test() {
        Stop();

        guard lk(mx_);
        timers_ex_.clear();
    }

private:
    mutable boost::mutex mx_;
    boost::asio::io_service io_;
    boost::optional<boost::asio::io_service::work> work_;
    std::map<int, std::shared_ptr<Session> > timers_ex_;
    boost::thread io_thread;
};

void Test::TimedOpen(int type, int duration) {
    guard lk(mx_);

    auto &session = timers_ex_[type];
    if (!session) session = std::make_shared<Session>(io_, type);

    session->Enqueue(duration);
}

As you can see I've

  • extrapolated to any number of types
  • made operations thread-safe
  • added relative timestamps in milliseconds since t0
  • fixed the completely broken io_service lifetime. Now, construction starts the service. The work_ variable keeps it alive when idle.
  • Stop() shuts it down (draining the session queues first).
  • Destruction calls Stop() implicitly

Here's a test run:

Live On Coliru

int main() {
    auto t = std::make_shared<Test>();
    t->TimedOpen(1, 300);
    t->TimedOpen(2, 150);
    t->TimedOpen(1,  50);
    t->TimedOpen(2,  20);

    boost::this_thread::sleep_for(boost::chrono::milliseconds(400));
    std::cout << "================\n";
    t->TimedOpen(1,  50);
    t->TimedOpen(2,  20);
    t->TimedOpen(1, 300);
    t->TimedOpen(2, 150);

    t->Stop();
}

Prints

t0 +    0ms Enqueue for Number: 1 (dur:300)
t0 +    0ms Open for Number :   1 (dur:300) (depth 1)
t0 +    0ms Enqueue for Number: 2 (dur:150)
t0 +    0ms Open for Number :   2 (dur:150) (depth 1)
t0 +    0ms Enqueue for Number: 1 (dur:50)
t0 +    0ms Enqueue for Number: 2 (dur:20)
t0 +  150ms Close for Number :  2 (dur:150) (depth 2)
t0 +  150ms Open for Number :   2 (dur:20) (depth 1)
t0 +  170ms Close for Number :  2 (dur:20) (depth 1)
t0 +  300ms Close for Number :  1 (dur:300) (depth 2)
t0 +  300ms Open for Number :   1 (dur:50) (depth 1)
t0 +  350ms Close for Number :  1 (dur:50) (depth 1)
================
t0 +  400ms Enqueue for Number: 1 (dur:50)
t0 +  400ms Open for Number :   1 (dur:50) (depth 1)
t0 +  400ms Enqueue for Number: 2 (dur:20)
t0 +  400ms Open for Number :   2 (dur:20) (depth 1)
t0 +  400ms Enqueue for Number: 1 (dur:300)
t0 +  400ms Enqueue for Number: 2 (dur:150)
t0 +  420ms Close for Number :  2 (dur:20) (depth 2)
t0 +  420ms Open for Number :   2 (dur:150) (depth 1)
t0 +  450ms Close for Number :  1 (dur:50) (depth 2)
t0 +  450ms Open for Number :   1 (dur:300) (depth 1)
t0 +  570ms Close for Number :  2 (dur:150) (depth 1)
t0 +  750ms Close for Number :  1 (dur:300) (depth 1)


¹ Why do I need strand per connection when using boost::asio?

这篇关于确保boost :: deadline_timer不接受新的等待,除非先前的等待已过期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆