通讯/黑白两个线程在一个常见的​​数据结构。设计问题 [英] Communication b/w two threads over a common datastructure. Design Issue

查看:140
本文介绍了通讯/黑白两个线程在一个常见的​​数据结构。设计问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我现在有两个线程生产者和消费者。生产者是,在一个deque型抗静电容器插入数据的静态方法,并通过通知消费者的boost :: condition_variable 一个对象已插入双端队列对象。那么消费者从双端队列式读取数据和通信使用从container.The两个线程中删除的boost :: condition_variable

下面是正在发生的事情的抽象。这是code,为消费者和生产者

  //静态方法:这是制片人。不同类别使用此方法将数据添加到容器
    无效Ç:: add_data工具(OBJ一)
    {
        尝试
        {
            INT A = MyContainer.size();
            UpdateTextBoxA(当前大小为+一);
            UpdateTextBoxB(运行);
            MyContainer.push_back(一);
            condition_consumer.notify_one(); //这种情况是静态成员
            UpdateTextBoxB(停止);
        }
        赶上(性病::例外急症)
        {
            标准::字符串ERR = e.what();
        }
    } // end方法
    //消费者方法 - 在一个单独的独立线程中运行
    无效Ç:: READ_DATA()
    {
        而(真)
        {
            提高::互斥:: scoped_lock的锁(mutex_c);
            而(MyContainer.size()!= 0)
            {
                尝试
                {
                    物镜一个= MyContainer.front();
                    ....
                    ....
                    ....
                    MyContainer.pop_front();
                }
                赶上(性病::例外急症)
                {
                    标准::字符串ERR = e.what();
                }
            }
            condition_consumer.wait(锁);
        }    } // end方法

现在被插入在 deque的键入对象中的对象是非常快的约500对象second.While运行此我注意到TextBoxB总是以停止,而我相信这是假设的运行和罢之间切换。另外很慢。什么我可能还没有考虑和可能是做错了?任何建议


解决方案

1),你应该做的 MyContainer.push_back(一); 下的互斥体 - 否则,你会得到< STRONG>数据竞争,这是不确定的行为(+你可能需要保护 MyContainer.size(); 按互斥过,这取决于它的类型和C + + ISO /编译器版本使用)。

2)无效Ç:: READ_DATA()应该是:

 无效Ç:: READ_DATA()
{
    scoped_lock的SLOCK(mutex_c);
    而(真)//你可能还需要一些退出条件/机制
    {
        condition_consumer.wait(SLOCK,[&安培] {返回MyContainer.empty();});
        //在该行MyContainer.empty()== false,并且SLOCK被锁定
        //所以你可以从弹出deque的价值
    }
}

3)您与逻辑的混合并发队列逻辑生产/消费。相反,你可以隔离并发队列部分独立的实体:

现场演示

  // C ++ 98
模板&LT; typename的T&GT;
类concurrent_queue
{
    队列&LT; T&GT; q;
    可变互斥米;
    可变condition_variable℃;
上市:
    无效的push(const的T&amp; T公司)
    {
        (lock_guard&所述;互斥&GT(M)),
            q.push(T)
            c.notify_one();
    }
    无效流行(T&放大器;结果)
    {
        unique_lock&LT;&互斥GT; U(M);
        而(q.empty())
            c.wait(U);
        结果= q.front();
        q.pop();
    }
};



  

感谢您的答复。你能解释一下第二个参数的条件等待语句 [&安培] {返回MyContainer.empty();}


有是<第二个版本href=\"http://www.boost.org/doc/libs/1_53_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref.condition_variable.wait_$p$pdicate\"相对=nofollow> condition_variable ::等待这需要predicate作为第二个参数中。它基本上等待而predicate是假的,有助于忽略虚假唤醒,

[&安培] {返回MyContainer.empty();} - 这是的 lambda函数。这是C ++ 11的新功能 - 它可以就地来定义的功能。如果你没有C ++ 11则只是使独立predicate或使用的一个参数版本等待带手动while循环:

 而(MyContainer.empty())condition_consumer.wait(锁定);



  

在你的第三个问题,你建议我应该隔离整个队列,而我加入到队列方法的一个问题是静态的,消费者(读者队列)的一个单独的线程将永远运行下去。你能告诉我这是为什么在我的设计缺陷?


有没有问题始终运行或静态。你甚至可以让静态concurrent_queue&LT; T&GT;成员 - 如果你的设计要求。

缺陷是多线程的同步结合其他方面的工作。但是,当你拥有的 concurrent_queue - 所有的同步隔离里面的那个原始和code这生产/消耗数据没有上锁,并等待污染:

  concurrent_queue&LT; INT&GT; C;
螺纹生产者([放大器;]
{
    的for(int i = 0; i = 100;!++我)
        c.push(ⅰ);
});
螺纹消费([安培]
{
    INT X;
    做{
        c.pop(X);
        性病::法院LT&;&LT;点¯x所述&;&下;的std :: ENDL;
    }而(X = 11!);
});
producer.join();
consumer.join();

正如你所看到的,有 PUSH / POP 和code没有手动同步干净多了。

此外,当您去耦在这样的方式您的组件 - 您可以在隔离测试。此外,他们正变得越来越可重复使用的。

I currently have two threads a producer and a consumer. The producer is a static methods that inserts data in a Deque type static container and informs the consumer through boost::condition_variable that an object has been inserted in the deque object . The consumer then reads data from the Deque type and removes it from the container.The two threads communicate using boost::condition_variable

Here is an abstract of what is happening. This is the code for the consumer and producer

    //Static Method : This is the producer. Different classes add data to the container using this method
    void C::Add_Data(obj a)
    {
        try
        {       
            int a = MyContainer.size();
            UpdateTextBoxA("Current Size is " + a);
            UpdateTextBoxB("Running"); 
            MyContainer.push_back(a);
            condition_consumer.notify_one(); //This condition is static member
            UpdateTextBoxB("Stopped");                 
        }
        catch (std::exception& e)
        {
            std::string err = e.what();
        }
    }//end method


    //Consumer Method - Runs in a separate independent thread
    void C::Read_Data()
    {
        while(true)
        {
            boost::mutex::scoped_lock lock(mutex_c);
            while(MyContainer.size()!=0)
            {
                try
                {
                    obj a = MyContainer.front();
                    ....
                    ....
                    ....
                    MyContainer.pop_front();
                }
                catch (std::exception& e)
                {
                    std::string err = e.what();
                }
            }
            condition_consumer.wait(lock);
        }

    }//end method

Now the objects being inserted in the Deque type object are very fast about 500 objects a second.While running this I noticed that TextBoxB was always at "Stopped" while I believe it was suppose to toggle between "Running" and "Stoped". Plus very slow. Any suggestions on what I might have not considered and might be doing wrong ?

解决方案

1) You should do MyContainer.push_back(a); under mutex - otherwise you would get data race, which is undefined behaviour (+ you may need to protect MyContainer.size(); by mutex too, depending on it's type and C++ISO/Compiler version you use).

2) void C::Read_Data() should be:

void C::Read_Data()
{
    scoped_lock slock(mutex_c);
    while(true) // you may also need some exit condition/mechanism
    {
        condition_consumer.wait(slock,[&]{return !MyContainer.empty();});
        // at this line MyContainer.empty()==false and slock is locked
        // so you may pop value from deque
    }
}

3) You are mixing logic of concurrent queue with logic of producing/consuming. Instead you may isolate concurrent queue part to stand-alone entity:

LIVE DEMO

// C++98
template<typename T>
class concurrent_queue
{
    queue<T> q;
    mutable mutex m;
    mutable condition_variable c;
public:
    void push(const T &t)
    {
        (lock_guard<mutex>(m)),
            q.push(t),
            c.notify_one();
    }
    void pop(T &result)
    {
        unique_lock<mutex> u(m);
        while(q.empty())
            c.wait(u);
        result = q.front();
        q.pop();
    }
};


Thanks for your reply. Could you explain the second parameter in the conditional wait statement [&]{return !MyContainer.empty();}

There is second version of condition_variable::wait which takes predicate as second paramter. It basically waits while that predicate is false, helping to "ignore" spurious wake-ups.

[&]{return !MyContainer.empty();} - this is lambda function. It is new feature of C++11 - it allows to define functions "in-place". If you don't have C++11 then just make stand-alone predicate or use one-argument version of wait with manual while loop:

while(MyContainer.empty()) condition_consumer.wait(lock);


One question in your 3rd point you suggested that I should Isolate the entire queue while My adding to the queue method is static and the consumer(queue reader) runs forever in a separate thread. Could you tell me why is that a flaw in my design?

There is no problem with "runs forever" or with static. You can even make static concurrent_queue<T> member - if your design requires that.

Flaw is that multithreaded synchronization is coupled with other kind of work. But when you have concurrent_queue - all synchronization is isolated inside that primitive, and code which produces/consumes data is not polluted with locks and waits:

concurrent_queue<int> c;
thread producer([&]
{
    for(int i=0;i!=100;++i)
        c.push(i);
});
thread consumer([&]
{
    int x;
    do{
        c.pop(x);
        std::cout << x << std::endl;
    }while(x!=11);
});
producer.join();
consumer.join();

As you can see, there is no "manual" synchronization of push/pop, and code is much cleaner.

Moreover, when you decouple your components in such way - you may test them in isolation. Also, they are becoming more reusable.

这篇关于通讯/黑白两个线程在一个常见的​​数据结构。设计问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆