Implementing Portable Threads and Mutexes
Implementing a portable framework for multi-threading doesn’t have to be difficult or error prone. With the right framework, implementing multi-threaded programs can be pretty simple, and you can hide all the platform dependent functions. You can also implement your threading system such that you don’t pay the costs for multi-threaded support until threads are actually spawned.
Let’s get started.
The Thread Interface
First we have the thread interface:
typedef u32(*KxThreadFunc)(void*); class KxThread { public: KxThread(); KxThread( const KxThread& thread ) { mThreadId = thread.mThreadId; } KxThread& operator=( const KxThread& thread ) { mThreadId = thread.mThreadId; return *this; } bool spawn( KxThreadFunc func, void* arg ); u32 join(); u32 threadId() const { return mThreadId; } bool isValid() const; bool detach(); private: ptrv mThreadId; // the thread id ptrv mOldThreadId; // the thread id the way it was before the thread ended. }; class KxThreadShim { public: KxThreadShim( KxThreadFunc func, void* data ) : mFunc( func ), mData( data ) {} KxThreadFunc mFunc; void* mData; }; u32 kxThreadShimFunc( void* data );
We start by defining the thread function pointer. All your threads will start with a function that looks like this:
u32 myThread( void* data );
This thread function signature should work for any sensible threading system. It takes a pointer to your data as avoid*
pointer. Normally you take all the data you want to use to initialize your thread and pack it into structure or class, and pass its address as the function argument. We’ll talk more about that later.
There are three important methods here, spawn()
, join()
and detach()
.
To create a new thread you call spawn()
, with your thread function, and the data its going to use. Spawn will create a new thread, and call your thread function with the given data inside the thread. The thread will already be running by the time the call to spawn()
returns.
join()
Will cause the calling thread to wait for the thread to complete, and return with the thread’s return value. You use it like this:
KxThread thread1; KxThread thread2; // start the threads thread1.spawn( myThreadFunction, data1 ); thread1.spawn( myThreadFunction, data2 ); // now they are both running simultaneously. // wait until both threads complete thread1.join(); thread2.join();
It doesn’t matter which thread actually completes first. If thread1
finishes before thread2
the KxThread
object will get the return value. What you can be sure of is that after you have called join()
on both threads they they have both completed processing. Of course if your thread function is designed never to return, then a call to join()
on that thread will never return either.
The other way to use threads is to call detach()
on them. Once detached, a thread will continue to run even after the calling thread terminates. This is best for threads that are designed never to terminate, or when the main thread never needs to be notified when the child thread completes.
You use it something like this:
KxThread thread; thread.spawn( myThreadFunction, data ); thread.detach();
There is one final thing. The mutex system is only activated when threads are active – for efficiency. To do this we will wrap each thread function in a short pass through – a “shim” – that deactivates the mutex system when the thread function returns. We implement a simple class to do this, KxThreadShim
. More on that after we discuss the mutex system.
The Mutex Interface
Before we dive in to the implementation of threads, lets talk about mutexes.
A mutex guarantees that in a multi-threaded system certain sections of code are only being run by one thread at at time. This way if you have shared data structures that might be accessed by multiple threads at once, that one thread will not disturb the work of another when it updates the shared data structure.
The mutex interface looks something like this:
// os independent sleep in microseconds KX_IFACE void kxSleep( u32 usec ); KX_IFACE void kxSleepSeconds( u32 sec ); #ifdef LINUX #ifdef __LP64__ #define MUTEX_SIZE 40 // on linux 64 sizeof pthread_mutex_t structure. #else #define MUTEX_SIZE 24 // on linux 32 sizeof pthread_mutex_t structure. #endif #endif // !LINUX #ifdef DARWIN #define MUTEX_SIZE 44 // on Os X sizeof pthread_mutex_t structure. #endif // !DARWIN #ifdef IPHONE_I386 #define MUTEX_SIZE 44 // on iphone simulator sizeof pthread_mutex_t structure. #endif // !IPHONE_I386 #ifdef IPHONE_ARM #define MUTEX_SIZE 44 // on iphone sizeof pthread_mutex_t structure. #endif // !IPHONE_ARM #ifdef WIN32 #define MUTEX_SIZE 24 // on windows sizeof CRITICAL_SECTION structure. #endif // !WIN32 class KxMutex { public: KxMutex(); ~KxMutex(); void lock() { if ( mMutexesActive ) lock_i(); } void unlock() { if ( mMutexesActive ) unlock_i(); } static void activateMutexes(); static void deactivateMutexes(); static u32 getMutexSystemRefCount() { return mMutexesActive; } // wait until the mutex system is no longer active static void wait() { while ( mMutexesActive ) kxSleep( 100 ); } private: void lock_i(); void unlock_i(); char mData[MUTEX_SIZE]; // protect the mutex system itself with a mutex static KxMutex mSelfMutex; static u32 mMutexesActive; };
The important functions here are lock()
and unlock()
. To protect a piece of code you first lock()
it before starting processing on a shared data structure, and thenunlock()
it when you are done. Like this:
GlobalObject object; KxMutex objectMutex; void myThreadFunction( void* data ) { objectMutex.lock(); object.doStuff(); objectMutex.unlock(); }
In most cases you can make the mutex a data member of any class that might be used simultaneously by multiple threads, and just lock it as part of any operation on it:
class SharedObject { void doStuff() { mMutex.lock(); doStuff(); mMutex.unlock(); } public: KxMutex mMutex; }; void myThreadFunction( void* data ) { // this operation is thread safe. object.doStuff(); }
You must use the same mutex to protect any data structure. This for example is wrong:
class SharedObject { void doStuff() { KxMutex mMutex; mMutex.lock(); doStuff(); mMutex.unlock(); } void doMoreStuff() { KxMutex mMutex; mMutex.lock(); doMoreStuff(); mMutex.unlock(); } }; void myThreadFunction( void* data ) { // Wrong: expect to crash. object.doStuff(); object.doMoreStuff(); }
This would be safe if threads only ever called either doStuff()
or doMoreStuff()
but never both. If a thread called both methods, one thread could be in one function, and the other thread in the other function, and both would be updating the SharedObject
simultaneously.
Failure to properly protect global data structures with mutexes is the most common source of crashes in multi-threaded systems. A good pattern to follow is that mutexes protect data and not functions. So for any data structure that might be accessed by multiple threads there is one mutex to protect it, and it gets locked every time that structure is written or read
Structures don’t always have to be protected on read if you can guarantee that the structure is always valid when being modified – or that it is never modified in threaded operation. In that case it is ok to protect the structure just when being written.
The mutex itself is just a data structure. We store it in an opaque buffer ( mData
). To do this we need to know at compile time the size of the operating system’s mutex structure. That is why we define the size of the mutex object per platform at the top of the file.
Or we could do this inside the mutex constructor and allocate a buffer of the correct size at run time. In our case we’d rather not allocate the memory at run time, and just set the size of the buffer at compile time. This both keeps the mutex object lightweight, and allows us to use mutexes in our heap manager. It introduces a point of failure: if the mutex implementation size on a given platform changes our mutexes will crash.
The mutex system itself starts off turned off. The real mutex locking functions are lock_i()
and unlock_i()
. Until it is activated, the lock()
and unlock()
methods do nothing. As we will see in the thread implementation later, the mutex system can be automatically activated the first time a thread is spawned. This way we don’t pay any cost for mutex locking until the the first thread spawns, and generally the programmer doesn’t have to know anything about it. In certain special cases it is necessary for the programmer to activate the mutex system before the first thread is spawned (e.g. in a dynamic library that is called by a multi-threaded program )
Here is the implementation of the activateMutexes()
and deactivateMutexes()
. This code is platform independent:
#include "kx_mutex.h" void KxMutex::activateMutexes() { mSelfMutex.lock_i(); mMutexesActive++; mSelfMutex.unlock_i(); } void KxMutex::deactivateMutexes() { mSelfMutex.lock_i(); assert( mMutexesActive ); if ( mMutexesActive ) mMutexesActive--; mSelfMutex.unlock_i(); }
The key here is that when the mutex count is modified, it needs to be protected by a mutex.
Finally we have the wait()
method. It just waits until all the threads that have activated the mutex system deactivate it. In a threaded program that is shutting down, you want to wait until all the processes terminate before you start destructing shared data structures like the memory manager. This should only be an issue when you have threads that were detach()
‘d:
main() { KxThread thread; thread.spawn( myThreadFunc, data ); thread.detach(); // this will return when all threads terminate. KxMutex::wait(); exit(0); }
Mutex Implementation on Windows
#include "kx_mutex.h" #include <windows.h>; KxMutex::KxMutex() { CRITICAL_SECTION* psec = (CRITICAL_SECTION*)this; InitializeCriticalSection( psec ); } KxMutex::~KxMutex() { DeleteCriticalSection( (CRITICAL_SECTION*)this ); } void KxMutex::lock_i() { EnterCriticalSection( (CRITICAL_SECTION*)this); } void KxMutex::unlock_i() { LeaveCriticalSection( (CRITICAL_SECTION*)this); } // os independent sleep in microseconds void kxSleep( u32 usec ) { if ( usec < 1000 ) { Sleep( 0 ); } else { Sleep( usec / 1000 ); } } // os independent sleep in microseconds void kxSleepSeconds( u32 sec ) { // windows sleep is in msec Sleep( sec * 1000 ); } u32 KxMutex::mMutexesActive = 0; KxMutex KxMutex::mSelfMutex;
This is pretty simple. We just call down to the Win32 CriticalSection functions.
We also implement a platform independent sleep function. Windows Sleep()
function is in milliseconds, and Linux/OsX sleep()
is in microseconds. We just want a function that uses the same units to use in ourKxMutex::wait()
method.
Mutex Implementation at the OS Level on Windows
On windows there are two kinds of mutexes, true mutexes created with CreateMutex()
, and “critical sections”.
A true mutex is implemented at the operating system level, and guarantees that not only is no other thread in the current process using that protected code, but no thread in another process is using it either. This level of protection is not normally necessary. If you have a DLL or shared library that has a single instance being used simultaneously by two processes you might need something like this. A mutex like this will stop the entire operating system every time you lock it, interrupting the task manager for example. It is a million times slower than a critical section.
Critical sections protect code from all threads in a single process, and are very lightweight objects. They are generally implemented by using special processor instructions that both “test” and “set” value in memory as a single atomic processor operation. For example the critical section would test a flag in the mutex that tells it if the mutex is not already locked, and if the test succeeds it sets the flag with its own thread id – in a single operation that is not interruptable by the task manager. Without this, there is a small chance that after reading that flag and finding it clear, that some other thread would set the flag. This is why you cannot implement your own mutex object unless you are willing to write it in non-portable ASM code ( or use newer C++ features like atomic operations ).
For our purposes, we only ever need to use critical sections.
Debugging Critical Sections
You might find this code useful for debugging your windows critical sections:
// special critical section initialization ideas from // Russ Osterlund & Matt Pietrek, MSDN Magazine, 2003 // The point is to initialize each critical section with the address of the // caller, and a special signature. We later run a tool that gives us information // on each properly initialized critical section. #define MYCRITSECT_SIGNATURE 0xACDCACDC #pragma optimize( "y", on ) // Ensure that we get standard stack frames #define GetPrevListEntry( SEC ) ((PRTL_CRITICAL_SECTION_DEBUG) ((DWORD) (SEC).ProcessLocksList.Blink - ( offsetof(_RTL_CRITICAL_SECTION_DEBUG, ProcessLocksList )))) #define GetNextListEntry( SEC ) ((PRTL_CRITICAL_SECTION_DEBUG) ((DWORD) (SEC).ProcessLocksList.Blink - ( offsetof(_RTL_CRITICAL_SECTION_DEBUG, ProcessLocksList )))) KxMutex::KxMutex() { CRITICAL_SECTION* psec = (CRITICAL_SECTION*)this; InitializeCriticalSection( psec ); // get the return address of this constructor. DWORD pReturnAddress; __asm mov eax, dword ptr [ebp+4] __asm mov [pReturnAddress], eax // Set one of the "spare" fields in the critical section to the // return address. We subtract 6, since the return address // is after the "CALL" instruction, which is typically 6 bytes. // Being off by 6 bytes is usually enough for the line # lookup // to pick the wrong line of code. psec->DebugInfo->Spare[0] = pReturnAddress - 6; psec->DebugInfo->Spare[1] = MYCRITSECT_SIGNATURE; } bool CriticalSectionReport() { DWORD PEBLoaderLock = 0x7ffdf0a0; DWORD* pLoaderLockAddress; HANDLE pid = /* get my pid */; // Locate the address of the loader lock critical section in the process. if ( !ReadProcessMemory( pid, (void*)PEBLoaderLock, &pLoaderLockAddress, sizeof( DWORD* ), 0 )) return false; // Load the loader lock critical section into our own buffer. _RTL_CRITICAL_SECTION csLoaderLock; if ( !ReadProcessMemory( pid, pLoaderLockAddress, &csLoaderLock, sizeof ( _RTL_CRITICAL_SECTION ), 0 )) return false; // Load the debug structure for the loader lock into our own buffer. // This is the 3rd debug area in the doubly-linked list. _RTL_CRITICAL_SECTION_DEBUG csdLoaderLock; if ( !ReadProcessMemory( pid, csLoaderLock.DebugInfo, &csdLoaderLock, sizeof ( _RTL_CRITICAL_SECTION_DEBUG ), 0 )) return false; // We need to walk the list backwards to the 1st entry. PRTL_CRITICAL_SECTION_DEBUG pListEntry = GetPrevListEntry( csdLoaderLock ); // Load the 2nd debug area, i.e., the defered (sic) critical // section debug structure. _RTL_CRITICAL_SECTION_DEBUG csdDeferedCriticalSectionDebug; if ( !ReadProcessMemory( pid, pListEntry, &csdDeferedCriticalSectionDebug, sizeof(_RTL_CRITICAL_SECTION_DEBUG), 0 )) return false; pListEntry = GetPrevListEntry( csdDeferedCriticalSectionDebug ); // Now load the 1st debug area, i.e., the critical section lock // debug structure. Its previous or backward link - BLink - will mark // the end of the list. _RTL_CRITICAL_SECTION_DEBUG csdCriticalSectionLockDebug; if ( !ReadProcessMemory( pid, pListEntry, &csdCriticalSectionLockDebug, sizeof(_RTL_CRITICAL_SECTION_DEBUG ),0 )) return false; // Now, walk the chain from the beginning to the end. while ( TRUE ) { // Read the critical section debug structure first... _RTL_CRITICAL_SECTION_DEBUG cs_debug; if ( !ReadProcessMemory( pid, pListEntry, &cs_debug, sizeof( _RTL_CRITICAL_SECTION_DEBUG ), 0 )) break; // since this will give us the address to the critical section. RTL_CRITICAL_SECTION cs; if ( !ReadProcessMemory( pid, cs_debug.CriticalSection, &cs, sizeof ( _RTL_CRITICAL_SECTION ), 0 )) break; // report the contents of this critical section: PCRITICAL_SECTION_ENTRY pcse = new CRITICAL_SECTION_ENTRY; // Items from the critical section structure. pcse->LockCount = cs.LockCount; pcse->RecursionCount = cs.RecursionCount; pcse->OwningThread = cs.OwningThread; pcse->LockSemaphore = cs.LockSemaphore; pcse->SpinCount = cs.SpinCount; // Items from the critical section debug structure. pcse->CriticalSection = cs_debug.CriticalSection; pcse->EntryCount = cs_debug.EntryCount; pcse->ContentionCount = cs_debug.ContentionCount; pcse->Spare[0] = cs_debug.Spare[0]; pcse->Spare[1] = cs_debug.Spare[1]; // Save all of the important fields in our own collection. m_Collection->push_back( pcse ); // When the next or forward link is the same as the critical section // lock's backward link we have reached the end of the list. if ( cs_debug.ProcessLocksList.Flink == csdCriticalSectionLockDebug.ProcessLocksList.Blink ) break; // The forward link (Flink) actually points into the middle of the // next critical section debug structure, so we take its address and // back up the correct number of DWORDs to obtain the next entry in // the chain. Use the offsetof macro to let the structures and the // compiler do the actual work. pListEntry = GetListEntry (PRTL_CRITICAL_SECTION_DEBUG) ((DWORD) cs_debug.ProcessLocksList.Flink - (offsetof (_RTL_CRITICAL_SECTION_DEBUG, ProcessLocksList))); } CloseHandle( pid ); return true; }
Implementing Mutexes on Linux and OsX
Both Linux and OsX use the pthread library to implement mutexes. So the implementation is the same for both, and self explanatory:
#include "kx_mutex.h" #include <pthread.h> #include <errno.h> #include <unistd.h> KxMutex::KxMutex() { // this means this mutex may be locked multiple times by the same thread. pthread_mutexattr_t attr; pthread_mutexattr_init( &attr ); // pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_ERRORCHECK ); pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE ); u32 err; if (( err = pthread_mutex_init( (pthread_mutex_t*)this, &attr ) )) assert( 0 ); } KxMutex::~KxMutex() { pthread_mutex_destroy( (pthread_mutex_t*)this ); } void KxMutex::lock_i() { // we can lock a mutex that is already locked. u32 err; if (( err = pthread_mutex_lock( (pthread_mutex_t*)this ))) assert( 0 ); } void KxMutex::unlock_i() { u32 err; if (( err = pthread_mutex_unlock( (pthread_mutex_t*)this ))) assert( 0 ); } // os independent sleep in microseconds void kxSleep( u32 usec ) { usleep( usec ); } void kxSleepSeconds( u32 sec ) { sleep( sec ); } u32 KxMutex::mMutexesActive = 0; KxMutex KxMutex::mSelfMutex; u32 kxThreadShimFunc( void* data ) { // however a thread terminates, it decrements the mutex ref count KxThreadShim* shim = (KxThreadShim*)data; u32 ret = shim->mFunc( shim->mData ); delete shim; KxMutex::deactivateMutexes(); return ret; }
The only wrinkle here is whether you want the same thread to be able to protected from itself or not. PTHREAD_MUTEX_RECURSIVE
will allow you to do stuff like:
mutex.lock(); doStuff(); mutex.lock(); doModeStuff(); mutex.unlock(); mutex.unlock();
PTHREAD_MUTEX_ERRORCHECK
won’t allow this. You don’t want to do this in any case. It is bad practice. But the windows version works more like PTHREAD_MUTEX_RECURSIVE
, so we have it set this way so it works the same way.
Implementing Threads on Linux and Osx
Both Linux and OsX use the pthread
library, so coding threads for them is also identical:
#include <pthread.h> #include <errno.h> typedef void*(*PThreadFunc)(void *); #ifndef THREAD_ID_TYPE #define THREAD_ID_TYPE pthread_t #endif #define INVALID_THREAD_ID ((ptrv)(pthread_self())) // the pthread_t structure is a pointer to the actual // pthread structure managed by the pthread library. KxThread::KxThread() { mThreadId = mOldThreadId = INVALID_THREAD_ID; } bool KxThread::spawn( KxThreadFunc func, void* arg ) { assert( !isValid()); KxThreadShim* shim = new KxThreadShim( func, arg ); if ( pthread_create( (pthread_t*)(&mThreadId), NULL, (PThreadFunc)kxThreadShimFunc, shim )) { mThreadId = INVALID_THREAD_ID; delete shim; KxMutex::deactivateMutexes(); return false; } return true; } u32 KxThread::join() { assert( isValid()); u32 exitCode; u32 retCode; assert( mThreadId ); mOldThreadId = mThreadId; mThreadId = INVALID_THREAD_ID; if (( retCode = pthread_join( (THREAD_ID_TYPE)mOldThreadId, (void**)&exitCode ))) { // join failed KxMutex::deactivateMutexes(); switch( retCode ) { case EDEADLK: printf( "join failed. Error: %d, EDEADLK: probably two " "threads trying to join each other.\n" retCode ); break; case EINVAL: printf( "join failed. Error: %d, EINVAL: thread exists, " "but not joinable. Was it already joined?\n", retCode ); break; case ESRCH: printf( "join failed. Error: %d, ESRCH: no such thread id.\n", retCode ); break; default: printf( "join failed. Error: %d, act: %d\n", retCode, KxMutex::getMutexSystemRefCount()); } return ( retCode + 0x8000 ); } return exitCode; } bool KxThread::detach() { assert( isValid()) ; mOldThreadId = mThreadId; mThreadId = INVALID_THREAD_ID; u32 err; if (( err = pthread_detach( (THREAD_ID_TYPE)mOldThreadId ))) { // thread detach fail. should never happen assert(0); return false; } return true; }
A few things to notice. Pthreads functions return a thread id that controls the thread object. Practically speaking the pthreads implementation returns a pointer to an operating system thread structure as the thread id. But in principle you aren’t supposed to rely on that. Implementations of pthreads are allowed to use whatever type they wants as the thread id.
To detect an invalid thread, or initialize the thread id, you could just use a NULL
pointer. But this is bad practice. A better way is to initialize the thread id to the id of the calling thread. A function to spawn a thread for example can return anything. But one thing it should never return is the value of the calling thread. So using pthread_self()
for the value of an invalid or unitialized thread id is a good way to do this.
When a thread is spawned we increment the mutex system’s reference count. When a thread fails to spawn, or terminates, we decrement the reference count. We increment the mutex ref count in the spawn function, and decrement it if the thread fails to spawn. If the thread does spawn, it runs inside a wrapper function, kxThreadShimFunc, that decrements the mutex system reference count whenever the thread terminates. We need a wrapper function to do this because threads that are detach()
‘d never return back to the thread system (via join()
) to get decremented.
In normal operation this works fine, and users of these classes don’t have to worry about activation and deactivation of the mutex system.
There are a few cases where the mutex system needs to be activated manually before threads spawn:
1. In cases where the code is called in a multi-threaded way into a shared library. In this case, the shared library should enable mutexes when it initializes.
2. The other case is where you lock a mutex before spawning a thread, and then unlock it inside that thread. For the first thread, that initial lock won’t do anything, and the unlock will cause a crash. This is also bad practice. Mutexes should be locked and unlocked in small sections of code where the lock and unlock calls are relatively near each other. Locking in one thread unlocking in another is a recipe for disaster. Just don’t do it.
Implementing Threads on Windows
#include "kx_thread.h" #include "kx_mutex.h" #include <windows.h> KxThread::KxThread() { mThreadId = mOldThreadId = (u32)INVALID_HANDLE_VALUE; } bool KxThread::spawn( KxThreadFunc func, void* arg ) { KxMutex::activateMutexes(); KxThreadShim* shim = new KxThreadShim( func, arg ); mThreadId = (u32)CreateThread( 0, 0, (LPTHREAD_START_ROUTINE)kxThreadShimFunc, shim, 0, 0 ); if ( mThreadId == (u32)INVALID_HANDLE_VALUE ) { delete shim; KxMutex::deactivateMutexes(); return false; } else { return true; } } u32 KxThread::join() { mOldThreadId = mThreadId; mThreadId = (u32)INVALID_HANDLE_VALUE; DWORD exitCode; WaitForSingleObject( (HANDLE)mOldThreadId, INFINITE ); GetExitCodeThread( (HANDLE)mOldThreadId, &exitCode ); CloseHandle( (HANDLE)mOldThreadId ); return (u32)exitCode; } bool KxThread::detach() { mOldThreadId = mThreadId; mThreadId = (u32)INVALID_HANDLE_VALUE; CloseHandle( (HANDLE)mOldThreadId ); return true; } bool KxThread::isValid() const { return (( mThreadId == (u32)INVALID_HANDLE_VALUE ) ? false : true ); }
One thing to note is that on Windows, threads are identified by an OS handle, and that does have a defined invalid value. So we don’t have to use a trick like pthread_self()
like we do on Linux/OsX.
Implementing Thread Priority and Affinity
It is possible to set threads to different priorities such that some threads run more often. You do it like this on Windows:
void KxThread::setPriority( KxThreadPriority pri ) { assert( isValid()); u32 wpri; switch( pri ) { case KX_THREAD_PRIORITY_LOW: wpri = THREAD_PRIORITY_BELOW_NORMAL; break; case KX_THREAD_PRIORITY_NORMAL: wpri = THREAD_PRIORITY_NORMAL; break; case KX_THREAD_PRIORITY_HIGH: wpri = THREAD_PRIORITY_ABOVE_NORMAL; break; default: assert(0); } if ( !SetThreadPriority( (HANDLE)mThreadId, wpri )) assert(0); }
On pthreads this is more complex given that different versions of Linux do thread prioritization differently. There aren’t any clear priority levels. So we leave thread priority on Linux as an exercise for the reader.
You can also set thread affinity. That is assign particular threads to certain processors. In the function below mask
is a bit mask where each bit represents a processor. Finding the number of processors on your system and building a good mask is outside the scope of this article. But once you have such a mask you set affinity like this on Windows:
bool KxThread::setAffinity( u64 mask ) { if ( !SetThreadAffinityMask( (HANDLE)mThreadId, (DWORD_PTR)mask )) { assert(0); return false; } return true; }
And like this on OsX:
bool KxThread::setAffinity( u64 mask ) { #ifndef DARWIN cpu_set_t cpuset; CPU_ZERO( &cpuset ); for ( u32 i = 0; i < 64; i++ ) { if ( mask & 0x1 ) CPU_SET( i, &cpuset ); mask >>= 1; } u32 err = pthread_setaffinity_np( (THREAD_ID_TYPE)mThreadId, sizeof(cpu_set_t), &cpuset ); if ( err != 0 ) { return false; } #endif // !DARWIN return true; }
For this Linux again has multiple ways of doing it.
Designing for Multi-Threading
In a single threaded program the most important thing to optimize are the inner loops. In a multi-threaded program, what determines how well your program runs is in how you use your mutexes.
For efficiency you want to make sure that you lock data structures only when you need to, and that you minimize the number of shared data structures that must be locked. Any time you have a mutex that every thread needs to lock you open up an opportunity for all the threads to be locked on the same mutex waiting for the same shared resource. You typically see this when all threads need to lock a shared heap, all access the same database, or all of them are waiting on the same files. This is called lock contention
The answer here is to have every thread use a separate local cache and only synchronize the results of its operation to shared resources on a more limited basis. For example for memory management, give each thread its own heap, and do most memory allocations in thread’s own heap, and then have that heap get memory from a shared heap when it runs out. Or in the case of a shared database, make queries to the database in large batches, process them, and sync back to the shared database when the batch is complete.
If you have a multi-threaded program that seems to operate well with a certain number of threads, but when more threads are allocated performance slows down, you probably have lock contention.
You also want to watch out for race conditions and deadlocks.
In a deadlock, there are two or more mutexes, one thread locks the first, and the second thread locks the second. All is well. Until one thread needs to lock the other mutex before releasing the mutex it already has:
void threadOne( void* data ) { mutexOne.lock(); doStuff(); mutexTwo.lock(); doMoreStuff(); mutexOne.unlock(); mutexTwo.unlock(); } void threadTwo( void* data ) { mutexTwo.lock(); doMoreStuff(); mutexOne.lock(); doStuff(); mutexTwo.unlock(); mutexOne.unlock(); }
For this reason when a you have two mutexes that are sometimes locked at the same time, you always want to make one mutex “superior” to the other, and always lock it first. That is the “superior” mutex protects not just its code, but also the inferior mutex. Or a good rule of thumb, is to always lock them in the same order, and unlock them in reverse order. For example:
void threadOne( void* data ) { mutexOne.lock(); doStuff(); mutexTwo.lock(); doMoreStuff(); mutexTwo.unlock(); mutexOne.unlock(); } void threadTwo( void* data ) { mutexOne.lock(); mutexTwo.lock(); doMoreStuff(); doStuff(); mutexTwo.unlock(); mutexOne.unlock(); }
Resources:
POSIX Threads Programming | A tutorial on programming pthreads |
POSIX thread API concepts | A pthreads API reference |
pthread(3) Mac OS X Developer Tools | Pthreads reference for Mac OsX |
Processes and Threads | A Reference in Windows Processes and Threads |
Using Mutex Object | About Windows mutex objects |