正确的清除与暂停协程 [英] Proper cleanup with a suspended coroutine

查看:236
本文介绍了正确的清除与暂停协程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不知道什么清理最好的(干净的,最难把事情弄糟)方法是在这种情况下。

I'm wondering what the best (cleanest, hardest to mess up) method for cleanup is in this situation.

void MyClass::do_stuff(boost::asio::yield_context context) {
  while (running_) {
    uint32_t data = async_buffer->Read(context);
    // do other stuff
  }
}

Read是一个呼叫哪个异步等待直到存在要读出的数据,然后返回该数据。如果我想删除的MyClass这种情况下,我怎么能保证我这样做是否正确?比方说,异步等在这里通过deadline_timer的async_wait执行。如果我取消的情况下,我还是要等待线程执行完其他东西之前,我知道事情是在一个良好的状态(我不能加入线程,因为它是属于IO服务线程是,还可以处理其他的作业)。我可以做这样的事情:

Read is a call which asynchronously waits until there is data to be read, then returns that data. If I want to delete this instance of MyClass, how can I make sure I do so properly? Let's say that the asynchronous wait here is performed via a deadline_timer's async_wait. If I cancel the event, I still have to wait for the thread to finish executing the "other stuff" before I know things are in a good state (I can't join the thread, as it's a thread that belongs to the io service that may also be handling other jobs). I could do something like this:

MyClass::~MyClass() {
  running_ = false;
  read_event->CancelEvent(); // some way to cancel the deadline_timer the Read is waiting on
  boost::mutex::scoped_lock lock(finished_mutex_);
  if (!finished_) {
    cond_.wait(lock);
  }
  // any other cleanup
}

void MyClass::do_stuff(boost::asio::yield_context context) {
  while (running_) {
    uint32_t data = async_buffer->Read(context);
    // do other stuff
  }
  boost::mutex::scoped_lock lock(finished_mutex_);
  finished_ = true;
  cond.notify();
}

但我希望让这些stackful协程尽可能容易使用,它不是简单的让人们认识到存在这种情况,将需要做,以确保一切正确清理的东西。有没有更好的办法?就是我试图在更根本的层面来这里做不对吗?

But I'm hoping to make these stackful coroutines as easy to use as possible, and it's not straightforward for people to recognize that this condition exists and what would need to be done to make sure things are cleaned up properly. Is there a better way? Is what I'm trying to do here wrong at a more fundamental level?

此外,事件(我有什么是基本相同坦纳的回答这里)我需要取消在我不得不保留一些额外的状态的方式(一个真正取消对正常的取消用于触发事件) - 如果有逻辑的多件等待了同一事件这是不恰当的。很想听听是否有与协同程序中使用挂起/恢复一个更好的方式来异步事件模型。

Also, for the event (what I have is basically the same as Tanner's answer here) I need to cancel it in a way that I'd have to keep some extra state (a true cancel vs. the normal cancel used to fire the event) -- which wouldn't be appropriate if there were multiple pieces of logic waiting on that same event. Would love to hear if there's a better way to model the asynchronous event to be used with a coroutine suspend/resume.

感谢。

编辑:谢谢@Sehe,在花了一个工作的例子了一枪,我觉得这说明了什么,我在得到:

Thanks @Sehe, took a shot at a working example, I think this illustrates what I'm getting at:

class AsyncBuffer {
public:
  AsyncBuffer(boost::asio::io_service& io_service) :
  write_event_(io_service) {
    write_event_.expires_at(boost::posix_time::pos_infin);
  }

  void Write(uint32_t data) {
    buffer_.push_back(data);
    write_event_.cancel();
  }

  uint32_t Read(boost::asio::yield_context context) {
    if (buffer_.empty()) {
      write_event_.async_wait(context);
    }
    uint32_t data = buffer_.front();
    buffer_.pop_front();
    return data;
  }

protected:
  boost::asio::deadline_timer write_event_;
  std::list<uint32_t> buffer_;
};

class MyClass {
public:
  MyClass(boost::asio::io_service& io_service) :
      running_(false), io_service_(io_service), buffer_(io_service) {
  }

  void Run(boost::asio::yield_context context) {
    while (running_) {
      boost::system::error_code ec;
      uint32_t data = buffer_.Read(context[ec]);
      // do something with data
    }
  }

  void Write(uint32_t data) {
    buffer_.Write(data);
  }

  void Start() {
    running_ = true;
    boost::asio::spawn(io_service_, boost::bind(&MyClass::Run, this, _1));
  }

protected:
  boost::atomic_bool running_;
  boost::asio::io_service& io_service_;
  AsyncBuffer buffer_;
};

所以在这里,让我们说,缓冲区是空的,MyClass的::运行当前处于暂停状态,而拨打电话来阅读,因此是一个的等待事件触发恢复这方面一个deadline_timer.async_wait。它的时间来破坏的MyClass这种情况下,那么,我们如何确保它被干净地完成。

So here, let's say that the buffer is empty and MyClass::Run is currently suspended while making a call to Read, so there's a deadline_timer.async_wait that's waiting for the event to fire to resume that context. It's time to destroy this instance of MyClass, so how do we make sure that it gets done cleanly.

推荐答案

一个更典型的方法是使用的boost :: enable_shared_from_this MyClass的,并运行方法为绑定到共享指针。

A more typical approach would be to use boost::enable_shared_from_this with MyClass, and run the methods as bound to the shared pointer.

升压绑定支持绑定到的boost :: shared_ptr的&LT; MyClass的方式&gt; 透明

Boost Bind supports binding to boost::shared_ptr<MyClass> transparently.

这样,您就可以自动拥有的析构函数,只有当最后一个用户运行消失

This way, you can automatically have the destructor run only when the last user disappears.

如果您创建一个SSCCE,我很高兴地周围改变它,以示我的意思。

If you create a SSCCE, I'm happy to change it around, to show what I mean.

更新

要在SSCCEE:一些言论:

To the SSCCEE: Some remarks:


  • 我想象中的运行服务的IO线程池

  • 其中
  • 的方法 MyClass的调用到 AsyncBuffer 成员函数直接不是线程安全的。其实没有取消生产者线程外的事件 [1] ,因为制片人已经进入缓冲区 ING。这可以用一个链(在当前的设置我看不出怎么MyClass的可能会是线程安全的)来缓解。另外,看活动对象模式(对此坦纳具有优良的答案 [2] 上SO)。

  • I imagined a pool of threads running the IO service
  • The way in which MyClass calls into AsyncBuffer member functions directly is not threadsafe. There is actually no thread safe way to cancel the event outside the producer thread[1], since the producer already access the buffer for Writeing. This could be mitigated using a strand (in the current setup I don't see how MyClass would likely be threadsafe). Alternatively, look at the active object pattern (for which Tanner has an excellent answer[2] on SO).

我选择的是链的做法在这里,为了简单起见,所以我们做的:

I chose the strand approach here, for simplicity, so we do:

void MyClass::Write(uint32_t data) {
    strand_.post(boost::bind(&AsyncBuffer::Write, &buffer_, data));
}


  • 您问

  • You ask

    此外,事件(我有什么是基本相同坦纳的回答在这里)我要取消它,我不得不保留一些额外的状态的方式(一个真正取消对正常取消用于触发事件)

    该状态的最自然的地方是平时的deadline_timer:它的截止日期的。停止缓冲器由复位计时器完成

    The most natural place for this state is the usual for the deadline_timer: it's deadline. Stopping the buffer is done by resetting the timer:

    void AsyncBuffer::Stop() { // not threadsafe!
        write_event_.expires_from_now(boost::posix_time::seconds(-1));
    }
    

    这一次取消计时器,但由于检测的截止日期是在过去。

    This at once cancels the timer, but is detectable because the deadline is in the past.

    下面是使用AA组IO服务线程,一制片人协同程序产生随机数和一个狙击手线程是斯奈普斯的 MyClass的::运行一个简单的演示在2秒钟后协同程序。主线程是狙击手主题。

    Here's a simple demo with a a group of IO service threads, one "producer coroutine" that produces random numbers and a "sniper thread" that snipes the MyClass::Run coroutine after 2 seconds. The main thread is the sniper thread.

    看它的 <大骨节病> 住在Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/spawn.hpp>
    #include <boost/asio/async_result.hpp>
    #include <boost/bind.hpp>
    #include <boost/thread.hpp>
    #include <boost/atomic.hpp>
    #include <list>
    #include <iostream>
    
    // for refcounting:
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/make_shared.hpp>
    
    namespace asio = boost::asio;
    
    class AsyncBuffer {
        friend class MyClass;
    protected:
        AsyncBuffer(boost::asio::io_service &io_service) : write_event_(io_service) {
            write_event_.expires_at(boost::posix_time::pos_infin);
        }
    
        void Write(uint32_t data) {
            buffer_.push_back(data);
            write_event_.cancel();
        }
    
        uint32_t Read(boost::asio::yield_context context) {
            if (buffer_.empty()) {
                boost::system::error_code ec;
                write_event_.async_wait(context[ec]);
    
                if (ec != boost::asio::error::operation_aborted || write_event_.expires_from_now().is_negative())
                {
                    if (context.ec_)
                        *context.ec_ = boost::asio::error::operation_aborted;
                    return 0;
                }
            }
    
            uint32_t data = buffer_.front();
            buffer_.pop_front();
            return data;
        }
    
        void Stop() {
            write_event_.expires_from_now(boost::posix_time::seconds(-1));
        }
    
    private:
        boost::asio::deadline_timer write_event_;
        std::list<uint32_t> buffer_;
    };
    
    class MyClass : public boost::enable_shared_from_this<MyClass> {
        boost::atomic_bool stopped_;
    public:
        MyClass(boost::asio::io_service &io_service) : stopped_(false), buffer_(io_service), strand_(io_service) {}
    
        void Run(boost::asio::yield_context context) {
            while (!stopped_) {
                boost::system::error_code ec;
    
                uint32_t data = buffer_.Read(context[ec]);
    
                if (ec == boost::asio::error::operation_aborted)
                    break;
    
                // do something with data
                std::cout << data << " " << std::flush;
            }
            std::cout << "EOF\n";
        }
    
        bool Write(uint32_t data) { 
            if (!stopped_) {
                strand_.post(boost::bind(&AsyncBuffer::Write, &buffer_, data));
            }
            return !stopped_;
        }
    
        void Start() {
            if (!stopped_) {
                stopped_ = false;
                boost::asio::spawn(strand_, boost::bind(&MyClass::Run, shared_from_this(), _1));
            }
        }
    
        void Stop() {
            stopped_ = true;
            strand_.post(boost::bind(&AsyncBuffer::Stop, &buffer_));
        }
    
        ~MyClass() { 
            std::cout << "MyClass destructed because no coroutines hold a reference to it anymore\n";
        }
    
    protected:
        AsyncBuffer buffer_;
        boost::asio::strand strand_;
    };
    
    int main()
    {
        boost::thread_group tg;
        asio::io_service svc;
    
        {
            // Start the consumer:
            auto instance = boost::make_shared<MyClass>(svc); 
            instance->Start();
    
            // Sniper in 2 seconds :)
            boost::thread([instance]{ 
                    boost::this_thread::sleep_for(boost::chrono::seconds(2));
                    instance->Stop();
                    }).detach();
    
            // Start the producer:
            auto producer_coro = [instance, &svc](asio::yield_context c) { // a bound function/function object in C++03
                asio::deadline_timer tim(svc);
    
                while (instance->Write(rand())) {
                    tim.expires_from_now(boost::posix_time::milliseconds(200));
                    tim.async_wait(c);
                }
            };
    
            asio::spawn(svc, producer_coro);
    
            // Start the service threads:
            for(size_t i=0; i < boost::thread::hardware_concurrency(); ++i)
                tg.create_thread(boost::bind(&asio::io_service::run, &svc));
        }
    
        // now `instance` is out of scope, it will selfdestruct after the snipe
        // completed
        boost::this_thread::sleep_for(boost::chrono::seconds(3)); // wait longer than the snipe
        std::cout << "This is the main thread _after_ MyClass self-destructed correctly\n";
    
        // cleanup service threads
        tg.join_all();
    }
    


    [1] 逻辑线程,这可能是大干快上不同的线程恢复协程


    [1] logical thread, this could be a coroutine that gets resumed on different threads

    [2] boost::asio并主动对象

    这篇关于正确的清除与暂停协程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆