Windows API线程池简单示例 [英] Windows API Thread Pool simple example

查看:154
本文介绍了Windows API线程池简单示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



我不设法找到一个简单的例子下面的任务。例如,我的程序需要在一个巨大的std :: vector中增加一个值,所以我想并行的做。它需要在程序的整个生命周期做一堆的时间。我知道如何做,使用CreateThread在每次调用的例程,但我不设法摆脱CreateThread与ThreadPool。



这是我所做的:

  $ b public:
Thread(){}
virtual void run()= 0; //我可以继承一个IncrementVectorThread
};
class IncrementVectorThread:public Thread {
public:
IncrementVectorThread(int threadID,int nbThreads,std :: vector< int>& vec):id(threadID),nb(nbThreads) myvec(vec){};

virtual void run(){
for(int i =(myvec.size()* id)/ nb; i<(myvec.size()*(id + 1)) / nb; i ++)
myvec [i] ++; //,让我们假设myvec是正确的大小
}
int id,nb;
std :: vector< int> & myvec;&
};

类ThreadGroup:public std :: vector< Thread *> {
public:
ThreadGroup(){
pool = CreateThreadpool(NULL);
InitializeThreadpoolEnvironment(& cbe);
cleanupGroup = CreateThreadpoolCleanupGroup();
SetThreadpoolCallbackPool(& cbe,pool);
SetThreadpoolCallbackCleanupGroup(& cbe,cleanupGroup,NULL);
threadCount = 0;
}
〜ThreadGroup(){
CloseThreadpool(pool);
}
PTP_POOL pool;
TP_CALLBACK_ENVIRON cbe;
PTP_CLEANUP_GROUP cleanupGroup;
volatile long threadCount;
};


static VOID CALLBACK runFunc(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_WORK Work){

ThreadGroup& thread = *((ThreadGroup *)Context);
long id = InterlockedIncrement(&(thread.threadCount));
DWORD tid =(id-1)%thread.size();
thread [tid] - > run();
}

void run_threads(ThreadGroup * thread_group){
SetThreadpoolThreadMaximum(thread_group-> pool,thread_group-> size());
SetThreadpoolThreadMinimum(thread_group-> pool,thread_group-> size());

TP_WORK * worker = CreateThreadpoolWork(runFunc,(void *)thread_group,& thread_group-> cbe);
thread_group-> threadCount = 0;
for(int i = 0; i< thread_group-> size(); i ++){
SubmitThreadpoolWork(worker);
}
WaitForThreadpoolWorkCallbacks(worker,FALSE);
CloseThreadpoolWork(worker);
}

void main(){

ThreadGroup group;
std :: vector< int> vec(10000,0);
for(int i = 0; i <10; i ++)
group.push_back(new IncrementVectorThread(i,10,vec));

run_threads(& group);
run_threads(& group);
run_threads(& group);

//现在,vec应该是== std :: vector< int>(10000,3);
}

所以,如果我理解得很好:

- 命令CreateThreadpool创建一束Threads(因此,对CreateThreadpoolWork的调用很便宜,因为它不调用CreateThread)

- 我可以有我想要的线程池数量(如果我想做一个线程IncrementVector,一个用于我的DecrementVector线程,我可以)。

- 如果我需要将我的增量向量任务分成10个线程,而不是调用10次CreateThread,我创建一个单一的工人,并提交它10次到具有相同参数的ThreadPool(因此,我需要线程ID在回调中知道我的std :: vector的哪个部分增量)。这里我找不到线程ID,因为函数GetCurrentThreadId()返回线程的实际ID(即,像1528,不是0..nb_launched_threads之间的东西)。



最后,我不知道我理解的概念:我真的需要一个工人,而不是10如果我把我的std :: vector分成10个线程? / p>

谢谢!

解决方案



关于线程池的整个想法是你不关心它有多少线程。你只是把大量的工作抛入线程池,让操作系统决定如何执行每个块。
所以,如果你创建和提交10块,操作系统可以使用池中的1到10个线程。



你不应该关心那些线程标识。不要打扰线程ID,最小或最大线程数,或这样的东西。



如果你不关心线程标识,那么你如何管理向量的什么部分改变呢?简单。在创建线程池之前,将计数器初始化为零。在回调函数中,调用 InterlockedIncrement 来检索和增加计数器。对于每个提交的工作项,您将获得一个连续的整数。


[EDIT: thanks to MSalters answer and Raymond Chen's answer to InterlockedIncrement vs EnterCriticalSection/counter++/LeaveCriticalSection, the problem is solved and the code below is working properly. This should provide an interesting simple example of Thread Pool use in Windows]

I don't manage to find a simple example of the following task. My program, for example, needs to increment the values in a huge std::vector by one, so I want to do that in parallel. It needs to do that a bunch of times across the lifetime of the program. I know how to do that using CreateThread at each call of the routine but I don't manage to get rid of the CreateThread with the ThreadPool.

Here is what I do :

class Thread {
public:
    Thread(){}
    virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
   IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };

   virtual void run() {
        for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
          myvec[i]++; //and let's assume myvec is properly sized
    }
   int id, nb;
   std::vector<int> &myvec;
};

class ThreadGroup : public std::vector<Thread*> {
public:
    ThreadGroup() { 
         pool = CreateThreadpool(NULL);
         InitializeThreadpoolEnvironment(&cbe);
         cleanupGroup = CreateThreadpoolCleanupGroup();
         SetThreadpoolCallbackPool(&cbe, pool);
         SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
         threadCount = 0;
    }
    ~ThreadGroup() {
         CloseThreadpool(pool);
}
    PTP_POOL pool;
    TP_CALLBACK_ENVIRON cbe;
    PTP_CLEANUP_GROUP cleanupGroup;
    volatile long threadCount;
} ;


static VOID CALLBACK runFunc(
                PTP_CALLBACK_INSTANCE Instance,
                PVOID Context,
                PTP_WORK Work) {

   ThreadGroup &thread = *((ThreadGroup*) Context);
   long id = InterlockedIncrement(&(thread.threadCount));
   DWORD tid = (id-1)%thread.size();
   thread[tid]->run();
}

void run_threads(ThreadGroup* thread_group) {
    SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
    SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());

    TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
    thread_group->threadCount = 0;
    for (int i=0; i<thread_group->size(); i++) {
        SubmitThreadpoolWork(worker);
     }  
     WaitForThreadpoolWorkCallbacks(worker,FALSE);  
     CloseThreadpoolWork(worker);   
}       

void main() {

   ThreadGroup group;
   std::vector<int> vec(10000, 0);
   for (int i=0; i<10; i++)
      group.push_back(new IncrementVectorThread(i, 10, vec));

   run_threads(&group);
   run_threads(&group);
   run_threads(&group);

   // now, vec should be == std::vector<int>(10000, 3);       
}

So, if I understood well :
- the command CreateThreadpool creates a bunch of Threads (hence, the call to CreateThreadpoolWork is cheap as it doesn't call CreateThread)
- I can have as many thread pools as I want (if I want to do a thread pool for "IncrementVector" and one for my "DecrementVector" threads, I can).
- if I need to divide my "increment vector" task into 10 threads, instead of calling 10 times CreateThread, I create a single "worker", and Submit it 10 times to the ThreadPool with the same parameter (hence, I need the thread ID in the callback to know which part of my std::vector to increment). Here I couldn't find the thread ID, since the function GetCurrentThreadId() returns the real ID of the thread (ie., something like 1528, not something between 0..nb_launched_threads).

Finally, I am not sure I understood the concept well : do I really need a single worker and not 10 if I split my std::vector into 10 threads ?

Thanks!

解决方案

You're roughly right up to the last point.

The whole idea about a thread pool is that you don't care how many threads it has. You just throw a lot of work into the thread pool, and let the OS determine how to execute each chunk. So, if you create and submit 10 chunks, the OS may use between 1 and 10 threads from the pool.

You should not care about those thread identities. Don't bother with thread ID's, minimum or maximum number of threads, or stuff like that.

If you don't care about thread identities, then how do you manage what part of the vector to change? Simple. Before creating the threadpool, initialize a counter to zero. In the callback function, call InterlockedIncrement to retrieve and increment the counter. For each submitted work item, you'll get a consecutive integer.

这篇关于Windows API线程池简单示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆