启动多个线程并重新启动它们 [英] Launching multiple threads and restarting them

查看:120
本文介绍了启动多个线程并重新启动它们的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图编写一个系统,其中我创建了x个工作线程。这些线程将在不同的时间完成他们的工作。当任何一个完成他们的工作,我将检查他们的输出,并重新启动它们(保持线程周围的x)。我将这样做一些有趣的迭代。因此,基本上一个控制器线程将启动x个线程,并且将在它们完成其工作时重新启动它们,直到达到一定数量的迭代。

I'm trying to program a system where I create x amount of worker threads. These threads will finish their work in different times. When any of them complete their work, I will examine their output and restart them again (Keeping the running number of threads around x). I will do this for a number of aribitary iterations. So, basically a controller thread will launch x amount of threads, and will restart them when they finish their work, until some number of iterations are reached.

附加说明#1:当我说重新启动时,等待直到当前一个退出/中止并被销毁,并创建一个新的。它不必重新启动同一个线程。我最感兴趣的是以干净的异步方式做这件事。

Additional Note #1 : When I said restart, it's perfectly fine to wait until the current one exits/aborts and gets destroyed, and create a new one. It doesn't have to "restart" the same thread. I'm mostly interested in doing this in a clean async way.

注意:我不是在寻找任何特定的代码,但一些可能的伪代码和设计模式使用插槽和信号。

Note: I'm not looking for any specific code, but some possible pseudo code and a design pattern making use of slots and signals.

我知道qt线程和工作他们。我熟悉的例子,你启动x线程数量,并等待,直到他们完成使用yield,并等待。我正在寻找一个干净的方式来实现我在第一段使用信号和插槽描述。

I'm aware of qt threads and have worked them. I'm familiar with examples where you launch an x amount of threads and wait until all of them finish using yield, and wait. I'm looking for a clean way to achieve what I described in the first paragraph using signals and slots.

推荐答案

code> QtConcurrent :: run()或 QThreadPool :: start() Concurrent框架在内部使用线程池,因此它们是完全等价的:前者是后者的方便包装器。默认线程池最好留给短时间运行的任务;运行长任务,使用自己的线程池。您将其传递给 QtConcurrent :: run()作为第一个参数。

That's what QtConcurrent::run() or QThreadPool::start() are for. The Concurrent framework uses a thread pool internally, so they're quite equivalent: the former is a convenience wrapper for the latter. The default thread pool is best left for short-running tasks; to run long tasks, use your own thread pool. You'd pass it to QtConcurrent::run() as the first argument.

QThreadPool 维护工作项的队列,将它们分派到线程,并动态创建和销毁工作线程。这是一个很棒的课,你不必重新实现自己。

The QThreadPool maintains a queue of work items, dispatches them to threads, and dynamically creates and destroys worker threads. It's a wonderful class that you don't have to reimplement yourself.

如果你没有太多的工作单元,可以提供所有的前期,只需使用 QtConcurrent :: run() QThreadPool :: start()提前排队。

If you don't have too many units of work and can furnish them all upfront, simply use QtConcurrent::run() or QThreadPool::start() to queue them all ahead of time. They can emit a signal from a helper object to notify you as each of them finishes.

如果工作单元过于昂贵而无法一次性创建,那么您可以从帮助对象发出一个信号来通知您。必须在线程池之上实现通知工作队列。

If the units of work are too expensive to create all at once, you'll have to implement a notifying work queue on top of a thread pool.

工作单元需要通知队列及其用户已完成。这可以例如。通过重新实现 QRunnable 作为 WorkUnit 的基础,将工作转发到抽象方法,抽象方法已经完成。同样的方法适用于 QtConcurrent :: run ,除了不是重新实现 QRunnable :: run code>运算符()()。

The unit of work needs to notify the queue and its users that it has finished. This can be done e.g. by reimplementing the QRunnable as a base for a WorkUnit, forwarding the work to an abstract method, and notifying the queue when the abstract method has finished. The same approach works for QtConcurrent::run, except that instead of reimplementing QRunnable::run you implement a functor's operator()().

队列将发出 workUnitDone 信号。

为了方便起见,队列可以请求一个通过发出 workUnitDone(nullptr),初始工作项的数量。如果您在每次完成前一个项目时都准确填充一个项目,则队列将保留工作项目的初始数量。

For convenience, the queue can request a number of initial work items, by emitting workUnitDone(nullptr). If you replenish exactly one item every time a previous one has finished, the queue will maintain the initial number of work items.

如果项目需要很短的时间要处理,你应该有更多的可用比线程数,所以没有线程将空闲没有工作。对于通常需要很长时间(几十毫秒或更长)的项目,足以拥有 QThread :: idealThreadCount 的1.5-2倍。

If the items take a very short amount of time to process, you should have many more available than the number of threads, so that no threads will be idling without work. For items that mostly take long time (tens of milliseconds or more), it's sufficient to have 1.5-2 times the QThread::idealThreadCount.

添加到队列中的工作单元可以是 WorkUnit 的实例,或函数。

The work units added to the queue can be instances of WorkUnit, or functors.

// https://github.com/KubaO/stackoverflown/tree/master/questions/notified-workqueue-38000605
#include <QtCore>

class WorkUnit;
class WorkQueue : public QObject {
   Q_OBJECT
   friend class WorkUnit;
   QThreadPool m_pool{this};
   union alignas(64) { // keep it in its own cache line
      QAtomicInt queuedUnits{0};
      char filler[64];
   } d;
   void isDone(WorkUnit * unit) {
      auto queued = d.queuedUnits.deref();
      emit workUnitDone(unit);
      if (!queued) emit finished();
   }
public:
   explicit WorkQueue(int initialUnits = 0) {
      if (initialUnits)
         QTimer::singleShot(0, [=]{
            for (int i = 0; i < initialUnits; ++i)
               emit workUnitDone(nullptr);
         });
   }
   Q_SLOT void addWork(WorkUnit * unit);
   template <typename F> void addFunctor(F && functor);
   Q_SIGNAL void workUnitDone(WorkUnit *);
   Q_SIGNAL void finished();
};

class WorkUnit : public QRunnable {
   friend class WorkQueue;
   WorkQueue * m_queue { nullptr };
   void run() override {
      work();
      m_queue->isDone(this);
   }
protected:
   virtual void work() = 0;
};

template <typename F>
class FunctorUnit : public WorkUnit, private F {
   void work() override { (*this)(); }
public:
   FunctorUnit(F && f) : F(std::move(f)) {}
};

void WorkQueue::addWork(WorkUnit *unit) {
   d.queuedUnits.ref();
   unit->m_queue = this;
   m_pool.start(unit);
}

template <typename F> void WorkQueue::addFunctor(F && functor) {
   addWork(new FunctorUnit<F>{std::forward<F>(functor)});
}

为了演示一下,让我们做50个单位的工作随机时间在1us和1s之间。我们将传递一半的单位为 SleepyWork 实例,另一半作为lambdas。

To demonstrate things, let's do 50 units of "work" of sleeping for a random time between 1us and 1s. We're passing half of the units as SleepyWork instances, and another half as lambdas.

#include <random>

struct SleepyWork : WorkUnit {
   int usecs;
   SleepyWork(int usecs) : usecs(usecs) {}
   void work() override {
      QThread::usleep(usecs);
      qDebug() << "slept" << usecs;
   }
};

int main(int argc, char ** argv) {
   QCoreApplication app{argc, argv};
   std::random_device dev;
   std::default_random_engine eng{dev()};
   std::uniform_int_distribution<int> dist{1, 1000000};
   auto rand_usecs = [&]{ return dist(eng); };

   int workUnits = 50;
   WorkQueue queue{2*QThread::idealThreadCount()};
   QObject::connect(&queue, &WorkQueue::workUnitDone, [&]{
      if (workUnits) {
         if (workUnits % 2) {
            auto us = dist(eng);
            queue.addFunctor([us]{
               QThread::usleep(us);
               qDebug() << "slept" << us;
            });
         } else
            queue.addWork(new SleepyWork{rand_usecs()});
         --workUnits;
      }
   });
   QObject::connect(&queue, &WorkQueue::finished, [&]{
      if (workUnits == 0) app.quit();
   });

   return app.exec();
}

#include "main.moc"

结束示例。

这篇关于启动多个线程并重新启动它们的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆