通讯/黑白两个线程在一个常见的数据结构。设计问题 [英] Communication b/w two threads over a common datastructure. Design Issue
问题描述
我现在有两个线程生产者和消费者。生产者是,在一个deque型抗静电容器插入数据的静态方法,并通过通知消费者的boost :: condition_variable
一个对象已插入双端队列对象。那么消费者从双端队列式读取数据和通信使用从container.The两个线程中删除的boost :: condition_variable
下面是正在发生的事情的抽象。这是code,为消费者和生产者
//静态方法:这是制片人。不同类别使用此方法将数据添加到容器
无效Ç:: add_data工具(OBJ一)
{
尝试
{
INT A = MyContainer.size();
UpdateTextBoxA(当前大小为+一);
UpdateTextBoxB(运行);
MyContainer.push_back(一);
condition_consumer.notify_one(); //这种情况是静态成员
UpdateTextBoxB(停止);
}
赶上(性病::例外急症)
{
标准::字符串ERR = e.what();
}
} // end方法
//消费者方法 - 在一个单独的独立线程中运行
无效Ç:: READ_DATA()
{
而(真)
{
提高::互斥:: scoped_lock的锁(mutex_c);
而(MyContainer.size()!= 0)
{
尝试
{
物镜一个= MyContainer.front();
....
....
....
MyContainer.pop_front();
}
赶上(性病::例外急症)
{
标准::字符串ERR = e.what();
}
}
condition_consumer.wait(锁);
} } // end方法
现在被插入在 deque的
键入对象中的对象是非常快的约500对象second.While运行此我注意到TextBoxB总是以停止,而我相信这是假设的运行和罢之间切换。另外很慢。什么我可能还没有考虑和可能是做错了?任何建议
1),你应该做的 MyContainer.push_back(一);
下的互斥体 - 否则,你会得到< STRONG>数据竞争,这是不确定的行为(+你可能需要保护 MyContainer.size();
按互斥过,这取决于它的类型和C + + ISO /编译器版本使用)。
2)无效Ç:: READ_DATA()
应该是:
无效Ç:: READ_DATA()
{
scoped_lock的SLOCK(mutex_c);
而(真)//你可能还需要一些退出条件/机制
{
condition_consumer.wait(SLOCK,[&安培] {返回MyContainer.empty();});
//在该行MyContainer.empty()== false,并且SLOCK被锁定
//所以你可以从弹出deque的价值
}
}
3)您与逻辑的混合并发队列逻辑生产/消费。相反,你可以隔离并发队列部分独立的实体:
// C ++ 98
模板&LT; typename的T&GT;
类concurrent_queue
{
队列&LT; T&GT; q;
可变互斥米;
可变condition_variable℃;
上市:
无效的push(const的T&amp; T公司)
{
(lock_guard&所述;互斥&GT(M)),
q.push(T)
c.notify_one();
}
无效流行(T&放大器;结果)
{
unique_lock&LT;&互斥GT; U(M);
而(q.empty())
c.wait(U);
结果= q.front();
q.pop();
}
};
感谢您的答复。你能解释一下第二个参数的条件等待语句
[&安培] {返回MyContainer.empty();}
块引用>有是<第二个版本href=\"http://www.boost.org/doc/libs/1_53_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref.condition_variable.wait_$p$pdicate\"相对=nofollow> condition_variable ::等待这需要predicate作为第二个参数中。它基本上等待而predicate是假的,有助于忽略虚假唤醒,
[&安培] {返回MyContainer.empty();}
- 这是的 lambda函数。这是C ++ 11的新功能 - 它可以就地来定义的功能。如果你没有C ++ 11则只是使独立predicate或使用的一个参数版本等待
带手动while循环:而(MyContainer.empty())condition_consumer.wait(锁定);
在你的第三个问题,你建议我应该隔离整个队列,而我加入到队列方法的一个问题是静态的,消费者(读者队列)的一个单独的线程将永远运行下去。你能告诉我这是为什么在我的设计缺陷?
块引用>有没有问题始终运行或
静态
。你甚至可以让静态concurrent_queue&LT; T&GT;成员
- 如果你的设计要求。缺陷是多线程的同步结合其他方面的工作。但是,当你拥有的 concurrent_queue - 所有的同步隔离里面的那个原始和code这生产/消耗数据没有上锁,并等待污染:
concurrent_queue&LT; INT&GT; C;
螺纹生产者([放大器;]
{
的for(int i = 0; i = 100;!++我)
c.push(ⅰ);
});
螺纹消费([安培]
{
INT X;
做{
c.pop(X);
性病::法院LT&;&LT;点¯x所述&;&下;的std :: ENDL;
}而(X = 11!);
});
producer.join();
consumer.join();正如你所看到的,有
PUSH / POP
和code没有手动同步干净多了。此外,当您去耦在这样的方式您的组件 - 您可以在隔离测试。此外,他们正变得越来越可重复使用的。
I currently have two threads a producer and a consumer. The producer is a static methods that inserts data in a Deque type static container and informs the consumer through
boost::condition_variable
that an object has been inserted in the deque object . The consumer then reads data from the Deque type and removes it from the container.The two threads communicate usingboost::condition_variable
Here is an abstract of what is happening. This is the code for the consumer and producer
//Static Method : This is the producer. Different classes add data to the container using this method void C::Add_Data(obj a) { try { int a = MyContainer.size(); UpdateTextBoxA("Current Size is " + a); UpdateTextBoxB("Running"); MyContainer.push_back(a); condition_consumer.notify_one(); //This condition is static member UpdateTextBoxB("Stopped"); } catch (std::exception& e) { std::string err = e.what(); } }//end method //Consumer Method - Runs in a separate independent thread void C::Read_Data() { while(true) { boost::mutex::scoped_lock lock(mutex_c); while(MyContainer.size()!=0) { try { obj a = MyContainer.front(); .... .... .... MyContainer.pop_front(); } catch (std::exception& e) { std::string err = e.what(); } } condition_consumer.wait(lock); } }//end method
Now the objects being inserted in the
Deque
type object are very fast about 500 objects a second.While running this I noticed that TextBoxB was always at "Stopped" while I believe it was suppose to toggle between "Running" and "Stoped". Plus very slow. Any suggestions on what I might have not considered and might be doing wrong ?解决方案1) You should do
MyContainer.push_back(a);
under mutex - otherwise you would get data race, which is undefined behaviour (+ you may need to protectMyContainer.size();
by mutex too, depending on it's type and C++ISO/Compiler version you use).2)
void C::Read_Data()
should be:void C::Read_Data() { scoped_lock slock(mutex_c); while(true) // you may also need some exit condition/mechanism { condition_consumer.wait(slock,[&]{return !MyContainer.empty();}); // at this line MyContainer.empty()==false and slock is locked // so you may pop value from deque } }
3) You are mixing logic of concurrent queue with logic of producing/consuming. Instead you may isolate concurrent queue part to stand-alone entity:
// C++98 template<typename T> class concurrent_queue { queue<T> q; mutable mutex m; mutable condition_variable c; public: void push(const T &t) { (lock_guard<mutex>(m)), q.push(t), c.notify_one(); } void pop(T &result) { unique_lock<mutex> u(m); while(q.empty()) c.wait(u); result = q.front(); q.pop(); } };
Thanks for your reply. Could you explain the second parameter in the conditional wait statement
[&]{return !MyContainer.empty();}
There is second version of condition_variable::wait which takes predicate as second paramter. It basically waits while that predicate is false, helping to "ignore" spurious wake-ups.
[&]{return !MyContainer.empty();}
- this is lambda function. It is new feature of C++11 - it allows to define functions "in-place". If you don't have C++11 then just make stand-alone predicate or use one-argument version ofwait
with manual while loop:while(MyContainer.empty()) condition_consumer.wait(lock);
One question in your 3rd point you suggested that I should Isolate the entire queue while My adding to the queue method is static and the consumer(queue reader) runs forever in a separate thread. Could you tell me why is that a flaw in my design?
There is no problem with "runs forever" or with
static
. You can even makestatic concurrent_queue<T> member
- if your design requires that.Flaw is that multithreaded synchronization is coupled with other kind of work. But when you have concurrent_queue - all synchronization is isolated inside that primitive, and code which produces/consumes data is not polluted with locks and waits:
concurrent_queue<int> c; thread producer([&] { for(int i=0;i!=100;++i) c.push(i); }); thread consumer([&] { int x; do{ c.pop(x); std::cout << x << std::endl; }while(x!=11); }); producer.join(); consumer.join();
As you can see, there is no "manual" synchronization of
push/pop
, and code is much cleaner.Moreover, when you decouple your components in such way - you may test them in isolation. Also, they are becoming more reusable.
这篇关于通讯/黑白两个线程在一个常见的数据结构。设计问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!