我定义的 Mutex 类中的逻辑错误以及我在生产者消费者程序中使用它的方式 - pthreads [英] Logic error in my defined Mutex class and the way I use it in producer consumer program - pthreads

查看:48
本文介绍了我定义的 Mutex 类中的逻辑错误以及我在生产者消费者程序中使用它的方式 - pthreads的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我添加了一个 Mutex 类来遵守 RAII.我不确定我使用它的方式是否正确.

I have added a Mutex class to adhere to RAII. I am not sure if the way I am using it is correct.

队列被生产者锁定后,程序意外结束.

After the queue is locked by a producer, the program unexpectedly finishes.

MutexClass.h

#ifndef MUTEXCLASS
#define MUTEXCLASS

#include <pthread.h>

class MutexClass
{
private:
    pthread_mutex_t & _mutexVariable;
public:
    MutexClass (pthread_mutex_t &);
    ~MutexClass ();
};

#endif // MUTEXCLASS

MutexClass.cpp

#include "mutexClass.h"
#include <stdexcept>

MutexClass::MutexClass (pthread_mutex_t & arg) : _mutexVariable (arg)
{
    _mutexVariable  = PTHREAD_MUTEX_INITIALIZER;
    int returnValue = pthread_mutex_lock (&_mutexVariable);
    if (returnValue > 0)
    {
        throw std::logic_error ("Mutex couldn't be locked!");
    }
}

MutexClass::~MutexClass()
{
    pthread_mutex_unlock (&_mutexVariable);
}

这是ma​​in.cpp,我在其中使用了上面定义的互斥类的对象.

This is the main.cpp where I am using the object of the above defined mutex class.

Qt 的类在这里同名,因为我是 Qt Creator.请忽略它们.

#include "mainwindow.h"
#include <QApplication>
#include <stdexcept>

#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>

#include "mutexClass.h"

pthread_mutex_t mutexVariable;
pthread_cond_t  conditionVariable;

int numberOfActiveProducers;
int numberOfActiveConsumers;

QList <int> sharedQueueA;
QList <int> sharedQueueB;

/*
 * Shared queues are supposed to be shared among four threads. Two producer, and two consumer threads.
 * Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
 * Assumption: `sharedQueue` can contain only 10 elements at a time.
 */

int sizeOfSharedQueue;

void checkForSpaceAndPush (QList <int> & argList, int listId, pthread_t argTId)
{
    std::cerr << "\nQueue " << listId << ", First check by Producer: " << argTId;
    if (argList.length () < sizeOfSharedQueue + 1)
    {
        {
            MutexClass mutex1 (mutexVariable);

            std::cerr << "\n\nQueue " << listId << ", Locked by Producer: " << argTId;

            if (argList.length () < sizeOfSharedQueue + 1)
            {
                argList.push_back (1); std::cerr << "\nPushed by Producer " << argTId << ": " << "Length of queue " << listId << " is: " << argList.length ();
            }
            else
            {
                std::cerr << "\nProducer " << argTId << ". Queue " << listId << " is full. Length of queue is: " << argList.length ();
                pthread_cond_wait (&conditionVariable, &mutexVariable);
            }
        }
        std::cerr << "\n\nQueue " << listId << ", UnLocked by Producer: " << argTId;
    }
}

void checkForSpaceAndPop (QList <int> & argList, int listId, pthread_t argTId)
{
    std::cerr << "\nQueue " << listId << ", First check by Consumer: " << argTId;
    if (argList.length () > 0)
    {
        {
            MutexClass mutex1 (mutexVariable);
            std::cerr << "\n\nQueue " << listId << ", Locked by Consumer: " << argTId;

            if (argList.length () > 0)
            {
                argList.pop_front (); std::cerr << "\nRemoved by Consumer: " << argTId << ", Length of queue " << listId << " is: " << argList.length ();
            }
            else
            {
                pthread_cond_signal (&conditionVariable); std::cerr << "\nSignal issued by Consumer: " << argTId << ", Length of queue " << listId << " is: " << argList.length ();
            }
        }

        std::cerr << "\n\nQueue " << listId << ", UnLocked by Consumer: " << argTId;
    }
}

//  This function is run by the `Producer` threads.
void *producerThreadFunction (void *arg)
{
    Q_UNUSED (arg);

    while (1)
    {
        pthread_t tId = pthread_self(); std::cerr << "\nProducers: " << tId; std::cerr.flush();
        checkForSpaceAndPush (sharedQueueA, 1, tId);
        checkForSpaceAndPush (sharedQueueB, 2, tId);
    }

    return NULL;
}

//  This function is run by the `Consumer` threads.
void *consumerThreadFunction (void *arg)
{
    Q_UNUSED (arg);

    while (1)
    {
        pthread_t tId = pthread_self (); std::cerr << "\nConsumer: " << tId; std::cerr.flush();
        checkForSpaceAndPop (sharedQueueA, 1, tId);
        checkForSpaceAndPop (sharedQueueB, 2, tId);
    }
    return NULL;
}

int main (int argc, char *argv[])
{
    numberOfActiveProducers = 2;
    numberOfActiveConsumers = 2;
    sizeOfSharedQueue       = 10;

    // Producer threads creation
    pthread_t producerA;
    pthread_t producerB;

    if (pthread_create (&producerA, NULL, producerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer A\n");
        return 1;
    }

    if (pthread_create (&producerB, NULL, producerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer B\n");
        return 1;
    }

    // Consumer threads creation
    pthread_t consumerA;
    pthread_t consumerB;

    if (pthread_create (&consumerA, NULL, consumerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer A\n");
        return 1;
    }

    if (pthread_create (&consumerB, NULL, consumerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer B\n");
        return 1;
    }

    // Joining every thread
    if (pthread_join (producerA, NULL)) {
        fprintf (stderr, "Error joining thread Producer A\n");
        return 2;
    }

    if (pthread_join (producerB, NULL)) {
        fprintf (stderr, "Error joining thread Producer B\n");
        return 2;
    }

    if (pthread_join (consumerB, NULL)) {
        fprintf (stderr, "Error joining thread Consumer B\n");
        return 2;
    }

    if (pthread_join (consumerA, NULL)) {
        fprintf (stderr, "Error joining thread Consumer A\n");
        return 2;
    }

    QApplication a (argc, argv);
    MainWindow w;
    w.show ();

    return a.exec ();
}

输出在这里:

...  
...  

Queue 140388157085440
Removed by Consumer: 1403881570854401403881654781442
Queue , Locked by Consumer: 1140388148692736
Removed by Consumer: , Length of queue 1 is: , First check by Producer: 140388148692736, Length of queue 2 is: 1403881654781449

Queue 

Queue 2, UnLocked by Consumer: 140388148692736
Consumer: 9

Queue 1, UnLocked by Consumer: 140388157085440
Queue 2, First check by Consumer: 1403881570854401140388148692736
Queue 1, First check by Consumer: 140388148692736

Queue , Locked by Producer: 

Queue 2, Locked by Consumer: 140388157085440
Removed by Consumer: 1403881654781441, Locked by Consumer: 140388148692736
Pushed by Producer 140388165478144: Length of queue 1 is: 10

Queue 1, UnLocked by Producer: 140388165478144
Queue 2, First check by Producer: 140388165478144

Queue 2, Locked by Producer: 140388165478144The program has unexpectedly finished.

另外,请注意输出的以下部分:

Also, notice the following part of output:

队列 2,被消费者锁定:140388157085440被消费者移除:1403881654781441,被消费者锁定:140388148692736

Queue 2, Locked by Consumer: 140388157085440 Removed by Consumer: 1403881654781441, Locked by Consumer: 140388148692736

我只创建了 2 个消费者,但这里显示的 pid 是 3.为什么会这样?

I have created only 2 consumers but the pids shown here are 3. Why is that so?

推荐答案

目前,您每次创建新的构造一个新的 时,都会将 mutexVariable 初始化为 PTHREAD_MUTEX_INITIALIZER互斥类.考虑如果线程 A 持有 mutexVariable 并且线程 B 想要获取互斥锁会发生什么:

Currently, you initialize mutexVariable to PTHREAD_MUTEX_INITIALIZER every time you create a new construct a new MutexClass. Consider what happens if thread A holds the mutexVariable and thread B wants to acquire the mutex:

thread A tries to lock mutexVariable and succeeds
    mutexVariable = PTHRAED_MUTEX_INITIALIZER
    pthread_mutex_lock(mutexVariable)

thread B tries to lock mutexVariable and succeeds
    // this assignment overwrites the locked state thread A has stored
    mutexVariable = PTHRAED_MUTEX_INITIALIZER
    // mutex is default-initialized (not locked) - so lock it
    pthread_mutex_lock(mutexVariable)
    // both threads now believe they have the mutex
    // and all syncronization is lost

thread B unlocks mutexVariable
    // succeeds

thread A unlocks mutexVariable
    // uh.. it is not even locked any more?!

你应该只在 main.cpp:13 中初始化 mutexVariable 一次:

You should initialize mutexVariable only once, in main.cpp:13:

pthread_mutex_t mutexVariable = PTHRAED_MUTEX_INITIALIZER;

并从 MutexClass 中删除初始化.

and remove the initialization from MutexClass.

不确定这是否能解决您的所有问题,但这是我首先要做的.

Not sure if this will fix all of your issues, but that's the thing I'd do first.

这篇关于我定义的 Mutex 类中的逻辑错误以及我在生产者消费者程序中使用它的方式 - pthreads的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆