具有条件变量的多生产者/消费者线程...为什么不能正常工作? [英] Multi producer/consumer threads with condition variables...why don't work well?

查看:78
本文介绍了具有条件变量的多生产者/消费者线程...为什么不能正常工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

大家好,

我试着改进我以前的问题,但是改变很多,然后我打开一个新的问题,请你找一个解决方案,帮助我。



我必须管理5个执行图像处理的线程。我想要像装配线一样执行,其中单个线程等待来自前一个线程的可用图像,对其进行处理,并为下一个线程放入输出队列。



但是这些操作必须在每个线程之间进行竞争......比如



在第一个线程发布第一个图像后,secondo将详细说明,第三个等待第二个,依此类推。 ..不是连续的阐述。



根据您的建议,我在我的代码中引入CONDITION VARIABLES ...和关键部分,以管理每个线程的生产者方面的生产项目的条件。 ..因为在这个程序中,每个线程同时是它的制作人和消费者。



我在这里发布我的代码并请你帮助我。



实际上情况是:程序每次运行都会给出不同的输出...线程没有做所有的详细说明,或者如果做了所有10次精心设计(一个固定的数字只是为了测试)按顺序进行...第一次完成线程和第二次开始详细说明等等......



为什么会这样?请帮我解决这个问题...



这是代码:



Hi everyone,
I have try to improve my previous question, but the changes are a lot, then I open a new question for ask you a solution, an help for me.

I have to manage 5 threads that doing images elaborations. I want an execution like assembly line, in which single thread wait for available image from previous thread, work on it, and puts in output queue for next threads.

But these operations must doing in competition between every thread...like

After first threads release first image, secondo going to elaborate it, third wait second, and so on...not a sequential elaboration.

With your suggestions, I introduce in my code CONDITION VARIABLES...and critical section, to manage condition of "produced item" from the producer side of every thread...because in this program, every thread it's producer and consumer at the same time.

I post here my code and ask you to help me.

Actually situation is: program give different output every time I run it...threads don't doing all elaborations, or if doing all 10 elaboration (a fixed number just for test) doing it sequentially...first thread finish and second start elaboration and so on...

Why this happened? Please help me to fix this situation...

This is the code:

#include <opencv\cv.h>
#include <opencv\highgui.h>
#include <opencv2\highgui\highgui.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#include <queue>

using namespace std;
using namespace cv;

template<typename T>
class coda_concorr
{
private:
	std::queue<T> la_coda;
	HANDLE mutex;
	
public:	
	bool elemento;
	coda_concorr()
	{
		mutex = CreateMutex(NULL,FALSE,NULL);
		}
	~coda_concorr()
	{}
	void push(T& data)
	{
		WaitForSingleObject(mutex,INFINITE);
		la_coda.push(data);
		ReleaseMutex(mutex);
		}
	bool vuota() const
	{
		WaitForSingleObject(mutex,INFINITE);
		bool RetCode = la_coda.empty();
		ReleaseMutex(mutex);
		return RetCode;
	}
	bool try_pop(T& popped)
    {
		WaitForSingleObject(mutex,INFINITE);
		while (la_coda.empty()){
            ReleaseMutex(mutex);
			return false;
        }
		WaitForSingleObject(mutex,INFINITE);
		popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
		return true;
    }
};


struct Args
{
	coda_concorr<cv::Mat> in;
	coda_concorr<cv::Mat> *out; //puntatore a coda successiva
};


CONDITION_VARIABLE NonVuoto1;
CONDITION_VARIABLE NonVuoto2;
CONDITION_VARIABLE NonVuoto3;
CONDITION_VARIABLE NonVuoto4;
CRITICAL_SECTION  Lock;

bool stop;

//initial populating of queue
void puts (void* param){
	Args* arg = (Args*)param;
	int i=0;
    Mat image;	
	while(true){		
		EnterCriticalSection(&Lock);
		while(!stop && !arg->in.vuota()){
		if(stop==true){
			LeaveCriticalSection(&Lock);
			break;
			}
		arg->in.try_pop(image);
		arg->out->push(image);
		i++;		
		LeaveCriticalSection(&Lock);
		WakeConditionVariable(&NonVuoto1);
		}
	}
	//fine
	LeaveCriticalSection(&Lock);
	cout<<endl<<"Thread (PUSH) terminato con "<<i<<" elaborazioni."<<endl;
	_endthread();
}

//greyscale
void grey (void *param){
	Mat temp1,temp2;
	int add = 0;
	Args* arg = (Args*)param;
	while(true){
		EnterCriticalSection(&Lock);
		//se vuoto
		while(arg->in.vuota() && !stop){
		     SleepConditionVariableCS(&NonVuoto1,&Lock,INFINITE);
			}
			if(stop==true){
			LeaveCriticalSection(&Lock);
			break;
			}
		arg->in.try_pop(temp1);
		cvtColor(temp1,temp2,CV_BGR2GRAY);
		arg->out->push(temp2);
		add++;		
		cout<<endl<<"grey ha fatto: "<<add<<endl;
		LeaveCriticalSection(&Lock);
		WakeConditionVariable(&NonVuoto2);
		}
	//fine	
	cout<<endl<<"Thread (GREY) terminato con "<<add<<" elaborazioni."<<endl;
	_endthread();
}

//threshold funct
void soglia(void *param){
	Mat temp1a,temp2a;
	int add=0;
	Args* arg = (Args*)param;
	while(true){
		EnterCriticalSection(&Lock);
		if(arg->in.vuota() && !stop){
			 SleepConditionVariableCS(&NonVuoto2,&Lock,INFINITE);
			}
		    if(stop==true){
			LeaveCriticalSection(&Lock);
			break;
			     }
		arg->in.try_pop(temp1a);
		threshold(temp1a,temp2a,128,255,THRESH_BINARY);
		arg->out->push(temp2a);
		add++;
		cout<<endl<<"soglia ha fatto: "<<add<<endl;
		LeaveCriticalSection(&Lock);
		WakeConditionVariable(&NonVuoto3);
		}
		//fine 	
	 cout<<endl<<"Thread (SOGLIA) terminato con "<<add<<" elaborazioni."<<endl;
	 _endthread();
}

//erose/dilate
void finitura(void *param){
	Mat temp1b,temp2b,temp2c;
	int add = 0;
	Args* arg = (Args*)param;
	//come consumatore
	while(true){
		EnterCriticalSection(&Lock);
		while(arg->in.vuota() && !stop){
			 SleepConditionVariableCS(&NonVuoto3,&Lock,INFINITE);
			}
		if(stop==TRUE){
			LeaveCriticalSection(&Lock);
			break;
			}	
		arg->in.try_pop(temp1b);
		erode(temp1b,temp2b,cv::Mat());
		dilate(temp2b,temp2c,Mat());
		arg->out->push(temp2c);
		add++;
		cout<<endl<<"erode ha fatto: "<<add<<endl;
		LeaveCriticalSection(&Lock);
		WakeConditionVariable(&NonVuoto4);
		}
	 //fine	
	 cout<<endl<<"Thread (ERODE) terminato con "<<add<<" elaborazioni."<<endl;
	_endthread();
}

//countour funct
void contorno (void *param){
	Mat temp;
	int add=0;
	Args* arg = (Args*)param;
	//come consumatore
	while(true){
		EnterCriticalSection(&Lock);
		while(arg->in.vuota() && stop == false){
			 SleepConditionVariableCS(&NonVuoto4,&Lock,INFINITE);
			}
		if(stop==TRUE){
			LeaveCriticalSection(&Lock);
			break;
			}	
	//esegue pop
	arg->in.try_pop(temp);
	//trova i contorni
	vector<vector<Point>> contorni;
	findContours(temp,contorni,CV_RETR_LIST, CV_CHAIN_APPROX_SIMPLE);
	//disegna i contoni in un'immagine
	Mat dst(temp.size(), CV_8UC3, Scalar(0,0,0));
	Scalar colors[3];
	colors[0] = Scalar(255,0,0);
	colors[1] = Scalar(0,255,0);
	colors[2] = Scalar(0,0,255);
	for (size_t idx = 0; idx < contorni.size(); idx++){
		drawContours(dst,contorni,idx,colors[idx %3]);
		}

	//come produttore
	arg->out->push(dst);
	add++;
	cout<<endl<<"cont ha fatto: "<<add<<endl;
	LeaveCriticalSection(&Lock);
	}
    cout<<endl<<"Thread (CONTOUR) terminato con "<<add<<" elaborazioni."<<endl;
   _endthread();
}

//main
int main()
{
	coda_concorr<cv::Mat> ingresso;
	coda_concorr<cv::Mat> uscita;


	InitializeConditionVariable(&NonVuoto1);
	InitializeConditionVariable(&NonVuoto2);
	InitializeConditionVariable(&NonVuoto3);
	InitializeConditionVariable(&NonVuoto4);
	InitializeCriticalSection(&Lock);

	
	//counter var
	LARGE_INTEGER count1, count2, freq;
	double elapsed;
	
        //just temp var for test use
	Mat temp[10];
	Mat out;
	
	//queues
	Args dati0,dati1,dati2,dati3,dati4;
	
	
	//start counter
	QueryPerformanceFrequency(&freq);	
	QueryPerformanceCounter (&count1);
		
	for(int i=0;i<9;i++){
		temp[i] = imread("C:/OPENCV/Test/imgtest/bird1.jpg",1);
		ingresso.push(temp[i]);
	}

	//next queue pointer
	dati0.in=ingresso;
	dati0.out=&dati1.in;
	dati1.out=&dati2.in;
	dati2.out=&dati3.in;
	dati3.out=&dati4.in;
	dati4.out=&uscita;	

	//handle
	HANDLE handle0,handle1,handle2,handle3,handle4;
	
	//threads
	handle0 = (HANDLE) _beginthread(puts,0,&dati0);
	handle1 = (HANDLE) _beginthread(grey,0,&dati1);
	handle2 = (HANDLE) _beginthread(soglia,0,&dati2);
	handle3 = (HANDLE) _beginthread(finitura,0,&dati3);
	handle4 = (HANDLE) _beginthread(contorno,0,&dati4);
	
	_putws(L"Press ENTER to stop elaboration...");
    getchar();
 
    EnterCriticalSection(&Lock);
    stop = true;
    LeaveCriticalSection(&Lock);
	
	//wakeup condition
	WakeAllConditionVariable(&NonVuoto1);
	WakeAllConditionVariable(&NonVuoto2);
	WakeAllConditionVariable(&NonVuoto3);
	WakeAllConditionVariable(&NonVuoto4);

	//join
	WaitForSingleObject(handle0,INFINITE);
	WaitForSingleObject(handle1,INFINITE);
	WaitForSingleObject(handle2,INFINITE);
	WaitForSingleObject(handle3,INFINITE);
	WaitForSingleObject(handle4,INFINITE);

	//close counter and output
	QueryPerformanceCounter (&count2);
	//calcolo tempo trascorso
	elapsed = (count2.QuadPart - count1.QuadPart) * 1000.0 / freq.QuadPart;
	cout <<endl<<"Tempo di esecuzione approssimativo: " <<elapsed<<" ms."<<endl;

	system("PAUSE");
	
return 0;
}







请....帮助我...我要去疯狂...




Please....help me...I'm going crazy...

推荐答案

说实话,我还没有检查出你的所有代码,只需查看你在coda_concorr中使用的同步原语,这已经是可疑的。使用丑陋的锁定和像try这样的antipaterns的线程函数也是如此。您没有使用正确的同步原语。根据具体情况,您的解决方案不仅可能存在技术问题(错误),还会产生巨大的概念问题。让我们说我们忽略了您的解决方案的概念问题,我们只关注您当前解决方案的缺陷。你需要的是一个单一的生产者单个消费者阻塞消息队列类,你可以放在你的线程之间。我想coda_concorr想成为那样的东西。为了给你提供一些进步,这里有一个这样的阻塞队列,它有几个优化并在windows上使用正确的同步原语:

To be honest I haven't checked out all your code just check out what synchronization primitives are you using in coda_concorr and that is already suspicious. The same is true for your thread funcs that use ugly lockings and antipaterns like "try". You are not using the right synchronization primitives for sure. Depending on the circumstances your solution may have not only technical problems (bugs) but huge conceptual ones too. Lets say we overlook the conceptual problems of your solutions and we focus only on the bugs of your current solution. What you need here is a single producer single consumer blocking message queue class that you can put in between your threads. I guess coda_concorr wants to be that stuff. To give you some boost in your progress here is one such blocking queue that has several optimizations and uses the right synchronization primitives on windows:
// single/multi producer, single consumer blocking queue without upper
// limit on queue size so producers never block!
// WARNING!!! UNTESTED CODE!!! PROVIDED AS IS WITHOUT ANY WARRANTY!!!
template <typename T>
class BlockingQueue
{
public:
    BlockingQueue()
    {
        m_AddedItemEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
        InitializeCriticalSection(&m_Lock);
        m_SwappedItemIndex = 0;
    }

    ~BlockingQueue()
    {
        assert(m_Items.empty() && m_SwappedItemIndex >= m_SwappedItems.size());
        CloseHandle(m_AddedItemEvent);
        DeleteCriticalSection(&m_Lock);
    }

    // Never blocks, can be called from any number of producer threads. Multiple producer threads
    // are allowed here just because I don't know any optimizations that would involve the
    // restriction of the number of threads for some performance gain... It works well with one
    // thread too so there is no problem.
    void Produce(const T& item)
    {
        EnterCriticalSection(&m_Lock);
        m_Items.push_back(item);
        SetEvent(m_AddedItemEvent);
        LeaveCriticalSection(&m_Lock);
    }

    // It may block, call it only from the consumer thread.
    void Consume(T& item)
    {
        // Several implementations possible here. This one exploits that we have only
        // one consumer thread so it is able to consume items in batch with less locking.
        // And of course this implementation may be buggy because I have just put it together.
        if (m_SwappedItemIndex >= m_SwappedItems.size())
        {
            m_SwappedItemIndex = 0;
            m_SwappedItems.clear();
            WaitForSingleObject(m_AddedItemEvent, INFINITE);
            EnterCriticalSection(&m_Lock);
            ResetEvent(m_AddedItemEvent);
            m_Items.swap(m_SwappedItems);
            LeaveCriticalSection(&m_Lock);
        }

        item = m_SwappedItems[m_SwappedItemIndex++];
    }

private:
    CRITICAL_SECTION m_Lock;
    HANDLE m_AddedItemEvent;
    std::vector<T> m_Items;
    // m_SwappedItems is accessed only by the consumer so we don't have to hold the lock.
    // After some time the size of allocated storage of m_Items and m_SwappedItems reaches
    // an upper bound and from that point the queue performs no allocation, just swapping
    // the data pointers in the two containers. Using simple array (vector) instead of
    // fancy containers like queue and list makes the locality of reference better
    // (less cache misses).
    std::vector<T> m_SwappedItems;
    size_t m_SwappedItemIndex;
};





现在我们来看看你的问题解决方案的高级设计。 (我不会深入研究deatils,也不会在这里写很多关于多线程的书)。事实:如果硬件支持多线程并且您的程序正确使用多线程,则多线程可加快程序的执行速度。使程序多线程只是因为它说加速执行(并且因为它现在是一种时尚)没有意义 - 在这种情况下,你只是让自己头疼,你牺牲了顺序执行和确定性行为以及易于调试你的程序。 (当然在某些情况下,你创建后台线程不是为了获得速度,而是将同步调用转换为异步调用,但这是另一个故事......)。当然你的问题可能非常适合多线程,但是应该调试掉,因为大部分重型举重可能都是由opencv完成的。您应该以某种方式编写程序,以便可以按顺序(单线程)和多线程模式执行。在两种模式之间切换可以是编译时(使用#define)或运行时(使用bool变量)。这很重要,因为这样您就可以将具有特定内核数量(以及可选的超线程)的特定计算机上的顺序执行时间与多线程执行时间进行比较。假设您有一台具有8个内核的计算机,并且您的多线程版本的运行速度是顺序版本的6倍,在这种情况下,您已经完成了多线程! (我说只有6倍速而不是8倍速,因为其余的计算能力已用于等待锁和核心之间的通用硬件资源管理(如内存和缓存)。)同时拥有单线程和多线程的另一个好处版本是我认为这有助于许多人组织和设计更清晰的多线程代码,并迫使您明确分离各个工作。在多线程的情况下,总是考虑将您的任务分成单独的作业类,这些作业类将以某种方式在某个地方的线程上执行,而不是在这里和那里随机执行线程函数的线程。如果您的代码是清楚的,那么程序的单/多线程版本之间的差异将只是以某种方式执行作业的中央核心逻辑,这只是一小段代码。



介绍之后,让我们谈谈你的问题,这与许多其他多线程问题非常相似。您有要处理和处理的项目包括在每个项目上执行多个操作(一个接一个)。让我们首先谈谈单线程版本,不涉及多线程!即使在这种情况下,您也有多种选择,因为您有许多项目和许多操作要对每个项目执行。一种解决方案是采用一个项目并通过一个接一个地对其执行每个操作来处​​理它。另一个选项是对每个项目执行第一个操作,然后对每个项目执行第二个操作,依此类推。第二种解决方案起初可能并不明显,很多人会从第一种选择开始,但第二种选择往往更好!这样做的原因是多次执行第一次操作比执行opeartion1,然后是opeartion2,然后是operation3等等更快,因为相同操作的代码通常会访问或多或少相同的内存区域,从而导致更少的缓存未命中。当然,有时你无法承担通过对所有项目执行operation1然后对所有项目执行operation2等来处理一批中的所有项目,以此类推,因为这意味着所有项目几乎全部准备就绪,有时您必须在之前提到的解决方案是将您的项目分成较小的批次,因此首先在一组较小的项目中执行operation1,然后在这一小组项目上执行operation2,如果您需要通过此系统定期输出,则可以在两者之间取得平衡要由另一个系统处理,但这已经是优化...



多线程版本:您的解决方案在其自己的线程上并行执行每个操作。我们可以通过多个线程(即线程池)执行每个操作来使其更加丰富多彩。解决相同问题的另一种方法(类似于单线程版本)首先对线程池(具有多个线程)上的所有项执行operation1,然后对线程池(通常是完全相同的线程池)上的所有项进行perofming操作2 。后一种解决方案有几个好处,如果不仅仅是第一个版本的好处(如单线程版本的情况)。在多个项目的多个线程上执行相同的操作会导致更少的缓存未命中,更好的引用位置,并且让我们谈谈此解决方案的其他优点,而不是在Y个线程上执行X个操作的优势:



如果通过处理所有项目在多个线程上一次只执行一个操作,则可以轻松控制线程池中的线程数。通常,您只需要在系统中全局使用一个线程池,并且该线程池通常只包含与系统中核心数一样多的线程(有时,根据作业类型,为核心的超线程创建线程不是一个好主意)。你要做的就是把很多工作(假设1000个在所有项目上执行操作1的工作)放在线程池中,稍后线程池将按照随机顺序处理所有这些工作。当所有作业都准备就绪时,您将另一批作业放在一起,对所有项目执行operation2,然后将其提供给线程池。有时候以后所有工作都准备就绪。你所拥有的只是一个主/主线程和线程池中的几个工作线程。我喜欢这个解决方案的另一件事是使它成为单线程非常简单:你创建一个以这种方式工作的线程池:它没有工作线程,当主线程调用线程池的AddJobs()方法时AddJobs( )方法由主线程立即执行所有作业,并在此之后返回!如果像你一样并行执行所有操作,那么控制线程数的选项就会少得多!你必须创建至少与操作数一样多的线程,也许如果你有很多核心,你可以为一些操作创建多个线程,但这就是全部!

让我们继续讨论在单个线程池上执行单个操作解决方案:

执行操作所涉及的代码通常会访问operation1和operation2的代码所使用的全局代码和数据。 operationX。这些共享的东西可以是全局函数,全局变量/对象。 These all increase the number of cache misses. The most obvious shared data between these operations is the queue that connects two operations. If you use the better solution and execute only operation1 on a thread pool then the queue will only be written by many threads and then later it will only be read by another thread. The queue is still accessed by many threads but only its producer side, and maybe later only on its consumer side but usually the items are get out all at once by the main/coordinator thread! It never happens that both the producer and consuer side is accessed in parallel by threads! This allows for very nice optimizations in the queue!

When you put together a large number of jobs (lets say the jobs that execute operation1 on all items) and put them into the threadpool for processing (you do this from the main thread) then somehow you have to be able to wait on the main thread for the end of processing of all jobs by the threadpool. Putting this wait code into the threadpool is a bad idea, never complicate clean conceptual units like a thread pool with stupid operations like WaitForAll()!!! What I usually do is that I put a wait for all operation into my queue implementation that holds the items between two operations. I do the following on my main thread:



- preparing the queue between operation1 and operation2 to be able to receive produced items from operation1 (easy because currently no threads are accessing the queue)

- prepare jobs that perform operation1 on all items and putting it to the thread pool for processing

- waiting for the queue between operation1 and operation2 to receive all produced items

- getting out the results of operation1 from the queue (easy because currently no threads are accessing the queue)

- preparing the queue between operation2 and operation3 to be able to receive produced items from operation2 (easy because currently no threads are accessing the queue)

- prepare jobs (from the results we just got out from the queue) that perform operation2 on all items and putting it to the thread pool for processing

- ...



Here is something that looks like a queue I was tal king about:



Now we will take a look at the high level design of your solution to your problem. (I won't delve into the deatils and won't write here many books about multithreading). Facts: multithreading speeds up the execution of your program if the hardware supports multithreading and your program uses multithreading correctly. Making a program multithreading "just because" it said to speed up execution (and because its a fashion these days) has no point - in that case you are just making headaches for yourself and you sacrifice the sequential execution and deterministic behaviour and ease of debuggability of your program. (Of course in some cases you are creating background thread not to gain speed but to convert synchronous calls to async ones but that is another story...). Of course your problem maybe be quite well suited for multithreading but that should be debugged out since most of the heavy weighlifting will probably be done by opencv. You should write your program in a way so that it can be executed both sequentially (single threaded) and in multithread mode. Switching between the two modes can be compile time (with a #define) or runtime (with a bool variable). This is important because this way you can compare sequential execution time with the multithreaded one on a specific machine with specific number of cores (and optionally hyperthreads). Lets say you have a machine with 8 cores and your multithreaded version runs 6x of the speed of the sequential one, in this case you have done pretty well with multithreading! (I said only 6xspeed and not 8xspeed because the rest of the computation capacity has been spent on waiting on locks and on the management of common hardware resources between the cores (like memory and cache).) Another good thing about having both single and multithread version is that in my opinion this helps many people to organize and design more clear multithreaded code and forces you on the clear separation of individual jobs. In case of multithreading always think about separating your tasks into separate job classes that will be executed "on a thread somewhere" somehow and not about threads that randomly execute thread functions here and there. If your code is clear then the difference between the single/multi threaded version of your program will be only the central core logic that executes the jobs somehow somewhere that is only a little piece of code.

After the introduction lets talk a bit about your problem that is quite similar to a lot of other multithreading problems. You have items to process and processing consists of performing several operations (one after the other) on each item. Lets talk first about the single threaded version with no multithreading involved! Even in this case you have several choices because you have many items and many operations to perform on each item. One solution is taking one item and processing it by performing every operation on it one after the other. The other option is performing the first operation on every item, then performing the second operation on every item, and so on. This second solution may not be obvious at first, many people would start with the first option but the second is often better! The reason for this is that performing the first operation many times is faster than performing opeartion1, then opeartion2, and then operation3 and so on because the code of the same operation usually accesses more or less the same memory areas that results in less cache misses. Of course sometimes you can not afford that you process all items in one batch by performing operation1 on all items then operation2 on all items and so on because this means that all items will be ready almost all at once, sometimes you have to balance between the previously mentioned solutions by separating your items into smaller batches so first you execute operation1 in one smaller set of items, then you execute operation2 on this smaller set of items,... You can balance between the two if you need regular output by this system to be processed by another system but this is already optimization...

Multithreaded version: Your solution performs each operation in parallel on its own thread. We could make it more colorful by executing each operation by several threads, namely on a thread pool. The other way to the same problem (similarly to the single threaded version) is first performing operation1 on all items on a thread pool (with multiple threads) and then perofming operation2 an all items on a thread pool (usually the very same thread pool). This latter solution has several benefits, if not only benefits over the first version (like in case of the single threaded version). Performing only the same operation on multiple threads on multiple items results in less cache misses, better locality of reference and lets talk about the other advantages of this solution over the one that executes X number of operations on Y number of threads:

If you execute only one operation at a time on multiple threads by processing all of your items you can easily control the number of threads in the thread pool. Usually you want only one thread pool globally in your system and that thread pool usually contains only as many thread as the number of cores in your system (sometimes depending on the type of jobs its not a good idea to create thread for hyperthreads of your cores). All you do is putting together a lot of jobs (lets say 1000 jobs that perform operation 1 on all items) you feed it into the thread pool and sometime later the thread pool will process all of them in some random order. When all jobs are ready you put together another batch of jobs that perform operation2 on all items and then you feed it into the thread pool. Sometimes later all the jobs become ready. All you have is a master/main thread and several worker threads in your thread pool. Another thing I like about this solution is that making it single threaded is super easy: You create a thread pool that works this way: It has no worker threads, when the main thread calls the AddJobs() method of the thread pool the AddJobs() method executes all jobs immediately by the main thread and returns only after that! If you execute all operations in parallel like you do you have much less options to control the number of threads! You have to create at least as many threads as the number of operations, maybe if you have a lot of cores you can create more than one thread for some opeartions but thats all!
Lets continue the discussion of the execution of a "single operation on a single threadpool" solution:
The code involved in performing an operation will usually access "global" code and data that is used by the code of both operation1 and operation2 and operationX. These shared things can be global functions, global variables/objects. These all increase the number of cache misses. The most obvious shared data between these operations is the queue that connects two operations. If you use the better solution and execute only operation1 on a thread pool then the queue will only be written by many threads and then later it will only be read by another thread. The queue is still accessed by many threads but only its producer side, and maybe later only on its consumer side but usually the items are get out all at once by the main/coordinator thread! It never happens that both the producer and consuer side is accessed in parallel by threads! This allows for very nice optimizations in the queue!
When you put together a large number of jobs (lets say the jobs that execute operation1 on all items) and put them into the threadpool for processing (you do this from the main thread) then somehow you have to be able to wait on the main thread for the end of processing of all jobs by the threadpool. Putting this wait code into the threadpool is a bad idea, never complicate clean conceptual units like a thread pool with stupid operations like WaitForAll()!!! What I usually do is that I put a wait for all operation into my queue implementation that holds the items between two operations. I do the following on my main thread:

- preparing the queue between operation1 and operation2 to be able to receive produced items from operation1 (easy because currently no threads are accessing the queue)
- prepare jobs that perform operation1 on all items and putting it to the thread pool for processing
- waiting for the queue between operation1 and operation2 to receive all produced items
- getting out the results of operation1 from the queue (easy because currently no threads are accessing the queue)
- preparing the queue between operation2 and operation3 to be able to receive produced items from operation2 (easy because currently no threads are accessing the queue)
- prepare jobs (from the results we just got out from the queue) that perform operation2 on all items and putting it to the thread pool for processing
- ...

Here is something that looks like a queue I was talking about:

// WARNING!!! UNTESTED CODE!!! PROVIDED AS IS WITHOUT ANY WARRANTY!!!
// This is just "illustration" of the optimized "queue" I was talking about.
template <typename T>
class ExampleQueue
{
public:
    ExampleQueue()
    {
        m_PrevIndex = -1;
        m_MaxIndex = 0;
        m_FinishedEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    }

    ~ExampleQueue()
    {
        CloseHandle(m_FinishedEvent);
    }

    void PrepareForProduction(int max_items)
    {
        m_PrevIndex = -1;
        m_MaxIndex = (LONG)(max_items - 1);
        m_Items.resize((size_t)max_items);
        ResetEvent(m_FinishedEvent);
    }

    void Produce(const T& item)
    {
        LONG index = InterlockedIncrement(&m_PrevIndex);
        m_Items[(size_t)index] = item;
        if (index == m_MaxIndex)
            SetEvent(m_FinishedEvent);
    }

    void GetAllItems(std::vector<T>& items)
    {
        m_Items.swap(items);
        m_Items.clear();
    }

private:
    LONG m_PrevIndex;
    LONG m_MaxIndex;
    std::vector<T> m_Items;
    HANDLE m_FinishedEvent;
};





Another advice: make your multithreading code object oriented (lock, event, queue, thread, threadpool, job, jobgroup classes....).



In multithreading usually there is no good solution. You have to profile your program and tweak usually the number of threads and the size of processed batches to perform well on a given hardware that has given amount of resources.



A small example program about the usage of BlockingQueue plus I fixed the BlockingQueue implementation as it was buggy:



Another advice: make your multithreading code object oriented (lock, event, queue, thread, threadpool, job, jobgroup classes....).

In multithreading usually there is no good solution. You have to profile your program and tweak usually the number of threads and the size of processed batches to perform well on a given hardware that has given amount of resources.

A small example program about the usage of BlockingQueue plus I fixed the BlockingQueue implementation as it was buggy:

class CJob
{
public:
    virtual void Execute() = 0;
};


BlockingQueue<CJob*> g_Queue1;
BlockingQueue<CJob*> g_Queue2;


class COperation2 : public CJob
{
public:
    virtual void Execute() override
    {
        printf("operation2\n");
        delete this;
    }
};

class COperation1 : public CJob
{
public:
    virtual void Execute() override
    {
        printf("operation1\n");
        g_Queue2.Produce(new COperation2);
        delete this;
    }
};


// a generic thread function for each thread
DWORD WINAPI ThreadFunc(LPVOID param)
{
    BlockingQueue<CJob*>& queue = *(BlockingQueue<CJob*>*)param;
    for (;;)
    {
        CJob* job;
        queue.Consume(job);
        // We signal the thread to exit by placing a NULL pointer in the queue.
        if (!job)
            break;
        job->Execute();
    }
    return 0;
}


void ManageThreads()
{
    DWORD thread_id;

    HANDLE thread1 = CreateThread(NULL, 0, ThreadFunc, &g_Queue1, 0, &thread_id);
    // we don't handle serious problems correctly, just catching them with assert in the debugger...
    assert(thread1);

    HANDLE thread2 = CreateThread(NULL, 0, ThreadFunc, &g_Queue2, 0, &thread_id);
    assert(thread2);

    g_Queue1.Produce(new COperation1);
    g_Queue1.Produce(new COperation1);
    g_Queue1.Produce(new COperation1);

    // asking the thread1 to exit and waiting for it....
    g_Queue1.Produce(NULL);
    WaitForSingleObject(thread1, INFINITE);

    // asking the thread2 to exit and waiting for it....
    // thread 1 has already placed the last job in the queue of thread2 so its safe to put in the NULL
    g_Queue2.Produce(NULL);
    WaitForSingleObject(thread2, INFINITE);
}









The exact same example as the previous but in an object oriented way without global variables:





The exact same example as the previous but in an object oriented way without global variables:

class Thread
{
public:
    Thread()
    {
        m_hThread = NULL;
    }
    ~Thread()
    {
        if (m_hThread)
        {
            assert(!IsRunning());
            CloseHandle(m_hThread);
        }
    }
    void Start()
    {
        assert(!IsRunning());
        DWORD thread_id;
        m_hThread = CreateThread(NULL, 0, StaticThreadProc, this, 0, &thread_id);
        assert(m_hThread);
    }
    void WaitForExit()
    {
        assert(m_hThread);
        WaitForSingleObject(m_hThread, INFINITE);
    }
protected:
    virtual void Run() = 0;
private:
    bool IsRunning()
    {
        return m_hThread && WaitForSingleObject(m_hThread, 0)==WAIT_TIMEOUT;
    }
    static DWORD WINAPI StaticThreadProc(LPVOID param)
    {
        Thread* thread = (Thread*)param;
        thread->Run();
        return 0;
    }
private:
    HANDLE m_hThread;
};


class CJob
{
public:
    virtual void Execute() = 0;
};


// A general purpose job executor worker thread.
class JobExecutorThread : public Thread
{
public:
    JobExecutorThread()
    {
        m_ExitRequested = false;
    }
    bool AddJob(CJob* job)
    {
        if (m_ExitRequested)
            return false;
        assert(job);
        m_JobQueue.Produce(job);
        return true;
    }
    void RequestExit()
    {
        assert(!m_ExitRequested);
        if (!m_ExitRequested)
            m_JobQueue.Produce(NULL);
    }
protected:
    virtual void Run() override
    {
        for (;;)
        {
            CJob* job;
            m_JobQueue.Consume(job);
            if (job)
                job->Execute();
            else
                break;
        }
    }
private:
    bool m_ExitRequested;
    BlockingQueue<CJob*> m_JobQueue;
};


class COperation2 : public CJob
{
public:
    virtual void Execute() override
    {
        printf("operation2\n");
        delete this;
    }
};

class COperation1 : public CJob
{
public:
    COperation1(JobExecutorThread* operation2_thread)
        : m_Operation2Thread(operation2_thread)
    {}
    virtual void Execute() override
    {
        printf("operation1\n");
        m_Operation2Thread->AddJob(new COperation2);
        delete this;
    }
private:
    JobExecutorThread* m_Operation2Thread;
};


void ManageThreads()
{
    JobExecutorThread operation1_thread;
    JobExecutorThread operation2_thread;
    operation1_thread.Start();
    operation2_thread.Start();

    operation1_thread.AddJob(new COperation1(&operation2_thread));
    operation1_thread.AddJob(new COperation1(&operation2_thread));
    operation1_thread.AddJob(new COperation1(&operation2_thread));

    // asking the thread1 to exit and waiting for it....
    operation1_thread.RequestExit();
    operation1_thread.WaitForExit();

    // asking the thread2 to exit and waiting for it....
    // thread 1 has already placed the last job in the queue of thread2 so its safe to put in the NULL
    operation2_thread.RequestExit();
    operation2_thread.WaitForExit();
}





The object oriented example may look a bit more but it contains no global variables and most of the boilerplate code is reusable code that would reside in a central library and reusability pays off well if you are using threading at more places in your program. Another benefit of the object oriented approach is that the platform specific stuff (thread creation, etc..) is wrapped into classes that can be ported easily to other platforms, you don’t have to touch the core logic (the ManageThreads() function/method) the that coordinates the work of the threads.



The object oriented example may look a bit more but it contains no global variables and most of the boilerplate code is reusable code that would reside in a central library and reusability pays off well if you are using threading at more places in your program. Another benefit of the object oriented approach is that the platform specific stuff (thread creation, etc..) is wrapped into classes that can be ported easily to other platforms, you don't have to touch the core logic (the ManageThreads() function/method) the that coordinates the work of the threads.


这篇关于具有条件变量的多生产者/消费者线程...为什么不能正常工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆