IO完成端口:单独的线程池以处理出队数据包? [英] IO Completion ports: separate thread pool to process the dequeued packets?

查看:107
本文介绍了IO完成端口:单独的线程池以处理出队数据包?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

注意:我已经为此添加了C ++标记,因为a)代码是C ++,并且b)使用C ++的人可能已经使用了IO完成端口.所以请不要大喊.

NOTE: I have added the C++ tag to this because a) the code is C++ and b) people using C++ may well have used IO completion ports. So please don't shout.


我正在使用IO完成端口,并最终完全理解(并经过测试证明)-两者均在 RbMm 的帮助下进行-CreateIoCompletionPort()NumberOfConcurrentThreads参数的含义.


I am playing with IO completion ports, and have eventually fully understood (and tested, to prove) - both with help from RbMm - the meaning of the NumberOfConcurrentThreads parameter within CreateIoCompletionPort().

我有下面的小程序,该程序创建10个线程,所有线程都在完成端口上等待.我告诉完成端口只允许一次运行4个线程(我有四个CPU).然后,我将8个数据包排入端口.如果我的线程函数使ID> 4的数据包出队,则会输出一条消息.为了输出此消息,我必须停止当前运行的四个线程中的至少一个,这是在控制台上输入"1"时发生的.

I have the following small program which creates 10 threads all waiting on the completion port. I tell my completion port to only allow 4 threads to be runnable at once (I have four CPUs). I then enqueue 8 packets to the port. My thread function outputs a message if it dequeues a packet with an ID > 4; in order for this message to be output, I have to stop at least one of the four currently running threads, which happens when I enter '1' at the console.

现在,这都是相当简单的代码.但是,我有一个大问题,那就是如果所有正在处理完成数据包的线程陷入瘫痪,这将意味着不再有更多数据包可以出队和处理. 这就是我在无限循环中所模拟的情况-在控制台输入"1"之前,没有更多数据包出队这一事实凸显了这一潜在问题!

Now this is all fairly simple code. I have one big concern however, and that is that if all of the threads that are processing a completion packet get bogged down, it will mean no more packets can be dequeued and processed. That is what I am simulating with my infinite loop - the fact that no more packets are dequeued until I enter '1' at the console highlights this potential problem!

更好的解决方案是不要让我的四个线程使数据包出队(或与CPU一样多的线程),然后在使一个线程出队时,将该数据包的处理从单独的池中分配给工作线程. /em>,从而消除了IOCP中所有线程陷入阻塞从而不再有数据包出队的风险?

Would a better solution not be to have my four threads dequeuing packets (or as many threads as CPUs), then when one is dequeued, farm the processing of that packet off to a worker thread from a separate pool, thereby removing the risk of all threads in the IOCP being bogged down thus no more packets being dequeued?

我以 all 的身份询问此示例,我看到的IO完成端口代码示例使用的方法与下面显示的方法类似,不是使用单独的线程池,提出.这就是让我认为缺少某些东西的原因,因为我的人数太多了!

I ask this as all the examples of IO completion port code I have seen use a method similar to what I show below, not using a separate thread pool which I propose. This is what makes me think that I am missing something because I am outnumbered!

注意:这是一个有些人为的示例,因为如果其中一个可运行线程之一进入等待状态,Windows将允许使另一个数据包出队.我在代码中用cout注释掉的代码显示了这一点:

Note: this is a somewhat contrived example, because Windows will allow an additional packet to be dequeued if one of the runnable threads enters a wait state; I show this in my code with a commented out cout call:

系统还允许线程在GetQueuedCompletionStatus中等待 如果另一个正在运行的线程关联,则处理完成包 具有相同I/O完成端口的端口进入等待状态 原因,例如SuspendThread函数.当线程进入 等待状态再次开始运行,可能会短暂 活动线程数超过了并发值. 但是, 系统通过不允许任何新的活动状态来快速减少此数目 线程,直到活动线程数降至并发以下 值.

The system also allows a thread waiting in GetQueuedCompletionStatus to process a completion packet if another running thread associated with the same I/O completion port enters a wait state for other reasons, for example the SuspendThread function. When the thread in the wait state begins running again, there may be a brief period when the number of active threads exceeds the concurrency value. However, the system quickly reduces this number by not allowing any new active threads until the number of active threads falls below the concurrency value.

但是我不会在线程函数中调用SuspendThread,并且我不知道cout以外的哪个函数会导致线程进入等待状态我无法预测我的一个或多个线程是否会陷入困境!因此,我对线程池的想法是:至少上下文切换将意味着其他数据包有机会出队!

But I won't be calling SuspendThread in my thread functions, and I don't know which functions other than cout will cause the thread to enter a wait state, thus I can't predict if one or more of my threads will ever get bogged down! Hence my idea of a thread pool; at least context switching would mean that other packets get a chance to be dequeued!

#define _CRT_SECURE_NO_WARNINGS
#include <windows.h>
#include <thread>
#include <vector>
#include <algorithm>
#include <atomic>
#include <ctime>
#include <iostream>

using namespace std;

int main()
{
    HANDLE hCompletionPort1;
    if ((hCompletionPort1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4)) == NULL)
    {
        return -1;
    }
    vector<thread> vecAllThreads;
    atomic_bool bStop(false);

    // Fill our vector with 10 threads, each of which waits on our IOCP.
    generate_n(back_inserter(vecAllThreads), 10, [hCompletionPort1, &bStop] {
        thread t([hCompletionPort1, &bStop]()
        {
            // Thread body
            while (true)
            {
                DWORD dwBytes = 0;
                LPOVERLAPPED pOverlapped = 0;
                ULONG_PTR uKey;
                if (::GetQueuedCompletionStatus(hCompletionPort1, &dwBytes, &uKey, &pOverlapped, INFINITE) == 1)
                {
                    if (dwBytes == 0 && uKey == 0 && pOverlapped == 0)
                        break;  // Special completion packet; end processing.

                    //cout << uKey; // EVEN THIS WILL CAUSE A "wait" which causes MORE THAN 4 THREADS TO ENTER!

                    if (uKey >4) 
                        cout << "Started processing packet ID > 4!" << endl;
                    while (!bStop)
                        ;   // INFINITE LOOP
                }
            }
        });
        return move(t);
    }
    );

    // Queue 8 completion packets to our IOCP...only four will be processed until we set our bool
    for (int i = 1; i <= 8; ++i)
    {
        PostQueuedCompletionStatus(hCompletionPort1, 0, i, new OVERLAPPED);
    }

    while (!bStop)
    {
        int nVal;
        cout << "Enter 1 to cause current processing threads to end: ";
        cin >> nVal;
        bStop = (nVal == 1);
    }
    for (int i = 0; i < 10; ++i)    // Tell all 10 threads to stop processing on the IOCP
    {
        PostQueuedCompletionStatus(hCompletionPort1, 0, 0, 0);  // Special packet marking end of IOCP usage
    }
    for_each(begin(vecAllThreads), end(vecAllThreads), mem_fn(&thread::join));

    return 0;
}


编辑#1

独立线程池"的含义如下:

What I mean by "separate thread pool" is something like the following:

class myThread {
public:
    void SetTask(LPOVERLAPPED pO) { /* start processing pO*/ }
private:
    thread m_thread;    // Actual thread object
};

// The threads in this thread pool are not associated with the IOCP in any way whatsoever; they exist
// purely to be handed a completion packet which they then process!
class ThreadPool
{
public:
    void Initialise() { /* create 100 worker threads and add them to some internal storage*/}
    myThread& GetNextFreeThread() { /* return one of the 100 worker thread we created*/}
} g_threadPool;

我与IOCP关联的四个线程中的每个线程随后更改为

The code that each of my four threads associated with the IOCP then change to

if (::GetQueuedCompletionStatus(hCompletionPort1, &dwBytes, &uKey, &pOverlapped, INFINITE) == 1)
{
    if (dwBytes == 0 && uKey == 0 && pOverlapped == 0)
        break;  // Special completion packet; end processing.

    // Pick a new thread from a pool of pre-created threads and assign it the packet to process
    myThread& thr = g_threadPool.GetNextFreeThread();
    thr.SetTask(pOverlapped);

    // Now, this thread can immediately return to the IOCP; it doesn't matter if the
    // packet we dequeued would take forever to process; that is happening in the 
    // separate thread thr *that will not intefere with packets being dequeued from IOCP!*
}

这样,我不可能在没有更多数据包出队的情况下结束!

This way, there is no possible way that I can end up in the situation where no more packets are being dequeued!

推荐答案

对于是否应使用单独的线程池,似乎存在分歧.显然,正如我发布的示例代码所示,如​​果数据包的处理未进入等待状态,则有可能使数据包停止从IOCP出队.给定的是,无限循环也许是不现实的,但它确实说明了这一点.

It seems there is conflicting opinion on whether a separate thread pool should be used. Clearly, as the sample code I have posted shows, there is potential for packets to stop being dequeued from the IOCP if the processing of the packets does not enter a wait state; given, the infinite loop is perhaps unrealistic but it does demonstrate the point.

这篇关于IO完成端口:单独的线程池以处理出队数据包?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆