具有C ++ 11多线程的特征库 [英] Eigen library with C++11 multithreading

查看:72
本文介绍了具有C ++ 11多线程的特征库的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个代码可以利用期望最大化来计算高斯混合模型,以便从给定的输入数据样本中识别出聚类.

I have a code to compute a Gaussian Mixture Model with Expectation Maximization in order to identify the clusters from a given input data sample.

一段代码正在多次试验中重复这种模型的计算无试验(一个独立试验,但使用相同的输入数据),以便最终获得最佳解决方案(一个最大化模型的总似然).这个概念可以推广到许多其他的聚类算法(例如k-means).

A piece of the code is repeating the computation of such model for a number of trials Ntrials (one indepenendet of the other but using the same input data) in order to finally pick up the best solution (the one maximizing the total likelihood from the model). This concept can be generalized to many other clustering algorithms (e.g. k-means).

我想通过使用C ++ 11的多线程并行化必须重复 Ntrials 次的代码部分,以便每个线程将执行一个试验.

I want to parallelize the part of the code that has to be repeated Ntrials times through multi-threading with C++11 such that each thread will execute one trial.

一个代码示例,假设输入(c)为(Ndimensions x Npoints),可以是以下类型:

A code example, assuming an input Eigen::ArrayXXd sample of (Ndimensions x Npoints) can be of the type:

    double bestTotalModelProbability = 0;
    Eigen::ArrayXd clusterIndicesFromSample(Npoints);
    clusterIndicesFromSample.setZero();

    for (int i=0; i < Ntrials; i++)
    {
         totalModelProbability = computeGaussianMixtureModel(sample);


         // Check if this trial is better than the previous one.
         // If so, update the results (cluster index for each point
         // in the sample) and keep them.

         if totalModelProbability > bestTotalModelProbability
         {
             bestTotalModelProbability = totalModelProbability;
             ...
             clusterIndicesFromSample = obtainClusterMembership(sample);
         }
    }

我在其中传递样本的参考值(Eigen :: Ref),而不对两个函数 computeGaussianMixtureModel() obtainClusterMembership()本身进行采样.

where I pass the reference value of sample (Eigen::Ref), and not sample itself to both the functions computeGaussianMixtureModel() and obtainClusterMembership().

我的代码很大程度上基于Eigen数组,我遇到的N维问题可以解释10-100维和500-1000个不同采样点的阶数.我正在寻找一些示例,以使用Eigen数组和C ++ 11的std:thread创建此代码的多线程版本,但找不到任何内容,因此,我甚至在努力编写一些简单的示例来处理Eigen数组

My code is heavily based on Eigen array, and the N-dimensional problems that I take can account for order 10-100 dimensions and 500-1000 different sample points. I am looking for some examples to create a multi-threaded version of this code using Eigen arrays and std:thread of C++11, but could not find anything around and I am struggling with making even some simple examples for manipulation of Eigen arrays.

我什至不确定Eigen是否可以在C ++ 11的std :: thread中使用. 有人甚至可以通过一些简单的示例来帮助我理解合奏吗? 我正在使用clang ++作为Mac OSX中具有6个核心(12个线程)的CPU上的编译器.

I am not even sure Eigen can be used within std::thread in C++11. Can someone help me even with some simple example to understand the synthax? I am using clang++ as compiler in Mac OSX on a CPU with 6 cores (12 threads).

推荐答案

OP的问题引起了我的注意,因为通过多线程获得加速的数字运算是我个人列表中最重要的事情之一.

OP's question attracted my attention because number-crunching with speed-up earned by multi-threading is one of the top todo's on my personal list.

我必须承认,我对Eigen库的经验非常有限. (我曾经使用过将3×3旋转矩阵分解为Euler角的方法,这在Eigen库中通常可以很巧妙地解决.)

I must admit that my experience with the Eigen library is very limited. (I once used the decompose of 3×3 rotation matrices to Euler angles which is very clever solved in a general way in the Eigen library.)

因此,我定义了另一个示例任务,该任务包括对示例数据集中的值进行愚蠢的计数.

Hence, I defined another sample task consisting of a stupid counting of values in a sample data set.

多次执行(使用相同的评估功能):

This is done multiple times (using the same evaluation function):

  1. 单线程(以获取值进行比较)
  2. 在一个额外的线程中启动每个子任务(一种公认的愚蠢方法)
  3. 以交错访问示例数据的方式启动线程
  4. 使用对样本数据的分区访问来启动线程.

test-multi-threading.cc:

#include <cstdint>
#include <cstdlib>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <limits>
#include <thread>
#include <vector>

// a sample function to process a certain amount of data
template <typename T>
size_t countFrequency(
  size_t n, const T data[], const T &begin, const T &end)
{
  size_t result = 0;
  for (size_t i = 0; i < n; ++i) result += data[i] >= begin && data[i] < end;
  return result;
}

typedef std::uint16_t Value;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::microseconds MuSecs;
typedef decltype(std::chrono::duration_cast<MuSecs>(Clock::now() - Clock::now())) Time;

Time duration(const Clock::time_point &t0)
{
  return std::chrono::duration_cast<MuSecs>(Clock::now() - t0);
}

std::vector<Time> makeTest()
{
  const Value SizeGroup = 4, NGroups = 10000, N = SizeGroup * NGroups;
  const size_t NThreads = std::thread::hardware_concurrency();
  // make a test sample
  std::vector<Value> sample(N);
  for (Value &value : sample) value = (Value)rand();
  // prepare result vectors
  std::vector<size_t> results4[4] = {
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0)
  };
  // make test
  std::vector<Time> times{
    [&]() { // single threading
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[0];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment single-threaded
      for (size_t i = 0; i < NGroups; ++i) {
        results[i] = countFrequency(data.size(), data.data(),
          (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
      }
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - stupid aproach
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[1];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value i = 0; i < NGroups;) {
        size_t nT = 0;
        for (; nT < NThreads && i < NGroups; ++nT, ++i) {
          threads[nT] = std::move(std::thread(
            [i, &results, &data, SizeGroup]() {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }));
        }
        for (size_t iT = 0; iT < nT; ++iT) threads[iT].join();
      }
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - interleaved
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[2];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value iT = 0; iT < NThreads; ++iT) {
        threads[iT] = std::move(std::thread(
          [iT, &results, &data, NGroups, SizeGroup, NThreads]() {
            for (Value i = iT; i < NGroups; i += NThreads) {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }
          }));
      }
      for (std::thread &threadI : threads) threadI.join();
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - grouped
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[3];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value iT = 0; iT < NThreads; ++iT) {
        threads[iT] = std::move(std::thread(
          [iT, &results, &data, NGroups, SizeGroup, NThreads]() {
            for (Value i = iT * NGroups / NThreads,
              iN = (iT + 1) * NGroups / NThreads; i < iN; ++i) {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }
          }));
      }
      for (std::thread &threadI : threads) threadI.join();
      // done
      return duration(t0);
    }()
  };
  // check results (must be equal for any kind of computation)
  const unsigned nResults = sizeof results4 / sizeof *results4;
  for (unsigned i = 1; i < nResults; ++i) {
    size_t nErrors = 0;
    for (Value j = 0; j < NGroups; ++j) {
      if (results4[0][j] != results4[i][j]) {
        ++nErrors;
#ifdef _DEBUG
        std::cerr
          << "results4[0][" << j << "]: " << results4[0][j]
          << " != results4[" << i << "][" << j << "]: " << results4[i][j]
          << "!\n";
#endif // _DEBUG
      }
    }
    if (nErrors) std::cerr << nErrors << " errors in results4[" << i << "]!\n";
  }
  // done
  return times;
}

int main()
{
  std::cout << "std::thread::hardware_concurrency(): "
    << std::thread::hardware_concurrency() << '\n';
  // heat up
  std::cout << "Heat up...\n";
  for (unsigned i = 0; i < 3; ++i) makeTest();
  // repeat NTrials times
  const unsigned NTrials = 10;
  std::cout << "Measuring " << NTrials << " runs...\n"
    << "   "
    << " | " << std::setw(10) << "Single"
    << " | " << std::setw(10) << "Multi 1"
    << " | " << std::setw(10) << "Multi 2"
    << " | " << std::setw(10) << "Multi 3"
    << '\n';
  std::vector<double> sumTimes;
  for (unsigned i = 0; i < NTrials; ++i) {
    std::vector<Time> times = makeTest();
    std::cout << std::setw(2) << (i + 1) << ".";
    for (const Time &time : times) {
      std::cout << " | " << std::setw(10) << time.count();
    }
    std::cout << '\n';
    sumTimes.resize(times.size(), 0.0);
    for (size_t j = 0; j < times.size(); ++j) sumTimes[j] += times[j].count();
  }
  std::cout << "Average Values:\n   ";
  for (const double &sumTime : sumTimes) {
    std::cout << " | "
      << std::setw(10) << std::fixed << std::setprecision(1)
      << sumTime / NTrials;
  }
  std::cout << '\n';
  std::cout << "Ratio:\n   ";
  for (const double &sumTime : sumTimes) {
    std::cout << " | "
      << std::setw(10) << std::fixed << std::setprecision(3)
      << sumTime / sumTimes.front();
  }
  std::cout << '\n';
  // done
  return 0;
}

已在Windows 10上的 cygwin64 上进行编译和测试:

Compiled and tested on cygwin64 on Windows 10:

$ g++ --version
g++ (GCC) 7.3.0

$ g++ -std=c++11 -O2 -o test-multi-threading test-multi-threading.cc

$ ./test-multi-threading
std::thread::hardware_concurrency(): 8
Heat up...
Measuring 10 runs...
    |     Single |    Multi 1 |    Multi 2 |    Multi 3
 1. |     384008 |    1052937 |     130662 |     138411
 2. |     386500 |    1103281 |     133030 |     132576
 3. |     382968 |    1078988 |     137123 |     137780
 4. |     395158 |    1120752 |     138731 |     138650
 5. |     385870 |    1105885 |     144825 |     129405
 6. |     366724 |    1071788 |     137684 |     130289
 7. |     352204 |    1104191 |     133675 |     130505
 8. |     331679 |    1072299 |     135476 |     138257
 9. |     373416 |    1053881 |     138467 |     137613
10. |     370872 |    1096424 |     136810 |     147960
Average Values:
    |   372939.9 |  1086042.6 |   136648.3 |   136144.6
Ratio:
    |      1.000 |      2.912 |      0.366 |      0.365

我在coliru.com上做了同样的事情. (由于超出了原始值的时间限制,我不得不减少加热周期和减少样本量.):

I did the same on coliru.com. (I had to reduce the heat up cycles and the sample size as I exceeded the time limit with the original values.):

g++ (GCC) 8.1.0
Copyright (C) 2018 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

std::thread::hardware_concurrency(): 4
Heat up...
Measuring 10 runs...
    |     Single |    Multi 1 |    Multi 2 |    Multi 3
 1. |     224684 |     297729 |      48334 |      39016
 2. |     146232 |     337222 |      66308 |      59994
 3. |     195750 |     344056 |      61383 |      63172
 4. |     198629 |     317719 |      62695 |      50413
 5. |     149125 |     356471 |      61447 |      57487
 6. |     155355 |     322185 |      50254 |      35214
 7. |     140269 |     316224 |      61482 |      53889
 8. |     154454 |     334814 |      58382 |      53796
 9. |     177426 |     340723 |      62195 |      54352
10. |     151951 |     331772 |      61802 |      46727
Average Values:
    |   169387.5 |   329891.5 |    59428.2 |    51406.0
Ratio:
    |      1.000 |      1.948 |      0.351 |      0.303

coliru上的实时演示

我有点想知道coliru(只有4个线程)上的比率甚至比我的PC(只有8个线程)上的比率还要好.其实,我不知道该怎么解释. 但是,这两种设置之间可能还有其他许多不同之处,可能会也可能不会造成责任.至少,两次测量均显示3 rd 和4 th 方法的粗略提升为3,其中2 nd 唯一地消耗了每个潜在速度-up(可能是通过启动并加入所有这些线程).

I wonder a little bit that the ratios on coliru (with only 4 threads) are even better than on my PC with (with 8 threads). Actually, I don't know how to explain this. However, there are a lot of other differences in the two setups which may or may not be responsible. At least, both measurements show a rough speed-up of 3 for 3rd and 4th approach where the 2nd consumes uniquely every potential speed-up (probably by starting and joining all these threads).

查看示例代码,您会发现没有互斥锁或任何其他显式锁定.这是故意的.据我了解(很多年前),每次并行化尝试都可能导致一定数量的额外通信开销(对于必须交换数据的并发任务).如果通信开销变大,它只会消耗并发的速度优势.因此,可以通过以下方式获得最佳的加速效果:

Looking at the sample code, you will recognize that there is no mutex or any other explicit locking. This is intentionally. As I've learned (many, many years ago), every attempt for parallelization may cause a certain extra amount of communication overhead (for concurrent tasks which have to exchange data). If communication overhead becomes to big, it simply consumes the speed advantage of concurrency. So, best speed-up can be achieved by:

  • 最少的通信开销,即并发任务对独立数据进行操作
  • 省力地将后合并的计算结果进行后合并.

在我的示例代码中,我

  1. 在启动线程之前准备好每个数据和存储,
  2. 读取的共享数据在线程运行时永远不会改变
  3. 写入的数据是线程本地的(没有两个线程写入同一数据地址)
  4. 在所有线程都加入后评估计算结果.

关于3.对于是否合法,我有些挣扎,即是否允许在线程中写入的数据在加入后正确显示在主线程中. (似乎可以正常工作的事实通常是虚幻的,但在多线程方面尤为虚幻.)

Concerning 3. I struggled a bit whether this is legal or not i.e. is it granted for data which is written in threads to appear correctly in main thread after joining. (The fact that something seems to work fine is illusive in general but especially illusive concerning multi-threading.)

cppreference.com提供以下说明

cppreference.com provides the following explanations

构造函数的调用完成(如 std::memory_order )开始在新的执行线程上调用 f 的副本.

The completion of the invocation of the constructor synchronizes-with (as defined in std::memory_order) the beginning of the invocation of the copy of f on the new thread of execution.

  • 用于 std::thread::join()

  • for std::thread::join()

    *this 标识的线程的完成与 join()的相应成功返回同步.

    The completion of the thread identified by *this synchronizes with the corresponding successful return from join().

  • 在堆栈溢出中,我发现了以下相关的Q/A:

    In Stack Overflow, I found the following related Q/A's:

    • Does relaxed memory order effect can be extended to after performing-thread's life?
    • Are memory fences required here?
    • Is there an implicit memory barrier with synchronized-with relationship on thread::join?

    说服我,没关系.

    但是,缺点是

    • 创建和连接线程会导致额外的工作量(而且并不便宜).

    另一种方法可能是使用线程池来解决此问题.我在Google上搜索了一下,例如 Jakob Progsch在github上的ThreadPool .但是,我猜想,有了线程池,锁定问题又回到了游戏中".

    An alternative approach could be the usage of a thread pool to overcome this. I googled a bit and found e.g. Jakob Progsch's ThreadPool on github. However, I guess, with a thread pool the locking issue is back “in the game”.

    这是否也适用于本征函数,取决于响应方式.本征函数已实现.如果可以访问全局变量(在同时调用同一函数时共享),这将导致数据争用.

    Whether this will work for Eigen functions as well, depends on how the resp. Eigen functions are implemented. If there are accesses to global variables in them (which become shared when the same function is called concurrently), this will cause a data race.

    谷歌搜索了一下,我发现了以下文档.

    Googling a bit, I found the following doc.

    本征和多线程–在多线程应用程序中使用Eigen :

    如果您自己的应用程序是多线程的,并且有多个线程调用 Eigen ,那么您必须在创建线程之前调用以下例程来初始化 Eigen .

    In the case your own application is multithreaded, and multiple threads make calls to Eigen, then you have to initialize Eigen by calling the following routine before creating the threads:

    #include <Eigen/Core>
    int main(int argc, char** argv)
    {
      Eigen::initParallel();
      ...
    }

    注意

    使用 Eigen 3.3和完全符合C ++ 11的编译器(即,线程安全的静态局部变量初始化),然后调用是可选的.

    With Eigen 3.3, and a fully C++11 compliant compiler (i.e., thread-safe static local variable initialization), then calling initParallel() is optional.

    警告

    请注意,所有生成随机矩阵的函数都是不是可重入的,也不是线程安全的.其中包括 DenseBase :: Random()

    note that all functions generating random matrices are not re-entrant nor thread-safe. Those include DenseBase::Random(), and DenseBase::setRandom() despite a call to Eigen::initParallel(). This is because these functions are based on std::rand which is not re-entrant. For thread-safe random generator, we recommend the use of boost::random or c++11 random feature.

    这篇关于具有C ++ 11多线程的特征库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆