c ++是否存在多个生产者单消费者无锁队列? [英] Does a multiple producer single consumer lock-free queue exist for c++?

查看:973
本文介绍了c ++是否存在多个生产者单消费者无锁队列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我读的越多,我越来越困惑。我会认为找到一个正式的正确的mpsc队列在c ++中实现是非常简单的。

The more I read the more confused I become... I would have thought it trivial to find a formally correct mpsc queue implemented in c++.

发现另一个刺刀,进一步的研究似乎表明有一些问题,如ABA或其他微妙的种族条件。

Every time I find another stab at it, further research seems to suggest there are issues such as ABA or other subtle race conditions.

很多人谈到垃圾收集的需要。这是我想避免的。

Many talk of the need for garbage collection. This is something I want to avoid.

是否有一个公认的正确的开源实现?

Is there an accepted correct open source implementation out there?

推荐答案

您可能想检查破坏者;它在C ++中可用: http://lmax-exchange.github.io/disruptor/

You may want to check disruptor; it's available in C++ here: http://lmax-exchange.github.io/disruptor/

您还可以找到它的工作原理 here on stackoverflow 基本上它是没有锁定的循环缓冲区,优化为在固定大小的插槽中的线程之间传递FIFO消息。

You can also find explanation how it works here on stackoverflow Basically it's circular buffer with no locking, optimized for passing FIFO messages between threads in a fixed-size slots.

这里有两个我觉得有用的实现:无锁多生产者多消费者队列在环缓冲区@ NatSys实验室。博客

另一个无锁循环数组队列的实现
@ CodeProject

Here are two implementations which I found useful: Lock-free Multi-producer Multi-consumer Queue on Ring Buffer @ NatSys Lab. Blog and
Yet another implementation of a lock-free circular array queue @ CodeProject

注意:

如果你不喜欢google版本的复杂性,这里是类似于我的东西 - 它更简单,但我把它作为一个练习让读者使其工作(它是更大的项目的一部分,目前不可移植)。整个想法是保持数据的cirtular缓冲区和一小组计数器来标识用于写/写和读/读的插槽。由于每个计数器在其自己的高速缓存行中,并且(通常)每个在消息的实时中仅被原子地更新一次,所以它们可以在没有任何同步的情况下被读取。在 post_done 中写线程之间存在一个潜在的争用点,它需要FIFO保证。选择计数器(head_,wrtn_,rdng_,tail_)以确保正确性 FIFO,因此丢弃FIFO也将需要改变计数器(并且在不牺牲正确性的情况下可能难以做到)。对于一个消费者的场景,可以稍微提高性能,但我不会打扰 - 如果找到多个读者的其他用例,则必须撤消它。

If you don't like the complexity of google version, here is something similar from me - it's much simpler, but I leave it as an exercise to the reader to make it work (it's part of larger project, not portable at the moment). The whole idea is to maintain cirtular buffer for data and a small set of counters to identify slots for writing/written and reading/read. Since each counter is in its own cache line, and (normally) each is only atomically updated once in the live of a message, they can all be read without any synchronisation. There is one potential contention point between writing threads in post_done, it's required for FIFO guarantee. Counters (head_, wrtn_, rdng_, tail_) were selected to ensure correctness and FIFO, so dropping FIFO would also require change of counters (and that might be difficult to do without sacrifying correctness). It is possible to slightly improve performance for scenarios with one consumer, but I would not bother - you would have to undo it if other use cases with multiple readers are found.

在我的机器延迟看起来像下面(百分位左边,平均值在这个百分位右边,单位是微秒,由rdtsc测量):

On my machine latency looks like following (percentile on left, mean within this percentile on right, unit is microsecond, measured by rdtsc):

    total=1000000 samples, avg=0.24us
    50%=0.214us, avg=0.093us
    90%=0.23us, avg=0.151us
    99%=0.322us, avg=0.159us
    99.9%=15.566us, avg=0.173us

单轮询消费者,即工作线程在紧环循环中调用wheel.read(),并检查是否不为空(例如滚动到底部)。等待消费者(CPU利用率低得多)会等待事件(获取... 函数),这会由于上下文切换而增加大约1-2us的平均延迟。

These results are for single polling consumer, i.e. worker thread calling wheel.read() in tight loop and checking if not empty (scroll to bottom for example). Waiting consumers (much lower CPU utilization) would wait on event (one of acquire... functions), this adds about 1-2us to average latency due to context switch.

由于阅读中只有极少的争论,消费者在工作者线程数量上表现非常好。我的机器上有3个主题:

Since there is verly little contention on read, consumers scale very well with number of worker threads, e.g. for 3 threads on my machine:

    total=1500000 samples, avg=0.07us
    50%=0us, avg=0us
    90%=0.155us, avg=0.016us
    99%=0.361us, avg=0.038us
    99.9%=8.723us, avg=0.044us

欢迎补丁:)

// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <core/api.hxx>
#include <core/wheel/exception.hxx>

#include <boost/noncopyable.hpp>
#include <boost/type_traits.hpp>
#include <boost/lexical_cast.hpp>
#include <typeinfo>

namespace core { namespace wheel
{
  struct bad_size : core::exception
  {
    template<typename T> explicit bad_size(const T&, size_t m)
      : core::exception(std::string("Slot capacity exceeded, sizeof(")
                  + typeid(T).name()
                  + ") = "
                  + boost::lexical_cast<std::string>(sizeof(T))
                  + ", capacity = "
                  + boost::lexical_cast<std::string>(m)
                  )
    {}
  };        

  // inspired by Disruptor
  template <typename Header>
  class wheel : boost::noncopyable
  {
    __declspec(align(64))
    struct slot_detail
    {
      // slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel)
      // slot read:  (memory barrier in wheel) > read_done > (memory barrier in wheel)

      // done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate
      template <bool Writing>
      void done(wheel* w)
      {
        if (Writing)
          w->post_done(sequence);
        else
          w->read_done();
      }

      // cache line for sequence number and header
      long long sequence;
      Header header;

      // there is no such thing as data type with variable size, but we need it to avoid thrashing
      // cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element.
      // This is well into UB territory! Using template parameter for this is not good, since it
      // results in this small implementation detail leaking to all possible user interfaces.
      __declspec(align(8))
      char data[8];
    };

    // use this as a storage space for slot_detail, to guarantee 64 byte alignment
    _declspec(align(64))
    struct slot_block { long long padding[8]; };

  public:
    // wrap slot data to outside world
    template <bool Writable>
    class slot
    {
      template<typename> friend class wheel;

      slot& operator=(const slot&); // moveable but non-assignable

      // may only be constructed by wheel
      slot(slot_detail* impl, wheel<Header>* w, size_t c)
        : slot_(impl) , wheel_(w) , capacity_(c)
      {}

    public:
      slot(slot&& s)
        : slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_)
      {
        s.slot_ = NULL;
      }

      ~slot()
      {
        if (slot_)
        {
          slot_->done<Writable>(wheel_);
        }
      }

      // slot accessors - use Header to store information on what type is actually stored in data
      bool empty() const          { return !slot_; }
      long long sequence() const  { return slot_->sequence; }
      Header& header()            { return slot_->header; }
      char* data()                { return slot_->data; }

      template <typename T> T& cast()
      {
        static_assert(boost::is_pod<T>::value, "Data type must be POD");
        if (sizeof(T) > capacity_)
          throw bad_size(T(), capacity_);
        if (empty())
          throw no_data();
        return *((T*) data());
      }

    private:
      slot_detail*    slot_;
      wheel<Header>*  wheel_;
      const size_t    capacity_;
    };

  private:
    // dynamic size of slot, with extra capacity, expressed in 64 byte blocks
    static size_t sizeof_slot(size_t s)
    {
      size_t m = sizeof(slot_detail);
      // add capacity less 8 bytes already within sizeof(slot_detail)
      m += max(8, s) - 8;
      // round up to 64 bytes, i.e. alignment of slot_detail
      size_t r = m & ~(unsigned int)63;
      if (r < m)
        r += 64;
      r /= 64;
      return r;
    }

    // calculate actual slot capacity back from number of 64 byte blocks
    static size_t slot_capacity(size_t s)
    {
      return s*64 - sizeof(slot_detail) + 8;
    }

    // round up to power of 2
    static size_t round_size(size_t s)
    {
      // enfore minimum size
      if (s <= min_size)
        return min_size;

      // find rounded value
      --s;
      size_t r = 1;
      while (s)
      {
        s >>= 1;
        r <<= 1;
      };
      return r;
    }

    slot_detail& at(long long sequence)
    {
      // find index from sequence number and return slot at found index of the wheel
      return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]);
    }

  public:
    wheel(size_t capacity, size_t size)
      : head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_()
      , blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size))
    {
      static_assert(boost::is_pod<Header>::value, "Header type must be POD");
      static_assert(sizeof(slot_block) == 64, "This was unexpected");

      wheel_ = new slot_block[size_ * blocks_];
      // all slots must be initialised to 0
      memset(wheel_, 0, size_ * 64 * blocks_);
      active_ = 1;
    }

    ~wheel()
    {
      stop();
      delete[] wheel_;
    }

    // all accessors needed
    size_t capacity() const { return capacity_; }   // capacity of a single slot
    size_t size() const     { return size_; }       // number of slots available
    size_t queue() const    { return (size_t)head_ - (size_t)tail_; }
    bool active() const     { return active_ == 1; }

    // enough to call it just once, to fine tune slot capacity
    template <typename T>
    void check() const
    {
      static_assert(boost::is_pod<T>::value, "Data type must be POD");
      if (sizeof(T) > capacity_)
        throw bad_size(T(), capacity_);
    }

    // stop the wheel - safe to execute many times
    size_t stop()
    {
      InterlockedExchange(&active_, 0);
      // must wait for current read to complete
      while (rdng_ != tail_)
        Sleep(10);

      return size_t(head_ - tail_);
    }

    // return first available slot for write
    slot<true> post()
    {
      if (!active_)
        throw stopped();

      // the only memory barrier on head seq. number we need, if not overflowing
      long long h = InterlockedIncrement64(&head_);
      while(h - (long long) size_ > tail_)
      {
        if (InterlockedDecrement64(&head_) == h - 1)
          throw overflowing();

        // protection against case of race condition when we are overflowing
        // and two or more threads try to post and two or more messages are read,
        // all at the same time. If this happens we must re-try, otherwise we
        // could have skipped a sequence number - causing infinite wait in post_done
        Sleep(0);
        h = InterlockedIncrement64(&head_);
      }

      slot_detail& r = at(h);
      r.sequence = h;

      // wrap in writeable slot
      return slot<true>(&r, this, capacity_);
    }

    // return first available slot for write, nothrow variant
    slot<true> post(std::nothrow_t)
    {
      if (!active_)
        return slot<true>(NULL, this, capacity_);

      // the only memory barrier on head seq. number we need, if not overflowing
      long long h = InterlockedIncrement64(&head_);
      while(h - (long long) size_ > tail_)
      {
        if (InterlockedDecrement64(&head_) == h - 1)
          return slot<true>(NULL, this, capacity_);

        // must retry if race condition described above
        Sleep(0);
        h = InterlockedIncrement64(&head_);
      }

      slot_detail& r = at(h);
      r.sequence = h;

      // wrap in writeable slot
      return slot<true>(&r, this, capacity_);
    }

    // read first available slot for read
    slot<false> read()
    {
      slot_detail* r = NULL;
      // compare rdng_ and wrtn_ early to avoid unnecessary memory barrier
      if (active_ && rdng_ < wrtn_)
      {
        // the only memory barrier on reading seq. number we need
        const long long h = InterlockedIncrement64(&rdng_);
        // check if this slot has been written, step back if not
        if (h > wrtn_)
          InterlockedDecrement64(&rdng_);
        else
          r = &at(h);
      }

      // wrap in readable slot
      return slot<false>(r , this, capacity_);
    }

    // waiting for new post, to be used by non-polling clients
    void acquire()
    {
      event_.acquire();
    }

    bool try_acquire()
    {
      return event_.try_acquire();
    }

    bool try_acquire(unsigned long timeout)
    {
      return event_.try_acquire(timeout);
    }

    void release()
    {}

  private:
    void post_done(long long sequence)
    {
      const long long t = sequence - 1;

      // the only memory barrier on written seq. number we need
      while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t)
        Sleep(0);

      // this is outside of critical path for polling clients
      event_.set();
    }

    void read_done()
    {
      // the only memory barrier on tail seq. number we need
      InterlockedIncrement64(&tail_);
    }

    // each in its own cache line
    // head_ - wrtn_ = no. of messages being written at this moment
    // rdng_ - tail_ = no. of messages being read at the moment
    // head_ - tail_ = no. of messages to read (including those being written and read)
    // wrtn_ - rdng_ = no. of messages to read (excluding those being written or read)
    __declspec(align(64)) volatile long long head_; // currently writing or written
    __declspec(align(64)) volatile long long wrtn_; // written
    __declspec(align(64)) volatile long long rdng_; // currently reading or read
    __declspec(align(64)) volatile long long tail_; // read
    __declspec(align(64)) volatile long active_;    // flag switched to 0 when stopped

    __declspec(align(64))
    api::event event_;          // set when new message is posted
    const size_t blocks_;       // number of 64-byte blocks in a single slot_detail
    const size_t capacity_;     // capacity of data() section per single slot. Initialisation depends on blocks_
    const size_t size_;         // number of slots available, always power of 2
    slot_block* wheel_;
  };
}}

这里是轮询消费者工作线程的样子:

Here is what polling consumer worker thread may look like:

  while (wheel.active())
  {
    core::wheel::wheel<int>::slot<false> slot = wheel.read();
    if (!slot.empty())
    {
      Data& d = slot.cast<Data>();
      // do work
    }
    // uncomment below for waiting consumer, saving CPU cycles
    // else
    //   wheel.try_acquire(10);
  }

已编辑添加的消费者示例

这篇关于c ++是否存在多个生产者单消费者无锁队列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆