C ++多线程,简单使用者/生产者线程,LIFO,通知,计数器 [英] C++ multithreading, simple consumer / producer threads, LIFO, notification, counter

查看:63
本文介绍了C ++多线程,简单使用者/生产者线程,LIFO,通知,计数器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是多线程编程的新手,我想实现以下功能.

I am new to multi-thread programming, I want to implement the following functionality.

  1. 有2个线程,生产者和消费者.
  2. 消费者仅处理最新值,即后进先出(LIFO).
  3. 生产者有时会以比消费者能更快的更快的速度生成新价值 过程.例如,生产者可能会在1中产生2个新值 毫秒,但处理消费者大约需要5毫秒.
  4. 如果消费者在处理旧的过程中收到新的价值 值,则无需中断.换句话说,消费者将完成当前 首先执行,然后从最新值开始执行.
  1. There are 2 threads, producer and consumer.
  2. Consumer only processes the latest value, i.e., last in first out (LIFO).
  3. Producer sometimes generates new value at a faster rate than consumer can process. For example, producer may generate 2 new value in 1 milli-second, but it approximately takes consumer 5 milli-seconds to process.
  4. If consumer receives a new value in the middle of processing an old value, there is no need to interrupt. In other words, consumer will finish current execution first, then start an execution on the latest value.

这是我的设计过程,如果我做错了,请纠正我.

Here is my design process, please correct me if I am wrong.

  1. 不需要队列,因为只有最新值是 由消费者处理.
  2. 从生产者发送的通知是否自动排队?
  3. 我将改用计数器.
  4. ConsumerThread()最后检查计数器,以确保生产者 不会产生新的价值.
  5. 但是,如果生产者在消费者之前产生了新的价值,会发生什么 进入sleep(),但是在检查了计数器之后??
  1. There is no need for a queue, since only the latest value is processed by consumer.
  2. Is notification sent from producer being queued automatically???
  3. I will use a counter instead.
  4. ConsumerThread() check the counter at the end, to make sure producer doesn't generate new value.
  5. But what happen if producer generates a new value just before consumer goes to sleep(), but after check the counter???

这是一些伪代码.

boost::mutex mutex;
double x;

void ProducerThread() 
{
    {
        boost::scoped_lock lock(mutex);
        x = rand();
        counter++;
    }
    notify(); // wake up consumer thread
}   

void ConsumerThread()
{
    counter = 0; // reset counter, only process the latest value

... do something which takes 5 milli-seconds ...

    if (counter > 0) 
    {
... execute this function again, not too sure how to implement this ...
    } 
    else 
    {
... what happen if producer generates a new value here??? ...
        sleep();
    }
}

谢谢.

推荐答案

如果我正确理解了您的问题,则对于您的特定应用,消费者仅需要处理生产者提供的最新可用价值.换句话说,因为消费者无法跟上生产者的价值,所以价值下降是可以接受的.

If I understood your question correctly, for your particular application, the consumer only needs to process the latest available value provided by the producer. In other words, it's acceptable for values to get dropped because the consumer cannot keep up with the producer.

如果是这样,那么我同意您可以不排队而使用计数器.但是,将需要以原子方式访问共享的计数器和值变量.

If that's the case, then I agree that you can get away without a queue and use a counter. However, the shared counter and value variables will be need to be accessed atomically.

您可以使用 boost::condition_variable 向消费者发出通知,告知已准备好新值.这是一个完整的例子;我让评论做解释.

You can use boost::condition_variable to signal notifications to the consumer that a new value is ready. Here is a complete example; I'll let the comments do the explaining.

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/locks.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>

boost::mutex mutex;
boost::condition_variable condvar;
typedef boost::unique_lock<boost::mutex> LockType;

// Variables that are shared between producer and consumer.
double value = 0;
int count = 0;

void producer()
{
    while (true)
    {
        {
            // value and counter must both be updated atomically
            // using a mutex lock
            LockType lock(mutex);
            value = std::rand();
            ++count;

            // Notify the consumer that a new value is ready.
            condvar.notify_one();
        }

        // Simulate exaggerated 2ms delay
        boost::this_thread::sleep(boost::posix_time::milliseconds(200));
    }
}

void consumer()
{
    // Local copies of 'count' and 'value' variables. We want to do the
    // work using local copies so that they don't get clobbered by
    // the producer when it updates.
    int currentCount = 0;
    double currentValue = 0;

    while (true)
    {
        {
            // Acquire the mutex before accessing 'count' and 'value' variables.
            LockType lock(mutex); // mutex is locked while in this scope
            while (count == currentCount)
            {
                // Wait for producer to signal that there is a new value.
                // While we are waiting, Boost releases the mutex so that
                // other threads may acquire it.
                condvar.wait(lock);
            }

            // `lock` is automatically re-acquired when we come out of
            // condvar.wait(lock). So it's safe to access the 'value'
            // variable at this point.
            currentValue = value; // Grab a copy of the latest value
                                  // while we hold the lock.
        }

        // Now that we are out of the mutex lock scope, we work with our
        // local copy of `value`. The producer can keep on clobbering the
        // 'value' variable all it wants, but it won't affect us here
        // because we are now using `currentValue`.
        std::cout << "value = " << currentValue << "\n";

        // Simulate exaggerated 5ms delay
        boost::this_thread::sleep(boost::posix_time::milliseconds(500));
    }
}

int main()
{
    boost::thread c(&consumer);
    boost::thread p(&producer);
    c.join();
    p.join();
}


附录

我最近在考虑这个问题,并且意识到这种解决方案虽然可行,但并不是最佳选择.您的生产者正在使用所有CPU来丢弃一半的计算值.


ADDENDUM

I was thinking about this question recently, and realized that this solution, while it may work, is not optimal. Your producer is using all that CPU just to throw away half of the computed values.

我建议您重新考虑您的设计,并在生产者和消费者之间使用有界的阻塞队列.这样的队列应具有以下特征:

I suggest that you reconsider your design and go with a bounded blocking queue between the producer and consumer. Such a queue should have the following characteristics:

  • 线程安全
  • 队列的大小固定(有界)
  • 如果消费者要弹出下一个项目,但队列为空,则操作将被阻止,直到生产者通知某项目可用为止.
  • 生产者可以检查是否有空间可以推另一个物品并阻塞直到有足够的空间为止.

使用这种类型的队列,您可以有效地限制生产者,以使其不会超过消费者.它还可以确保生产者不会浪费CPU资源来计算将被丢弃的值.

With this type of queue, you can effectively throttle down the producer so that it doesn't outpace the consumer. It also ensures that the producer doesn't waste CPU resources computing values that will be thrown away.

TBB

Libraries such as TBB and PPL provide implementations of concurrent queues. If you want to attempt to roll your own using std::queue (or boost::circular_buffer) and boost::condition_variable, check out this blogger's example.

这篇关于C ++多线程,简单使用者/生产者线程,LIFO,通知,计数器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆