C Pthreads-线程安全队列实现问题 [英] C Pthreads - issues with thread-safe queue implementation

查看:108
本文介绍了C Pthreads-线程安全队列实现问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是多线程的新手,我试图实现一个简单的线程安全任务队列,其中每个线程都可以从中拉出工作,直到没有更多的任务了.任何线程都不会进行任务排队.

I'm new to multithreading and im trying to implement a simple thread safe queue of tasks where each thread can pull work from until there's no more tasks left. No queuing of tasks will be made by any of the threads.

出于测试目的,每个任务仅包含一个数字.

For testing purposeses every Task holds just a number.

    static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;

    typedef struct Task{
       int number;
    }Task;


    typedef struct Cell{
        Task t;
        struct Cell* next;
    }Cell;


    typedef struct TQueue{
        struct Cell* head;
        struct Cell* tail;
    }TQueue;



   int empty(TQueue *Queue) 
      return queue->head == queue->tail;


   void startQueue(TQueue *queue){

        queue->head = malloc(sizeof(Cell));
        queue->tail = queue->head;
   }

   void enqueue(TQueue *queue, Task C){

       queue->tail->next = malloc(sizeof(Cell));
       queue->tail = queue->tail->next;
       queue->tail->t = C;
       queue->tail->next = NULL; 
   }


    Task * dequeue(TQueue* queue){

       pthread_mutex_lock( &task_mutex);
       Task * t;

       if(empty(queue)) t = NULL;

       else{

           struct Cell* p = queue->head;
           queue->head = queue->head->next;
           t = &queue->head->t;
           free(p);
       }

       pthread_mutex_unlock( &task_mutex);
       return t;
    }

    void * work( void* arg){

       TQueue* queue = (TQueue *)arg;
       Task* t = malloc(sizeof(Task));

       for(t = dequeue(queue); t != NULL; t = dequeue(queue))
           printf("%d ", t->number);

       free(t);
       pthread_exit(NULL);
       return 0;
    }

为了简单测试,我在main上运行了此

For a simple test i runned this on main:

int main(){

    TQueue* queue = malloc(sizeof(TQueue));
    startQueue(queue);

    pthread_t threads[3];
    Task t[3];


    for(int i = 0; i < 3; i++){
        t[i].number = i + 1;
        enqueue(queue, t[i]);
    }

    for(int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void*)queue);

    for(int i = 0; i < 3; i++) pthread_join(threads[i], NULL);

    return 0;
}

期望的输出是任何顺序的1 2 3,但是有时它会打印一个序列,其中包含一个奇怪的数字,例如1823219 2 3.我无法检测到任何比赛条件或相关问题,因此,我感谢您的帮助.

The expected output was 1 2 3 in any order, but sometimes it prints a sequence with a strange number in it like 1823219 2 3. I have not been able to detect any race conditions or related problems, so i appreciate any help.

推荐答案

我发现了另外一些错误.

I found a few more bugs.

我已经注释了您的代码.我从您的第一篇文章和第二篇文章中抽了一点点.我已经修复了代码,在[请原谅免费的样式清理]之前和之后显示:

I've annotated your code. I took a bit from your first posting and your second. I've fixed the code, showing before and after [please pardon the gratuitous style cleanup]:

#include <stdio.h>
#include <pthread.h>
#include <malloc.h>

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;

typedef struct Task {
    int number;
} Task;

typedef struct Cell {
// NOTE/BUG: this should be a pointer to the task. otherwise, dequeue gets
// messy
#if 0
    Task t;
#else
    Task *t;
#endif
    struct Cell *next;
} Cell;

typedef struct TQueue {
    struct Cell *head;
    struct Cell *tail;
} TQueue;

void
startQueue(TQueue *queue)
{

#if 0
    queue->head = malloc(sizeof(Cell));
#else
    queue->head = NULL;
#endif
    queue->tail = NULL;
}

int
empty(TQueue *queue)
{

    // NOTE/BUG: dequeue never touches tail, so this test is incorrect
#if 0
    return (queue->head == queue->tail);
#else
    return (queue->head == NULL);
#endif
}

void
enqueue(TQueue *queue, Task *t)
{
    Cell *p;

    pthread_mutex_lock(&task_mutex);

    p = malloc(sizeof(Cell));
    p->next = NULL;
    p->t = t;

    if (queue->tail == NULL) {
        queue->tail = p;
        queue->head = p;
    }
    else {
        queue->tail->next = p;
        queue->tail = p;
    }

    pthread_mutex_unlock(&task_mutex);
}

Task *
dequeue(TQueue *queue)
{
    Task *t;

    pthread_mutex_lock(&task_mutex);

    if (empty(queue))
        t = NULL;

    else {
        Cell *p = queue->head;

        if (p == queue->tail)
            queue->tail = NULL;

        queue->head = p->next;

        // NOTE/BUG: this is setting t to the second element in the list,
        // not the first
        // NOTE/BUG: this is also undefined behavior, in original code (with
        // original struct definition), because what t points to _does_ get
        // freed before return
#if 0
        t = &queue->head->t;
#else
        t = p->t;
#endif

        free(p);
    }

    pthread_mutex_unlock(&task_mutex);

    return t;
}

void *
work(void *arg)
{

    TQueue *queue = (TQueue *) arg;

    // NOTE/BUG: this gets orphaned on the first call to dequeue
#if 0
    Task *t = malloc(sizeof(Task));
#else
    Task *t;
#endif

    for (t = dequeue(queue); t != NULL; t = dequeue(queue))
        printf("%d ", t->number);

    // NOTE/BUG: this frees some cell allocated in main -- not what we want
#if 0
    free(t);
#endif

    pthread_exit(NULL);
    return 0;
}

// For a simple test i runned this on main:

int
main()
{

    TQueue *queue = malloc(sizeof(TQueue));

    startQueue(queue);

    pthread_t threads[3];
    Task t[3];

    for (int i = 0; i < 3; i++) {
        t[i].number = i + 1;
#if 0
        enqueue(queue, t);
#else
        enqueue(queue, &t[i]);
#endif
    }

    for (int i = 0; i < 3; i++)
        pthread_create(&threads[i], NULL, work, (void *) queue);

    for (int i = 0; i < 3; i++)
        pthread_join(threads[i], NULL);

    return 0;
}


更新:

线程是否同时执行任务?我一直在用htop测试cpu的使用情况,但我最多只能将单个核心的使用率提高到四个.

Are the threads executing the tasks concurrently ? I've been testing the cpu usage with htop and i can only max the usage of a single core out of four.

请记住一些注意事项. htop在运行时间如此短的程序上可能不会显示太多.即使有10,000个队列条目,该程序也会在20毫秒内执行.

A few things to keep in mind. htop probably won't show much on programs that have such a short running time. Even with 10,000 queue entries this program executes in 20ms.

最好让程序自己打印信息[见下文].请注意,printf确实对stdin进行了线程锁定,因此它可能有助于程序的串行"性质. 也会对程序的执行时间造成很大的影响(即printfdequeue慢得多)

It's better to have the program itself print the information [see below]. Note that printf does thread locking on stdin so it may contribute to the "serial" nature of the program. It also contributes a significant amount to the execution time of the program (i.e. the printf is much slower than the dequeue)

此外,一个线程(即第一个线程)可以独占队列,并在其他线程有机会运行之前先清空所有条目.

Also, one thread (i.e. the first one) could monopolize the queue and drain all entries before the others have a chance to run.

OS可以[随意]在单个内核上调度所有线程.然后,它可能会在以后(例如在一秒钟左右)迁移"它们.

The OS may [is at liberty to] schedule all threads on a single core. It may then "migrate" them later (e.g. within a second or so).

我已经对该程序进行了增强,使其在输出打印中包含一些时序信息,这些信息可能有助于显示更多您想看到的内容.另外,我添加了命令行选项来控制线程数和排队的项目数.这类似于我对自己的某些程序所做的工作.将程序输出转移到日志文件并进行检查.多次运行选项

I've enhanced the program to include some timing information in the output print that may help show more of what you'd like to see. Also, I've added command line options to control the number of threads and number of items queued. This is similar to what I do for some of my own programs. Divert the program output to a log file and examine it. Play around with the options on multiple runs

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <malloc.h>
#include <time.h>

int opt_n;                              // suppress thread output
int opt_T;                              // number of threads
int opt_Q;                              // number of queue items

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;

double tvzero;

typedef struct Task {
    int number;
} Task;

typedef struct Cell {
    Task *t;
    struct Cell *next;
} Cell;

typedef struct TQueue {
    struct Cell *head;
    struct Cell *tail;
} TQueue;

typedef struct Thread {
    pthread_t tid;
    int xid;
    TQueue *queue;
} Thread;

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);
    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

void
startQueue(TQueue *queue)
{

    queue->head = NULL;
    queue->tail = NULL;
}

int
empty(TQueue *queue)
{

    return (queue->head == NULL);
}

void
enqueue(TQueue *queue, Task *t)
{
    Cell *p;

    pthread_mutex_lock(&task_mutex);

    p = malloc(sizeof(Cell));
    p->next = NULL;
    p->t = t;

    if (queue->tail == NULL) {
        queue->tail = p;
        queue->head = p;
    }
    else {
        queue->tail->next = p;
        queue->tail = p;
    }

    pthread_mutex_unlock(&task_mutex);
}

Task *
dequeue(TQueue *queue)
{
    Task *t;

    pthread_mutex_lock(&task_mutex);

    if (empty(queue))
        t = NULL;

    else {
        Cell *p = queue->head;

        if (p == queue->tail)
            queue->tail = NULL;

        queue->head = p->next;

        t = p->t;

        free(p);
    }

    pthread_mutex_unlock(&task_mutex);

    return t;
}

void *
work(void *arg)
{
    Thread *tskcur = arg;
    TQueue *queue = tskcur->queue;
    Task *t;
    double tvbef;
    double tvaft;

    while (1) {
        tvbef = tvgetf();
        t = dequeue(queue);
        tvaft = tvgetf();

        if (t == NULL)
            break;

        if (! opt_n)
            printf("[%.9f/%.9f %5.5d] %d\n",
                tvbef,tvaft - tvbef,tskcur->xid,t->number);
    }

    return (void *) 0;
}

// For a simple test i runned this on main:

int
main(int argc,char **argv)
{
    char *cp;
    TQueue *queue;
    Task *t;
    Thread *tsk;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'n':  // suppress thread output
            opt_n = 1;
            break;

        case 'Q':  // number of queue items
            opt_Q = atoi(cp + 2);
            break;

        case 'T':  // number of threads
            opt_T = atoi(cp + 2);
            break;

        default:
            break;
        }
    }

    tvzero = tvgetf();

    queue = malloc(sizeof(TQueue));
    startQueue(queue);

    if (opt_T == 0)
        opt_T = 16;
    Thread threads[opt_T];

    if (opt_Q == 0)
        opt_Q = 10000;
    t = malloc(sizeof(Task) * opt_Q);

    for (int i = 0; i < opt_Q; i++) {
        t[i].number = i + 1;
        enqueue(queue, &t[i]);
    }

    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        tsk->xid = i + 1;
        tsk->queue = queue;
        pthread_create(&tsk->tid, NULL, work, tsk);
    }

    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        pthread_join(tsk->tid, NULL);
    }

    printf("TOTAL: %.9f\n",tvgetf());

    free(t);

    return 0;
}


更新#2:

另外,一个线程(即第一个线程)可以垄断队列,并在其他线程有机会运行之前清空所有条目."在这种情况下可以做什么?

Also, one thread (i.e. the first one) could monopolize the queue and drain all entries before the others have a chance to run." What can be done in that case ?

几件事.

pthread_create需要一些时间,允许线程1继续运行,而其他线程仍在创建.一种改善方法是创建所有线程,每个线程都在其线程控制块中设置一个我正在运行"标志.主线程等待所有线程设置此标志.然后,主线程设置一个全局易失性"you_may_now_all_run"标志,每个线程在进入其主线程循环之前都会旋转.以我的经验,它们都在彼此之间[或更好]的微秒内开始运行.

pthread_create takes a bit of time, allowing thread 1 to go while the others are still being created. A way to ameliorate this is to create all threads, each thread sets an "I am running" flag (in its thread control block). The main thread waits for all threads to set this flag. Then, the main thread sets a global volatile "you_may_now_all_run" flag that each thread spins on before entering its primary thread loop. In my experience, they all start running within microseconds of each other [or better].

我没有在下面的更新代码中实现此功能,因此您可以[连同nanosleep]自己进行试验.

I didn't implement this in the updated code below, so you can experiment with it yourself [along with the nanosleep].

互斥体总体上相当合理(至少在linux下),因为阻塞的线程将排队等待互斥体.正如我在评论中提到的,也可以使用nanosleep,但是由于线程速度变慢,此[某种程度上]破坏了目的.

mutexes are pretty fair overall [under linux, at least] because a blocked thread will get queued, waiting on the mutex. As I mentioned in the comments, a nanosleep can also be used, but this [somewhat] defeats the purpose as the threads will slow down.

解决线程饥饿问题的方法是公平".正如我提到的,有一种精心设计的公平算法,无需等待.它是Kogan/Petrank算法: http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf 这确实有点涉及/高级,所以请警告...

The antidote to thread starvation is "fairness". As I mentioned, there is an elaborate algorithm for fairness without waiting. It is the Kogan/Petrank algorithm: http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf This is really a bit involved/advanced, so caveat emptor ...

但是,折衷方案可能是锁定票证: https://en.wikipedia.org/wiki /Ticket_lock

However, a compromise may be a ticket lock: https://en.wikipedia.org/wiki/Ticket_lock

我再次修改了程序.它具有池分配,票证与互斥锁以及延迟打印日志条目的选项.它还会交叉检查线程之间的结果,以确保它们中没有一个得到重复的条目.

I've reworked the program again. It has options for pooled allocation, ticket vs. mutex lock, and deferred printing of log entries. It also cross-checks the results between threads to ensure none of them got duplicate entries.

当然,这一切的关键是准确,高精度的日志记录(即,如果无法测量,就无法对其进行调整).

Of course, the key to all this is accurate, high precision logging (i.e. if you can't measure it, you can't tune it).

例如,人们会认为在dequeue内部执行free会比简单地将Cell释放到可重用池(类似于slab分配器)要慢,但是,性能提升并没有预期的那么好.这可能是因为glibc的malloc/free只是在快速发展[这就是他们的声明].

For example, one would think that doing free inside dequeue would be slower than simply releasing the Cell to a resuable pool (similar to a slab allocator), but, the performance boost wasn't as great as expected. This could be that glibc's malloc/free is just blazing fast [which is what they claim].

这些不同的版本应该为您提供一些有关如何构建自己的性能评估套件的想法.

These various versions should give you some ideas of how to build up your own performance measurement suite.

无论如何,这是代码:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <malloc.h>
#include <errno.h>
#include <string.h>
#include <time.h>

int opt_p;                              // print thread output immediately
int opt_T;                              // number of threads
int opt_Q;                              // number of queue items
int opt_L;                              // use ticket lock
int opt_M;                              // use fast cell alloc/free

typedef unsigned char byte;
typedef unsigned int u32;

#define sysfault(_fmt...) \
    do { \
        fprintf(stderr,_fmt); \
        exit(1); \
    } while (0)

// lock control
typedef struct AnyLock {
    pthread_mutex_t mutex;              // standard mutex
    volatile u32 seqreq;                // ticket lock request
    volatile u32 seqacq;                // ticket lock grant
} AnyLock;

// work value
typedef struct Task {
    union {
        struct Task *next;
        int number;
    };
} Task;

// queue item
typedef struct Cell {
    struct Cell *next;
    Task *t;
} Cell;

// queue control
typedef struct TQueue {
    struct Cell *head;
    struct Cell *tail;
} TQueue;

// thread log entry
typedef struct Log {
    double tvbef;
    double tvaft;
    int number;
} Log;

#define BTVOFF(_off) \
    ((_off) >> 3)
#define BTVMSK(_off) \
    (1u << ((_off) & 0x07))

#define BTVLEN(_len) \
    ((_len) + 7) >> 3

// thread control
typedef struct Thread {
    pthread_t tid;
    int xid;
    TQueue *queue;
    Log *log;
    byte *bitv;
} Thread;

static inline byte
btvset(byte *bitv,long off)
{
    u32 msk;
    byte oval;

    bitv += BTVOFF(off);
    msk = BTVMSK(off);

    oval = *bitv & msk;

    *bitv |= msk;

    return oval;
}

AnyLock task_mutex;
AnyLock print_mutex;
double tvzero;
Cell *cellpool;                         // free pool of cells
long bitvlen;

#define BARRIER \
    __asm__ __volatile__("" ::: "memory")

// virtual function pointers
Cell *(*cellnew)(void);
void (*cellfree)(Cell *);
void (*lock_acquire)(AnyLock *lock);
void (*lock_release)(AnyLock *lock);

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);
    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

void *
xalloc(size_t cnt,size_t siz)
{
    void *ptr;

    ptr = calloc(cnt,siz);
    if (ptr == NULL)
        sysfault("xalloc: calloc failure -- %s\n",strerror(errno));

    return ptr;
}

void
lock_wait_ticket(AnyLock *lock,u32 newval)
{
    u32 oldval;

    // wait for our ticket to come up
    // NOTE: atomic_load is [probably] overkill here
    while (1) {
#if 0
        oldval = atomic_load(&lock->seqacq);
#else
        oldval = lock->seqacq;
#endif
        if (oldval == newval)
            break;
    }
}

void
lock_acquire_ticket(AnyLock *lock)
{
    u32 oldval;
    u32 newval;
    int ok;

    // acquire our ticket value
    // NOTE: just use a garbage value for oldval -- the exchange will
    // update it with the correct/latest value -- this saves a separate
    // refetch within the loop
    oldval = 0;
    while (1) {
#if 0
        BARRIER;
        oldval = lock->seqreq;
#endif
        newval = oldval + 1;
        ok = atomic_compare_exchange_strong(&lock->seqreq,&oldval,newval);
        if (ok)
            break;
    }

    lock_wait_ticket(lock,newval);
}

void
lock_release_ticket(AnyLock *lock)
{

    // NOTE: atomic_fetch_add is [probably] overkill, but leave it for now
#if 1
    atomic_fetch_add(&lock->seqacq,1);
#else
    lock->seqacq += 1;
#endif
}

void
lock_acquire_mutex(AnyLock *lock)
{

    pthread_mutex_lock(&lock->mutex);
}

void
lock_release_mutex(AnyLock *lock)
{

    pthread_mutex_unlock(&lock->mutex);
}

void
lock_init(AnyLock *lock)
{

    switch (opt_L) {
    case 1:
        lock->seqreq = 0;
        lock->seqacq = 1;
        lock_acquire = lock_acquire_ticket;
        lock_release = lock_release_ticket;
        break;

    default:
        pthread_mutex_init(&lock->mutex,NULL);
        lock_acquire = lock_acquire_mutex;
        lock_release = lock_release_mutex;
        break;
    }
}

void
startQueue(TQueue *queue)
{

    queue->head = NULL;
    queue->tail = NULL;
}

int
empty(TQueue *queue)
{

    return (queue->head == NULL);
}

// cellnew_pool -- allocate a queue entry
Cell *
cellnew_pool(void)
{
    int cnt;
    Cell *p;
    Cell *pool;

    while (1) {
        // try for quick allocation
        p = cellpool;

        // bug out if we got it
        if (p != NULL) {
            cellpool = p->next;
            break;
        }

        // go to the heap to replenish the pool
        cnt = 1000;
        p = xalloc(cnt,sizeof(Cell));

        // link up the entries
        pool = NULL;
        for (;  cnt > 0;  --cnt, ++p) {
            p->next = pool;
            pool = p;
        }

        // put this "online"
        cellpool = pool;
    }

    return p;
}

// cellfree_pool -- release a queue entry
void
cellfree_pool(Cell *p)
{

    p->next = cellpool;
    cellpool = p;
}

// cellnew_std -- allocate a queue entry
Cell *
cellnew_std(void)
{
    Cell *p;

    p = xalloc(1,sizeof(Cell));

    return p;
}

// cellfree_std -- release a queue entry
void
cellfree_std(Cell *p)
{

    free(p);
}

void
enqueue(TQueue *queue, Task *t)
{
    Cell *p;

    lock_acquire(&task_mutex);

    p = cellnew();
    p->next = NULL;
    p->t = t;

    if (queue->tail == NULL) {
        queue->tail = p;
        queue->head = p;
    }
    else {
        queue->tail->next = p;
        queue->tail = p;
    }

    lock_release(&task_mutex);
}

Task *
dequeue(TQueue *queue)
{
    Task *t;

    lock_acquire(&task_mutex);

    if (empty(queue))
        t = NULL;

    else {
        Cell *p = queue->head;

        if (p == queue->tail)
            queue->tail = NULL;

        queue->head = p->next;

        t = p->t;

        cellfree(p);
    }

    lock_release(&task_mutex);

    return t;
}

void *
work(void *arg)
{
    Thread *tskcur = arg;
    TQueue *queue = tskcur->queue;
    Task *t;
    Log *log;
    long cnt;
    int tprev;
    byte *bitv;
    double tvbeg;
    double tvbef;
    double tvaft;

    log = tskcur->log;
    bitv = tskcur->bitv;
    tvbeg = tvgetf();

    tprev = 0;
    while (1) {
        tvbef = tvgetf();
        t = dequeue(queue);
        tvaft = tvgetf();

        if (t == NULL)
            break;

        // abort if we get a double entry
        if (btvset(bitv,t->number))
            sysfault("work: duplicate\n");

        if (opt_p) {
            printf("[%.9f/%.9f %5.5d] %d [%d]\n",
                tvbef,tvaft - tvbef,tskcur->xid,t->number,t->number - tprev);
            tprev = t->number;
            continue;
        }

        log->tvbef = tvbef;
        log->tvaft = tvaft;
        log->number = t->number;
        ++log;
    }

    if (! opt_p) {
        tvaft = tvgetf();

        cnt = log - tskcur->log;
        log = tskcur->log;

        lock_acquire(&print_mutex);

        printf("\n");
        printf("THREAD=%5.5d START=%.9f STOP=%.9f ELAP=%.9f TOTAL=%ld\n",
            tskcur->xid,tvbeg,tvaft,tvaft - tvbeg,cnt);

        tprev = 0;
        for (;  cnt > 0;  --cnt, ++log) {
            printf("[%.9f/%.9f %5.5d] %d [%d]\n",
                log->tvbef,log->tvaft - log->tvbef,tskcur->xid,
                log->number,log->number - tprev);
            tprev = log->number;
        }

        lock_release(&print_mutex);
    }

    return (void *) 0;
}

void
btvchk(Thread *tska,Thread *tskb)
{
    byte *btva;
    byte *btvb;
    byte aval;
    byte bval;
    int idx;

    printf("btvchk: %d ??? %d\n",tska->xid,tskb->xid);

    btva = tska->bitv;
    btvb = tskb->bitv;

    // abort if we get overlapping entries between two threads
    for (idx = 0;  idx < bitvlen;  ++idx) {
        aval = btva[idx];
        bval = btvb[idx];
        if (aval & bval)
            sysfault("btvchk: duplicate\n");
    }
}

// For a simple test i runned this on main:

int
main(int argc,char **argv)
{
    char *cp;
    TQueue *queue;
    Task *t;
    Thread *tsk;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'p':  // print immediately
            opt_p = 1;
            break;

        case 'Q':  // number of queue items
            opt_Q = atoi(cp + 2);
            break;

        case 'T':  // number of threads
            opt_T = atoi(cp + 2);
            break;

        case 'L':
            opt_L = 1;
            break;

        case 'M':
            opt_M = 1;
            break;

        default:
            break;
        }
    }

    printf("p=%d -- thread log is %s\n",opt_p,opt_p ? "immediate" : "deferred");

    if (opt_T == 0)
        opt_T = 16;
    printf("T=%d (number of threads)\n",opt_T);

    if (opt_Q == 0)
        opt_Q = 1000000;
    printf("Q=%d (number of items to enqueue)\n",opt_Q);

    printf("L=%d -- lock is %s\n",opt_L,opt_L ? "ticket" : "mutex");
    printf("M=%d -- queue item allocation is %s\n",
        opt_M,opt_M ? "pooled" : "malloc/free");

    tvzero = tvgetf();

    lock_init(&task_mutex);
    lock_init(&print_mutex);

    // select queue item allocation strategy
    switch (opt_M) {
    case 1:
        cellnew = cellnew_pool;
        cellfree = cellfree_pool;
        break;

    default:
        cellnew = cellnew_std;
        cellfree = cellfree_std;
        break;
    }

    queue = xalloc(1,sizeof(TQueue));
    startQueue(queue);

    Thread threads[opt_T];

    // get byte length of bit vectors
    bitvlen = BTVLEN(opt_Q + 1);

    // allocate per-thread log buffers
    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        if (! opt_p)
            tsk->log = xalloc(opt_Q,sizeof(Log));
        tsk->bitv = xalloc(bitvlen,sizeof(byte));
    }

    // allocate "work to do"
    t = xalloc(opt_Q,sizeof(Task));

    // add to master queue
    for (int i = 0; i < opt_Q; i++) {
        t[i].number = i + 1;
        enqueue(queue, &t[i]);
    }

    // fire up the threads
    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        tsk->xid = i + 1;
        tsk->queue = queue;
        pthread_create(&tsk->tid, NULL, work, tsk);
    }

    // wait for threads to complete
    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        pthread_join(tsk->tid, NULL);
    }

    // wait for threads to complete
    for (int i = 0; i < opt_T; i++) {
        for (int j = i + 1; j < opt_T; j++)
            btvchk(&threads[i],&threads[j]);
    }

    printf("TOTAL: %.9f\n",tvgetf());

    free(t);

    return 0;
}

这篇关于C Pthreads-线程安全队列实现问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆