Multi-Threaded Programming III
- C/C++ Class Thread for Pthreads - 2020
- POSIX Threads, or Pthreads, is a POSIX standard for threads. The standard, POSIX.1c, Threads extensions (IEEE Std 1003.1c-1995), defines an API for creating and manipulating threads.
- Implementations of the API are available on many Unix-like POSIX systems such as FreeBSD, NetBSD, GNU/Linux, Mac OS X and Solaris, but Microsoft Windows implementations also exist. For example, the pthreads-w32 is available and supports a subset of the Pthread API for the Windows 32-bit platform.
- The POSIX standard has continued to evolve and undergo revisions, including the Pthreads specification. The latest version is known as IEEE Std 1003.1, 2004 Edition.
- Pthreads are defined as a set of C language programming types and procedure calls, implemented with a pthread.h header file. In GNU/Linux, the pthread functions are not included in the standard C library. They are in libpthrea, therefore, we should add -lpthread to link our program.
Pthreads API can be grouped into four:
- Thread management:
Routines that work directly on threads - creating, detaching, joining, etc. They also include functions to set/query thread attributes such as joinable, scheduling etc. - Mutexes:
Routines that deal with synchronization, called a "mutex", which is an abbreviation for "mutual exclusion". Mutex functions provide for creating, destroying, locking and unlocking mutexes. These are supplemented by mutex attribute functions that set or modify attributes associated with mutexes. - Condition variables:
Routines that address communications between threads that share a mutex. Based upon programmer specified conditions. This group includes functions to create, destroy, wait and signal based upon specified variable values. Functions to set/query condition variable attributes are also included. - Synchronization:
Routines that manage read/write locks and barriers.
- Our main() program is a single, default thread. All other threads must be explicitly created by the programmer.
- pthread_create creates a new thread and makes it executable. This routine can be called any number of times from anywhere within our code.
- pthread_create (pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg) arguments:
- thread:
An identifier for the new thread returned by the subroutine. This is a pointer to pthread_t structure. When a thread is created, an identifier is written to the memory location to which this variable points. This identifier enables us to refer to the thread. - attr:
An attribute object that may be used to set thread attributes. We can specify a thread attributes object, or NULL for the default values. - start_routine:
The routine that the thread will execute once it is created.void *(*start_routine)(void *)
We should pass the address of a function taking a pointer to void as a parameter and the function will return a pointer to void. So, we can pass any type of single argument and return a pointer to any type.
While using fork() causes execution to continue in the same location with a different return code, using a new thread explicitly provides a pointer to a function where the new thread should start executing. - arg:
A single argument that may be passed to start_routine. It must be passed as a void pointer. NULL may be used if no argument is to be passed.
- thread:
- The maximum number of threads that may be created by a process is implementation dependent.
- Once created, threads are peers, and may create other threads. There is no implied hierarchy or dependency between threads.
- Here is a sample of creating a child thread:
// thread0.c #include <pthread.h> #include <stdio.h> #include <stdlib.h> void *worker_thread(void *arg) { printf("This is worker_thread()\n"); pthread_exit(NULL); } int main() { pthread_t my_thread; int ret; printf("In main: creating thread\n"); ret = pthread_create(&my;_thread, NULL, &worker;_thread, NULL); if(ret != 0) { printf("Error: pthread_create() failed\n"); exit(EXIT_FAILURE); } pthread_exit(NULL); }
In the code, the main thread will create a second thread to execute worker_thread(), which will print out its message while main thread prints another. The call to create the thread has a NULL value for the attributes, which gives the thread default attributes. The call also passes the address of a my_thread variable for the worker_thread() to store a handle to the thread. The return value from the pthread_create() call will be zero if it's successful, otherwise, it returns an error.
To run it:
$ gcc -o thread0 thread0.c -lpthread $ ./thread0 In main: creating thread This is worker_thread()
We can create several child threads:
// thread01.c #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define N 5 void *worker_thread(void *arg) { printf("This is worker_thread #%ld\n", (long)arg); pthread_exit(NULL); } int main() { pthread_t my_thread[N]; long id; for(id = 1; id <= N; id++) { int ret = pthread_create(&my;_thread[id], NULL, &worker;_thread, (void*)id); if(ret != 0) { printf("Error: pthread_create() failed\n"); exit(EXIT_FAILURE); } } pthread_exit(NULL); }
Output is:
$ ./thread01 This is worker_thread #5 This is worker_thread #4 This is worker_thread #3 This is worker_thread #2 This is worker_thread #1
Note that, in the code, we pass the parameter (thread id) to the child thread.
If we do (void*)&id, it's a wrong way of passing data to the child thread. It passes the address of variable id, which is shared memory space and visible to all threads. As the loop iterates, the value of this memory location changes, possibly before the created threads can access it.
- By default, a thread is created with certain attributes. Some of these attributes can be changed by the programmer via the thread attribute object.
- pthread_attr_init() and pthread_attr_destroy() are used to initialize/destroy the thread attribute object.
- Other routines are then used to query/set specific attributes in the thread attribute object.
- There are several ways in which a Pthread may be terminated:
- The thread returns from its starting routine (the main routine for the initial thread).
- The thread makes a call to the pthread_exit subroutine.
- The thread is canceled by another thread via the pthread_cancel routine
- The entire process is terminated due to a call to either the exec or exit subroutines.
` - pthread_exit is used to explicitly exit a thread. Typically, the pthread_exit() routine is called after a thread has completed its work and is no longer required to exist.
If main() finishes before the threads it has created, and exits with pthread_exit(), the other threads will continue to execute. Otherwise, they will be automatically terminated when main() finishes.
So, if we comment out the line pthread_exit() in main() in the thread01.c of the previous example code, the threads created may not have a chance to execute their work before being terminated. - The programmer may optionally specify a termination status, which is stored as a void pointer for any thread that may join the calling thread.
- Cleanup: the pthread_exit() routine does not close files; any files opened inside the thread will remain open after the thread is terminated.
- int pthread_join (pthread_t th, void **thread_return)
The first parameter is the thread for which to wait, the identified that pthread_create filled in for us. The second argument is a pointer to a pointer that itself points to the return value from the thread. This function returns zero for success and an error code on failure. - When a thread is created, one of its attributes defines whether the thread is joinable or detached. Only threads that are created as joinable can be joined. If a thread is created as detached, it can never be joined.
- The final draft of the POSIX standard specifies that threads should be created as joinable.
- To explicitly create a thread as joinable or detached, the attr argument in the pthread_create() routine is used. The typical 4 step process is:
- Declare a pthread attribute variable of the pthread_attr_t data type.
- Initialize the attribute variable with pthread_attr_init().
- Set the attribute detached status with pthread_attr_setdetachstate()
- When done, free library resources used by the attribute with pthread_attr_destroy()
- Here is the summary for the join related functions:
- pthread_join (threadid,status)
- pthread_detach (threadid)
- pthread_attr_setdetachstate (attr,detachstate)
- pthread_attr_getdetachstate (attr,detachstate)
Picture from https://computing.llnl.gov/tutorials/pthreads/
A thread can execute a thread join to wait until the other thread terminates. In our case, you - the main thread - should execute a thread join waiting for your colleague - a child thread - to terminate. In general, thread join is for a parent (P) to join with one of its child threads (C). Thread join has the following activities, assuming that a parent thread P wants to join with one of its child threads C:
- When P executes a thread join in order to join with C, which is still running, P is suspended until C terminates. Once C terminates, P resumes.
- When P executes a thread join and C has already terminated, P continues as if no such thread join has ever executed (i.e., join has no effect).
A parent thread may join with many child threads created by the parent. Or, a parent only join with some of its child threads, and ignore other child threads. In this case, those child threads that are ignored by the parent will be terminated when the parent terminates.
- The pthread_join() subroutine blocks the calling thread until the specified thread terminates.
- The programmer is able to obtain the target thread's termination return status if it was specified in the target thread's call to pthread_exit() as show here:
void *worker_thread(void *arg) { pthread_exit((void*)911); } int main() { int i; pthread_t thread; pthread_create(&thread;, NULL, worker_thread, NULL); pthread_join(thread, (void **)&i); printf("%d\n",i); // will print out 911 }
- A joining thread can match one pthread_join() call. It is a logical error to attempt multiple joins on the same thread.
Here is a sample code using pthread_join(): Merge Sort using Multithread.
There are cases we have to resynchronize our threads using pthread_join() before allowing the program to exit. We need to do this if we want to allow one thread to return data to the thread that created it. However, sometimes we neither need the second thread to return information to the main thread nor want the main thread to wait for it.
Suppose we create a second thread to spool a backup copy of a data file that is being edited while the main thread continues to service the user. When the backup has finished, the second thread can just terminate, and there is no need for it to join the main thread.
We can create threads that have this behavior. They are called detached threads, and we can create them by modifying the thread attributes or by calling pthread_detach().
- The pthread_detach() routine can be used to explicitly detach a thread even though it was created as joinable.
- There is no converse routine.
The example below is using pthread_join() to wait for it to finish. The newly created thread is sharing global variable with the original thread. It modifies the variable.
// thread1.c #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> void *thread_fnc(void * arg); char thread_msg[] ="Hello Thread!"; // global int main() { int ret; pthread_t my_thread; void *ret_join; ret = pthread_create(&my;_thread, NULL, thread_fnc, (void*) thread_msg); if(ret != 0) { perror("pthread_create failed\n"); exit(EXIT_FAILURE); } printf("Waiting for thread to finish...\n"); ret = pthread_join(my_thread, &ret_join); if(ret != 0) { perror("pthread_join failed"); exit(EXIT_FAILURE); } printf("Thread joined, it returned %s\n", (char *) ret_join); printf("New thread message: %s\n",thread_msg); exit(EXIT_SUCCESS); } void *thread_fnc(void *arg) { printf("This is thread_fnc(), arg is %s\n", (char*) arg); strcpy(thread_msg,"Bye!"); pthread_exit("'Exit from thread'"); }
And the make file:
thread1: thread1.o gcc -D_REENTRANT -o thread1 thread1.o -lpthread thread1.o: thread1.c gcc -c thread1.c clean: rm -f *.o thread1
Output from the run:
$ ./thread1 Waiting for thread to finish... This is thread_fnc(), arg is Hello Thread! Thread joined, it returned 'Exit from thread' New thread message: Bye!
We declare a prototype for the function that the thread calls when we create it:
void *thread_fnc(void * arg);
It takes a pointer to void as its argument and returns a pointer to void, which is required by pthread_create().
In main(), we call pthread_create() to start running our new thread:
ret = pthread_create(&my;_thread, NULL, thread_fnc, (void*) thread_msg);
We are passing the address of a pthread_t object that we can use to refer to the thread later. For the thread attribute, we pass NULL since we do not want to modify the default values.
If the call succeeds, two threads will be running. The original thread (main) continues and execute the code after pthread_create(), and a new thread starts executing in the thread_fnc().
The original thread checks if the new thread has started, and then calls pthread_join():
ret = pthread_join(my_thread, &ret;_join);
We pass the identifier of the thread that we are waiting to join and a pointer to a result. This function will wait until the other thread terminates before it returns. Then, it prints the return value from the thread.
The new thread starts executing at the start of thread_fnc(), which updates global variable, returning a string to the main thread.
The mutual exclusion lock is the simplest and most primitive synchronization variable. It provides a single, absolute owner for the section of code (aka a critical section) that it brackets between the calls to pthread_mutex_lock() and pthread_mutex_unlock(). The first thread that locks the mutex gets ownership, and any subsequent attempts to lock it will fail, causing the calling thread to go to sleep. When the owner unlocks it, one of the sleepers will be awakened, made runnable, and given the chance to obtain ownership.
A mutex lock is a mechanism that can be acquired by only one thread at a time. For other threads to get the same mutex, they must wait until it is released by the current owner of the mutex.
The key advantage of multithreading code is that all threads see the same memory. So, data is already shared between threads. But the failure of coordinating the access to the data can lead to incorrect results due to the reason such as data reaces. The mutex lock is one of ways of synchronizing data sharing methods.
#include <stdio.h> #include <pthread.h> volatile int counter = 0; pthread_mutex_t myMutex; int argc, char *argv[] void *mutex_testing(void *param) { int i; for(i = 0; i < 5; i++) { pthread_mutex_lock(&myMutex;); counter++; printf("thread %d counter = %d\n", (int)param, counter); pthread_mutex_unlock(&myMutex;); } } int main() { int one = 1, two = 2, three = 3; pthread_t thread1, thread2, thread3; pthread_mutex_init(&myMutex;,0); pthread_create(&thread1;, 0, mutex_testing, (void*)one); pthread_create(&thread2;, 0, mutex_testing, (void*)two); pthread_create(&thread3;, 0, mutex_testing, (void*)three); pthread_join(thread1, 0); pthread_join(thread2, 0); pthread_join(thread3, 0); pthread_mutex_destroy(&myMutex;); return 0; }
The code shows a mutex lock protecting the variable count against simulation access by multiple threads. Note that the count is declared as volatile to ensure that it is read from memory at each access and written back to memory after each access. There would be a data race between the threads if we're not using mutex lock.
Output:
thread 3 counter = 1 thread 3 counter = 2 thread 3 counter = 3 thread 3 counter = 4 thread 3 counter = 5 thread 2 counter = 6 thread 2 counter = 7 thread 2 counter = 8 thread 2 counter = 9 thread 2 counter = 10 thread 1 counter = 11 thread 1 counter = 12 thread 1 counter = 13 thread 1 counter = 14 thread 1 counter = 15
If we do not use the lock and there are some more steps after incrementing counter. We may get the different results. In the code below, we used usleep(1), 1 ms sleep to represents other steps.
#include <stdio.h> #include <pthread.h> #include <unistd.h> volatile int counter = 0; pthread_mutex_t myMutex; void *mutex_testing(void *param) { int i; for(i = 0; i < 5; i++) { //pthread_mutex_lock(&myMutex;); counter++ ; usleep(1); printf("thread %d counter = %d\n", (int)param, counter); //pthread_mutex_unlock(&myMutex;); } } int main() { int one = 1, two = 2, three = 3; pthread_t thread1, thread2, thread3; pthread_mutex_init(&myMutex;,0); pthread_create(&thread1;, 0, mutex_testing, (void*)one); pthread_create(&thread2;, 0, mutex_testing, (void*)two); pthread_create(&thread3;, 0, mutex_testing, (void*)three); pthread_join(thread1, 0); pthread_join(thread2, 0); pthread_join(thread3, 0); pthread_mutex_destroy(&myMutex;); return 0; }
Output:
thread 3 counter = 3 thread 2 counter = 4 thread 1 counter = 5 thread 3 counter = 5 thread 2 counter = 5 thread 3 counter = 8 thread 2 counter = 9 thread 1 counter = 10 thread 3 counter = 11 thread 2 counter = 12 thread 1 counter = 13 thread 3 counter = 14before aquiring the lock just check for queue count :) thread 2 counter = 14 thread 1 counter = 14 thread 1 counter = 15
As we can see, any thread can increment the counter variable at any time. Also we see from the values of the counter, it may have been incremented by other thread while trying to print the value.
Though mutexes, by default, are private to a process, they can be shared between multiple processes. To create a mutex that can be shared between processes, we need to set up the attributes for pthread_mutex_init():
#include <pthread.h> int main() { pthread_mutex_t myMutex; pthread_mutexattr_t myMutexAttr; pthread_mutexattr_init(&myMutexAttr;); pthread_mutexattr_setpshared(&myMutexAttr;, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&myMutex;, &myMutexAttr;); //... pthread_mutexattr_destroy(&myMutexAttr;); pthread_mutex_destroy(&myMutex;); return 0; }
pthread_mutexattr_setpshared() with a pointer to the attribute structure and the value PTHREAD_PROCESS_SHARED sets the attributes to cause a shared mutex to be created.
Mutexes are not shared between processes by default. Calling pthread_mutexattr_setpshared() with the value PTHREAD_PROCESS_PRIVATE restores the attribute to the default.
These attributes are passed into the call to pthread_mutexattr_init() to set the attributes of the initialized mutex. Once the attributes have been used, they can be disposed of by a call to pthread_mutexattr_destroy().
Spin locks are essentially mutex locks.
A spin lock polls its lock condition repeatedly until that condition becomes true. Spin locks are most often used on multiprocessor systems where the expected wait time for a lock is small. In these situations, it is often more efficient to poll than to block the thread, which involves a Context switch and the updating of thread data structures.
The difference between a mutex lock and a spin lock is that a thread waiting to acquire a spin lock will keep trying to acquire the lock without sleeping and consuming processor resources until it finally acquires the lock. On the contrary, a mutex lock may sleep if it is unable to acquire the lock. But under normal mutex implementation, multex locks will immediately put a thread to sleep when it cannot get the mutex.
The advantage of using spin locks is that they will acquire the lock as soon as it is released, while a mutex lock will need to be woken by the OS before it can get the lock. The disadvantage is that a spin lock will spin on a virtual CPU monopolizing that resource, but a mutex lock will sleep and free the CPU for another thread to use. So, in practice, mutex locks are often implemented to be a hybrid of a spin locks and more traditional mutex locks. This kind of mutx is called adaptive mutex lock.
The call pthread_spin_init(). initializes a spin lock. A spin lock can be shared between processes or private to the process that created it.
By passing the value PTHREAD_PROCESS_PRIVATE to the pthread_spin_init(), a spin lock is not shareable.
To make it sharable, we need to pass the value PTHREAD_PROCESS_SHARED. The default is set to be private.
The call to the pthread_spin_lock() will spin until the lock is acquired, and the call to the pthread_spin_unlock() will release the lock. The call pthread_spin_destroy() releases any resources used by the lock.
#include <pthread.h> pthread_spinlock_t slock; void splock() { int i = 100; while(i>0) { pthread_spin_lock(&slock;); i--; pthread_spin_unlock(&slock;); } } int main() { pthread_spin_init(&slock;, PTHREAD_PROCESS_PRIVATE); splock(); pthread_spin_destroy(&slock;); return 0; }
To use process resources more efficiently, we can use pthread_spin_trylock(). This call will attempt to acquire the lock, however, it will immediately return whether or not the lock is acquired rather than keep spinning:
void splock() { int i = 0; while(i == 0) { pthread_spin_trylock(&slock;); i++; pthread_spin_unlock(&slock;); } }
As one of the synchronization methods, a barrier tells a group of threads or processes must stop at the barrier and cannot proceed until all other threads/processes reach this barrier.
Here we'll address the classic barrier that its construct define the set of participating processes/threads statically. This is usually done either at program startup or when a barrier like the Pthreads barrier is instantiated.
Unlike the static barriers, to support more dynamic programming paradigms like fork/join parallelism, the sets of participants have to be dynamic. Thus, the set of processes/threads participating in a barrier operation needs to be able to change over time. But in this section, we will discuss only the static barriers.
We can create a barrier by calling pthread_barrier_init(). It initializes the barrier and it takes three parameters:
- A pointer to the barrier to be initialized.
- To determine whether the barrier is private or can be shared, it takes an optional attributes structure.
- We need feed the number of threads that need to reach the barrier before any threads are released.
Each thread calls pthread_barrier_wait() when it reaches the barrier, and the call will return when the number of threads has reached the barrier. The code below shows how the bbefore aquiring the lock just check for queue count :)arrier force the threads to wait until all the threads have been created:
#include <pthread.h> #include <stdio.h> pthread_barrier_t b; void task(void* param) { int id = (int)param; printf("before the barrier %d\n", id); pthread_barrier_wait(&b;); printf("after the barrier %d\n", id); } int main() { int nThread = 5; int i; pthread_t thread[nThread]; pthread_barrier_init(&b;, 0, nThread); for(i = 0; i < nThread; i++) pthread_create(&thread;[i], 0, task, (void*)i); for(i = 0; i < nThread; i++) pthread_join(thread[i], 0); pthread_barrier_destroy(&b;); return 0; }
Output below shows all the threads arrive and leave.
before the barrier 4 before the barrier 3 before the barrier 2 before the barrier 1 before the barrier 0 after the barrier 0 after the barrier 3 after the barrier 4 after the barrier 2 after the barrier 1
If we did not set the barrier, the arrival and leave would have been mixed:
before the barrier 4 after the barrier 4 before the barrier 3 after the barrier 3 before the barrier 2 after the barrier 2 before the barrier 1 after the barrier 1 before the barrier 0 after the barrier 0
The output confirms the mixed arrival and leave. I run the code with the comment at the line:
/* pthread_barrier_wait(&b;); */
The code below shows another example of synchronizing access with mutexes. To control access, we lock a mutex before entering the section of the code, and then unlock it when we have finished.
int pthread_mutex_init(pthread_mutex_t *m_mutex, const pthread_mutexattr_t *mutexattr); int pthread_mutex_lock(pthread_mutex_t *m_mutex); int pthread_mutex_unlock(pthread_mutex_t *m_mutex); int pthread_mutex_destroy(pthread_mutex_t *m_mutex);
All of the functions take a pointer to a previously declared object, in this case, pthread_mutex_t. The extra attribute parameter pthread_mutex_init allows us to provide attributes for the mutex, which controls its behavior.
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <semaphore.h> void *thread_fnc(void * arg); pthread_mutex_t my_mutex; char my_string[100]; /* shared variable */ int time_to_exit = 0; int main() { int ret; pthread_t my_thread; void *ret_join; ret = pthread_mutex_init(&my;_mutex, NULL); if(ret != 0) { perror("mutex init failed\n"); exit(EXIT_FAILURE); } ret = pthread_create(&my;_thread, NULL, thread_fnc, NULL); if(ret != 0) { perror("pthread_create failed\n"); exit(EXIT_FAILURE); } pthread_mutex_lock(&my;_mutex); printf("Type in some characters. Enter 'quit' to finish\n"); while(!time_to_exit) { fgets(my_string, 100, stdin); pthread_mutex_unlock(&my;_mutex); while(1) { if(my_string[0] != '\0') { pthread_mutex_unlock(&my;_mutex); sleep(1); } else break; } } pthread_mutex_unlock(&my;_mutex); printf("Waiting for thread to finish...\n"); ret = pthread_join(my_thread, &ret;_join); if(ret != 0) { perror("pthread_join failed"); exit(EXIT_FAILURE); } printf("Thread joined\n"); pthread_mutex_destroy(&my;_mutex); exit(EXIT_SUCCESS); } void *thread_fnc(void *arg) { sleep(1); pthread_mutex_lock(&my;_mutex); while(strncmp("quit", my_string, 4) != 0) { printf("You typed in %d characters\n",strlen(my_string)-1); my_string[0]='\0'; pthread_mutex_unlock(&my;_mutex); sleep(1); pthread_mutex_lock(&my;_mutex); if(my_string[0] != '\0') { pthread_mutex_unlock(&my;_mutex); sleep(1); pthread_mutex_lock(&my;_mutex); } } time_to_exit = 1; my_string[0] = '\0'; pthread_mutex_unlock(&my;_mutex); pthread_exit(NULL); }
Let's look at the thread function. The new thread tries to lock the mutex. If it's already locked, the call will block until it is released. Once we have access, we check to see if we are being requested to exit. If we are requested to exit, then simply set time_to_exit, zap the first character of the my_string, and exit.
If we do not want to exit, count the characters and then zap the first character to a null. We use the first character being null as a way of telling the reader program that we have finished the counting. We then unlock the mutex and wait for the main thread to run. We attempt to lock the mutex and, when we succeed, check if the main thread has given us any more work to do. If that's not the case, we unlock the mutex and wait more. If we have work to do, we count the characters and loop through again.
Here is the output:
$ ./mutex Type in some characters. Enter 'quit' to finish 12345 You typed in 5 characters You typed in -1 charactersbefore aquiring the lock just check for queue count :) 999 You typed in 3 characters You typed in -1 characters You typed in -1 characters quit Waiting for thread to finish... Thread joined
The following code calculates the sum of integers from 1 to 100. Each of 5 threads takes care of 20 integers. Each set of integers is separated by the argument we're passing into threadWork() function.
#include <stdio.h> #include <stdlib.h> #include <pthread.h> typedef struct { int *a; int length; int sum; } MyData; #define N 5 #define L 20 MyData mData; pthread_t myThread[N]; pthread_mutex_t mutex; void *threadWork(void *arg) { /* Define and use local variables for convenience */ long offset = (long)arg; int sum = 0; int start = offset * mData.length; int end = start + mData.length; /* each thread calculates its sum */ for (int i = start; i < end ; i++) sum += mData.a[i]; /* mutex lock/unlock */ pthread_mutex_lock(&mutex;); mData.sum += sum; pthread_mutex_unlock(&mutex;); pthread_exit((void*) 0); } int main () { void *status; /* fill the structure */ int *a = (int*) malloc (N*L*sizeof(int)); for (int i = 0; i < N*L; i++) a[i] = i + 1; mData.length = L; mData.a = a; mData.sum = 0; pthread_mutex_init(&mutex;, NULL); /* Each thread has its own set of data to work on. */ for(long i=0; i < N; i++) pthread_create(&myThread;[i], NULL, threadWork, (void *)i); /* Wait on child threads */ for(int i=0; i < N; i++) pthread_join(myThread[i], &status;); /* Results and cleanup */ printf ("Sum = %d \n", mData.sum); free (a); pthread_mutex_destroy(&mutex;); pthread_exit(NULL); }
Condition variable enables threads to communicate with state changes. In other words, a condition variable allows one thread to inform other threads about the changes in the state of a shared resource and allows the other threads to wait for such notification. It allows a thread to sleep(wait) until another thread signals it that it must respond to since some condition has arisen. Without condition variables, the waiting have to do polling to check whether the condition become true.
A condition variable is always used in conjunction with a mutex. While the mutex is there to ensure that only one thread at a time can access the resource, the condition variable is used to signal changes the state variable.
The essence of the condition variable is "signal and wait". The signal operation is a notification of state changes in shared variable to the waiting threads. The wait operation is the way of blocking until such a notification is received.
#include <pthread.h> #include <stdio.h> pthread_mutex_t mutex; pthread_cond_t cond; int buffer[100]; int loops = 5; int length = 0; void *producer(void *arg) { int i; for (i = 0; i < loops; i++) { pthread_mutex_lock(&mutex;); buffer[length++] = i; printf("producer length %d\n", length); pthread_cond_signal(&cond;); pthread_mutex_unlock(&mutex;); } } void *consumer(void *arg) { int i; for (i = 0; i < loops; i++) { pthread_mutex_lock(&mutex;); while(length == 0) { printf(" consumer waiting...\n"); pthread_cond_wait(&cond;, &mutex;); } int item = buffer[--length]; printf("Consumer %d\n", item); pthread_mutex_unlock(&mutex;); } } int main(int argc, char *argv[]) { pthread_mutex_init(&mutex;, 0); pthread_cond_init(&cond;, 0); pthread_t pThread, cThread; pthread_create(&pThread;, 0, producer, 0); pthread_create(&cThread;, 0, consumer, 0); pthread_join(pThread, NULL); pthread_join(cThread, NULL); pthread_mutex_destroy(&mutex;); pthread_cond_destroy(&cond;); return 0; }
The pthread_cond_wait(&cond;, &mutex;) does the following:
- unlock the muext
- block the calling thread until another thread signals the condition variable cond
- relock mutex
Video recording - Producer and Consumer model using QSemaphore, for more info, please visit my Qt5 Tutorial:
QSemaphores - Producer and Consumer.
A semaphore is useful for working with objects where what we care about is whether there are either zero objects or more than zero. Buffers and lists that fill and empty are good examples.
POSIX | Win32 |
---|---|
sem_wait(&s;); | WaitForSingleObject(s,...); |
sem_post(&s;); | ReleaseSemaphore(s,...); |
Semaphores are also useful when we want a thread to wait for something. We can accomplish this by having the thread call sem_wait() on a semaphore with value zero, then have another thread increment the semaphore when we're ready for the thread to continue.
A semaphore is a counting and signaling mechanism. We use it to allow threads access to a specified number of items. If there is a single item, then a semaphore is virtually the same as a mutex.
However, it is more commonly used in a situation where there are multiple items to be managed. Semaphores can also be used to signal between threads or processes. For example, to tell another thread that there is data present in a queue. There are two types of semaphores: named and unnamed semaphores.
An unnamed semaphore is initialized with a call to semi_init().
There are 4 basic semaphore functions, but unlike most of the functions which start with pthread_, semaphore functions start with sem_.
A semaphore is created with the sem_init function, and it is declared in semaphore.h:
-
int sem_init(sem_t *sem, int pshared, unsigned int val);
It initializes a semaphore object pointed by sem, sets its sharing option, and gives it an initial integer value. The pshared parameter controls the type of semaphore. If the value of pshared is 0, the semaphore is local to the current process, i.e. private to a single process. Otherwise, the semaphore may be shared between processes. The third parameter is the value with which to initialize the semaphore. A semaphore created by a call to sem_init() is destroyed with a call to sem_destoy().
#include <semaphore.h> int main() { sem_t mySemaphore; sem_init(&mySemaphore;, 0, 5); //... sem_destroy(&mySemaphore;); return 0; }
In the code, we initialized a semaphore with a count of 5. The second parameter of the call to sem_init() is 0, and this makes the semaphore private to the thread. Passing the value of one would enable the semaphore to be shared between multiple processes.
The code below is using a named semaphore. A named semaphore is opened rather than initialized. The sem_open() returns a pointer to a semaphore.
#include <semaphore.h> #include <fcntl.h> int main() { sem_t *mySemaphore; mySemaphore = sem_open("./my_sem", O_CREAT, 0777, 5); //... sem_close(mySemaphore); sem_unlink("./my_sem"); return 0; }
-
int sem_post(sem_t *sem);
This function atomically increases the value of the semaphore by 1.
-
int sem_try_wait(sem_t *sem);
This function will return immediately either having decremented the semaphore or if the semaphore is already zero.
-
int sem_wait(sem_t *sem);
This function atomically decreases the value of the semaphore by 1, but always waits until the semaphore has a nonzero count first. So, if we call sem_wait on a semaphore with a value of 2, the thread will continue executing but the semaphore will be decreased to 1. If we call it on a semaphore with a value of 0, the function will wait until some other thread has incremented the value so that it is no longer 0. If two thread are both waiting in sem_wait for the same semaphore to be nonzero and it is incremented once by a third process, only one of the two waiting process will get to decrement the semaphore and continue while the other will remain waiting.
#include <semaphore.h> int main() { sem_t *mySemaphore; int count = 0; sem_init(mySemaphore, 0, 1); sem_wait(mySemaphore); count++; sem_post(mySemaphore); sem_destroy(mySemaphore); return 0; }
-
int sem_destroy(sem_t *sem);
This function tidies up the semaphore when we have finished with it. This function takes a pointer to a semaphore and tidies up any resources that it may have. If we attempt to destroy a semaphore for which some thread is waiting, we get an error.
#include <pthread.h> #include <stdio.h> void *f1() { printf("thread 1\n"); } void *f2() { printf("thread 2\n"); } int main() { pthread_t myThread[2]; pthread_create(&myThread;[0], 0, f1, 0); pthread_create(&myThread;[1], 0, f2, 0); pthread_join(myThread[0], 0); pthread_join(myThread[1], 0); return 0; }
Output should look like this. In this output, the thread2 gets to the printf() first.
thread 2 thread 1
The behavior is essentially not deterministic. However, we can enforce the order of the execution by using semaphore as shown in the next example:
#include <pthread.h> #include <stdio.h> #include <semaphore.h> sem_t mySemaphore; void *f1(void *param) { printf("thread %d\n", (long)param); // signal semaphore // increment the value of mySemaphore by 1 // if there are one or more threads waiting, wake sem_post(&mySemaphore;); } void *f2(void *param) { // wait until value of semaphore is greater than 0 // decrement the value of semaphore by 1 sem_wait(&mySemaphore;); printf("thread %d\n", (long)param); } int main() { pthread_t myThread[2]; // The 3rd value which is semaphore count set to 0 sem_init(&mySemaphore;, 0, 0); long one = 1, two = 2; // f2 has sem_wait(), so it should wait // until f1 increase the semaphore value to 1 // As a result, f1 will be executed first pthread_create(&myThread;[0], 0, f1, (void *)one); pthread_create(&myThread;[1], 0, f2, (void *)two); pthread_join(myThread[0], 0); pthread_join(myThread[1], 0); sem_destroy(&mySemaphore;); return 0; }
The function f2() has sem_wait(), so it should wait until function f1() increase the semaphore value to 1 by calling sem_post(&mySemaphore;).
So, the output should now look like this:
thread 1 thread 2
Condition variable enables threads to communicate with state changes. In other words, a condition variable allows one thread to inform other threads about the changes in the state of a shared resource and allows the other threads to wait for such notification. It allows a thread to sleep(wait) until another thread signals it that it must respond to since some condition has arisen.
Though we can implement synchronization by controlling thread access to data, condition variables allow threads to synchronize based upon the actual value of data.
A condition variable is always used in conjunction with a mutex. While the mutex provides mutual exclusion for accessing the shared variable, the condition variable is used to signal changes the state variable.
In this section, we will see how to use a semaphore to achieve this effect. Actually, the example in the previous section showed how. But here, we will start from the very simple one. By using a semaphore, we can achieve the similar effect of join(). In the code, the main thread is waiting for the worker thread to finish its task.
#include <pthread.h> #include <stdio.h> #include <semaphore.h> sem_t s; void * task(void *param) { printf("%s done its task \n", (char *)param); // signaling worker's task (printing) is done sem_post(&s;); return NULL; } int main() { // set current semaphore value 0 (the 3rd arg) sem_init(&s;, 0, 0); printf("main thread started\n"); pthread_t worker; pthread_create(&worker;, 0, task, (void*)"worker thread"); // wait here for worker thread to finish sem_wait(&s;); printf("main thread ends here\n"); return 0; }
The output is:
main thread started worker thread done its task main thread ends here
In this section, we will address the producer/consumer (aka Bounded Buffer) problem.
Suppose one or more producer threads and one or more consumer threads. Producers produce data items and wish to place them in a buffer. Then, consumers grab data items out of the buffer consume the data in some way.
This arrangement occurs in many places within real systems. For example, in a multithread web server, a producer puts HTTP requests into a work queue (i.e., the bounded buffer); a thread pool of consumers each take a request out of the work queue and process the request.
Similarly, when you use a piped command in a UNIX shell, as follows:
$ cat notes.txt | wc -l
This example runs two processes concurrently; cat notes.txt writes the body of the file notes.txt to what it thinks is standard output; instead, however, the UNIX shell has redirected the output to what is called a UNIX pipe (created by the pipe() system call). The other end of this pipe is connected to the standard input of the process wc, which simply counts the number of lines in the input stream and prints out the result. Thus, the cat process is the producer, and the wc process is the consumer. Between them is a bounded buffer.
Because the bounded buffer is a shared resource, we must of course require synchronized access to it to avoid any race condition. To understand this problem better, let us examine some actual code:
In this example, we assume that the shared buffer buffer is just an array of integers, and that the fill and use integers are used as indices into the array, and are used to track where to both put data (fill) and get data (use).
Suppose, we have just two threads, a producer and a consumer, and that the producer just writes some number of integers into the buffer which the consumer removes from the buffer and prints:
#include <pthread.h> #include <stdio.h> #define MAX 100 int buffer[MAX]; int fill = 0; int use = 0; void put(int value) { buffer[fill] = value; fill = (fill + 1) % MAX; } int get() { int tmp = buffer[use]; use = (use + 1) % MAX; return tmp; } int loops = 0; void *producer(void *arg) { int i; for (i = 0; i < loops; i++) { put(i); } } void *consumer(void *arg) { int i; for (i = 0; i < loops; i++) { int b = get(); printf("%d\n", b); } } int main(int argc, char *argv[]) { if(argc < 2 ){ printf("Needs 2nd arg for loop count variable.\n"); return 1; } loops = atoi(argv[1]); pthread_t pThread, cThread; pthread_create(&pThread;, 0, producer, 0); pthread_create(&cThread;, 0, consumer, 0); pthread_join(pThread, NULL); pthread_join(cThread, NULL); return 0; }
In main(), we simply created the two threads and waits for them to finish.
If we run the code with loops = 5, what we'd like to get is the producer producing 0, 1, 2, 3, and 4, and the consumer printing them in that order. However, without synchronization, we may not get that.
For example, imagine if the consumer thread runs first; it will call get() to get data that hasn't even been produced yet, and thus not function as desired. Things get worse when we add multiple producers or consumers, as there could be race conditions in the update of the use or fill indices.
Actually, I got the following output:
$ ./test 5 0 0 0 0 0
Clearly, something should be done to correct this kind of potential malfunction.
As a first try, we introduces two semaphores, empty and full, which the threads will use to indicate when a buffer entry has been emptied or filled, respectively.
In the code below, the producer first waits for a buffer to become empty in order to put data into it, and the consumer similarly waits for a buffer to become filled before using it.
#include <pthread.h> #include <stdio.h> #include <semaphore.h> #define MAX 1 int buffer[MAX]; int fill = 0; int use = 0; void put(int value) { buffer[fill] = value; fill = (fill + 1) % MAX; } int get() { int b = buffer[use]; use = (use + 1) % MAX; return b; } int loops = 0; sem_t empty; sem_t full; void *producer(void *arg) { int i; for (i = 0; i < loops; i++) { sem_wait(∅); put(i); sem_post(&full;); } } void *consumer(void *arg) { int i; for (i = 0; i < loops; i++) { sem_wait(&full;); int b = get(); sem_post(∅); printf("%d\n", b); } } int main(int argc, char *argv[]) { if(argc < 2 ){ printf("Needs 2nd arg for loop count variable.\n"); return 1; } loops = atoi(argv[1]); sem_init(∅, 0, MAX); // MAX buffers are empty to begin with... sem_init(&full;, 0, 0); // ... and 0 are full pthread_t pThread, cThread; pthread_create(&pThread;, 0, producer, 0); pthread_create(&cThread;, 0, consumer, 0); pthread_join(pThread, NULL); pthread_join(cThread, NULL); return 0; }
Suppose, there are two threads, a producer and a consumer. Assume the consumer gets to run first. Thus, the consumer will call sem_wait(&full;), however, full semaphore was initialized to the value 0, the call will block the consumer and wait for another thread to call sem_post() on the semaphore, as desired.
Then, the producer runs. It will call sem_wait(∅). Unlike the consumer, the producer will continue through this line, because empty was initialized to the value MAX ( = 1). Thus, empty will be decremented to 0 and the producer will put a data value into the first entry of buffer. The producer will then continue on and call sem_post(&full;), changing the value of the full semaphore from 0 to 1 and waking up the consumer (e.g., move it from blocked to ready status).
In this case, one of two things could happen. If the producer continues to run, it will loop around and hit sem_wait(∅) again. This time, however, it would block, as the empty semaphore's value is 0. If the producer instead was interrupted and the consumer began to run, it would call sem_wait(&full;) and find that the buffer was indeed full and thus consume it. In either case, we achieve the desired behavior:
$ ./test 5 0 1 2 3 4
Beware of the race condition!
Suppose, there are multiple producers and multiple consumers. We now have a problem: a race condition.
Imagine two producers both calling into put() at the same time. Assume producer 1 gets to run first, and just starts to fill the first buffer entry (fill = 0 @ buffer[fill] = value;). Before the producer gets a chance to increment the fill counter to 1, it is interrupted. Producer 2 starts to run, and at the same line of code, it also puts its data into the 0th element of buffer, which means that the old data there is overwritten!
As we can see, what we've forgotten here is mutual exclusion. The filling of a buffer and incrementing of the index into the buffer is a critical section, and thus must be guarded carefully. So let's use binary semaphore and add some locks.
Here is the new code:
#include <pthread.h> #include <stdio.h> #include <semaphore.h> #define MAX 1 int buffer[MAX]; int fill = 0; int use = 0; void put(int value) { buffer[fill] = value; fill = (fill + 1) % MAX; } int get() { int b = buffer[use]; use = (use + 1) % MAX; return b; } int loops = 0; sem_t empty; sem_t full; sem_t mutex; void *producer(void *arg) { int i; for (i = 0; i < loops; i++) { sem_wait(&mutex;); sem_wait(∅); put(i); sem_post(&full;); sem_post(&mutex;); } } void *consumer(void *arg) { int i; for (i = 0; i < loops; i++) { sem_wait(&mutex;); sem_wait(&full;); int b = get(); sem_post(∅); sem_post(&mutex;); printf("%d\n", b); } } int main(int argc, char *argv[]) { if(argc < 2 ){ printf("Needs 2nd arg for loop count variable.\n"); return 1; } loops = atoi(argv[1]); sem_init(∅, 0, MAX); // MAX buffers are empty to begin with... sem_init(&full;, 0, 0); // ... and 0 are full sem_init(&mutex;, 0, 1); // mutex = 1 since it a lock pthread_t pThread, cThread; pthread_create(&pThread;, 0, producer, 0); pthread_create(&cThread;, 0, consumer, 0); pthread_join(pThread, NULL); pthread_join(cThread, NULL); return 0; }
DEAD LOCK!
Now we've added some locks around the entire put()/get() parts of the code. However, it still doesn't work. Why? Deadlock!
Why does deadlock occur? Take a moment to consider it; try to find a case where deadlock arises; what sequence of steps must happen for the program to deadlock?Suppose, two threads, one producer and one consumer. The consumer gets to run first. It acquires the mutex, and then calls sem_wait() on the full semaphore. Because there is no data yet, this call causes the consumer to block and thus yield the CPU; importantly, though, the consumer still holds the lock!.
A producer then runs. It has data to produce and if it were able to run, it would be able to wake the consumer thread and all would be good. Unfortunately, the first thing it does is call sem_wait(&mutex;) on the binary mutex semaphore. The lock is already held. Hence, the producer is now stuck waiting too.
There is a simple cycle here. The consumer holds the mutex and is waiting for the someone to signal full. The producer could signal full but is waiting for the mutex.
Thus, the producer and consumer are each stuck waiting for each other: a classic deadlock!!.
SOLUTION!
we simply must reduce the scope of the lock. Here is
the final working solution:
// binary_semaphore.c #include <pthread.h> #include <stdio.h> #include <semaphore.h> #define MAX 1 int buffer[MAX]; int fill = 0; int use = 0; void put(int value) { buffer[fill] = value; fill = (fill + 1) % MAX; } int get() { int b = buffer[use]; use = (use + 1) % MAX; return b; } int loops = 0; sem_t empty; sem_t full; sem_t mutex; void *producer(void *arg) { int i; for (i = 0; i < loops; i++) { sem_wait(∅); // scope of lock reduced sem_wait(&mutex;); put(i); sem_post(&mutex;); sem_post(&full;); } } void *consumer(void *arg) { int i; for (i = 0; i < loops; i++) { sem_wait(&full;); // scope of lock reduced sem_wait(&mutex;); int b = get(); sem_post(&mutex;); sem_post(∅); printf("%d\n", b); } } int main(int argc, char *argv[]) { if(argc < 2 ){ printf("Needs 2nd arg for loop count variable.\n"); return 1; } loops = atoi(argv[1]); sem_init(∅, 0, MAX); // MAX buffers are empty to begin with... sem_init(&full;, 0, 0); // ... and 0 are full sem_init(&mutex;, 0, 1); // mutex = 1 since it a lock pthread_t pThread, cThread; pthread_create(&pThread;, 0, producer, 0); pthread_create(&cThread;, 0, consumer, 0); pthread_join(pThread, NULL); pthread_join(cThread, NULL); return 0; }
Here is the file: binary_semaphore.c.
Here is the Makefile.
binary_semaphore: binary_semaphore.o gcc -D_REENTRANT -o binary_semaphore binary_semaphore.o -lrt -lpthread binary_semaphore.o: binary_semaphore.c gcc -c binary_semaphore.c clean: rm -f *.o binary_semaphore
Output looks fine now!
$ ./binary_semaphore 5 0 1 2 3 4
As we see, by simply moving the mutex acquire and release to be just around the critical section; the full and empty wait and signal code is left outside.
The result is a simple and working bounded buffer, a commonly-used pattern in multithreaded programs.
This section was written based on Semaphores.
In the example below, we will use a binary semaphore which takes 0 or 1. There is a more general type of semaphore, a counting semaphore which takes a wider range of values. Semaphores are used to protect a section of code so that only one thread can run it at the given time. To do this kind of task, a binary semaphore is needed. However, if we want to permit a limited number of threads to execute a piece of code, we may need a counting semaphore.
// sem.c #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <semaphore.h> void *thread_fnc(void * arg); /* semaphores are declared global so they can be accessed in main() and in thread routine */ sem_t my_semaphore; char my_string[100]; /* shared variable */ int main() { int ret; int value; pthread_t my_thread; void *ret_join; /* initialize mutex to 1 - binary semaphore */ /* second param = 0 - semaphore is local */ ret = sem_init(&my;_semaphore, 0, 0); sem_getvalue(&my;_semaphore, &value;); printf("The initial value of the semaphore is %d\n", value); if(ret != 0) { perror("semaphore init failed\n"); exit(EXIT_FAILURE); } ret = pthread_create(&my;_thread, NULL, thread_fnc, NULL); if(ret != 0) { perror("pthread_create failed\n"); exit(EXIT_FAILURE); } printf("Type in some characters. Enter 'quit' to finish\n"); while(strncmp("quit", my_string, 4) != 0) { fgets(my_string, 100, stdin); sem_post(&my;_semaphore); sem_getvalue(&my;_semaphore, &value;); printf("The value of the semaphore after sem_post() is %d\n", value); } printf("Waiting for thread to finish...\n"); ret = pthread_join(my_thread, &ret;_join); if(ret != 0) { perror("pthread_join failed"); exit(EXIT_FAILURE); } printf("Thread joined, it returned %s\n", (char *) ret_join); sem_destroy(&my;_semaphore); exit(EXIT_SUCCESS); } void *thread_fnc(void *arg) { int val; printf("This is thread_fnc(), waiting for nonzero count...\n"); sem_getvalue(&my;_semaphore, &val;); printf("The value of the semaphore in thread_fnc() is %d\n", val); sem_wait(&my;_semaphore); sem_getvalue(&my;_semaphore, &val;); printf("The value of the semaphore after sem_wait() in thread_fnc() is %d\n", val); while(strncmp("quit", my_string, 4) != 0) { printf("You typed in %d characters\n",strlen(my_string)-1); sem_getvalue(&my;_semaphore, &val;); printf("The value of the semaphore before sem_wait() in thread_fnc() is %d\n", val); sem_wait(&my;_semaphore); sem_getvalue(&my;_semaphore, &val;); printf("The value of the semaphore after sem_wait() in thread_fnc() is %d\n", val); } pthread_exit(NULL); }
Output is:
$ ./sem The initial value of the semaphore is 0 Type in some characters. Enter 'quit' to finish This is thread_fnc(), waiting for nonzero count... The value of the semaphore in thread_fnc() is 0 1234 The value of the semaphore after sem_post() is 1 The value of the semaphore after sem_wait() in thread_fnc() is 0 You typed in 4 characters The value of the semaphore before sem_wait() in thread_fnc() is 0 98 The value of the semaphore after sem_post() is 1 The value of the semaphore after sem_wait() in thread_fnc() is 0 You typed in 2 characters The value of the semaphore before sem_wait() in thread_fnc() is 0 quit The value of the semaphore after sem_post() is 1 Waiting for thread to finish... The value of the semaphore after sem_wait() in thread_fnc() is 0 Thread joined, it returned (null)
In main(), after creating a new thread, we read in text, and put it into my_string which is global, and the incremented the semaphore with sem_post():
while(strncmp("quit", my_string, 4) != 0) { fgets(my_string, 100, stdin); sem_post(&my;_semaphore); }
In the new thread, we wait for the semaphore and then count the characters from the input:
sem_wait(&my;_semaphore); while(strncmp("quit", my_string, 4) != 0) { printf("You typed in %d characters\n",strlen(my_string)-1); sem_wait(&my;_semaphore); }
While the semaphore is set, we are waiting for keyboard input. When we have input, we release the semaphore allowing the second thread to count the characters before the first thread reads the keyboard input again.
Note that both threads share the same my_string array.
The main advantage of multiprocess programming is that a failure of one process does not cause all the processes to die. As a result, it might be possible to recover from such failures.
The fork() system call will spawn a new child process which is an identical process to the parent except that has a new system process ID. The process is copied in memory from the parent and a new process structure is assigned by the kernel. The return value of the function is which discriminates the two threads of execution. A zero is returned by the fork function in the child's process.
#include <unistd.h> #include <stdlib.h> #include <sys/wait.h> #include <stdio.h> int main() { int status; pid_t pid = fork(); // Child process will sleep for 10 second if(pid == 0) { execl("/usr/bin/sleep", "/usr/bin/sleep", 10, NULL); } // Parent process will wait for child process to terminate // Then, it will report the exit status of the child process else { waitpid(pid, &status;, 0); printf("status = %d\n", status); // print out -> status = 65280 } }
The execl() is used to execute the sleep command.
The waitpid() call is used to wait for state changes in a child of the calling process, and obtain information about the child whose state has changed. A state change is considered to be:
- the child terminated
- the child was stopped by a signal
- the child was resumed by a signal
In the case of a terminated child, performing a wait allows the system to release the resources associated with the child.
If a wait is not performed, then the terminated child remains in a zombie state.
If a child has already changed state, then it returns immediately. Otherwise it block until either a child changes state or a signal handler interrupts the call.
We'll make C++ classes Runnable and Thread for Pthreads. The interfaces are almost identical to the Win32 version of the previous chapter. The only difference is the Thread class constructor has a parameter indicating whether or not the thread is to be created in a detached state. The default is set to undetached
In this code, we added communications between the threads. We selected shared memory to demonstrate the communications between the threads. Because threads in the same program can reference global variables or call methods on a shared object, threads in different processes can access the same kernel objects by calling kernel routines.
#include <iostream> #include <pthread.h> #include <cassert> #include <error.h> using namespace std; class Runnable { public: virtual void* run() = 0; virtual ~Runnable() = 0; }; // Pure virtual destructor: function body required Runnable::~Runnable(){}; class Thread { public: Thread(auto_ptr<Runnable> run, bool isDetached = false); Thread(bool isDetached = false); virtual ~Thread(); void start(); void* join(); private: // thread ID pthread_t PthreadThreadID; // true if thread created in detached state bool detached; pthread_attr_t threadAttribute; // runnable object will be deleted automatically auto_ptr<Runnable> runnable; Thread(const Thread&); const Thread& operator=(const Thread&); // called when run() completes void setCompleted(); // stores return value from run() void* result; virtual void* run() {} static void* startThreadRunnable(void* pVoid); static void* startThread(void* pVoid); void printError(char * msg, int status, char* fileName, int lineNumber); }; Thread::Thread(auto_ptr<Runnable> r, bool isDetached) : runnable(r), detached(isDetached) { if(!runnable.get()){ cout << "Thread::Thread(auto_ptr<Runnable> r, bool isDetached)"\ "failed at " << " " << __FILE__ <<":" << __LINE__ << "-" << " runnable is NULL" << endl; exit(-1); } } Thread::Thread(bool isDetached) : runnable(NULL), detached(isDetached) {} void* Thread::startThreadRunnable(void* pVoid) { // thread start function when a Runnable is involved Thread* runnableThread = static_cast<Thread*>(pVoid); assert(runnableThread); runnableThread->result = runnableThread->runnable->run(); runnableThread->setCompleted(); return runnableThread->result; } void* Thread::startThread(void* pVoid) { // thread start function when no Runnable is involved Thread* aThread = static_cast<Thread*>(pVoid); assert(aThread); aThread->result = aThread->run(); aThread->setCompleted(); return aThread->result; } Thread::~Thread() {} void Thread::start() { // initialize attribute object int status = pthread_attr_init(&threadAttribute;); if(status) { printError("pthread_attr_init failed at", status, __FILE__, __LINE__); exit(status); } // set the scheduling scope attribute status = pthread_attr_setscope(&threadAttribute;, PTHREAD_SCOPE_SYSTEM); if(status) { printError("pthread_attr_setscope failed at", status, __FILE__, __LINE__); exit(status); } if(!detached) { if(!runnable.get()) { status = pthread_create(&PthreadThreadID;, &threadAttribute;, Thread::startThread, (void*)this); if(status) { printError("pthread_create failed at", status, __FILE__, __LINE__); exit(status); } } else { status = pthread_create(&PthreadThreadID;, &threadAttribute;, Thread::startThreadRunnable, (void*)this); if(status) { printError("pthread_create failed at", status, __FILE__, __LINE__); exit(status); } } } else { // set the detachstate attribute to detached status = pthread_attr_setdetachstate(&threadAttribute;, PTHREAD_CREATE_DETACHED); if(status) { printError("pthread_attr_setdetachstate failed at", status, __FILE__, __LINE__); exit(status); } if(!runnable.get()) { status = pthread_create(&PthreadThreadID;, &threadAttribute;, Thread::startThread, (void*)this); if(status) { printError("pthread_create failed at", status, __FILE__, __LINE__); exit(status); } } else { status = pthread_create(&PthreadThreadID;, &threadAttribute;, Thread::startThreadRunnable, (void*)this); if(status) { printError("pthread_create failed at", status, __FILE__, __LINE__); exit(status); } } } status = pthread_attr_destroy(&threadAttribute;); if(status) { printError("pthread_attr_destroy failed at", status, __FILE__, __LINE__); exit(status); } } void* Thread::join() { // A thread calling T.join() waits until thread T completes. int status = pthread_join(PthreadThreadID, NULL); // result was already saved by thread start function if(status) { printError("pthread_join failed at", status, __FILE__, __LINE__); exit(status); } return result; } void Thread::setCompleted() { // completion handled by pthread_join() } void Thread::printError(char * msg, int status, char* fileName, int lineNumber) { cout << msg << " " << fileName << ":" << lineNumber << "-" << strerror(status) << endl; } // shared variable int s = 0; class communicatingThread: public Thread { public: communicatingThread(int ID) : myID(ID) {} virtual void* run(); private: int myID; }; void* communicatingThread::run() { cout << "Thread " << myID << " is running!" << endl; // increment s by million times for (int i = 0; i < 1000000; i++) s+=1; return 0; } int main() { auto_ptr<communicatingThread> thread1(new communicatingThread(1)); auto_ptr<communicatingThread> thread2(new communicatingThread(2)); thread1->start(); thread2->start(); thread1->join(); thread2->join(); cout << "s = " << s << endl; return 0; }
- In main(), we created two communicatingTthreads.
auto_ptr<communicatingThread> thread1(new communicatingThread(1)); auto_ptr<communicatingThread> thread2(new communicatingThread(2));
- Each communicatingThread increments the global shared variable s one million times.
for (int i = 0; i < 1000000; i++) s+=1;
- The main thread uses join() to wait for the communicatingThread to complete.
thread1->join(); thread2->join();
The results from the run should be 2,000,000, most of the runs.
Note
We may get a message saying "undefined reference to pthread_join", when we compile/link with g++ code.cpp.
Then, try g++ code.cpp -lpthread.
Yes I did find your web site for very helpful.
http://www.bogotobogo.com/cplusplus/multithreading_pthread.php Multi-Threaded Programming III - C/C++ Class Thread for Pthreads - 2015 Exactly what I was looking for and you did it well!I did wish you hadn't sneaked up quite as slowly as you did on the final solution for the consumer/producer problem. Late at night and I lost concentration and didn't realize there was on more step to the final solution so I ended up having to debug the 2nd to the last version - wasted 5 minutes.
*** The MAX should be 5 or greater for your example of using a parm of 5. [say: #define MAX 15]
*** "Post full" and "post empty" need to be protected by the mutex.
I added more validation of the parm so loops fits in buffer. I added a sleep/printf/fflush in "producer" just for fun of demonstrating what was happening. I suggest including for training purposes other minor functions - partial sample, see the completed program:
int emptyValue, fullValue, mutexValue; sem_getvalue(∅, &emptyValue;); if(emptyValue == MAX) sem_destroy(∅); else std::cout << "Error destroying empty. Value is " << emptyValue << std::endl;
Pasted below is the completed program I used. And again your example was well done and just what I needed to learn quickly about these functions. Thanks
// multiple producers & consumers #include <pthread.h> #include <stdio.h> #include <semaphore.h> #include <stdlib.h> #include <iostream> #include <unistd.h> #define MAX 15 int buffer[MAX]; int fill = 0; int use = 0; void put(int value) { buffer[fill] = value; fill = (fill + 1) % MAX; } int get() { int b = buffer[use]; use = (use + 1) % MAX; return b; } int loops = 0; sem_t empty; sem_t full; sem_t mutex; void *producer(void *arg) { int i; for (i = 0; i < loops; i++) { sem_wait(∅); sem_wait(&mutex;); put(i); sem_post(&full;); printf("put %d\n", i); fflush(NULL); sem_post(&mutex;); sleep(1); } pthread_exit(NULL); } void *consumer(void *arg) { int i; for (i = 0; i < loops; i++) { sem_wait(&full;); sem_wait(&mutex;); int b = get(); sem_post(∅); printf("get %d\n", b); fflush(NULL); sem_post(&mutex;); } pthread_exit(NULL); } int main(int argc, char *argv[]) { if(argc < 2 ){ printf("Needs 2nd arg for loop count variable.\n"); return 1; } loops = atoi(argv[1]); if(loops > MAX){ printf("Max allowed arg is %d\n", MAX); return 1; } sem_init(∅, 0, MAX); // MAX buffers are empty to begin with... sem_init(&full;, 0, 0); // ... and 0 are full sem_init(&mutex;, 0, 1); // mutex = 1 since it a lock pthread_t pThread, cThread; pthread_create(&pThread;, 0, producer, 0); pthread_create(&cThread;, 0, consumer, 0); pthread_join(pThread, NULL); pthread_join(cThread, NULL); int emptyValue, fullValue, mutexValue; sem_getvalue(∅, &emptyValue;); sem_getvalue(&full;, &fullValue;); sem_getvalue(&mutex;, &mutexValue;); if(emptyValue == MAX) sem_destroy(∅); else std::cout << "Error destroying empty. Value is " << emptyValue << std::endl; if(fullValue == 0) sem_destroy(&full;); else std::cout << "Error destroying full. Value is " << fullValue << std::endl; if(mutexValue == 1) sem_destroy(&mutex;); else std::cout << "Error destroying mutex. Value is " << mutexValue << std::endl; return 0; }
Reference:
Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization