使用std :: async多次执行小任务对性能友好吗? [英] Is using std::async many times for small tasks performance friendly?

查看:85
本文介绍了使用std :: async多次执行小任务对性能友好吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

要提供一些背景信息,我正在处理一个保存的文件,然后使用正则表达式将该文件拆分成其组成对象,然后需要根据对象的类型来处理该对象的数据.

To give some background information, I am processing a saved file, and after using a regular expression to split the file into it's component objects, I then need to process the object's data based on which type of object it is.

我当前的想法是使用并行性来获得一点性能提升,因为加载每个对象都是彼此独立的.因此,我要定义一个LoadObject函数,为要处理的每种对象接受一个std::string,然后按如下所示调用std::async:

My current thought is to use parallelism to get a little bit of a performance gain as loading each object is independent of each other. So I was going to define a LoadObject function accepting a std::string for each type of object I'm going to be handling and then calling std::async as follows:

void LoadFromFile( const std::string& szFileName )
{
     static const std::regex regexObject( "=== ([^=]+) ===\\n((?:.|\\n)*)\\n=== END \\1 ===", std::regex_constants::ECMAScript | std::regex_constants::optimize );

     std::ifstream inFile( szFileName );
     inFile.exceptions( std::ifstream::failbit | std::ifstream::badbit );

     std::string szFileData( (std::istreambuf_iterator<char>(inFile)), (std::istreambuf_iterator<char>()) );

     inFile.close();

     std::vector<std::future<void>> vecFutures;

     for( std::sregex_iterator itObject( szFileData.cbegin(), szFileData.cend(), regexObject ), end; itObject != end; ++itObject )
     {
          // Determine what type of object we're loading:
          if( (*itObject)[1] == "Type1" )
          {
               vecFutures.emplace_back( std::async( LoadType1, (*itObject)[2].str() ) );
          }
          else if( (*itObject)[1] == "Type2" )
          {
               vecFutures.emplace_back( std::async( LoadType2, (*itObject)[2].str() ) );
          }
          else
          {
               throw std::runtime_error( "Unexpected type encountered whilst reading data file." );
          }
     }

     // Make sure all our tasks completed:
     for( auto& future : vecFutures )
     {
           future.get();
     }
}

请注意,应用程序中将有两种以上的类型(这只是一个简短的示例),并且文件中可能有数千个对象要读取.

Note that there will be more than 2 types in the application (this was just a short example) and potentially thousands of objects in the file to be read.

我知道创建过多的线程通常会因上下文切换而超出最大硬件并发性,这对于性能而言通常是一件坏事,但是如果我的内存正确地为我服务,则C ++运行时应该监视创建的线程数和适当地安排std::async的时间(我相信在Microsoft的情况下,他们的ConcRT库对此负责吗?),因此上面的代码仍可能会导致性能提高吗?

I am aware that creating too many threads is often a bad thing for performance when it exceeds the maximum hardware concurrency due to context switches, but if my memory serves me correctly the C++ runtime is supposed to monitor the number of threads created and schedule std::async appropriately (I believe in Microsoft's case their ConcRT library is responsible for this?), so the above code may still result in a performance improvement?

提前谢谢!

推荐答案

C ++运行时应该监视创建的线程数并适当安排std :: async

the C++ runtime is supposed to monitor the number of threads created and schedule std::async appropriately

不.如果异步任务实际上是异步运行的(而不是延迟的),那么所需要做的就是像在新线程上一样运行它们.在不考虑硬件有限的并行能力的情况下,为每个任务创建和启动新线程是完全有效的.

No. If the asynchronous tasks are in fact run asynchronously (rather than deferred) then all that's required is that they are run as if on a new thread. It is perfectly valid for a new thread to be created and started for every task, without any regard for the hardware's limited capacity for parallelism.

有一个注释:

[注意:如果此策略与其他策略一起指定,例如在使用launch :: async的策略值|发布::延期, 实现应推迟调用或选择策略 当没有更多的并发可以被有效利用时. —尾注]

[ Note: If this policy is specified together with other policies, such as when using a policy value of launch::async | launch::deferred, implementations should defer invocation or the selection of the policy when no more concurrency can be effectively exploited. —end note ]

但是,这是非规范的,在任何情况下,这表明一旦无法利用更多的并发性,任务就可能被推迟,从而在有人等待结果时执行,而不是仍然异步并在执行之后立即运行已完成了先前的异步任务之一,这是实现最大并行度所希望的.

However, this is non-normative and in any case it indicates that once no more concurrency can be exploited the tasks may become deferred, and therefore get executed when someone waits on the result, rather than still being asynchronous and running immediately after one of the previous asynchronous tasks is finished, as would be desirable for maximum parallelism.

也就是说,如果我们有10个长时间运行的任务,而该实现只能并行执行4个任务,则前4个任务将是异步的,而后6个任务可能会被推迟.顺序等待期货可以按顺序在单个线程上执行延迟的任务,从而消除了这些任务的并行执行.

That is, if we have 10 long running tasks and the implementation can only execute 4 in parallel, then the first 4 will be asynchronous and then the last 6 may be deferred. Waiting on the futures in sequence would execute the deferred tasks on a single thread in sequence, eliminating parallel execution for those tasks.

该注释还说,可以推迟对策略的选择,而不是推迟调用.也就是说,该功能可能仍然异步运行,但是该决定可能会延迟,例如,直到较早的任务之一完成,从而为新任务释放了核心.但是同样,这不是必需的,该注释不是规范性的,据我所知,Microsoft的实现是唯一以这种方式运行的实现.当我查看另一个实现libc ++时,它只是完全忽略了此注释,因此使用std::launch::asyncstd::launch::any策略会导致在新线程上异步执行.

The note does also say that instead of deferring invocation, the selection of the policy may be deferred. That is, the function may still run asynchronously but that decision may be delayed, say, until one of the earlier tasks completes, freeing up a core for a new task. But again, this is not required, the note is non-normative, and as far as I know Microsoft's implementation is the only one that behaves this way. When I looked at another implementation, libc++, it simply ignores this note altogether so that using either std::launch::async or std::launch::any policies result in asynchronous execution on a new thread.

(我相信Microsoft的ConcRT库对此负责)

(I believe in Microsoft's case their ConcRT library is responsible for this?)

Microsoft的实现确实按照您所描述的那样运行,但是这不是必需的,并且可移植程序不能依赖该行为.

Microsoft's implementation does indeed behave as you describe, however this is not required and a portable program cannot rely on that behavior.

可移植地限制实际运行的线程数的一种方法是使用诸如信号灯之类的东西:

One way to portably limit how many threads are actually running is to use something like a semaphore:

#include <future>
#include <mutex>
#include <cstdio>

// a semaphore class
//
// All threads can wait on this object. When a waiting thread
// is woken up, it does its work and then notifies another waiting thread.
// In this way only n threads will be be doing work at any time.
// 
class Semaphore {
private:
    std::mutex m;
    std::condition_variable cv;
    unsigned int count;

public:
    Semaphore(int n) : count(n) {}
    void notify() {
        std::unique_lock<std::mutex> l(m);
        ++count;
        cv.notify_one();
    }
    void wait() {
        std::unique_lock<std::mutex> l(m);
        cv.wait(l, [this]{ return count!=0; });
        --count;
    }
};

// an RAII class to handle waiting and notifying the next thread
// Work is done between when the object is created and destroyed
class Semaphore_waiter_notifier {
    Semaphore &s;
public:
    Semaphore_waiter_notifier(Semaphore &s) : s{s} { s.wait(); }
    ~Semaphore_waiter_notifier() { s.notify(); }
};

// some inefficient work for our threads to do
int fib(int n) {
    if (n<2) return n;
    return fib(n-1) + fib(n-2);
}

// for_each algorithm for iterating over a container but also
// making an integer index available.
//
// f is called like f(index, element)
template<typename Container, typename F>
F for_each(Container &c, F f) {
    Container::size_type i = 0;
    for (auto &e : c)
        f(i++, e);
    return f;
}

// global semaphore so that lambdas don't have to capture it
Semaphore thread_limiter(4);

int main() {
    std::vector<int> input(100);
    for_each(input, [](int i, int &e) { e = (i%10) + 35; });

    std::vector<std::future<int>> output;
    for_each(input, [&output](int i, int e) {
        output.push_back(std::async(std::launch::async, [] (int task, int n) -> int {
            Semaphore_waiter_notifier w(thread_limiter);
            std::printf("Starting task %d\n", task);
            int res = fib(n);
            std::printf("\t\t\t\t\t\tTask %d finished\n", task);
            return res;
        }, i, e));
    });

    for_each(output, [](int i, std::future<int> &e) {
        std::printf("\t\t\tWaiting on task %d\n", i);
        int res = e.get();
        std::printf("\t\t\t\t\t\t\t\t\tTask %d result: %d\n", i, res);
    });
}

这篇关于使用std :: async多次执行小任务对性能友好吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆