当一个工作线程失败时,如何中止剩余的工作线程? [英] When one worker thread fails, how to abort remaining workers?

查看:131
本文介绍了当一个工作线程失败时,如何中止剩余的工作线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个程序产生多个线程,每个线程执行一个长时间运行的任务。

I have a program which spawns multiple threads, each of which executes a long-running task. The main thread then waits for all worker threads to join, collects results, and exits.

如果其中一个工作者发生错误,我希望剩余的工作者停止工作,并且等待所有工作线程加入,收集结果并退出。优雅,以便主线程可以稍后退出。

If an error occurs in one of the workers, I want the remaining workers to stop gracefully, so that the main thread can exit shortly afterwards.

我的问题是如何最好地做到这一点,当长期运行的任务的实现由库提供

My question is how best to do this, when the implementation of the long-running task is provided by a library whose code I cannot modify.

这是一个简单的系统草图,没有错误处理:

Here is a simple sketch of the system, with no error handling:

void threadFunc()
{
    // Do long-running stuff
}

void mainFunc()
{
    std::vector<std::thread> threads;

    for (int i = 0; i < 3; ++i) {
        threads.push_back(std::thread(&threadFunc));
    }

    for (auto &t : threads) {
        t.join();
    }
}

如果长时间运行的函数执行循环,有权访问代码,那么
执行可以通过检查每次迭代顶部的共享保持运行标志来中止。

If the long-running function executes a loop and I have access to the code, then execution can be aborted simply by checking a shared "keep on running" flag at the top of each iteration.

std::mutex mutex;
bool error;

void threadFunc()
{
    try {
        for (...) {
            {
                std::unique_lock<std::mutex> lock(mutex);
                if (error) {
                    break;
                }
            }
        }
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

现在考虑长运行操作由库提供:

Now consider the case when the long-running operation is provided by a library:

std::mutex mutex;
bool error;

class Task
{
public:
    // Blocks until completion, error, or stop() is called
    void run();

    void stop();
};

void threadFunc(Task &task)
{
    try {
        task.run();
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

在这种情况下,主线程必须处理错误,并在
上调用 stop()仍在运行的任务。因此,它不能像在原始实现中一样等待每个工人
join()

In this case, the main thread has to handle the error, and call stop() on the still-running tasks. As such, it cannot simply wait for each worker to join() as in the original implementation.

我到目前为止使用的方法是在
主线程和每个工人之间共享以下结构:

The approach I have used so far is to share the following structure between the main thread and each worker:

struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;
}

当工作成功完成时,它会减少 计数。如果
捕获到异常,则工人设置错误标志。在这两种情况下,它
然后调用 condVar.notify_one()

When a worker completes successfully, it decrements the running count. If an exception is caught, the worker sets the error flag. In both cases, it then calls condVar.notify_one().

在条件变量上,如果
错误被设置或运行达到零,则唤醒。在唤醒时,如果错误已设置,则主线程
在所有任务上调用 stop()

The main thread then waits on the condition variable, waking up if either error is set or running reaches zero. On waking up, the main thread calls stop() on all tasks if error has been set.

这种方法有效,但我觉得应该有一个更干净的解决方案,使用标准并发库中的一些更高级的基元的

This approach works, but I feel there should be a cleaner solution using some of the higher-level primitives in the standard concurrency library. Can anyone suggest an improved implementation?

以下是我目前解决方案的完整代码:

Here is the complete code for my current solution:

// main.cpp

#include <chrono>
#include <mutex>
#include <thread>
#include <vector>

#include "utils.h"

// Class which encapsulates long-running task, and provides a mechanism for aborting it
class Task
{
public:
    Task(int tidx, bool fail)
    :   tidx(tidx)
    ,   fail(fail)
    ,   m_run(true)
    {

    }

    void run()
    {
        static const int NUM_ITERATIONS = 10;

        for (int iter = 0; iter < NUM_ITERATIONS; ++iter) {
            {
                std::unique_lock<std::mutex> lock(m_mutex);
                if (!m_run) {
                    out() << "thread " << tidx << " aborting";
                    break;
                }
            }

            out() << "thread " << tidx << " iter " << iter;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));

            if (fail) {
                throw std::exception();
            }
        }
    }

    void stop()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_run = false;
    }

    const int tidx;
    const bool fail;

private:
    std::mutex m_mutex;
    bool m_run;
};

// Data shared between all threads
struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;

    SharedData(int count)
    :   error(false)
    ,   running(count)
    {

    }
};

void threadFunc(Task &task, SharedData &shared)
{
    try {
        out() << "thread " << task.tidx << " starting";

        task.run(); // Blocks until task completes or is aborted by main thread

        out() << "thread " << task.tidx << " ended";
    } catch (std::exception &) {
        out() << "thread " << task.tidx << " failed";

        std::unique_lock<std::mutex> lock(shared.mutex);
        shared.error = true;
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);
        --shared.running;
    }

    shared.condVar.notify_one();
}

int main(int argc, char **argv)
{
    static const int NUM_THREADS = 3;

    std::vector<std::unique_ptr<Task>> tasks(NUM_THREADS);
    std::vector<std::thread> threads(NUM_THREADS);

    SharedData shared(NUM_THREADS);

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        const bool fail = (tidx == 1);
        tasks[tidx] = std::make_unique<Task>(tidx, fail);
        threads[tidx] = std::thread(&threadFunc, std::ref(*tasks[tidx]), std::ref(shared));
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);

        // Wake up when either all tasks have completed, or any one has failed
        shared.condVar.wait(lock, [&shared](){
            return shared.error || !shared.running;
        });

        if (shared.error) {
            out() << "error occurred - terminating remaining tasks";
            for (auto &t : tasks) {
                t->stop();
            }
        }
    }

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        out() << "waiting for thread " << tidx << " to join";
        threads[tidx].join();
        out() << "thread " << tidx << " joined";
    }

    out() << "program complete";

    return 0;
}

一些效用函数定义如下:

Some utility functions are defined here:

// utils.h

#include <iostream>
#include <mutex>
#include <thread>

#ifndef UTILS_H
#define UTILS_H

#if __cplusplus <= 201103L
// Backport std::make_unique from C++14
#include <memory>
namespace std {

template<typename T, typename ...Args>
std::unique_ptr<T> make_unique(
            Args&& ...args)
{
    return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}

} // namespace std
#endif // __cplusplus <= 201103L

// Thread-safe wrapper around std::cout
class ThreadSafeStdOut
{
public:
    ThreadSafeStdOut()
    :   m_lock(m_mutex)
    {

    }

    ~ThreadSafeStdOut()
    {
        std::cout << std::endl;
    }

    template <typename T>
    ThreadSafeStdOut &operator<<(const T &obj)
    {
        std::cout << obj;
        return *this;
    }

private:
    static std::mutex m_mutex;
    std::unique_lock<std::mutex> m_lock;
};

std::mutex ThreadSafeStdOut::m_mutex;

// Convenience function for performing thread-safe output
ThreadSafeStdOut out()
{
    return ThreadSafeStdOut();
}

#endif // UTILS_H


推荐答案

我一直在想你的情况,这可能对你有所帮助。你可以尝试做几种不同的方法来实现你的目标。有2-3个可能使用的选项或所有三个的组合。我将至少显示第一个选项,我仍然在学习,并试图掌握模板专业化的概念,以及使用Lambdas。

I've been thinking about your situation for sometime and this maybe of some help to you. You could probably try doing a couple of different methods to achieve you goal. There are 2-3 options that maybe of use or a combination of all three. I will at minimum show the first option for I'm still learning and trying to master the concepts of Template Specializations as well as using Lambdas.


  • 使用经理级

  • 使用模板专业化封装

  • 使用Lambdas。

经理类的伪代码如下所示:

Pseudo code of a Manager Class would look something like this:

class ThreadManager {
private:
    std::unique_ptr<MainThread> mainThread_;
    std::list<std::shared_ptr<WorkerThread> lWorkers_;  // List to hold finished workers
    std::queue<std::shared_ptr<WorkerThread> qWorkers_; // Queue to hold inactive and waiting threads.
    std::map<unsigned, std::shared_ptr<WorkerThread> mThreadIds_; // Map to associate a WorkerThread with an ID value.
    std::map<unsigned, bool> mFinishedThreads_; // A map to keep track of finished and unfinished threads.

    bool threadError_; // Not needed if using exception handling
public:
    explicit ThreadManager( const MainThread& main_thread );

    void shutdownThread( const unsigned& threadId );
    void shutdownAllThreads();

    void addWorker( const WorkerThread& worker_thread );          
    bool isThreadDone( const unsigned& threadId );

    void spawnMainThread() const; // Method to start main thread's work.

    void spawnWorkerThread( unsigned threadId, bool& error );

    bool getThreadError( unsigned& threadID ); // Returns True If Thread Encountered An Error and passes the ID of that thread, 

};

为了演示的目的,我使用bool值来确定一个线程是否失败,当然,如果你喜欢使用异常或无效的无符号值等,你可以用它代替。

Only for demonstration purposes did I use bool value to determine if a thread failed for simplicity of the structure, and of course this can be substituted to your like if you prefer to use exceptions or invalid unsigned values, etc.

现在使用这个类的类会是这个:还要注意,这种类型的类将被视为更好,如果它是一个Singleton类型对象,因为你不会需要超过1个ManagerClass,因为你使用共享指针。

Now to use a class of this sort would be something like this: Also note that a class of this type would be considered better if it was a Singleton type object since you wouldn't want more than 1 ManagerClass since you are working with shared pointers.

SomeClass::SomeClass( ... ) {
    // This class could contain a private static smart pointer of this Manager Class
    // Initialize the smart pointer giving it new memory for the Manager Class and by passing it a pointer of the Main Thread object

   threadManager_ = new ThreadManager( main_thread ); // Wouldn't actually use raw pointers here unless if you had a need to, but just shown for simplicity       
}

SomeClass::addThreads( ... ) {
    for ( unsigned u = 1, u <= threadCount; u++ ) {
         threadManager_->addWorker( some_worker_thread );
    }
}

SomeClass::someFunctionThatSpawnsThreads( ... ) {
    threadManager_->spawnMainThread();

    bool error = false;       
    for ( unsigned u = 1; u <= threadCount; u++ ) {
        threadManager_->spawnWorkerThread( u, error );

        if ( error ) { // This Thread Failed To Start, Shutdown All Threads
            threadManager->shutdownAllThreads();
        }
    }

    // If all threads spawn successfully we can do a while loop here to listen if one fails.
    unsigned threadId;
    while ( threadManager_->getThreadError( threadId ) ) {
         // If the function passed to this while loop returns true and we end up here, it will pass the id value of the failed thread.
         // We can now go through a for loop and stop all active threads.
         for ( unsigned u = threadID + 1; u <= threadCount; u++ ) {
             threadManager_->shutdownThread( u );
         }

         // We have successfully shutdown all threads
         break;
    }
}



我喜欢经理类的设计,他们在其他项目,他们很方便,特别是当工作与包含许多和多个资源,如工作的游戏引擎,有许多资产,如Sprites,纹理,音频文件,地图,游戏项目等的代码基地。使用管理器类有助于跟踪和维护所有资产。这个相同的概念可以应用于管理活动,非活动,等待线程,并且知道如何直观地处理和关闭所有线程正确。我建议使用ExceptionHandler如果你的代码库和库支持异常以及线程安全异常处理,而不是传递和使用bools的错误。还有一个Logger类是好的,它可以写到一个日志文件和或控制台窗口,以显式消息什么函数引发异常,以及什么引起异常,其中日志消息可能如下所示:

I like the design of manager class since I have used them in other projects, and they come in handy quite often especially when working with a code base that contains many and multiple resources such as a working Game Engine that has many assets such as Sprites, Textures, Audio Files, Maps, Game Items etc. Using a Manager Class helps to keep track and maintain all of the assets. This same concept can be applied to "Managing" Active, Inactive, Waiting Threads, and knows how to intuitively handle and shutdown all threads properly. I would recommend using an ExceptionHandler if your code base and libraries support exceptions as well as thread safe exception handling instead of passing and using bools for errors. Also having a Logger class is good to where it can write to a log file and or a console window to give an explicit message of what function the exception was thrown in and what caused the exception where a log message might look like this:

Exception Thrown: someFunctionNamedThis in ThisFile on Line# (x)
    threadID 021342 failed to execute.

这样,您可以查看日志文件并快速找出线程导致异常的原因,而不是使用传递的bool变量。

This way you can look at the log file and find out very quickly what thread is causing the exception, instead of using passed around bool variables.

这篇关于当一个工作线程失败时,如何中止剩余的工作线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆