diff --git a/include/pthreadpool.h b/include/pthreadpool.h index 9ec49d6..23e81eb 100644 --- a/include/pthreadpool.h +++ b/include/pthreadpool.h @@ -160,6 +160,25 @@ pthreadpool_t pthreadpool_create(size_t threads_count); */ size_t pthreadpool_get_threads_count(pthreadpool_t threadpool); +/* + * API to enable doing work with fewer threads than available in + * threadpool. + * Purpose of this is to ameliorate some perf degradation observed + * due to OS mapping a given set of threads to fewer cores. + * + * @param num_threads num threads to use for the subsequent tasks + * submitted. + */ +void pthreadpool_set_num_threads_to_use(size_t num_threads); + +/* + * Query current setting of the number of threads to use + * + * @returns The number of threads to be used for the subsequent tasks + * submitted. + */ +size_t pthreadpool_get_num_threads_to_use(void); + /** * Process items on a 1D grid. * diff --git a/src/fastpath.c b/src/fastpath.c index 7c4196c..98d5db6 100644 --- a/src/fastpath.c +++ b/src/fastpath.c @@ -39,7 +39,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_fastpath( (pthreadpool_task_1d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -77,7 +78,8 @@ pthreadpool_thread_parallelize_1d_with_thread_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -126,7 +128,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -163,7 +166,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -206,7 +210,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_fastpath( (pthreadpool_task_2d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -258,7 +263,8 @@ pthreadpool_thread_parallelize_2d_with_thread_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -309,7 +315,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -378,7 +385,8 @@ pthreadpool_thread_parallelize_2d_tile_1d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -449,7 +457,8 @@ pthreadpool_thread_parallelize_2d_tile_1d_with_uarch_with_thread_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -508,7 +517,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -581,7 +591,8 @@ pthreadpool_thread_parallelize_2d_tile_2d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -644,7 +655,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_fastpath( (pthreadpool_task_3d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -706,7 +718,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -773,7 +786,8 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_thread_fastpath( pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -853,7 +867,8 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -934,7 +949,8 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_uarch_with_thread_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1003,7 +1019,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1087,7 +1104,8 @@ pthreadpool_thread_parallelize_3d_tile_2d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1161,7 +1179,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_fastpath( (pthreadpool_task_4d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1234,7 +1253,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1311,7 +1331,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1405,7 +1426,8 @@ pthreadpool_thread_parallelize_4d_tile_2d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1490,7 +1512,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_fastpath( (pthreadpool_task_5d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1573,7 +1596,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1661,7 +1685,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1752,7 +1777,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_fastpath( (pthreadpool_task_6d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1846,7 +1872,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1944,7 +1971,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ diff --git a/src/gcd.c b/src/gcd.c index fddf499..a252a01 100644 --- a/src/gcd.c +++ b/src/gcd.c @@ -30,6 +30,8 @@ #include "threadpool-object.h" #include "threadpool-utils.h" +thread_local size_t max_num_threads = UINT_MAX; + static void thread_main(void* arg, size_t thread_index) { struct pthreadpool* threadpool = (struct pthreadpool*)arg; struct thread_info* thread = &threadpool->threads[thread_index]; @@ -73,6 +75,8 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; } threadpool->threads_count = fxdiv_init_size_t(threads_count); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + threads_count); for (size_t tid = 0; tid < threads_count; tid++) { threadpool->threads[tid].thread_number = tid; } @@ -85,6 +89,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return threadpool; } +void pthreadpool_set_num_threads_to_use(size_t num_threads) { + max_num_threads = num_threads; +} + +size_t pthreadpool_get_num_threads_to_use() { return max_num_threads; } + PTHREADPOOL_INTERNAL void pthreadpool_parallelize( struct pthreadpool* threadpool, thread_function_t thread_function, const void* params, size_t params_size, void* task, void* context, @@ -107,7 +117,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Locking of completion_mutex not needed: readers are sleeping on * command_condvar */ - const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; + const struct fxdiv_divisor_size_t threads_count = fxdiv_init_size_t(min( + threadpool->threads_count.value, pthreadpool_get_num_threads_to_use())); if (params_size != 0) { memcpy(&threadpool->params, params, params_size); diff --git a/src/portable-api.c b/src/portable-api.c index d173103..2864d06 100644 --- a/src/portable-api.c +++ b/src/portable-api.c @@ -93,7 +93,8 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -127,7 +128,8 @@ static void thread_parallelize_1d_with_thread(struct pthreadpool* threadpool, } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -173,7 +175,8 @@ static void thread_parallelize_1d_with_uarch(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -213,7 +216,8 @@ static void thread_parallelize_1d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -238,7 +242,8 @@ static void thread_parallelize_1d_tile_1d_dynamic( // Get a handle on the params. struct pthreadpool_1d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_1d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range; const size_t tile_i = params->tile; const pthreadpool_task_1d_tile_1d_dynamic_t task = @@ -294,7 +299,8 @@ static void thread_parallelize_1d_tile_1d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_1d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_1d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range; const size_t tile_i = params->tile; const pthreadpool_task_1d_tile_1d_dynamic_with_id_t task = @@ -350,7 +356,8 @@ static void thread_parallelize_1d_tile_1d_dynamic_with_uarch_with_thread( // Get a handle on the params. struct pthreadpool_1d_tile_1d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_1d_tile_1d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range; const size_t tile_i = params->tile; const pthreadpool_task_1d_tile_1d_dynamic_with_id_with_thread_t task = @@ -437,7 +444,8 @@ static void thread_parallelize_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -485,7 +493,8 @@ static void thread_parallelize_2d_with_thread(struct pthreadpool* threadpool, } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -536,7 +545,8 @@ static void thread_parallelize_2d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -603,7 +613,8 @@ static void thread_parallelize_2d_tile_1d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -671,7 +682,8 @@ static void thread_parallelize_2d_tile_1d_with_uarch_with_thread( } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -699,7 +711,8 @@ static void thread_parallelize_2d_tile_1d_dynamic( // Get a handle on the params. struct pthreadpool_2d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_2d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t tile_j = params->tile_j; const size_t tile_range_j = divide_round_up(range_j, tile_j); @@ -769,7 +782,8 @@ static void thread_parallelize_2d_tile_1d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_2d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_2d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t tile_j = params->tile_j; const size_t tile_range_j = divide_round_up(range_j, tile_j); @@ -839,7 +853,8 @@ static void thread_parallelize_2d_tile_1d_dynamic_with_uarch_with_thread( // Get a handle on the params. struct pthreadpool_2d_tile_1d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_2d_tile_1d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t tile_j = params->tile_j; const size_t tile_range_j = divide_round_up(range_j, tile_j); @@ -947,7 +962,8 @@ static void thread_parallelize_2d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1020,7 +1036,8 @@ static void thread_parallelize_2d_tile_2d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1049,7 +1066,8 @@ static void thread_parallelize_2d_tile_2d_dynamic( // Get a handle on the params. struct pthreadpool_2d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_2d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range_i; const size_t range_j = params->range_j; const size_t tile_i = params->tile_i; @@ -1151,7 +1169,8 @@ static void thread_parallelize_2d_tile_2d_dynamic_with_uarch( // Get a handle on the params. struct pthreadpool_2d_tile_2d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_2d_tile_2d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range_i; const size_t range_j = params->range_j; const size_t tile_i = params->tile_i; @@ -1239,7 +1258,8 @@ static void thread_parallelize_2d_tile_2d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_2d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_2d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range_i; const size_t range_j = params->range_j; const size_t tile_i = params->tile_i; @@ -1356,7 +1376,8 @@ static void thread_parallelize_3d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1418,7 +1439,8 @@ static void thread_parallelize_3d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1482,7 +1504,8 @@ static void thread_parallelize_3d_tile_1d_with_thread( } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1559,7 +1582,8 @@ static void thread_parallelize_3d_tile_1d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1637,7 +1661,8 @@ static void thread_parallelize_3d_tile_1d_with_uarch_with_thread( } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1667,7 +1692,8 @@ static void thread_parallelize_3d_tile_1d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_3d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_3d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t tile_k = params->tile_k; @@ -1779,7 +1805,8 @@ static void thread_parallelize_3d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1863,7 +1890,8 @@ static void thread_parallelize_3d_tile_2d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1894,7 +1922,8 @@ static void thread_parallelize_3d_tile_2d_dynamic( // Get a handle on the params. struct pthreadpool_3d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_3d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t tile_j = params->tile_j; @@ -2009,7 +2038,8 @@ static void thread_parallelize_3d_tile_2d_dynamic_with_uarch( // Get a handle on the params. struct pthreadpool_3d_tile_2d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_3d_tile_2d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t tile_j = params->tile_j; @@ -2112,7 +2142,8 @@ static void thread_parallelize_3d_tile_2d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_3d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_3d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t tile_j = params->tile_j; @@ -2253,7 +2284,8 @@ static void thread_parallelize_4d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2326,7 +2358,8 @@ static void thread_parallelize_4d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2403,7 +2436,8 @@ static void thread_parallelize_4d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2497,7 +2531,8 @@ static void thread_parallelize_4d_tile_2d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2531,7 +2566,8 @@ static void thread_parallelize_4d_tile_2d_dynamic( // Get a handle on the params. struct pthreadpool_4d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_4d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t range_l = params->range_l; @@ -2657,7 +2693,8 @@ static void thread_parallelize_4d_tile_2d_dynamic_with_uarch( // Get a handle on the params. struct pthreadpool_4d_tile_2d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_4d_tile_2d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t range_l = params->range_l; @@ -2816,7 +2853,8 @@ static void thread_parallelize_5d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2899,7 +2937,8 @@ static void thread_parallelize_5d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2987,7 +3026,8 @@ static void thread_parallelize_5d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -3077,7 +3117,8 @@ static void thread_parallelize_6d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -3171,7 +3212,8 @@ static void thread_parallelize_6d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -3270,7 +3312,8 @@ static void thread_parallelize_6d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -3300,13 +3343,22 @@ static void thread_parallelize_6d_tile_2d(struct pthreadpool* threadpool, pthreadpool_fence_release(); } +static inline size_t pthreadpool_num_threads_to_use( + struct pthreadpool* threadpool) { + size_t threads_count = 1; + if (threadpool != NULL) { + const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); + threads_count = min(threadpool->threads_count.value, num_threads_to_use); + } + return threads_count; +} + PTHREADPOOL_WEAK void pthreadpool_parallelize_1d(struct pthreadpool* threadpool, pthreadpool_task_1d_t function, void* context, size_t range, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3337,9 +3389,8 @@ PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_1d) PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_with_thread( struct pthreadpool* threadpool, pthreadpool_task_1d_with_thread_t function, void* context, size_t range, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3373,9 +3424,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_with_uarch( pthreadpool_t threadpool, pthreadpool_task_1d_with_id_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ uint32_t uarch_index = default_uarch_index; @@ -3422,9 +3472,8 @@ PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_1d_with_uarch) PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_1d_tile_1d_t function, void* context, size_t range, size_t tile, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= tile) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= tile) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3462,9 +3511,8 @@ PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_1d_tile_1d) PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_tile_1d_dynamic( pthreadpool_t threadpool, pthreadpool_task_1d_tile_1d_dynamic_t function, void* context, size_t range, size_t tile, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= tile) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= tile) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3493,9 +3541,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_tile_1d_dynamic_with_thread( pthreadpool_t threadpool, pthreadpool_task_1d_tile_1d_dynamic_with_id_t function, void* context, size_t range, size_t tile, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= tile) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= tile) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3526,9 +3573,8 @@ pthreadpool_parallelize_1d_tile_1d_dynamic_with_uarch_with_thread( pthreadpool_task_1d_tile_1d_dynamic_with_id_with_thread_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range, size_t tile, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= tile) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= tile) { uint32_t uarch_index = default_uarch_index; #if PTHREADPOOL_USE_CPUINFO uarch_index = @@ -3571,10 +3617,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d(pthreadpool_t threadpool, void* context, size_t range_i, size_t range_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || - (range_i | range_j) <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3611,10 +3655,8 @@ PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_2d) PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_with_thread( pthreadpool_t threadpool, pthreadpool_task_2d_with_thread_t function, void* context, size_t range_i, size_t range_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || - (range_i | range_j) <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3654,9 +3696,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_t function, void* context, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -3700,9 +3741,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d_with_uarch( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_with_id_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -3760,9 +3800,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d_with_uarch_with_thread( pthreadpool_task_2d_tile_1d_with_id_with_thread_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -3820,7 +3859,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d_dynamic( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_dynamic_t function, void* context, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -3854,7 +3894,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d_dynamic_with_thread( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_dynamic_with_id_t function, void* context, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -3890,7 +3931,8 @@ pthreadpool_parallelize_2d_tile_1d_dynamic_with_uarch_with_thread( pthreadpool_task_2d_tile_1d_dynamic_with_id_with_thread_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { uint32_t uarch_index = default_uarch_index; #if PTHREADPOOL_USE_CPUINFO @@ -3938,9 +3980,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_2d_tile_2d_t function, void* context, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -3988,7 +4029,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d_dynamic( pthreadpool_t threadpool, pthreadpool_task_2d_tile_2d_dynamic_t function, void* context, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4030,7 +4072,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d_dynamic_with_uarch( pthreadpool_task_2d_tile_2d_dynamic_with_id_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ uint32_t uarch_index = default_uarch_index; @@ -4084,7 +4127,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d_dynamic_with_thread( pthreadpool_task_2d_tile_2d_dynamic_with_id_t function, void* context, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4127,9 +4171,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d_with_uarch( void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -4191,9 +4234,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d(pthreadpool_t threadpool, void* context, size_t range_i, size_t range_j, size_t range_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4235,9 +4277,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_3d_tile_1d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4285,9 +4326,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_with_thread( pthreadpool_task_3d_tile_1d_with_thread_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4336,9 +4376,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_with_uarch( void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -4399,9 +4438,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_with_uarch_with_thread( pthreadpool_task_3d_tile_1d_with_id_with_thread_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -4455,14 +4493,16 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_with_uarch_with_thread( } } -PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_3d_tile_1d_with_uarch_with_thread) +PTHREADPOOL_PRIVATE_IMPL( + pthreadpool_parallelize_3d_tile_1d_with_uarch_with_thread) PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_dynamic_with_thread( pthreadpool_t threadpool, pthreadpool_task_3d_tile_1d_dynamic_with_id_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4500,9 +4540,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_3d_tile_2d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4553,7 +4592,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d_dynamic( pthreadpool_t threadpool, pthreadpool_task_3d_tile_2d_dynamic_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4602,7 +4642,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d_dynamic_with_uarch( uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ uint32_t uarch_index = default_uarch_index; @@ -4661,7 +4702,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d_dynamic_with_thread( pthreadpool_task_3d_tile_2d_dynamic_with_id_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4709,9 +4751,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d_with_uarch( void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -4777,9 +4818,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d(pthreadpool_t threadpool, size_t range_j, size_t range_k, size_t range_l, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4826,9 +4866,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_4d_tile_1d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_l, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4880,9 +4919,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_4d_tile_2d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_k, size_t tile_l, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4938,9 +4976,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_2d_with_uarch( void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_k, size_t tile_l, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5008,7 +5045,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_2d_dynamic( pthreadpool_t threadpool, pthreadpool_task_4d_tile_2d_dynamic_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_k, size_t tile_l, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5063,7 +5101,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_2d_dynamic_with_uarch( uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_k, size_t tile_l, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5129,9 +5168,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_5d(pthreadpool_t threadpool, size_t range_j, size_t range_k, size_t range_l, size_t range_m, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l | range_m) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -5181,9 +5219,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_5d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_5d_tile_1d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t tile_m, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -5239,9 +5276,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_5d_tile_2d( void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t tile_l, size_t tile_m, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l && range_m <= tile_m)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5300,9 +5336,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_6d( pthreadpool_t threadpool, pthreadpool_task_6d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t range_n, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l | range_m | range_n) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -5356,9 +5391,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_6d_tile_1d( void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t range_n, size_t tile_n, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l | range_m) <= 1 && range_n <= tile_n)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5418,9 +5452,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_6d_tile_2d( void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t range_n, size_t tile_m, size_t tile_n, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m && range_n <= tile_n)) { /* No thread pool used: execute task sequentially on the calling thread */ diff --git a/src/pthreads.c b/src/pthreads.c index 70291c6..ea87ab1 100644 --- a/src/pthreads.c +++ b/src/pthreads.c @@ -22,6 +22,7 @@ /* Configuration header */ #include + #include "threadpool-common.h" /* POSIX headers */ @@ -72,6 +73,8 @@ #include "threadpool-object.h" #include "threadpool-utils.h" +thread_local size_t max_num_threads = UINT_MAX; + #if PTHREADPOOL_USE_FUTEX #if defined(__linux__) static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { @@ -164,10 +167,10 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { #endif } -static uint32_t wait_for_new_command(struct pthreadpool* threadpool, +static uint32_t wait_for_new_command(struct thread_info* thread, uint32_t last_command, uint32_t last_flags) { - uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + uint32_t command = pthreadpool_load_acquire_uint32_t(&thread->command); if (command != last_command) { return command; } @@ -177,7 +180,7 @@ static uint32_t wait_for_new_command(struct pthreadpool* threadpool, for (uint32_t i = 0; i < PTHREADPOOL_SPIN_WAIT_ITERATIONS; i++) { pthreadpool_yield(i); - command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + command = pthreadpool_load_acquire_uint32_t(&thread->command); if (command != last_command) { return command; } @@ -187,20 +190,20 @@ static uint32_t wait_for_new_command(struct pthreadpool* threadpool, /* Spin-wait disabled or timed out, fall back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX do { - futex_wait(&threadpool->command, last_command); - command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + futex_wait(&thread->command, last_command); + command = pthreadpool_load_acquire_uint32_t(&thread->command); } while (command == last_command); #else /* Lock the command mutex */ - pthread_mutex_lock(&threadpool->command_mutex); + pthread_mutex_lock(&thread->command_mutex); /* Read the command */ - while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == + while ((command = pthreadpool_load_acquire_uint32_t(&thread->command)) == last_command) { /* Wait for new command */ - pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); + pthread_cond_wait(&thread->command_condvar, &thread->command_mutex); } /* Read a new command */ - pthread_mutex_unlock(&threadpool->command_mutex); + pthread_mutex_unlock(&thread->command_mutex); #endif return command; } @@ -217,7 +220,7 @@ static void* thread_main(void* arg) { /* Monitor new commands and act accordingly */ for (;;) { - uint32_t command = wait_for_new_command(threadpool, last_command, flags); + uint32_t command = wait_for_new_command(thread, last_command, flags); pthreadpool_fence_acquire(); flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); @@ -288,9 +291,19 @@ PTHREADPOOL_WEAK struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; } threadpool->threads_count = fxdiv_init_size_t(threads_count); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + threads_count); for (size_t tid = 0; tid < threads_count; tid++) { threadpool->threads[tid].thread_number = tid; threadpool->threads[tid].threadpool = threadpool; + // Since command is per thread we are creating conditional variables per + // thread as well. However, only children thread participate in wait/wakeup + // signalling. + // Note instead of using !use_futext this is just realying on use_condvar +#if PTHREADPOOL_USE_CONDVAR + pthread_mutex_init(&(threadpool->threads[tid].command_mutex), NULL); + pthread_cond_init(&(threadpool->threads[tid].command_condvar), NULL); +#endif } /* Thread pool with a single thread computes everything on the caller thread. @@ -300,8 +313,6 @@ PTHREADPOOL_WEAK struct pthreadpool* pthreadpool_create(size_t threads_count) { #if !PTHREADPOOL_USE_FUTEX pthread_mutex_init(&threadpool->completion_mutex, NULL); pthread_cond_init(&threadpool->completion_condvar, NULL); - pthread_mutex_init(&threadpool->command_mutex, NULL); - pthread_cond_init(&threadpool->command_condvar, NULL); #endif #if PTHREADPOOL_USE_FUTEX @@ -325,6 +336,12 @@ PTHREADPOOL_WEAK struct pthreadpool* pthreadpool_create(size_t threads_count) { PTHREADPOOL_PRIVATE_IMPL(pthreadpool_create) +void pthreadpool_set_num_threads_to_use(size_t num_threads) { + max_num_threads = num_threads; +} + +size_t pthreadpool_get_num_threads_to_use() { return max_num_threads; } + PTHREADPOOL_INTERNAL void pthreadpool_parallelize( struct pthreadpool* threadpool, thread_function_t thread_function, const void* params, size_t params_size, void* task, void* context, @@ -337,10 +354,16 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Protect the global threadpool structures */ pthread_mutex_lock(&threadpool->execution_mutex); -#if !PTHREADPOOL_USE_FUTEX + const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; + size_t max_threads_to_use = pthreadpool_get_num_threads_to_use(); + const struct fxdiv_divisor_size_t num_threads_to_use = + fxdiv_init_size_t(min(threads_count.value, max_threads_to_use)); /* Lock the command variables to ensure that threads don't start processing * before they observe complete command with all arguments */ - pthread_mutex_lock(&threadpool->command_mutex); +#if !PTHREADPOOL_USE_FUTEX + for (size_t tid = 1; tid < num_threads_to_use.value; tid++) { + pthread_mutex_lock(&(threadpool->threads[tid].command_mutex)); + } #endif /* Setup global arguments */ @@ -352,9 +375,11 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Locking of completion_mutex not needed: readers are sleeping on * command_condvar */ - const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, - threads_count.value - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t( + &threadpool->active_threads, + num_threads_to_use.value - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + num_threads_to_use.value); #if PTHREADPOOL_USE_FUTEX pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif @@ -366,9 +391,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Spread the work between threads */ const struct fxdiv_result_size_t range_params = - fxdiv_divide_size_t(linear_range, threads_count); + fxdiv_divide_size_t(linear_range, num_threads_to_use); size_t range_start = 0; - for (size_t tid = 0; tid < threads_count.value; tid++) { + for (size_t tid = 0; tid < num_threads_to_use.value; tid++) { struct thread_info* thread = &threadpool->threads[tid]; const size_t range_length = range_params.quotient + (size_t)(tid < range_params.remainder); @@ -381,40 +406,44 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( range_start = range_end; } - /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, - * argument, flags) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in - * command mask to ensure the unmasked command is different then the last - * command, because worker threads monitor for change in the unmasked command. - */ - const uint32_t old_command = - pthreadpool_load_relaxed_uint32_t(&threadpool->command); - const uint32_t new_command = - ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; - - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated command - * parameters. - * - * Note: release semantics is necessary even with a conditional variable, - * because the workers might be waiting in a spin-loop rather than the - * conditional variable. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, new_command); + for (size_t tid = 1; tid < num_threads_to_use.value; tid++) { + /* + * Update the threadpool command. + * Imporantly, do it after initializing command parameters (range, task, + * argument, flags) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in + * command mask to ensure the unmasked command is different then the last + * command, because worker threads monitor for change in the unmasked + * command. + */ + const uint32_t old_command = + pthreadpool_load_relaxed_uint32_t(&(threadpool->threads[tid].command)); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | + threadpool_command_parallelize; + + /* + * Store the command with release semantics to guarantee that if a worker + * thread observes the new command value, it also observes the updated + * command parameters. + * + * Note: release semantics is necessary even with a conditional variable, + * because the workers might be waiting in a spin-loop rather than the + * conditional variable. + */ + pthreadpool_store_release_uint32_t(&(threadpool->threads[tid].command), + new_command); #if PTHREADPOOL_USE_FUTEX - /* Wake up the threads */ - futex_wake_all(&threadpool->command); + /* Wake up the threads */ + futex_wake_all(&(threadpool->threads[tid].command)); #else - /* Unlock the command variables before waking up the threads for better - * performance */ - pthread_mutex_unlock(&threadpool->command_mutex); + /* Unlock the command variables before waking up the threads for better + * performance */ + pthread_mutex_unlock(&(threadpool->threads[tid].command_mutex)); - /* Wake up the threads */ - pthread_cond_broadcast(&threadpool->command_condvar); + /* Wake up the threads */ + pthread_cond_broadcast(&(threadpool->threads[tid].command_condvar)); #endif + } /* Save and modify FPU denormals control, if needed */ struct fpu_state saved_fpu_state = {0}; @@ -450,41 +479,48 @@ PTHREADPOOL_WEAK void pthreadpool_destroy(struct pthreadpool* threadpool) { threads_count - 1 /* caller thread */); pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated - * active_threads/has_active_threads values. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, - threadpool_command_shutdown); - /* Wake up worker threads */ - futex_wake_all(&threadpool->command); + for (size_t tid = 1; tid < threads_count; tid++) { + /* + * Store the command with release semantics to guarantee that if a + * worker thread observes the new command value, it also observes the + * updated active_threads/has_active_threads values. + */ + pthreadpool_store_release_uint32_t(&threadpool->threads[tid].command, + threadpool_command_shutdown); + + futex_wake_all(&threadpool->threads[tid].command); + } #else - /* Lock the command variable to ensure that threads don't shutdown until - * both command and active_threads are updated */ - pthread_mutex_lock(&threadpool->command_mutex); - + for (size_t tid = 1; tid < threads_count; tid++) { + /* Lock the command variable to ensure that threads don't shutdown until + * both command and active_threads are updated */ + pthread_mutex_lock(&threadpool->threads[tid].command_mutex); + } pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated - * active_threads value. - * - * Note: the release fence inside pthread_mutex_unlock is insufficient, - * because the workers might be waiting in a spin-loop rather than the - * conditional variable. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, - threadpool_command_shutdown); - - /* Wake up worker threads */ - pthread_cond_broadcast(&threadpool->command_condvar); + for (size_t tid = 1; tid < threads_count; tid++) { + /* + * Store the command with release semantics to guarantee that if a + * worker thread observes the new command value, it also observes the + * updated active_threads value. + * + * Note: the release fence inside pthread_mutex_unlock is insufficient, + * because the workers might be waiting in a spin-loop rather than the + * conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->threads[tid].command, + threadpool_command_shutdown); + + /* Wake up worker threads */ + pthread_cond_broadcast(&threadpool->threads[tid].command_condvar); + } - /* Commit the state changes and let workers start processing */ - pthread_mutex_unlock(&threadpool->command_mutex); + for (size_t tid = 1; tid < threads_count; tid++) { + /* Commit the state changes and let workers start processing */ + pthread_mutex_unlock(&threadpool->threads[tid].command_mutex); + } #endif /* Wait until all threads return */ @@ -497,8 +533,10 @@ PTHREADPOOL_WEAK void pthreadpool_destroy(struct pthreadpool* threadpool) { #if !PTHREADPOOL_USE_FUTEX pthread_mutex_destroy(&threadpool->completion_mutex); pthread_cond_destroy(&threadpool->completion_condvar); - pthread_mutex_destroy(&threadpool->command_mutex); - pthread_cond_destroy(&threadpool->command_condvar); + for (size_t tid = 0; tid < threads_count.value; tid++) { + pthread_mutex_destroy(&threadpool->threads[tid].command_mutex); + pthread_cond_destroy(&threadpool->threads[tid].command_condvar); + } #endif } #if PTHREADPOOL_USE_CPUINFO diff --git a/src/threadpool-common.h b/src/threadpool-common.h index 087cda1..0fab42e 100644 --- a/src/threadpool-common.h +++ b/src/threadpool-common.h @@ -116,4 +116,21 @@ #endif #endif +// ported from: +// https://stackoverflow.com/questions/18298280/how-to-declare-a-variable-as-thread-local-portably +/* gcc doesn't know _Thread_local from C11 yet */ +#ifdef __GNUC__ +#define thread_local __thread +/* +// c11 standard already has thread_local specified +// https://en.cppreference.com/w/c/thread/thread_local +#elif __STDC_VERSION__ >= 201112L +# define thread_local _Thread_local +*/ +#elif defined(_MSC_VER) +#define thread_local __declspec(thread) +#else +#error Cannot define thread_local +#endif + #endif // __PTHREADPOOL_SRC_THREADPOOL_COMMON_H_ diff --git a/src/threadpool-object.h b/src/threadpool-object.h index cde54d3..bab3e58 100644 --- a/src/threadpool-object.h +++ b/src/threadpool-object.h @@ -91,6 +91,32 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { */ HANDLE thread_handle; #endif + +#if !PTHREADPOOL_USE_GCD + /** + * The last command submitted to the thread pool. + */ + pthreadpool_atomic_uint32_t command; +#endif +#if PTHREADPOOL_USE_CONDVAR + /** + * Guards access to the @a command variable. + */ + pthread_mutex_t command_mutex; + /** + * Condition variable to wait for change of the @a command variable. + */ + pthread_cond_t command_condvar; +#endif +#if PTHREADPOOL_USE_EVENT + /** + * Events to wait on for change of the @a command variable. + * To avoid race conditions due to spin-lock synchronization, we use two + * events and switch event in use after every submitted command according to + * the high bit of the command word. + */ + HANDLE command_event[2]; +#endif }; PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % @@ -1056,6 +1082,13 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * The number of threads that are processing an operation. */ pthreadpool_atomic_size_t active_threads; + /* + * Enable restricting task parallelization among a subset of + * pthreadpool threads. + * As per this change, this feature is not available in GCD based + * pthreadpool + */ + pthreadpool_atomic_size_t num_threads_to_use; #endif #if PTHREADPOOL_USE_FUTEX /** @@ -1065,12 +1098,6 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * - has_active_threads == 1 if active_threads != 0 */ pthreadpool_atomic_uint32_t has_active_threads; -#endif -#if !PTHREADPOOL_USE_GCD - /** - * The last command submitted to the thread pool. - */ - pthreadpool_atomic_uint32_t command; #endif /** * The entry point function to call for each thread in the thread pool for @@ -1169,14 +1196,6 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * @a active_threads is zero). */ pthread_cond_t completion_condvar; - /** - * Guards access to the @a command variable. - */ - pthread_mutex_t command_mutex; - /** - * Condition variable to wait for change of the @a command variable. - */ - pthread_cond_t command_condvar; #endif #if PTHREADPOOL_USE_EVENT /** @@ -1185,14 +1204,8 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * synchronization, we use two events and switch event in use after every * submitted command according to the high bit of the command word. */ + uint32_t completion_event_index; HANDLE completion_event[2]; - /** - * Events to wait on for change of the @a command variable. - * To avoid race conditions due to spin-lock synchronization, we use two - * events and switch event in use after every submitted command according to - * the high bit of the command word. - */ - HANDLE command_event[2]; #endif /** * FXdiv divisor for the number of threads in the thread pool. diff --git a/src/windows.c b/src/windows.c index d3e9575..df79d83 100644 --- a/src/windows.c +++ b/src/windows.c @@ -16,6 +16,7 @@ /* Configuration header */ #include + #include "threadpool-common.h" /* Windows headers */ @@ -32,6 +33,8 @@ #include "threadpool-object.h" #include "threadpool-utils.h" +thread_local size_t max_num_threads = UINT_MAX; + static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) { if (pthreadpool_decrement_fetch_acquire_release_size_t( @@ -67,10 +70,10 @@ static void wait_worker_threads(struct pthreadpool* threadpool, assert(pthreadpool_load_relaxed_size_t(&threadpool->active_threads) == 0); } -static uint32_t wait_for_new_command(struct pthreadpool* threadpool, +static uint32_t wait_for_new_command(struct thread_info* thread, uint32_t last_command, uint32_t last_flags) { - uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + uint32_t command = pthreadpool_load_acquire_uint32_t(&thread->command); if (command != last_command) { return command; } @@ -80,7 +83,7 @@ static uint32_t wait_for_new_command(struct pthreadpool* threadpool, for (uint32_t i = 0; i < PTHREADPOOL_SPIN_WAIT_ITERATIONS; i++) { pthreadpool_yield(i); - command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + command = pthreadpool_load_acquire_uint32_t(&thread->command); if (command != last_command) { return command; } @@ -90,10 +93,10 @@ static uint32_t wait_for_new_command(struct pthreadpool* threadpool, /* Spin-wait disabled or timed out, fall back to event wait */ const uint32_t event_index = (last_command >> 31); const DWORD wait_status = - WaitForSingleObject(threadpool->command_event[event_index], INFINITE); + WaitForSingleObject(thread->command_event[event_index], INFINITE); assert(wait_status == WAIT_OBJECT_0); - command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + command = pthreadpool_load_relaxed_uint32_t(&thread->command); assert(command != last_command); return command; } @@ -110,7 +113,7 @@ static DWORD WINAPI thread_main(LPVOID arg) { /* Monitor new commands and act accordingly */ for (;;) { - uint32_t command = wait_for_new_command(threadpool, last_command, flags); + uint32_t command = wait_for_new_command(thread, last_command, flags); pthreadpool_fence_acquire(); flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); @@ -142,6 +145,9 @@ static DWORD WINAPI thread_main(LPVOID arg) { /* Notify the master thread that we finished processing */ const uint32_t event_index = command >> 31; checkin_worker_thread(threadpool, event_index); + const uint32_t completion_event_index = + pthreadpool_load_relaxed_uint32_t(&threadpool->completion_event_index); + checkin_worker_thread(threadpool, completion_event_index); /* Update last command */ last_command = command; }; @@ -161,6 +167,8 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; } threadpool->threads_count = fxdiv_init_size_t(threads_count); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + threads_count); for (size_t tid = 0; tid < threads_count; tid++) { threadpool->threads[tid].thread_number = tid; threadpool->threads[tid].threadpool = threadpool; @@ -176,13 +184,11 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { threadpool->completion_event[i] = CreateEventW( NULL /* event attributes */, TRUE /* manual-reset event: yes */, FALSE /* initial state: non-signaled */, NULL /* name */); - threadpool->command_event[i] = CreateEventW( - NULL /* event attributes */, TRUE /* manual-reset event: yes */, - FALSE /* initial state: non-signaled */, NULL /* name */); } pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->completion_event_index, 0); /* Caller thread serves as worker #0. Thus, we create system threads * starting with worker #1. */ @@ -191,6 +197,11 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { NULL /* thread attributes */, 0 /* stack size: default */, &thread_main, &threadpool->threads[tid], 0 /* creation flags */, NULL /* thread id */); + for (size_t i = 0; i < 2; i++) { + threadpool->threads[tid].command_event[i] = CreateEventW( + NULL /* event attributes */, TRUE /* manual-reset event: yes */, + FALSE /* initial state: nonsignaled */, NULL /* name */); + } } /* Wait until all threads initialize */ @@ -199,6 +210,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return threadpool; } +void pthreadpool_set_num_threads_to_use(size_t num_threads) { + max_num_threads = num_threads; +} + +size_t pthreadpool_get_num_threads_to_use() { return max_num_threads; } + PTHREADPOOL_INTERNAL void pthreadpool_parallelize( struct pthreadpool* threadpool, thread_function_t thread_function, const void* params, size_t params_size, void* task, void* context, @@ -221,8 +238,14 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, - threads_count.value - 1 /* caller thread */); + size_t max_threads_to_use = pthreadpool_get_num_threads_to_use(); + const struct fxdiv_divisor_size_t num_threads_to_use = + fxdiv_init_size_t(min(threads_count.value, max_threads_to_use)); + pthreadpool_store_relaxed_size_t( + &threadpool->active_threads, + num_threads_to_use.value - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + num_threads_to_use.value); if (params_size != 0) { CopyMemory(&threadpool->params, params, params_size); @@ -231,9 +254,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Spread the work between threads */ const struct fxdiv_result_size_t range_params = - fxdiv_divide_size_t(linear_range, threads_count); + fxdiv_divide_size_t(linear_range, num_threads_to_use); size_t range_start = 0; - for (size_t tid = 0; tid < threads_count.value; tid++) { + for (size_t tid = 0; tid < num_threads_to_use.value; tid++) { struct thread_info* thread = &threadpool->threads[tid]; const size_t range_length = range_params.quotient + (size_t)(tid < range_params.remainder); @@ -246,52 +269,62 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( range_start = range_end; } - /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, - * argument, flags) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in - * command mask to ensure the unmasked command is different then the last - * command, because worker threads monitor for change in the unmasked command. - */ - const uint32_t old_command = - pthreadpool_load_relaxed_uint32_t(&threadpool->command); - const uint32_t new_command = - ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; - - /* - * Reset the command event for the next command. - * It is important to reset the event before writing out the new command, - * because as soon as the worker threads observe the new command, they may - * process it and switch to waiting on the next command event. - * - * Note: the event is different from the command event signalled in this - * update. - */ - const uint32_t event_index = (old_command >> 31); - BOOL reset_event_status = - ResetEvent(threadpool->command_event[event_index ^ 1]); - assert(reset_event_status != FALSE); - - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated command - * parameters. - * - * Note: release semantics is necessary, because the workers might be waiting - * in a spin-loop rather than on the event object. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, new_command); - - /* - * Signal the event to wake up the threads. - * Event in use must be switched after every submitted command to avoid race - * conditions. Choose the event based on the high bit of the command, which is - * flipped on every update. - */ - const BOOL set_event_status = - SetEvent(threadpool->command_event[event_index]); - assert(set_event_status != FALSE); + uint32_t completion_event_index = + pthreadpool_load_relaxed_uint32_t(&threadpool->completion_event_index); + completion_event_index = completion_event_index ^ 1; + pthreadpool_store_relaxed_size_t(&threadpool->completion_event_index, + completion_event_index); + + for (size_t tid = 1; tid < num_threads_to_use.value; tid++) { + /* + * Update the threadpool command. + * Importantly, do it after initializing command parameters (range, task, + * argument, flags) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in + * command mask to ensure the unmasked command is different then the last + * command, because worker threads monitor for change in the unmasked + * command. + */ + const uint32_t old_command = + pthreadpool_load_relaxed_uint32_t(&threadpool->threads[tid].command); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | + threadpool_command_parallelize; + + /* + * Reset the command event for the next command. + * It is important to reset the event before writing out the new command, + * because as soon as the worker threads observe the new command, they may + * process it and switch to waiting on the next command event. + * + * Note: the event is different from the command event signalled in this + * update. + */ + const uint32_t event_index = (old_command >> 31); + BOOL reset_event_status = + ResetEvent(threadpool->threads[tid].command_event[event_index ^ 1]); + assert(reset_event_status != FALSE); + + /* + * Store the command with release semantics to guarantee that if a worker + * thread observes the new command value, it also observes the updated + * command parameters. + * + * Note: release semantics is necessary, because the workers might be + * waiting in a spin-loop rather than on the event object. + */ + pthreadpool_store_release_uint32_t(&threadpool->threads[tid].command, + new_command); + + /* + * Signal the event to wake up the threads. + * Event in use must be switched after every submitted command to avoid race + * conditions. Choose the event based on the high bit of the command, which + * is flipped on every update. + */ + const BOOL set_event_status = + SetEvent(threadpool->threads[tid].command_event[event_index]); + assert(set_event_status != FALSE); + } /* Save and modify FPU denormals control, if needed */ struct fpu_state saved_fpu_state = {0}; @@ -312,13 +345,14 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( * Wait until the threads finish computation * Use the complementary event because it corresponds to the new command. */ - wait_worker_threads(threadpool, event_index ^ 1); + wait_worker_threads(threadpool, completion_event_index); /* * Reset the completion event for the next command. * Note: the event is different from the one used for waiting in this update. */ - reset_event_status = ResetEvent(threadpool->completion_event[event_index]); + BOOL reset_event_status = + ResetEvent(threadpool->completion_event[completion_event_index ^ 1]); assert(reset_event_status != FALSE); /* Make changes by other threads visible to this thread */ @@ -336,26 +370,28 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated - * active_threads values. - */ - const uint32_t old_command = - pthreadpool_load_relaxed_uint32_t(&threadpool->command); - pthreadpool_store_release_uint32_t(&threadpool->command, - threadpool_command_shutdown); - - /* - * Signal the event to wake up the threads. - * Event in use must be switched after every submitted command to avoid - * race conditions. Choose the event based on the high bit of the command, - * which is flipped on every update. - */ - const uint32_t event_index = (old_command >> 31); - const BOOL set_event_status = - SetEvent(threadpool->command_event[event_index]); - assert(set_event_status != FALSE); + for (size_t tid = 1; tid < threads_count; tid++) { + /* + * Store the command with release semantics to guarantee that if a + * worker thread observes the new command value, it also observes the + * updated active_threads values. + */ + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t( + &threadpool->threads[tid].command); + pthreadpool_store_release_uint32_t(&threadpool->threads[tid].command, + threadpool_command_shutdown); + + /* + * Signal the event to wake up the threads. + * Event in use must be switched after every submitted command to avoid + * race conditions. Choose the event based on the high bit of the + * command, which is flipped on every update. + */ + const uint32_t event_index = (old_command >> 31); + const BOOL set_event_status = + SetEvent(threadpool->threads[tid].command_event[event_index]); + assert(set_event_status != FALSE); + } /* Wait until all threads return */ for (size_t tid = 1; tid < threads_count; tid++) { @@ -368,6 +404,13 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { const BOOL close_status = CloseHandle(thread_handle); assert(close_status != FALSE); } + for (size_t i = 0; i < 2; i++) { + if (threadpool->threads[tid].command_event[i] != NULL) { + const BOOL close_status = + CloseHandle(threadpool->threads[tid].command_event[i]); + assert(close_status != FALSE); + } + } } /* Release resources */ @@ -376,10 +419,6 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { assert(close_status != FALSE); } for (size_t i = 0; i < 2; i++) { - if (threadpool->command_event[i] != NULL) { - const BOOL close_status = CloseHandle(threadpool->command_event[i]); - assert(close_status != FALSE); - } if (threadpool->completion_event[i] != NULL) { const BOOL close_status = CloseHandle(threadpool->completion_event[i]); diff --git a/test/pthreadpool.cc b/test/pthreadpool.cc index 687c9c9..52be0cc 100644 --- a/test/pthreadpool.cc +++ b/test/pthreadpool.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include // NOLINT #include @@ -13976,3 +13977,335 @@ TEST(Parallelize6DTile2D, MultiThreadPoolWorkStealing) { kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); } + +struct array_addition_context { + double* augend; + double* addend; + double* sum; + std::mutex m; + int num_threads; + int* thread_ids; +}; + +static void add_arrays(struct array_addition_context* context, size_t start_i, + size_t tile_i) { +#if defined(__linux__) || defined(__EMSCRIPTEN__) || defined(_WIN32) || \ + defined(__CYGWIN__) + { + int thread_id; +#if defined(_WIN32) || defined(__CYGWIN__) + thread_id = GetCurrentThreadId(); +#else + thread_id = pthread_self(); +#endif + std::lock_guard l(context->m); + for (int i = 0; i < context->num_threads; i++) { + if (context->thread_ids[i] == thread_id) { + break; + } else if (context->thread_ids[i] == 0) { + context->thread_ids[i] = thread_id; + break; + } + } + } +#endif + for (size_t i = start_i; i < start_i + tile_i; ++i) { + context->sum[i] = context->augend[i] + context->addend[i]; + } +} + +void init_context(double* augend, double* addend, double* sum, int* thread_ids, + int num_threads, struct array_addition_context* context, + double* ref_sum, double init_val = 1.5) { + double val = init_val; + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + augend[i] = val; + addend[i] = val; + ref_sum[i] = 2 * val; + } + for (size_t i = 0; i < num_threads; ++i) { + thread_ids[i] = 0; + } + + context->augend = augend; + context->addend = addend; + context->sum = sum; + context->num_threads = num_threads; + context->thread_ids = thread_ids; +} + +void check_num_threads_used(int* thread_ids, int num_threads, + int num_threads_used) { +#if defined(__linux__) || defined(__EMSCRIPTEN__) || defined(_WIN32) || \ + defined(__CYGWIN__) + int total_threads = 0; + for (size_t i = 0; i < num_threads; ++i) { + if (thread_ids[i] != 0) { + total_threads += 1; + } + } + EXPECT_EQ(total_threads, num_threads_used); +#endif +} + +/* + * Check num threads used is little flaky since work stealing can result in + * one thread stealing the work from the other thread as a result of which the + * other thread will never pick up the work. If it does not pick up the work, it + * will not set the thread_ids appropriately. One way to fix this is to make + * workstealing compile time option and disable it for testing. Not in favor of + * it though, so maybe need a better way of testing. + */ +TEST(CapNumThreadsTest, RunUnderCapacity) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(2); + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 2); +} + +TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes1) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(2); + pthreadpool_set_num_threads_to_use(3); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 3); +} + +TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes2) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(2); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 2); + + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum, + 2.3); + pthreadpool_set_num_threads_to_use(3); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 3); +} + +TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes3) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(1); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 1); + + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum, + 2.3); + pthreadpool_set_num_threads_to_use(4); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 4); +} + +TEST(CapNumThreadsTest, RunAtCapacity) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(num_threads); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, num_threads); +} + +TEST(CapNumThreadsTest, RunOverCapacity) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(16); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, num_threads); +} + +TEST(CapNumThreadsTest, RunSingleThreaded) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(1); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 1); +}