A logic error in my specific Mutex class and how I use it in a producer consumer program - pthreads

I added a Mutex class to bind to RAII. I am not sure if I am using it correctly.

After the queue is blocked by the manufacturer, the program unexpectedly quits.

Mutexclass.h

#ifndef MUTEXCLASS #define MUTEXCLASS #include <pthread.h> class MutexClass { private: pthread_mutex_t & _mutexVariable; public: MutexClass (pthread_mutex_t &); ~MutexClass (); }; #endif // MUTEXCLASS 

Mutexclass.cpp

 #include "mutexClass.h" #include <stdexcept> MutexClass::MutexClass (pthread_mutex_t & arg) : _mutexVariable (arg) { _mutexVariable = PTHREAD_MUTEX_INITIALIZER; int returnValue = pthread_mutex_lock (&_mutexVariable); if (returnValue > 0) { throw std::logic_error ("Mutex couldn't be locked!"); } } MutexClass::~MutexClass() { pthread_mutex_unlock (&_mutexVariable); } 

This is main.cpp where I use an object of the above mutex class.

The Qt classes here are for the namesake, because I am a Qt Creator. Please ignore them.

 #include "mainwindow.h" #include <QApplication> #include <stdexcept> #include <stdio.h> #include <unistd.h> #include <sys/syscall.h> #include <iostream> #include <QDebug> #include "mutexClass.h" pthread_mutex_t mutexVariable; pthread_cond_t conditionVariable; int numberOfActiveProducers; int numberOfActiveConsumers; QList <int> sharedQueueA; QList <int> sharedQueueB; /* * Shared queues are supposed to be shared among four threads. Two producer, and two consumer threads. * Producer threads will put the 1 in it, and Consumer threads will remove the 1's. * Assumption: `sharedQueue` can contain only 10 elements at a time. */ int sizeOfSharedQueue; void checkForSpaceAndPush (QList <int> & argList, int listId, pthread_t argTId) { std::cerr << "\nQueue " << listId << ", First check by Producer: " << argTId; if (argList.length () < sizeOfSharedQueue + 1) { { MutexClass mutex1 (mutexVariable); std::cerr << "\n\nQueue " << listId << ", Locked by Producer: " << argTId; if (argList.length () < sizeOfSharedQueue + 1) { argList.push_back (1); std::cerr << "\nPushed by Producer " << argTId << ": " << "Length of queue " << listId << " is: " << argList.length (); } else { std::cerr << "\nProducer " << argTId << ". Queue " << listId << " is full. Length of queue is: " << argList.length (); pthread_cond_wait (&conditionVariable, &mutexVariable); } } std::cerr << "\n\nQueue " << listId << ", UnLocked by Producer: " << argTId; } } void checkForSpaceAndPop (QList <int> & argList, int listId, pthread_t argTId) { std::cerr << "\nQueue " << listId << ", First check by Consumer: " << argTId; if (argList.length () > 0) { { MutexClass mutex1 (mutexVariable); std::cerr << "\n\nQueue " << listId << ", Locked by Consumer: " << argTId; if (argList.length () > 0) { argList.pop_front (); std::cerr << "\nRemoved by Consumer: " << argTId << ", Length of queue " << listId << " is: " << argList.length (); } else { pthread_cond_signal (&conditionVariable); std::cerr << "\nSignal issued by Consumer: " << argTId << ", Length of queue " << listId << " is: " << argList.length (); } } std::cerr << "\n\nQueue " << listId << ", UnLocked by Consumer: " << argTId; } } // This function is run by the `Producer` threads. void *producerThreadFunction (void *arg) { Q_UNUSED (arg); while (1) { pthread_t tId = pthread_self(); std::cerr << "\nProducers: " << tId; std::cerr.flush(); checkForSpaceAndPush (sharedQueueA, 1, tId); checkForSpaceAndPush (sharedQueueB, 2, tId); } return NULL; } // This function is run by the `Consumer` threads. void *consumerThreadFunction (void *arg) { Q_UNUSED (arg); while (1) { pthread_t tId = pthread_self (); std::cerr << "\nConsumer: " << tId; std::cerr.flush(); checkForSpaceAndPop (sharedQueueA, 1, tId); checkForSpaceAndPop (sharedQueueB, 2, tId); } return NULL; } int main (int argc, char *argv[]) { numberOfActiveProducers = 2; numberOfActiveConsumers = 2; sizeOfSharedQueue = 10; // Producer threads creation pthread_t producerA; pthread_t producerB; if (pthread_create (&producerA, NULL, producerThreadFunction, NULL)) { fprintf (stderr, "Error creating thread Producer A\n"); return 1; } if (pthread_create (&producerB, NULL, producerThreadFunction, NULL)) { fprintf (stderr, "Error creating thread Producer B\n"); return 1; } // Consumer threads creation pthread_t consumerA; pthread_t consumerB; if (pthread_create (&consumerA, NULL, consumerThreadFunction, NULL)) { fprintf (stderr, "Error creating thread Consumer A\n"); return 1; } if (pthread_create (&consumerB, NULL, consumerThreadFunction, NULL)) { fprintf (stderr, "Error creating thread Consumer B\n"); return 1; } // Joining every thread if (pthread_join (producerA, NULL)) { fprintf (stderr, "Error joining thread Producer A\n"); return 2; } if (pthread_join (producerB, NULL)) { fprintf (stderr, "Error joining thread Producer B\n"); return 2; } if (pthread_join (consumerB, NULL)) { fprintf (stderr, "Error joining thread Consumer B\n"); return 2; } if (pthread_join (consumerA, NULL)) { fprintf (stderr, "Error joining thread Consumer A\n"); return 2; } QApplication a (argc, argv); MainWindow w; w.show (); return a.exec (); } 

The conclusion is here:

 ... ... Queue 140388157085440 Removed by Consumer: 1403881570854401403881654781442 Queue , Locked by Consumer: 1140388148692736 Removed by Consumer: , Length of queue 1 is: , First check by Producer: 140388148692736, Length of queue 2 is: 1403881654781449 Queue Queue 2, UnLocked by Consumer: 140388148692736 Consumer: 9 Queue 1, UnLocked by Consumer: 140388157085440 Queue 2, First check by Consumer: 1403881570854401140388148692736 Queue 1, First check by Consumer: 140388148692736 Queue , Locked by Producer: Queue 2, Locked by Consumer: 140388157085440 Removed by Consumer: 1403881654781441, Locked by Consumer: 140388148692736 Pushed by Producer 140388165478144: Length of queue 1 is: 10 Queue 1, UnLocked by Producer: 140388165478144 Queue 2, First check by Producer: 140388165478144 Queue 2, Locked by Producer: 140388165478144The program has unexpectedly finished. 

Also pay attention to the following part of the output:

Queue 2 blocked by user: 140388157085440 Deleted by user: 1403881654781441, Blocked by User: 140388148692736

I created only 2 users, but the numbers shown here are 3. Why is this so?

0
source share
1 answer

You are currently initializing mutexVariable to PTHREAD_MUTEX_INITIALIZER each time you create a new construct for the new MutexClass . See what happens if thread A contains mutexVariable and thread B wants to get a mutex:

 thread A tries to lock mutexVariable and succeeds mutexVariable = PTHRAED_MUTEX_INITIALIZER pthread_mutex_lock(mutexVariable) thread B tries to lock mutexVariable and succeeds // this assignment overwrites the locked state thread A has stored mutexVariable = PTHRAED_MUTEX_INITIALIZER // mutex is default-initialized (not locked) - so lock it pthread_mutex_lock(mutexVariable) // both threads now believe they have the mutex // and all syncronization is lost thread B unlocks mutexVariable // succeeds thread A unlocks mutexVariable // uh.. it is not even locked any more?! 

You should initialize mutexVariable only once, in main.cpp:13 :

 pthread_mutex_t mutexVariable = PTHRAED_MUTEX_INITIALIZER; 

and remove initialization from MutexClass .

Not sure if this will fix all your problems, but this is what I would do first.

+2
source

Source: https://habr.com/ru/post/1239612/


All Articles