为什么std :: condition_variable使调度不公平? [英] Why does std::condition_variable make scheduling unfair?

查看:210
本文介绍了为什么std :: condition_variable使调度不公平?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个简单的pool对象,我想要更少或者更少公平地分配访问一组共享资源给任何线程请求它。在windows中,我通常有一个Mutexes数组,并做一个WaitForMultipleObjects,bWaitAll = FALSE(见下面的windows_pool_of_n_t)。但我希望有一天能够将其移植到其他操作系统,所以我想坚持标准。资源deque,在size()!= 0上有一个condition_variable看起来像是明显的解决方案(见下面的pool_of_n_t)。

I'm trying to create a simple pool object, which I would like to more-or-less fairly allocate access to a set of shared resources to any threads that ask for it. In windows, I would typically have an array of Mutexes and do a WaitForMultipleObjects, with bWaitAll=FALSE (see windows_pool_of_n_t below). But I'm hoping to someday be able to port this to other OSes, so I'd like to stick with the standard. A deque of resources, with a condition_variable on size()!=0 seemed like the obvious solution (see pool_of_n_t below).

但是因为我不明白,该代码序列化线程访问。我不期望严格的公平,但这是几乎是最糟糕的可能情况 - 上次锁的线程总是获得锁下次。这不是std :: mutex不符合Windows更多或更少公平的调度,因为使用只有一个互斥体没有条件变量工作原理,虽然只有一个池,当然(见下面的pool_of_one_t)。

But for reasons I don't understand, that code serializes thread access. I'm not expecting strict fairness, but this is pretty much the worst possible case - the thread that had the lock last time always gets the lock the next time. It's not that std::mutex doesn't conform to Windows more-or-less fair scheduling, since using just a mutex without the condition variable works as expected, although only for a pool of one, of course (see pool_of_one_t below).

任何人都可以解释一下吗?有这样吗?

Can anyone explain this? Is there a way around this?

结果:

C:\temp\stdpool>bin\stdpool.exe
pool:pool_of_one_t
thread 0:19826 ms
thread 1:19846 ms
thread 2:19866 ms
thread 3:19886 ms
thread 4:19906 ms
thread 5:19926 ms
thread 6:19946 ms
thread 7:19965 ms
thread 8:19985 ms
thread 9:20004 ms
pool:windows_pool_of_n_t(1)
thread 0:19819 ms
thread 1:19838 ms
thread 2:19858 ms
thread 3:19878 ms
thread 4:19898 ms
thread 5:19918 ms
thread 6:19938 ms
thread 7:19958 ms
thread 8:19978 ms
thread 9:19997 ms
pool:pool_of_n_t(1)
thread 9:3637 ms
thread 0:4538 ms
thread 6:7558 ms
thread 4:9779 ms
thread 8:9997 ms
thread 2:13058 ms
thread 1:13997 ms
thread 3:17076 ms
thread 5:17995 ms
thread 7:19994 ms
pool:windows_pool_of_n_t(2)
thread 1:9919 ms
thread 0:9919 ms
thread 2:9939 ms
thread 3:9939 ms
thread 5:9958 ms
thread 4:9959 ms
thread 6:9978 ms
thread 7:9978 ms
thread 9:9997 ms
thread 8:9997 ms
pool:pool_of_n_t(2)
thread 2:6019 ms
thread 0:7882 ms
thread 4:8102 ms
thread 5:8182 ms
thread 1:8382 ms
thread 8:8742 ms
thread 7:9162 ms
thread 9:9641 ms
thread 3:9802 ms
thread 6:10201 ms
pool:windows_pool_of_n_t(5)
thread 4:3978 ms
thread 3:3978 ms
thread 2:3979 ms
thread 0:3980 ms
thread 1:3980 ms
thread 9:3997 ms
thread 7:3999 ms
thread 6:3999 ms
thread 5:4000 ms
thread 8:4001 ms
pool:pool_of_n_t(5)
thread 2:3080 ms
thread 0:3498 ms
thread 8:3697 ms
thread 3:3699 ms
thread 6:3797 ms
thread 7:3857 ms
thread 1:3978 ms
thread 4:4039 ms
thread 9:4057 ms
thread 5:4059 ms

代码:

#include <iostream>
#include <deque>
#include <vector>
#include <mutex>
#include <thread>
#include <sstream>
#include <chrono>
#include <iomanip>
#include <cassert>
#include <condition_variable>
#include <windows.h>

using namespace std;

class pool_t {
    public:
        virtual void check_in(int size) = 0;
        virtual int check_out() = 0;
        virtual string pool_name() = 0;
};

class pool_of_one_t : public pool_t {
    mutex lock;

public:
    virtual void check_in(int resource) {
        lock.unlock();
    }

    virtual int check_out() {
        lock.lock();
        return 0;
    }

    virtual string pool_name() {
        return "pool_of_one_t";
    }

};


class windows_pool_of_n_t : public pool_t {
    vector<HANDLE> resources;

public:
    windows_pool_of_n_t(int size) {
        for (int i=0; i < size; ++i)
            resources.push_back(CreateMutex(NULL, FALSE, NULL));
    }

    ~windows_pool_of_n_t() {
        for (auto resource : resources)
            CloseHandle(resource);
    }

    virtual void check_in(int resource) {
        ReleaseMutex(resources[resource]);
    }

    virtual int check_out() {
        DWORD result = WaitForMultipleObjects(resources.size(),
                resources.data(), FALSE, INFINITE);
        assert(result >= WAIT_OBJECT_0 
                && result < WAIT_OBJECT_0+resources.size());

        return result - WAIT_OBJECT_0;
    }

    virtual string pool_name() {
        ostringstream name;
        name << "windows_pool_of_n_t(" << resources.size() << ")";
        return name.str();
    }
};

class pool_of_n_t : public pool_t {
    deque<int> resources;
    mutex lock;
    condition_variable not_empty;

public:
    pool_of_n_t(int size) {
        for (int i=0; i < size; ++i)
            check_in(i);
    }

    virtual void check_in(int resource) {
        unique_lock<mutex> resources_guard(lock);
        resources.push_back(resource);
        resources_guard.unlock();
        not_empty.notify_one();
    }

    virtual int check_out() {
        unique_lock<mutex> resources_guard(lock);
        not_empty.wait(resources_guard,
                [this](){return resources.size() > 0;});
        auto resource = resources.front();
        resources.pop_front();
        bool notify_others = resources.size() > 0;
        resources_guard.unlock();
        if (notify_others)
            not_empty.notify_one();

        return resource;
    }

    virtual string pool_name() {
        ostringstream name;
        name << "pool_of_n_t(" << resources.size() << ")";
        return name.str();
    }
};


void worker_thread(int id, pool_t& resource_pool)
{
    auto start_time = chrono::system_clock::now();
    for (int i=0; i < 100; ++i) {
        auto resource = resource_pool.check_out();
        this_thread::sleep_for(chrono::milliseconds(20));
        resource_pool.check_in(resource);
        this_thread::yield();
    }

    static mutex cout_lock;
    {
        unique_lock<mutex> cout_guard(cout_lock);
        cout << "thread " << id << ":"
            << chrono::duration_cast<chrono::milliseconds>(
                    chrono::system_clock::now() - start_time).count()
            << " ms" << endl;
    }
}

void test_it(pool_t& resource_pool)
{
    cout << "pool:" << resource_pool.pool_name() << endl;
    vector<thread> threads;
    for (int i=0; i < 10; ++i)
        threads.push_back(thread(worker_thread, i, ref(resource_pool)));
    for (auto& thread : threads)
        thread.join();

}

int main(int argc, char* argv[])
{
    test_it(pool_of_one_t());
    test_it(windows_pool_of_n_t(1));
    test_it(pool_of_n_t(1));
    test_it(windows_pool_of_n_t(2));
    test_it(pool_of_n_t(2));
    test_it(windows_pool_of_n_t(5));
    test_it(pool_of_n_t(5));

    return 0;
}


推荐答案

我做了你的测试 pool:pool_of_n_t(2)在Linux上看到

I did your test pool:pool_of_n_t(2) on Linux and see the problem in

this_thread::yield();

查看测试池的结果:pool_of_n_t(2):

See results on my comp for the test pool:pool_of_n_t(2):

1)this_thread :: yield():

1) this_thread::yield():

$./a.out                                                                       
pool:pool_of_n_t(2)
thread 0, run for:2053 ms
thread 9, run for:3721 ms
thread 5, run for:4830 ms
thread 6, run for:6854 ms
thread 3, run for:8229 ms
thread 4, run for:8353 ms
thread 7, run for:9441 ms
thread 2, run for:9482 ms
thread 1, run for:10127 ms
thread 8, run for:10426 ms


2)当我替换时,同样的测试this_thread :: yield() pthread_yield()

$ ./a.out                                                               
pool:pool_of_n_t(2)
thread 0, run for:7922 ms
thread 3, run for:8853 ms
thread 4, run for:8854 ms
thread 1, run for:9077 ms
thread 5, run for:9364 ms
thread 9, run for:9446 ms
thread 7, run for:9594 ms
thread 2, run for:9615 ms
thread 8, run for:10170 ms
thread 6, run for:10416 ms

这是更公平的。你假设this_thread :: yield()确实是CPU给另一个线程,但它不是给它。

It is much fairer. You assume that this_thread::yield() gives indeed CPU to another thread but it is not giving it.

这是针对gcc 4.8的this_thread :: yield的disas:

This is disas for this_thread::yield for gcc 4.8:

(gdb) disassembl this_thread::yield
Dump of assembler code for function std::this_thread::yield():
   0x0000000000401fb2 <+0>: push   %rbp
   0x0000000000401fb3 <+1>: mov    %rsp,%rbp
   0x0000000000401fb6 <+4>: pop    %rbp
   0x0000000000401fb7 <+5>: retq   
End of assembler dump.

我没有看到任何重新排期

I don't see any rescheduling

这是pthread_yield的错误:

And this is disas for pthread_yield:

(gdb) disassemble pthread_yield
Dump of assembler code for function pthread_yield:
   0x0000003149c084c0 <+0>: jmpq   0x3149c05448 <sched_yield@plt>
End of assembler dump.
(gdb) disassemble sched_yield
Dump of assembler code for function sched_yield:
   0x00000031498cf520 <+0>: mov    $0x18,%eax
   0x00000031498cf525 <+5>: syscall 
   0x00000031498cf527 <+7>: cmp    $0xfffffffffffff001,%rax
   0x00000031498cf52d <+13>:    jae    0x31498cf530 <sched_yield+16>
   0x00000031498cf52f <+15>:    retq   
   0x00000031498cf530 <+16>:    mov    0x2bea71(%rip),%rcx        # 0x3149b8dfa8
   0x00000031498cf537 <+23>:    xor    %edx,%edx
   0x00000031498cf539 <+25>:    sub    %rax,%rdx
   0x00000031498cf53c <+28>:    mov    %edx,%fs:(%rcx)
   0x00000031498cf53f <+31>:    or     $0xffffffffffffffff,%rax
   0x00000031498cf543 <+35>:    jmp    0x31498cf52f <sched_yield+15>
End of assembler dump.

这篇关于为什么std :: condition_variable使调度不公平?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆