Multi-Threaded Programming
with C++11 - Part A
#include <iostream> #include <thread> void foo() { std::cout << "foo()\n"; } void bar() { std::cout << "bar()\n"; } int main() { std::thread t([]{ foo(); bar(); }); return 0; }
Once our thread started, we should let the code know if we want to wait for it to finish by joining with it or leave it to run on its own by detaching it. Our program may be terminated before the std::thread object is destroyed if we don't do anything explicitly.
So, we need to ensure that the thread is joined or detached. If we choose not to wait the thread finish, then we need to ensure that the data accessed by the thread is valid until the thread has finished with it. Otherwise, as shown in the example below, we may encounter a situation where the thread function holds pointers/references to local variables and the thread hasn't finished when the function exits.
#include <iostream> #include <thread> using namespace std; void foo(int i) { cout << "foo() i = " << i << endl; } struct task_struct { int& i; task_struct(int& ii):i(ii){ cout << "task_struct constructor i = " << i << endl; } void operator()() { for(unsigned j = 0; j < INT_MAX; ++j) { cout << j << ": foo()\n"; foo(i); } } }; void A_function_creating_a_thread_within() { int state = 99; task_struct task(state); cout << "Launching a thread\n"; std::thread t(task); cout << "detaching the thread\n"; // do not wait for the thread to finish t.detach(); } int main() { A_function_creating_a_thread_within(); cout << "END OF PROGRAM\n"; return 0; }
with the possible output:
task_struct constructor i = 99 Launching a thread 0: foo() foo() i = 99 detaching the thread 1: foo() foo() i = 3996880 2: foo() foo() i = 3996880 END OF PROGRAM
In the example, the new thread associated with t will probably still be running when main() exits, because we explicitly decided not to wait for it by calling detach().
Following pictures are from Visual Studio 2012:
We can also check the thread variables: right click on the thread, and then "Switch To Thread".
Then, we can see the locals of the thread.
If the thread is still running, then the next call to foo(i) will access an already destroyed variable. In general, it's not a good idea to create a thread within a function accessing the local variables in that function, unless the thread is guaranteed to finish before the function exits, which may not be the case for the example above.
We can avoid this problem by calling join() on the associated std::thread instance so that we can wait for a thread to complete.
In the code below, we replaced the detach() with join() to ensure that the thread was finished before the function was exited and thus before the local variables were destroyed. For more info on join, please visit multithreaded-Join().
#include <iostream> #include <thread> using namespace std; void foo(int i) { cout << "foo() i = " << i << endl; } struct task_struct { int& i; task_struct(int& ii):i(ii){ cout << "task_struct constructor i = " << i << endl; } void operator()() { for(unsigned j = 0; j < INT_MAX; ++j) { cout << j << ": foo()\n"; foo(i); } } }; void A_function_creating_a_thread_within() { int state = 99; task_struct task(state); cout << "Launching a thread\n"; std::thread t(task); // cout << "detaching the thread\n"; // t.detach(); cout << "joining the thread\n"; t.join(); } int main() { A_function_creating_a_thread_within(); cout << "END OF PROGRAM\n"; return 0; }
However, we can call join() only once for a given thread. Let's look inside the std::thread code:
inline void thread::join() { // join thread if (!joinable()) _Throw_Cpp_error(_INVALID_ARGUMENT); if (_Thr_is_null(_Thr)) _Throw_Cpp_error(_INVALID_ARGUMENT); if (get_id() == _STD this_thread::get_id()) _Throw_Cpp_error(_RESOURCE_DEADLOCK_WOULD_OCCUR); if (_Thrd_join(_Thr, 0) != _Thrd_success) _Throw_Cpp_error(_NO_SUCH_PROCESS); _Thr_set_null(_Thr); }Once join() is called, the std::thread object is no longer joinable, and joinable() will return false.
The join() gives us no choice but to wait for a thread to finish. If we need more control over our waiting for a thread, such as to check whether a thread is finished, or to wait only a certain period of time, then we have to use condition variables.
Actuallu, calling join()) cleans up any storage associated with the thread, so the std::thread object is no longer associated with the just finished thread.
When we call join() or detach(), it should be done before a std::thread object is destroyed. If we're detaching a thread, we can usually call detach() immediately after the thread has been started, so this can't be an issue. However, if we want to wait for the thread, we need to be very careful regarding the place in the code where we put the call join(). In other words, the call to join() has a high chance of being skipped if an exception is thrown after the thread has been started but before the call to join().
To avoid our application being terminated when an exception is thrown, we may want to call join() in the non-exceptional case, and we also need to call join() in the presence of an exception not to encounter accidental lifetime problems. As an example, let's look at the following code:
#include <iostream> #include <thread> using namespace std; void foo(int i) { cout << "foo() i = " << i << endl; } struct task_struct { int& i; task_struct(int& ii):i(ii){ cout << "task_struct constructor i = " << i << endl; } void operator()() { for(unsigned j = 0; j < INT_MAX; ++j) { cout << j << ": foo()\n"; foo(i); } } }; void current_thread_task() { cout << "try: current_thread_task()\n"; } void A_function_creating_a_thread_within() { int state = 99; task_struct task(state); std::thread t(task); try { current_thread_task(); } catch(...) { cout << "catch(...)\n"; t.join(); throw; } t.join(); } int main() { A_function_creating_a_thread_within(); cout << "END OF PROGRAM\n"; return 0; }
Output should look something like this:
task_struct constructor i = 99 0: foo() try: current_thread_task() foo() i = 99 1: foo() foo() i = 99 2: foo() foo() i = 99 3: foo() foo() i = 99 4: foo() foo() i = 99 ... END OF PROGRAM
The code is using a try/catch block to ensure that a thread with access to local state is finished before the function exits, regardless of how the function exits either normally or by an exception. However, if it's important to ensure that the thread must complete before the function exits and it's important to make sure that this is the case for all possible exit paths, and it's desirable to provide a simple way of doing it.
Resource Acquisition Is Initialization (RAII) provides a class that does the join() in its destructor, as shown in the following code.
#include <iostream> #include <thread> using namespace std; void foo(int i) { cout << "foo(" << i << ")\n"; } struct task_struct { int& i; task_struct(int& ii):i(ii){ "task_struct constructor\n";} void operator()() { cout << "task_struct::operator()\n"; for(unsigned j = 0; j < 10; ++j) { foo(i); } } }; class thread_RAII { thread& t; public: thread_RAII(thread& th):t(th) { cout << "thread_RAII constructor\n"; } ~thread_RAII() { if(t.joinable()) { cout << "if joinable(), then t.join()\n"; t.join(); } cout << "thread_RAII destructor\n"; } private: // copy constructor thread_RAII(const thread_RAII& thr) ; // copy-assignment operator thread_RAII& operator=(const thread_RAII& thr); }; void current_thread_task() { cout << "do something in current_thread_task()\n"; }; void A_function_creating_a_thread_within() { int state = 99; task_struct task(state); cout << "launching a thread\n"; thread t(task); cout << "make an instance of thead_RAII\n"; thread_RAII raii(t); cout << "call current_thread_task()\n"; current_thread_task(); } int main() { A_function_creating_a_thread_within(); return 0; }
The output can be random:
launching a thread task_struct::operator() foo(make an instance of thead_RAII thread_RAII constructor call current_thread_task() do something in current_thread_task() if joinable(), then t.join() 99) foo(99) foo(99) foo(99) foo(99) foo(99) foo(99) foo(99) foo(99) foo(99) thread_RAII destructor
When the execution of the current thread reaches the end of A_function_creating_a_thread_within() , the local objects are destroyed in reverse order of construction. In other words, the thread_RAII object raiiObj is destroyed first. The next one to be destroyed is t.
The destructor of thread_RAII first tests to see if the std::thread object is joinable() before calling join(). This is important, because join() can be called only once for a given thread of execution, so it would therefore be a mistake to do so if the thread had already been joined.
Calling detach() on a std::thread object leaves the thread to run in the background. The actual thread of execution has no direct communication with the detached thread, thus there is no control over waiting for that thread to complete. In other words, if a thread becomes detached, it's impossible to obtain that std::thread object, so it can no longer be joined. A detached thread runs in the background. The ownership and control are passed over to the C++ Runtime Library. The C++ Runtime Library ensures that the resources of the thread are correctly reclaimed when the thread exits.
Detached threads are also called daemon threads. The daemon threads may well run for almost the entire lifetime of the application, performing a background task such as monitoring the filesystem, clearing unused entries out of object caches, or optimizing data structures, etc.
std::thread t(background_task); t.detach(); assert(!t.joinable());
In order to detach the thread from a std::thread object, there must be a thread to detach: we can't just call detach() on a std::thread object with no associated thread of execution. This is exactly the same situation as for join(), and we can check it in exactly the same way: we can only call t.detach() for a std::thread object t when t.joinable() returns true.
Suppose we have an application that can edit several documents. Whenever we open a new document to edit, we can create a thread and edit the document on that thread. Each document is running on an independent thread, and thus there is no need for waiting. Here is the example:
void editDocument(std::string const& fileName) { openDocumentAndDisplayGUI(fileName); while(!doneEditing()) { UserCommand cmd=getUserInput(); if(cmd.type==openNewDocument) { std::string const newName=getFilename(); std::thread t(editDocument,newName); t.detach(); } else { processUserInput(cmd); } } }
If the user tries to open a new document, we prompt them for the document to open, start a new thread to open that document, and then detach it. Note that because the new thread is doing the same task as the current thread but on a different file, we can reuse the same function (editDocument) with the newly chosen fileName as the supplied argument.
We can pass arguments to the callable object or function. It is as simple as passing additional arguments to the std::thread constructor. However, it's important to note that the arguments are copied into internal storage, where they can be accessed by the newly created thread.
The following example creates a new thread t which calls t(CallableObj, 3.14, "pi").
void CallableObj(double d, std::string const& s); std::thread t(CallableObj, 3.14, "pi");
Note that even though CallableObj takes a string as the second parameter, the string literal is passed as a char const* and converted to a string only in the context of the new thread. This is important when the argument we supply is a pointer to an automatic variable:
void CallableObj(double d, std::string const& s); void fnc(float param) { char buf[1024]; sprintf(buf, "%f", param); thread t(CallableObj, 3.14, buf); t.detach(); }
In this case, it's the pointer to the local variable buf that was passed through to the new thread, and there's a significant chance that the function fnc will exit before the buffer has been converted to a string on the new thread, thus may cause undefined behavior.
To avoid the undefined behavior, we can cast it to string before passing the buf to the std::thread constructor:
void CallableObj(double d, std::string const& s); void fnc2(float param) { char buf[1024]; sprintf(buf, "%f", param); thread t(CallableObj, 3.14, string(buf)); t.detach(); }
In the example above, the potential issue is that we are relying on the implicit conversion of the pointer to the buf into the string object as a function parameter, because the thread constructor copies the supplied values as it is, without converting to the expected argument type.
It's also possible to get the reverse scenario. That is, the object is copied, and what we wanted is a reference. This might happen if the thread is updating a data structure that's passed in by reference. Here is the example of that case:
void updateWidgetData(Widget w, WidgetData& wData); void fnc3(Widget w) { WidgetData data; std::thread t(updateWidgetData, w, data); displayStatus(); t.join(); processWidgetData(data); }
Although updateWidgetData expects the second parameter to be passed by reference, the std::thread constructor doesn't aware of that. It's oblivious to the types of the arguments expected by the function and blindly copies the supplied values. When it calls updateWidgetData, it will end up passing a reference to the internal copy of data and not a reference to data itself. Consequently, when the thread finishes, these updates will be discarded as the internal copies of the supplied arguments are destroyed, and processWidgetData() will be passed an unchanged data rather than a correctly updated version.
The solution is to wrap the arguments that really need to be references in std::ref
void updateWidgetData(Widget w, WidgetData& wData); void fnc4(Widget w) { WidgetData data; std::thread t(updateWidgetData, w, std::ref(data)); displayStatus(); t.join(); processWidgetData(data); }
Then, updateWidgetData will be correctly passed a reference to data rather than a reference to a copy of data.
The parameter-passing semantics is similar to std::bind. In both cases, the operation of the std::thread constructor and the operation of std::bind are defined in a similar manner. This means that we can pass a function pointer as the function, provided we supply a suitable object pointer as the first argument:
class X { public: void task(); }; X x; std::thread t(&X;::task, &x;);
This code will invoke x.task() on the new thread, because the address of x is supplied as the object pointer. We can also supply arguments to such a member function call: the third argument to the std::thread constructor will be the first argument to the member function and so forth.
Another interesting scenario for supplying arguments is where the arguments can't be copied but can only be moved: the data held within one object is transferred over to another, leaving the original object empty. An example of such a type is std::unique_ptr, which provides automatic memory management for dynamically allocated objects. Only one std::unique_ptr instance can point to a given object at a time, and when that instance is destroyed, the pointed-to object is deleted.
The move constructor and move assignment operator allow the ownership of an object to be transferred around between std::unique_ptr instances. Such a transfer leaves the source object with a NULL pointer. This moving of values allows objects of this type to be accepted as function parameters or returned from functions. If the source object is a temporary, the move is automatic, however, if the source is a named value, the transfer must be requested directly by invoking std::move().
The example below shows the use of std::move to transfer ownership of a dynamic object into a thread:
void processObject(std::unique_ptr<object>); std::unique_ptr<object> p(new object); p->prepareData(99); std::thread t(processObject, std::move(p));
By specifying std::move(p) in the std::thread constructor, the ownership of the object is transferred first into internal storage for the newly created thread and then into processObject.
The std::unique_ptr's ownership semantics is the same as that of std::thread. Though std::thread instance doesn't own a dynamic object in the same way as std::unique_ptr does, it owns a resource. In other words, each instance is responsible for managing a thread of execution. This ownership can be transferred between instances, because instances of std::thread are movable, even though they aren't copyable. Therefore, only one object is associated with a particular thread of execution at any one time while providing programmers with the option of transferring the ownership between objects.
There are couple of situations that we want to move ownership to another thread. Suppose we want to write a function that creates a thread to run in the background. But we want the ownership of the new thread to pass back to the calling function rather than waiting for it to complete. In another case, we create a thread and pass the ownership to some function that should wait for it to complete. For both cases, we need to transfer ownership from one place to another.
This is why std::thread needs to support the move. Many resource-owning types in the C++ Standard Library such as std::ifstream and std::unique_ptr are movable but not copyable, and std::thread is one of them. This means that the ownership of a particular thread of execution can be moved between std::thread instances, as shown in the following code.
It creates two threads of execution and transfers ownership of those threads among three std::thread instances, t1, t2, and t3:
#include <iostream> #include <thread> using namespace std; void f1() { cout << "f1()\n"; } void f2() { cout << "f2()\n"; } int main() { // a new thread is started and associated with t1 thread t1(f1); // (A) // Ownership is then transferred over to t2 // and moves ownershipby when t2 is constructed thread t2 = move(t1); // (B) // Now, t1 no longer has any associated thread of execution; // the thread running f1 is now associated with t2. // A new thread is started and associated with a temporary thread object // The transfer of ownership into t1 doesn't require a call to move() to explicitly move ownership, // because the owner is a temporary object-moving from temporaries is automatic and implicit. t1 = thread(f2); // (C) // t3 is default constructed. // This means that it's created without any associated thread of execution. thread t3; // (D) // Ownership of the thread currently associated with t2 is transferred into t3. // This time, with an explicit call to std::move(), because t2 is a named object. // After all these moves, t1 is associated with the thread running f2, // t2 has no associated thread, and t3 is associated with the thread running f1. t3 = move(t2); // (E) // The final move transfers ownership of the thread running f1 back to t1 where it started. // This assignment will terminate program! t1 = move(t3); // (F) // (G) return 0; }
The following pictures show the creation and the movement of the threads from (A) to (G). Each represents the line of execution, for example, (A) means that the debugging session is at the line and about to execute that line of code:
(A) The line of code: thread t1(f1); is about to be executed;
(B) After execution of the line: thread t1(f1);
(C) After execution of the line: thread t2 = move(t1);
(D) After execution of the line: t1 = thread(f2);
(E) After execution of the line: thread t3;
(F) After execution of the line: t3 = move(t2);
(G) After execution of the line: t1 = move(t3);
One thing for the last move() (G):
t1 = move(t3);we must explicitly wait for a thread to complete or detach it before destruction, and the same applies to assignment. In this case t1 already had an associated thread (which was running f2). Therefore, terminate() is called to terminate the program. This is done for consistency with the std::thread destructor. We can't just drop a thread by assigning a new value to the std::thread object that manages it.
The support for move in std::thread means that ownership can readily be transferred out of a function:
#include <iostream> #include <thread> using namespace std; void fnc1() { cout << "fnc1()\n"; } void fnc2(int i) { cout << "fnc2(int " << i << ")\n"; } thread f() { return thread(fnc1); } thread g() { thread t(fnc2, 999); return t; } int main() { // two threads are started within functions f() and g() f().join(); g().join(); return 0; }
Output from the run is:
fnc1() fnc2(int 999)
If ownership should be transferred into a function, it can just accept an instance of thread by value as one of the parameters:
#include <iostream> #include <thread> using namespace std; void fnc() { cout << "thread task fnc()\n"; } void f(thread t) { cout << "f(thread t)\n"; t.join(); } void g() { f(thread(fnc)); thread t(fnc); f(move(t)); } int main() { g(); return 0; }
Output should look like this:
f(thread t) thread task fnc() f(thread t) thread task fnc()
One of the benefits of the move support of std::thread is that we can build on the thread_RAII class of the previous example and have it actually take ownership of the thread. This avoids any unpleasant consequences should the thread_RAII object outlive the thread it was referencing, and it also means that no one else can join or detach the thread once ownership has been transferred into the object: source file - MyThread.cpp
#include <iostream> #include <thread> using namespace std; void foo(int i) { cout << "foo(" << i << ")\n"; } struct task_struct { int& i; task_struct(int& ii):i(ii) { cout << "constructor for task_struct\n"; } void operator()() { cout << "task_struct::operator()\n"; for(unsigned j = 0; j < 10; ++j) { foo(i); } } }; class MyThread { thread t; public: explicit MyThread(thread& th):t(move(th)) { // Checking the thread is still joinalbe in the constructor // and throw an exception if it's not. if(!t.joinable()) throw std::logic_error( "Not joinable" ); cout << "MyThread constructor\n"; } ~MyThread() { // joins with the thread supplied to the constructor t.join(); // do not need to joinable check. // if(t.joinable()) t.join(); t.join(); cout << "MyThread destructor\n"; } // copy constructor MyThread(MyThread const&) ; // copy-assignment operator MyThread& operator=(MyThread const&); }; void current_thread_task() { cout << "do something in current_thread_task()\n"; }; void f() { int state = 99; // local state variable task_struct task(state); thread t(task); MyThread mt(t); current_thread_task(); // When the initial thread reaches this point, // the object of MyThread is destroyed in ~MyThread() } int main() { f(); return 0; }
Possible output from the run is:
constructor for task_struct task_struct::operator() MyThread constructor do something in current_thread_task() foo(99) foo(99) foo(99) foo(99) foo(99) foo(99) foo(99) foo(99) foo(99) foo(99) MyThread destructor
The move in std::thread also allows containers to have std::thread objects, if those containers are move aware like the updated std::vector<>. This means that we can write code which spawns a number of threads and then waits for them to finish using container classes.
#include <iostream> #include <thread> #include <algorithm> #include <vector> using namespace std; void doTask(unsigned id) { cout << "doTask()\n"; } void f() { vector<thread> threads; for(unsigned i = 0; i < 10; ++i) { // spawn threads threads.push_back(thread(doTask, i)); } // call join() on each thread in turn for_each(threads.begin(), threads.end(), std::mem_fn(&thread;::join)); } int main() { f(); return 0; }
Output:
doTask() doTask() doTask() doTask() doTask() doTask() doTask() doTask() doTask() doTask()
Putting std::thread objects in a std::vector<> is a step toward automating the management of those threads. Instead of creating separate variables for those threads and joining with them directly, they can be treated as a group.
We can choose a dynamic number of threads at runtime rather than creating a fixed number as in the previous example.
std::thread::hardware_ concurrency() function returns an indication of the number of threads that can run concurrently for a given execution. It may be the number of cores on a multicore system and it can be used for distributing tasks among threads.
The code below is a parallel version of std::accumulate. It divides the work among the threads, with a minimum number of elements per thread.
#include <iostream> #include <thread> #include <algorithm> #include <vector> using namespace std; template<typename Iterator, typename T> struct accumulate_block { void operator()(Iterator first, Iterator last, T& result) { result=std::accumulate(first, last, result); } }; template<typename Iterator, typename T> T parallel_accumulate(Iterator first, Iterator last, T init) { unsigned long const length = std::distance(first, last); // If the input range is empty, just return the initial value init. if(!length) return init; unsigned long const min_per_thread = 50; // Otherwise, there is at least one element in the range, // divide the number of elements to process by the minimum block size // in order to give the maximum number of threads unsigned long const max_threads = (length+min_per_thread-1)/min_per_thread; // std::thread::hardware_ concurrency() function returns the number of threads // that can run concurrently for a given execution. unsigned long const hardware_threads = thread::hardware_concurrency(); // The number of threads to run is the minimum of our calculated maximum and // the number of hardware threads // If the call to thread::hardware_concurrency() returned 0, we simply substitute it with 2. unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads); // The number of entries for each thread to process is // the length of the range divided by the number of threads unsigned long const block_size = length / num_threads; // To know how many threads we have, we can create a std::vectorvector<T> results(num_threads); // Launch one fewer thread than num_threads, because we already have one. vector<thread> threads(num_threads-1); Iterator block_start = first; // Launching the threads with a loop for(unsigned long i=0; i < num_threads - 1; ++i) { Iterator block_end = block_start; // advance the block_end iterator to the end of the current block std::advance(block_end, block_size); // launch a new thread to accumulate the results for the block threads[i] = thread(accumulate_block<Iterator, T>(), block_start, block_end, std::ref(results[i])); // The start of the next block is the end of this block block_start = block_end; } // After the launch of all the threads, // this thread can then process the final block accumulate_block<Iterator, T>()( block_start, last, results[num_threads-1]); // Once accumulated the results for the last block, // we can wait for all the threads std::for_each(threads.begin(), threads.end(), std::mem_fn(&std;::thread::join)); // add up the results with a final call to std::accumulate return std::accumulate(results.begin(), results.end(), init); }
The number of threads to run is the minimum of our calculated maximum and the number of hardware threads. We do not want to run more threads than the hardware can support (oversubscription), because the context switching will mean that more threads will decrease the performance.
Note: this chapter is largely based on "C++ Concurrency in Action Practical Multithreading by Anthony Williams"
Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization