如何没有加入同步经理/工人pthreads的? [英] How to synchronize manager/worker pthreads without a join?

查看:69
本文介绍了如何没有加入同步经理/工人pthreads的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我熟悉多线程和我已经开发了Java和Objective-C的多线程程序成功。但我不能达到使用pthreads的,而无需使用来自主线程的连接在C以下内容:

I'm familiar with multithreading and I've developed many multithreaded programs in Java and Objective-C successfully. But I couldn't achieve the following in C using pthreads without using a join from the main thread:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM_OF_THREADS 2

struct thread_data {
    int start;
    int end;
    int *arr;
};

void print(int *ints, int n);
void *processArray(void *args);

int main(int argc, const char * argv[])
{
    int numOfInts = 10;
    int *ints = malloc(numOfInts * sizeof(int));
    for (int i = 0; i < numOfInts; i++) {
        ints[i] = i;
    }
    print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    pthread_t threads[NUM_OF_THREADS];
    struct thread_data thread_data[NUM_OF_THREADS];

    // these vars are used to calculate the index ranges for each thread
    int remainingWork = numOfInts, amountOfWork;
    int startRange, endRange = -1;

    for (int i = 0; i < NUM_OF_THREADS; i++) {

        amountOfWork = remainingWork / (NUM_OF_THREADS - i);
        startRange = endRange + 1;
        endRange   = startRange + amountOfWork - 1;

        thread_data[i].arr   = ints;
        thread_data[i].start = startRange;
        thread_data[i].end   = endRange;

        pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);

        remainingWork -= amountOfWork;      
    }

    // 1. Signal to the threads to start working


    // 2. Wait for them to finish


    print(ints, numOfInts); // should print [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    free(ints);
    return 0;
}

void *processArray(void *args)
{
    struct thread_data *data = (struct thread_data *)args;
    int *arr  = data->arr;
    int start = data->start;
    int end   = data->end;

    // 1. Wait for a signal to start from the main thread


    for (int i = start; i <= end; i++) {
        arr[i] = arr[i] + 1;
    }

    // 2. Signal to the main thread that you're done

    pthread_exit(NULL);
}

void print(int *ints, int n)
{
    printf("[");
    for (int i = 0; i < n; i++) {
        printf("%d", ints[i]);
        if (i+1 != n)
            printf(", ");
    }
    printf("]\n");
}

我想实现上述code以下内容:

I would like to achieve the following in the above code:

在main():


  1. 信号的线程来开始工作。

  2. 等待后台线程完成。

在processArray():

In processArray():


  1. 等待一个信号,从主线程启动

  2. 信号到主线程即大功告成

我不希望使用在主线程,因为实际应用,主线程会加盟创建线程一次,然后将信号传送到后台线程的工作很多次,我不能让主线程进行,除非所有的后台线程完成工作。在 processArray 功能,我会把一个无限循环如下:

I don't want to use a join in the main thread because in the real application, the main thread will create the threads once, and then it will signal to the background threads to work many times, and I can't let the main thread proceed unless all the background threads have finished working. In the processArray function, I will put an infinite loop as following:

void *processArray(void *args)
{
    struct thread_data *data = (struct thread_data *)args;

    while (1)
    {
      // 1. Wait for a signal to start from the main thread

      int *arr  = data->arr;
      int start = data->start;
      int end   = data->end;          

      // Process
      for (int i = start; i <= end; i++) {
          arr[i] = arr[i] + 1;
      }

      // 2. Signal to the main thread that you're done

    }

    pthread_exit(NULL);
}

请注意,我是新的C和POSIX API,所以原谅我,如果我失去了一些东西明显。但我真的尽力了很多东西,从使用互斥体,和信号量的数组,而两者的混合物开始,但没有成功。我觉得一个条件变量可能会有帮助,但我不明白它如何被使用。

Note that I'm new to C and the posix API, so excuse me if I'm missing something obvious. But I really tried many things, starting from using a mutex, and an array of semaphores, and a mixture of both, but without success. I think a condition variable may help, but I couldn't understand how it could be used.

感谢您的时间。

问题解决了:

感谢你们这么多!我终于能得到这个安全,不使用按照你的秘诀参加工作。虽然解决方案是有点丑陋,它可以完成的工作,带来的性能提升是值得的(你会看到如下图)。任何有兴趣,这是我工作的实际应用,其中主线程一直给连续工作到后台线程的模拟:

Thank you guys so much! I was finally able to get this to work safely and without using a join by following your tips. Although the solution is somewhat ugly, it gets the job done and the performance gains is worth it (as you'll see below). For anyone interested, this is a simulation of the real application I'm working on, in which the main thread keeps giving work continuously to the background threads:

 #include <stdio.h>
 #include <stdlib.h>
 #include <pthread.h>

 #define NUM_OF_THREADS 5

 struct thread_data {
     int id;
     int start;
     int end;
     int *arr;
 };

 pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  currentlyIdleCond  = PTHREAD_COND_INITIALIZER;
 int currentlyIdle;

 pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  workReadyCond  = PTHREAD_COND_INITIALIZER;
 int workReady;

 pthread_cond_t  currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
 pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
 int currentlyWorking;

 pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  canFinishCond  = PTHREAD_COND_INITIALIZER;
 int canFinish;

 void print(int *ints, int n);
 void *processArray(void *args);
 int validateResult(int *ints, int num, int start);

 int main(int argc, const char * argv[])
 {
     int numOfInts = 10;
     int *ints = malloc(numOfInts * sizeof(int));
     for (int i = 0; i < numOfInts; i++) {
         ints[i] = i;
     }
 //   print(ints, numOfInts);

     pthread_t threads[NUM_OF_THREADS];
     struct thread_data thread_data[NUM_OF_THREADS];
     workReady = 0;
     canFinish = 0;
     currentlyIdle = 0;
     currentlyWorking = 0;

     // these vars are used to calculate the index ranges for each thread
     int remainingWork = numOfInts, amountOfWork;
     int startRange, endRange = -1;
     // Create the threads and give each one its data struct.
     for (int i = 0; i < NUM_OF_THREADS; i++) {

         amountOfWork = remainingWork / (NUM_OF_THREADS - i);
         startRange = endRange + 1;
         endRange   = startRange + amountOfWork - 1;

         thread_data[i].id    = i;
         thread_data[i].arr   = ints;
         thread_data[i].start = startRange;
         thread_data[i].end   = endRange;

         pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
         remainingWork -= amountOfWork;
     }

     int loops = 1111111;
     int expectedStartingValue = ints[0] + loops; // used to validate the results
     // The elements in ints[] should be incremented by 1 in each loop
     while (loops-- != 0) {

         // Make sure all of them are ready
         pthread_mutex_lock(&currentlyIdleMutex);
         while (currentlyIdle != NUM_OF_THREADS) {
             pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex);
         }
         pthread_mutex_unlock(&currentlyIdleMutex);

         // All threads are now blocked; it's safe to not lock the mutex.
         // Prevent them from finishing before authorized.
         canFinish = 0;
         // Reset the number of currentlyWorking threads
         currentlyWorking = NUM_OF_THREADS;

         // Signal to the threads to start
         pthread_mutex_lock(&workReadyMutex);
         workReady = 1;
         pthread_cond_broadcast(&workReadyCond );
         pthread_mutex_unlock(&workReadyMutex);      

         // Wait for them to finish
         pthread_mutex_lock(&currentlyWorkingMutex);
         while (currentlyWorking != 0) {
             pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex);
         }
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // The threads are now waiting for permission to finish
         // Prevent them from starting again
         workReady = 0;
         currentlyIdle = 0;

         // Allow them to finish
         pthread_mutex_lock(&canFinishMutex);
         canFinish = 1;
         pthread_cond_broadcast(&canFinishCond);
         pthread_mutex_unlock(&canFinishMutex);
     }

 //   print(ints, numOfInts);

     if (validateResult(ints, numOfInts, expectedStartingValue)) {
         printf("Result correct.\n");
     }
     else {
         printf("Result invalid.\n");      
     }

     // clean up
     for (int i = 0; i < NUM_OF_THREADS; i++) {
         pthread_cancel(threads[i]);
     }
     free(ints);

     return 0;
 }

 void *processArray(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;

     while (1) {

         // Set yourself as idle and signal to the main thread, when all threads are idle main will start
         pthread_mutex_lock(&currentlyIdleMutex);
         currentlyIdle++;
         pthread_cond_signal(&currentlyIdleCond);
         pthread_mutex_unlock(&currentlyIdleMutex);

         // wait for work from main
         pthread_mutex_lock(&workReadyMutex);
         while (!workReady) {
             pthread_cond_wait(&workReadyCond , &workReadyMutex);
         }
         pthread_mutex_unlock(&workReadyMutex);

         // Do the work
         for (int i = start; i <= end; i++) {
             arr[i] = arr[i] + 1;
         }

         // mark yourself as finished and signal to main
         pthread_mutex_lock(&currentlyWorkingMutex);
         currentlyWorking--;
         pthread_cond_signal(&currentlyWorkingCond);
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // Wait for permission to finish
         pthread_mutex_lock(&canFinishMutex);
         while (!canFinish) {
             pthread_cond_wait(&canFinishCond , &canFinishMutex);
         }
         pthread_mutex_unlock(&canFinishMutex);
     }

     pthread_exit(NULL);
 }

 int validateResult(int *ints, int n, int start)
 {
     int tmp = start;
     for (int i = 0; i < n; i++, tmp++) {
         if (ints[i] != tmp) {
             return 0;
         }
     }
     return 1;
 }

 void print(int *ints, int n)
 {
     printf("[");
     for (int i = 0; i < n; i++) {
         printf("%d", ints[i]);
         if (i+1 != n)
             printf(", ");
     }
     printf("]\n");
 }

我不知道,如果 pthread_cancel可以是虽然够清理!作为屏障,它会一直有很大的帮助,如果它不局限于某些操作系统由 @Jeremy

I'm not sure though if pthread_cancel is enough for clean up! As for the barrier, it would've been of a great help if it wasn't limited to some OSs as mentioned by @Jeremy.

测试:

我想确保这些条件很多实际上并没有放缓的算法,所以我设置这个基准的两个解决方案比较:

I wanted to make sure that these many conditions aren't actually slowing down the algorithm, so I've setup this benchmark to compare the two solutions:

 #include <stdio.h>
 #include <stdlib.h>
 #include <pthread.h>
 #include <unistd.h>
 #include <sys/time.h>
 #include <sys/resource.h>

 #define NUM_OF_THREADS 5
 struct thread_data {
     int start;
     int end;
     int *arr;
 };
 pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  currentlyIdleCond  = PTHREAD_COND_INITIALIZER;
 int currentlyIdle;
 pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  workReadyCond  = PTHREAD_COND_INITIALIZER;
 int workReady;
 pthread_cond_t  currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
 pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
 int currentlyWorking;
 pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  canFinishCond  = PTHREAD_COND_INITIALIZER;
 int canFinish;

 void *processArrayMutex(void *args);
 void *processArrayJoin(void *args);
 double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops);
 double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops);

 int main(int argc, const char * argv[])
 {
     int numOfInts = 10;
     int *join_ints = malloc(numOfInts * sizeof(int));
     int *mutex_ints = malloc(numOfInts * sizeof(int));
     for (int i = 0; i < numOfInts; i++) {
         join_ints[i] = i;
         mutex_ints[i] = i;
     }

     pthread_t join_threads[NUM_OF_THREADS];
     pthread_t mutex_threads[NUM_OF_THREADS];
     struct thread_data join_thread_data[NUM_OF_THREADS];
     struct thread_data mutex_thread_data[NUM_OF_THREADS];
     workReady = 0;
     canFinish = 0;
     currentlyIdle = 0;
     currentlyWorking = 0;

     int remainingWork = numOfInts, amountOfWork;
     int startRange, endRange = -1;
     for (int i = 0; i < NUM_OF_THREADS; i++) {
         amountOfWork = remainingWork / (NUM_OF_THREADS - i);
         startRange = endRange + 1;
         endRange   = startRange + amountOfWork - 1;

         join_thread_data[i].arr   = join_ints;
         join_thread_data[i].start = startRange;
         join_thread_data[i].end   = endRange;
         mutex_thread_data[i].arr   = mutex_ints;
         mutex_thread_data[i].start = startRange;
         mutex_thread_data[i].end   = endRange;

         pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]);
         remainingWork -= amountOfWork;
     }

     int numOfBenchmarkTests = 100;
     int numberOfLoopsPerTest= 1000;

     double join_sum = 0.0, mutex_sum = 0.0;
     for (int i = 0; i < numOfBenchmarkTests; i++)
     {
         double joinTime = doItWithJoin(join_threads, join_thread_data, numberOfLoopsPerTest);
         double mutexTime= doItWithMutex(mutex_threads, mutex_thread_data, numberOfLoopsPerTest);

         join_sum += joinTime;
         mutex_sum+= mutexTime;      
     }

     double join_avg = join_sum / numOfBenchmarkTests;
     double mutex_avg= mutex_sum / numOfBenchmarkTests;

     printf("Join average : %f\n", join_avg);
     printf("Mutex average: %f\n", mutex_avg);

     double diff = join_avg - mutex_avg;
     if (diff > 0.0)
         printf("Mutex is %.0f%% faster.\n", 100 * diff / join_avg);
     else if (diff < 0.0)
         printf("Join  is %.0f%% faster.\n", 100 * diff / mutex_avg);
     else
         printf("Both have the same performance.");

     free(join_ints);
     free(mutex_ints);

     return 0;
 }

 // From http://stackoverflow.com/a/2349941/408286
 double get_time()
 {
     struct timeval t;
     struct timezone tzp;
     gettimeofday(&t, &tzp);
     return t.tv_sec + t.tv_usec*1e-6;
 }

 double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops)
 {
     double start = get_time();

     int loops = num_loops;
     while (loops-- != 0) {
         // Make sure all of them are ready
         pthread_mutex_lock(&currentlyIdleMutex);
         while (currentlyIdle != NUM_OF_THREADS) {
             pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex);
         }
         pthread_mutex_unlock(&currentlyIdleMutex);

         // All threads are now blocked; it's safe to not lock the mutex.
         // Prevent them from finishing before authorized.
         canFinish = 0;
         // Reset the number of currentlyWorking threads
         currentlyWorking = NUM_OF_THREADS;

         // Signal to the threads to start
         pthread_mutex_lock(&workReadyMutex);
         workReady = 1;
         pthread_cond_broadcast(&workReadyCond );
         pthread_mutex_unlock(&workReadyMutex);

         // Wait for them to finish
         pthread_mutex_lock(&currentlyWorkingMutex);
         while (currentlyWorking != 0) {
             pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex);
         }
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // The threads are now waiting for permission to finish
         // Prevent them from starting again
         workReady = 0;
         currentlyIdle = 0;

         // Allow them to finish
         pthread_mutex_lock(&canFinishMutex);
         canFinish = 1;
         pthread_cond_broadcast(&canFinishCond);
         pthread_mutex_unlock(&canFinishMutex);
     }

     return get_time() - start;
 }

 double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops)
 {
     double start = get_time();

     int loops = num_loops;
     while (loops-- != 0) {
         // create them
         for (int i = 0; i < NUM_OF_THREADS; i++) {
             pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]);
         }
         // wait
         for (int i = 0; i < NUM_OF_THREADS; i++) {
             pthread_join(threads[i], NULL);
         }
     }

     return get_time() - start;
 }

 void *processArrayMutex(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;

     while (1) {

         // Set yourself as idle and signal to the main thread, when all threads are idle main will start
         pthread_mutex_lock(&currentlyIdleMutex);
         currentlyIdle++;
         pthread_cond_signal(&currentlyIdleCond);
         pthread_mutex_unlock(&currentlyIdleMutex);

         // wait for work from main
         pthread_mutex_lock(&workReadyMutex);
         while (!workReady) {
             pthread_cond_wait(&workReadyCond , &workReadyMutex);
         }
         pthread_mutex_unlock(&workReadyMutex);

         // Do the work
         for (int i = start; i <= end; i++) {
             arr[i] = arr[i] + 1;
         }

         // mark yourself as finished and signal to main
         pthread_mutex_lock(&currentlyWorkingMutex);
         currentlyWorking--;
         pthread_cond_signal(&currentlyWorkingCond);
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // Wait for permission to finish
         pthread_mutex_lock(&canFinishMutex);
         while (!canFinish) {
             pthread_cond_wait(&canFinishCond , &canFinishMutex);
         }
         pthread_mutex_unlock(&canFinishMutex);
     }

     pthread_exit(NULL);
 }

 void *processArrayJoin(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;

     // Do the work
     for (int i = start; i <= end; i++) {
         arr[i] = arr[i] + 1;
     }

     pthread_exit(NULL);
 }

和输出是:

Join average : 0.153074
Mutex average: 0.071588
Mutex is 53% faster.

再次感谢您。我真的AP preciate您的帮助!

Thank you again. I really appreciate your help!

推荐答案

您需要使用比不同的同步技术加入,是十分明显的。

You need to use a different synchronization technique than join, that's clear.

不幸的是,你有很多的选择。一个是同步屏障,这主要是每个到达它阻止,直到他们都达到了它的线程(您事先指定的线程数)的事情。看看 pthread_barrier

Unfortunately you have a lot of options. One is a "synchronization barrier", which basically is a thing where each thread that reaches it blocks until they've all reached it (you specify the number of threads in advance). Look at pthread_barrier.

另一种方法是使用条件变量/互斥对( pthread_cond _ * )。当每个线程完成它需要互斥量,增加一个计数,信号的condvar。主线程的condvar等待,直到计数达到它预期的价值。在code是这样的:

Another is to use a condition-variable/mutex pair (pthread_cond_*). When each thread finishes it takes the mutex, increments a count, signals the condvar. The main thread waits on the condvar until the count reaches the value it expects. The code looks like this:

// thread has finished
mutex_lock
++global_count
// optional optimization: only execute the next line when global_count >= N
cond_signal
mutex_unlock

// main is waiting for N threads to finish
mutex_lock
while (global_count < N) {
    cond_wait
}
mutex_unlock

另一种是用每个线程一个信号 - 当线程完成它张贴自己的信号,并在主线程又在等待每个信号量,而不是反过来加入每个线程

Another is to use a semaphore per thread -- when the thread finishes it posts its own semaphore, and the main thread waits on each semaphore in turn instead of joining each thread in turn.

您还需要同步重新开始为接下来的工作线程 - 这可能是相同类型的第一个第二个同步对象,用细节改变的事实,你有1张海报和N服务员,而不是另一种方式。或者你可以(小心)重复使用相同的对象有两种用途。

You also need synchronization to re-start the threads for the next job -- this could be a second synchronization object of the same type as the first, with details changed for the fact that you have 1 poster and N waiters rather than the other way around. Or you could (with care) re-use the same object for both purposes.

如果你已经尝试过这些东西,你的code没有工作,可能问你试过code上新建一个具体的问题。所有这些都不足以完成任务。

If you've tried these things and your code didn't work, maybe ask a new specific question about the code you tried. All of them are adequate to the task.

这篇关于如何没有加入同步经理/工人pthreads的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆