From e1c0034c9974ce1745bffd6bbe9cc85a1c787ad3 Mon Sep 17 00:00:00 2001 From: Kimish Patel Date: Mon, 2 Jun 2025 13:00:25 -0700 Subject: [PATCH] API to specify number of threads, from threadpool, to use for the task Summary: This PR adds a way use fewer threads than configured with in pthreadpool. Occassionaly it has been seen that using the # of thredas = logical core is not efficient. This can be due to system load and varying other factors that lead threads either being mapped to slower cores or being mapped to fewer than logical core. Thus this PR attempt to fix this. Approach: Add api to set thread local var for specifying the #of threads to use. pthreadpool_parallelize will then distributed the work only among specified threads. Threads that are not picked continue to wait, likely via mutex/condvar, for next chunk of work and thus give up their cpu slot. Both pthreads.c windows.c are modified to add this feature. In the original PR https://github.com/Maratyszcza/pthreadpool/pull/17, there were quite a few suggestions made. Some fundamental ones, such as - using thread local num_threads_to_use vs make this a property of threadpool object itself, remained unresolved. - Waking up all the threads even when they dont participate in the work (this one definitely makes sense) It would make sense to resolve those in this iteration. Test Plan: 4 tests are added to check this. Reviewers: Subscribers: Tasks: Tags: --- include/pthreadpool.h | 19 +++ src/fastpath.c | 84 ++++++---- src/gcd.c | 13 +- src/portable-api.c | 335 ++++++++++++++++++++++------------------ src/pthreads.c | 198 ++++++++++++++---------- src/threadpool-common.h | 17 ++ src/threadpool-object.h | 55 ++++--- src/windows.c | 209 +++++++++++++++---------- test/pthreadpool.cc | 333 +++++++++++++++++++++++++++++++++++++++ 9 files changed, 897 insertions(+), 366 deletions(-) diff --git a/include/pthreadpool.h b/include/pthreadpool.h index 9ec49d6..23e81eb 100644 --- a/include/pthreadpool.h +++ b/include/pthreadpool.h @@ -160,6 +160,25 @@ pthreadpool_t pthreadpool_create(size_t threads_count); */ size_t pthreadpool_get_threads_count(pthreadpool_t threadpool); +/* + * API to enable doing work with fewer threads than available in + * threadpool. + * Purpose of this is to ameliorate some perf degradation observed + * due to OS mapping a given set of threads to fewer cores. + * + * @param num_threads num threads to use for the subsequent tasks + * submitted. + */ +void pthreadpool_set_num_threads_to_use(size_t num_threads); + +/* + * Query current setting of the number of threads to use + * + * @returns The number of threads to be used for the subsequent tasks + * submitted. + */ +size_t pthreadpool_get_num_threads_to_use(void); + /** * Process items on a 1D grid. * diff --git a/src/fastpath.c b/src/fastpath.c index 7c4196c..98d5db6 100644 --- a/src/fastpath.c +++ b/src/fastpath.c @@ -39,7 +39,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_fastpath( (pthreadpool_task_1d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -77,7 +78,8 @@ pthreadpool_thread_parallelize_1d_with_thread_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -126,7 +128,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -163,7 +166,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -206,7 +210,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_fastpath( (pthreadpool_task_2d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -258,7 +263,8 @@ pthreadpool_thread_parallelize_2d_with_thread_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -309,7 +315,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -378,7 +385,8 @@ pthreadpool_thread_parallelize_2d_tile_1d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -449,7 +457,8 @@ pthreadpool_thread_parallelize_2d_tile_1d_with_uarch_with_thread_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -508,7 +517,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -581,7 +591,8 @@ pthreadpool_thread_parallelize_2d_tile_2d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -644,7 +655,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_fastpath( (pthreadpool_task_3d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -706,7 +718,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -773,7 +786,8 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_thread_fastpath( pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -853,7 +867,8 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -934,7 +949,8 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_uarch_with_thread_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1003,7 +1019,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1087,7 +1104,8 @@ pthreadpool_thread_parallelize_3d_tile_2d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1161,7 +1179,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_fastpath( (pthreadpool_task_4d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1234,7 +1253,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1311,7 +1331,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1405,7 +1426,8 @@ pthreadpool_thread_parallelize_4d_tile_2d_with_uarch_fastpath( } #endif - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1490,7 +1512,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_fastpath( (pthreadpool_task_5d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1573,7 +1596,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1661,7 +1685,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1752,7 +1777,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_fastpath( (pthreadpool_task_6d_t)pthreadpool_load_relaxed_void_p(&threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1846,7 +1872,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_1d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1944,7 +1971,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_2d_fastpath( &threadpool->task); void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ diff --git a/src/gcd.c b/src/gcd.c index fddf499..a252a01 100644 --- a/src/gcd.c +++ b/src/gcd.c @@ -30,6 +30,8 @@ #include "threadpool-object.h" #include "threadpool-utils.h" +thread_local size_t max_num_threads = UINT_MAX; + static void thread_main(void* arg, size_t thread_index) { struct pthreadpool* threadpool = (struct pthreadpool*)arg; struct thread_info* thread = &threadpool->threads[thread_index]; @@ -73,6 +75,8 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; } threadpool->threads_count = fxdiv_init_size_t(threads_count); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + threads_count); for (size_t tid = 0; tid < threads_count; tid++) { threadpool->threads[tid].thread_number = tid; } @@ -85,6 +89,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return threadpool; } +void pthreadpool_set_num_threads_to_use(size_t num_threads) { + max_num_threads = num_threads; +} + +size_t pthreadpool_get_num_threads_to_use() { return max_num_threads; } + PTHREADPOOL_INTERNAL void pthreadpool_parallelize( struct pthreadpool* threadpool, thread_function_t thread_function, const void* params, size_t params_size, void* task, void* context, @@ -107,7 +117,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Locking of completion_mutex not needed: readers are sleeping on * command_condvar */ - const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; + const struct fxdiv_divisor_size_t threads_count = fxdiv_init_size_t(min( + threadpool->threads_count.value, pthreadpool_get_num_threads_to_use())); if (params_size != 0) { memcpy(&threadpool->params, params, params_size); diff --git a/src/portable-api.c b/src/portable-api.c index d173103..2864d06 100644 --- a/src/portable-api.c +++ b/src/portable-api.c @@ -93,7 +93,8 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -127,7 +128,8 @@ static void thread_parallelize_1d_with_thread(struct pthreadpool* threadpool, } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -173,7 +175,8 @@ static void thread_parallelize_1d_with_uarch(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -213,7 +216,8 @@ static void thread_parallelize_1d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -238,7 +242,8 @@ static void thread_parallelize_1d_tile_1d_dynamic( // Get a handle on the params. struct pthreadpool_1d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_1d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range; const size_t tile_i = params->tile; const pthreadpool_task_1d_tile_1d_dynamic_t task = @@ -294,7 +299,8 @@ static void thread_parallelize_1d_tile_1d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_1d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_1d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range; const size_t tile_i = params->tile; const pthreadpool_task_1d_tile_1d_dynamic_with_id_t task = @@ -350,7 +356,8 @@ static void thread_parallelize_1d_tile_1d_dynamic_with_uarch_with_thread( // Get a handle on the params. struct pthreadpool_1d_tile_1d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_1d_tile_1d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range; const size_t tile_i = params->tile; const pthreadpool_task_1d_tile_1d_dynamic_with_id_with_thread_t task = @@ -437,7 +444,8 @@ static void thread_parallelize_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -485,7 +493,8 @@ static void thread_parallelize_2d_with_thread(struct pthreadpool* threadpool, } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -536,7 +545,8 @@ static void thread_parallelize_2d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -603,7 +613,8 @@ static void thread_parallelize_2d_tile_1d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -671,7 +682,8 @@ static void thread_parallelize_2d_tile_1d_with_uarch_with_thread( } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -699,7 +711,8 @@ static void thread_parallelize_2d_tile_1d_dynamic( // Get a handle on the params. struct pthreadpool_2d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_2d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t tile_j = params->tile_j; const size_t tile_range_j = divide_round_up(range_j, tile_j); @@ -769,7 +782,8 @@ static void thread_parallelize_2d_tile_1d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_2d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_2d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t tile_j = params->tile_j; const size_t tile_range_j = divide_round_up(range_j, tile_j); @@ -839,7 +853,8 @@ static void thread_parallelize_2d_tile_1d_dynamic_with_uarch_with_thread( // Get a handle on the params. struct pthreadpool_2d_tile_1d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_2d_tile_1d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t tile_j = params->tile_j; const size_t tile_range_j = divide_round_up(range_j, tile_j); @@ -947,7 +962,8 @@ static void thread_parallelize_2d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1020,7 +1036,8 @@ static void thread_parallelize_2d_tile_2d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1049,7 +1066,8 @@ static void thread_parallelize_2d_tile_2d_dynamic( // Get a handle on the params. struct pthreadpool_2d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_2d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range_i; const size_t range_j = params->range_j; const size_t tile_i = params->tile_i; @@ -1151,7 +1169,8 @@ static void thread_parallelize_2d_tile_2d_dynamic_with_uarch( // Get a handle on the params. struct pthreadpool_2d_tile_2d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_2d_tile_2d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range_i; const size_t range_j = params->range_j; const size_t tile_i = params->tile_i; @@ -1239,7 +1258,8 @@ static void thread_parallelize_2d_tile_2d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_2d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_2d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_i = params->range_i; const size_t range_j = params->range_j; const size_t tile_i = params->tile_i; @@ -1356,7 +1376,8 @@ static void thread_parallelize_3d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1418,7 +1439,8 @@ static void thread_parallelize_3d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1482,7 +1504,8 @@ static void thread_parallelize_3d_tile_1d_with_thread( } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1559,7 +1582,8 @@ static void thread_parallelize_3d_tile_1d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1637,7 +1661,8 @@ static void thread_parallelize_3d_tile_1d_with_uarch_with_thread( } /* There still may be other threads with work */ - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1667,7 +1692,8 @@ static void thread_parallelize_3d_tile_1d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_3d_tile_1d_dynamic_params* params = &threadpool->params.parallelize_3d_tile_1d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t tile_k = params->tile_k; @@ -1779,7 +1805,8 @@ static void thread_parallelize_3d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1863,7 +1890,8 @@ static void thread_parallelize_3d_tile_2d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -1894,7 +1922,8 @@ static void thread_parallelize_3d_tile_2d_dynamic( // Get a handle on the params. struct pthreadpool_3d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_3d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t tile_j = params->tile_j; @@ -2009,7 +2038,8 @@ static void thread_parallelize_3d_tile_2d_dynamic_with_uarch( // Get a handle on the params. struct pthreadpool_3d_tile_2d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_3d_tile_2d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t tile_j = params->tile_j; @@ -2112,7 +2142,8 @@ static void thread_parallelize_3d_tile_2d_dynamic_with_thread( // Get a handle on the params. struct pthreadpool_3d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_3d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t tile_j = params->tile_j; @@ -2253,7 +2284,8 @@ static void thread_parallelize_4d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2326,7 +2358,8 @@ static void thread_parallelize_4d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2403,7 +2436,8 @@ static void thread_parallelize_4d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2497,7 +2531,8 @@ static void thread_parallelize_4d_tile_2d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2531,7 +2566,8 @@ static void thread_parallelize_4d_tile_2d_dynamic( // Get a handle on the params. struct pthreadpool_4d_tile_2d_dynamic_params* params = &threadpool->params.parallelize_4d_tile_2d_dynamic; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t range_l = params->range_l; @@ -2657,7 +2693,8 @@ static void thread_parallelize_4d_tile_2d_dynamic_with_uarch( // Get a handle on the params. struct pthreadpool_4d_tile_2d_dynamic_with_uarch_params* params = &threadpool->params.parallelize_4d_tile_2d_dynamic_with_uarch; - const size_t num_threads = threadpool->threads_count.value; + const size_t num_threads = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); const size_t range_j = params->range_j; const size_t range_k = params->range_k; const size_t range_l = params->range_l; @@ -2816,7 +2853,8 @@ static void thread_parallelize_5d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2899,7 +2937,8 @@ static void thread_parallelize_5d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -2987,7 +3026,8 @@ static void thread_parallelize_5d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -3077,7 +3117,8 @@ static void thread_parallelize_6d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -3171,7 +3212,8 @@ static void thread_parallelize_6d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -3270,7 +3312,8 @@ static void thread_parallelize_6d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = threadpool->threads_count.value; + const size_t threads_count = + pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; @@ -3300,13 +3343,22 @@ static void thread_parallelize_6d_tile_2d(struct pthreadpool* threadpool, pthreadpool_fence_release(); } +static inline size_t pthreadpool_num_threads_to_use( + struct pthreadpool* threadpool) { + size_t threads_count = 1; + if (threadpool != NULL) { + const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); + threads_count = min(threadpool->threads_count.value, num_threads_to_use); + } + return threads_count; +} + PTHREADPOOL_WEAK void pthreadpool_parallelize_1d(struct pthreadpool* threadpool, pthreadpool_task_1d_t function, void* context, size_t range, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3337,9 +3389,8 @@ PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_1d) PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_with_thread( struct pthreadpool* threadpool, pthreadpool_task_1d_with_thread_t function, void* context, size_t range, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3373,9 +3424,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_with_uarch( pthreadpool_t threadpool, pthreadpool_task_1d_with_id_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ uint32_t uarch_index = default_uarch_index; @@ -3422,9 +3472,8 @@ PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_1d_with_uarch) PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_1d_tile_1d_t function, void* context, size_t range, size_t tile, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= tile) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= tile) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3462,9 +3511,8 @@ PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_1d_tile_1d) PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_tile_1d_dynamic( pthreadpool_t threadpool, pthreadpool_task_1d_tile_1d_dynamic_t function, void* context, size_t range, size_t tile, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= tile) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= tile) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3493,9 +3541,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_1d_tile_1d_dynamic_with_thread( pthreadpool_t threadpool, pthreadpool_task_1d_tile_1d_dynamic_with_id_t function, void* context, size_t range, size_t tile, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= tile) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= tile) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3526,9 +3573,8 @@ pthreadpool_parallelize_1d_tile_1d_dynamic_with_uarch_with_thread( pthreadpool_task_1d_tile_1d_dynamic_with_id_with_thread_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range, size_t tile, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || range <= tile) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || range <= tile) { uint32_t uarch_index = default_uarch_index; #if PTHREADPOOL_USE_CPUINFO uarch_index = @@ -3571,10 +3617,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d(pthreadpool_t threadpool, void* context, size_t range_i, size_t range_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || - (range_i | range_j) <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3611,10 +3655,8 @@ PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_2d) PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_with_thread( pthreadpool_t threadpool, pthreadpool_task_2d_with_thread_t function, void* context, size_t range_i, size_t range_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || - (range_i | range_j) <= 1) { + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { @@ -3654,9 +3696,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_t function, void* context, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -3700,9 +3741,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d_with_uarch( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_with_id_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -3760,9 +3800,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d_with_uarch_with_thread( pthreadpool_task_2d_tile_1d_with_id_with_thread_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -3820,7 +3859,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d_dynamic( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_dynamic_t function, void* context, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -3854,7 +3894,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_1d_dynamic_with_thread( pthreadpool_t threadpool, pthreadpool_task_2d_tile_1d_dynamic_with_id_t function, void* context, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -3890,7 +3931,8 @@ pthreadpool_parallelize_2d_tile_1d_dynamic_with_uarch_with_thread( pthreadpool_task_2d_tile_1d_dynamic_with_id_with_thread_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { uint32_t uarch_index = default_uarch_index; #if PTHREADPOOL_USE_CPUINFO @@ -3938,9 +3980,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_2d_tile_2d_t function, void* context, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -3988,7 +4029,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d_dynamic( pthreadpool_t threadpool, pthreadpool_task_2d_tile_2d_dynamic_t function, void* context, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4030,7 +4072,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d_dynamic_with_uarch( pthreadpool_task_2d_tile_2d_dynamic_with_id_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ uint32_t uarch_index = default_uarch_index; @@ -4084,7 +4127,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d_dynamic_with_thread( pthreadpool_task_2d_tile_2d_dynamic_with_id_t function, void* context, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4127,9 +4171,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_2d_tile_2d_with_uarch( void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t tile_i, size_t tile_j, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -4191,9 +4234,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d(pthreadpool_t threadpool, void* context, size_t range_i, size_t range_j, size_t range_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4235,9 +4277,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_3d_tile_1d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4285,9 +4326,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_with_thread( pthreadpool_task_3d_tile_1d_with_thread_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4336,9 +4376,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_with_uarch( void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -4399,9 +4438,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_with_uarch_with_thread( pthreadpool_task_3d_tile_1d_with_id_with_thread_t function, void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -4455,14 +4493,16 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_with_uarch_with_thread( } } -PTHREADPOOL_PRIVATE_IMPL(pthreadpool_parallelize_3d_tile_1d_with_uarch_with_thread) +PTHREADPOOL_PRIVATE_IMPL( + pthreadpool_parallelize_3d_tile_1d_with_uarch_with_thread) PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_1d_dynamic_with_thread( pthreadpool_t threadpool, pthreadpool_task_3d_tile_1d_dynamic_with_id_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4500,9 +4540,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_3d_tile_2d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4553,7 +4592,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d_dynamic( pthreadpool_t threadpool, pthreadpool_task_3d_tile_2d_dynamic_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4602,7 +4642,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d_dynamic_with_uarch( uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ uint32_t uarch_index = default_uarch_index; @@ -4661,7 +4702,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d_dynamic_with_thread( pthreadpool_task_3d_tile_2d_dynamic_with_id_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4709,9 +4751,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_3d_tile_2d_with_uarch( void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t tile_j, size_t tile_k, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -4777,9 +4818,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d(pthreadpool_t threadpool, size_t range_j, size_t range_k, size_t range_l, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4826,9 +4866,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_4d_tile_1d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_l, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4880,9 +4919,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_2d( pthreadpool_t threadpool, pthreadpool_task_4d_tile_2d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_k, size_t tile_l, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -4938,9 +4976,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_2d_with_uarch( void* context, uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_k, size_t tile_l, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5008,7 +5045,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_2d_dynamic( pthreadpool_t threadpool, pthreadpool_task_4d_tile_2d_dynamic_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_k, size_t tile_l, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5063,7 +5101,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_4d_tile_2d_dynamic_with_uarch( uint32_t default_uarch_index, uint32_t max_uarch_index, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t tile_k, size_t tile_l, uint32_t flags) { - if (threadpool == NULL || threadpool->threads_count.value <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5129,9 +5168,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_5d(pthreadpool_t threadpool, size_t range_j, size_t range_k, size_t range_l, size_t range_m, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l | range_m) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -5181,9 +5219,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_5d_tile_1d( pthreadpool_t threadpool, pthreadpool_task_5d_tile_1d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t tile_m, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -5239,9 +5276,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_5d_tile_2d( void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t tile_l, size_t tile_m, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l && range_m <= tile_m)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5300,9 +5336,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_6d( pthreadpool_t threadpool, pthreadpool_task_6d_t function, void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t range_n, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l | range_m | range_n) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = {0}; @@ -5356,9 +5391,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_6d_tile_1d( void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t range_n, size_t tile_n, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l | range_m) <= 1 && range_n <= tile_n)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -5418,9 +5452,8 @@ PTHREADPOOL_WEAK void pthreadpool_parallelize_6d_tile_2d( void* context, size_t range_i, size_t range_j, size_t range_k, size_t range_l, size_t range_m, size_t range_n, size_t tile_m, size_t tile_n, uint32_t flags) { - size_t threads_count; - if (threadpool == NULL || - (threads_count = threadpool->threads_count.value) <= 1 || + const size_t threads_count = pthreadpool_num_threads_to_use(threadpool); + if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m && range_n <= tile_n)) { /* No thread pool used: execute task sequentially on the calling thread */ diff --git a/src/pthreads.c b/src/pthreads.c index 70291c6..ea87ab1 100644 --- a/src/pthreads.c +++ b/src/pthreads.c @@ -22,6 +22,7 @@ /* Configuration header */ #include + #include "threadpool-common.h" /* POSIX headers */ @@ -72,6 +73,8 @@ #include "threadpool-object.h" #include "threadpool-utils.h" +thread_local size_t max_num_threads = UINT_MAX; + #if PTHREADPOOL_USE_FUTEX #if defined(__linux__) static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { @@ -164,10 +167,10 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { #endif } -static uint32_t wait_for_new_command(struct pthreadpool* threadpool, +static uint32_t wait_for_new_command(struct thread_info* thread, uint32_t last_command, uint32_t last_flags) { - uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + uint32_t command = pthreadpool_load_acquire_uint32_t(&thread->command); if (command != last_command) { return command; } @@ -177,7 +180,7 @@ static uint32_t wait_for_new_command(struct pthreadpool* threadpool, for (uint32_t i = 0; i < PTHREADPOOL_SPIN_WAIT_ITERATIONS; i++) { pthreadpool_yield(i); - command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + command = pthreadpool_load_acquire_uint32_t(&thread->command); if (command != last_command) { return command; } @@ -187,20 +190,20 @@ static uint32_t wait_for_new_command(struct pthreadpool* threadpool, /* Spin-wait disabled or timed out, fall back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX do { - futex_wait(&threadpool->command, last_command); - command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + futex_wait(&thread->command, last_command); + command = pthreadpool_load_acquire_uint32_t(&thread->command); } while (command == last_command); #else /* Lock the command mutex */ - pthread_mutex_lock(&threadpool->command_mutex); + pthread_mutex_lock(&thread->command_mutex); /* Read the command */ - while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == + while ((command = pthreadpool_load_acquire_uint32_t(&thread->command)) == last_command) { /* Wait for new command */ - pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); + pthread_cond_wait(&thread->command_condvar, &thread->command_mutex); } /* Read a new command */ - pthread_mutex_unlock(&threadpool->command_mutex); + pthread_mutex_unlock(&thread->command_mutex); #endif return command; } @@ -217,7 +220,7 @@ static void* thread_main(void* arg) { /* Monitor new commands and act accordingly */ for (;;) { - uint32_t command = wait_for_new_command(threadpool, last_command, flags); + uint32_t command = wait_for_new_command(thread, last_command, flags); pthreadpool_fence_acquire(); flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); @@ -288,9 +291,19 @@ PTHREADPOOL_WEAK struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; } threadpool->threads_count = fxdiv_init_size_t(threads_count); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + threads_count); for (size_t tid = 0; tid < threads_count; tid++) { threadpool->threads[tid].thread_number = tid; threadpool->threads[tid].threadpool = threadpool; + // Since command is per thread we are creating conditional variables per + // thread as well. However, only children thread participate in wait/wakeup + // signalling. + // Note instead of using !use_futext this is just realying on use_condvar +#if PTHREADPOOL_USE_CONDVAR + pthread_mutex_init(&(threadpool->threads[tid].command_mutex), NULL); + pthread_cond_init(&(threadpool->threads[tid].command_condvar), NULL); +#endif } /* Thread pool with a single thread computes everything on the caller thread. @@ -300,8 +313,6 @@ PTHREADPOOL_WEAK struct pthreadpool* pthreadpool_create(size_t threads_count) { #if !PTHREADPOOL_USE_FUTEX pthread_mutex_init(&threadpool->completion_mutex, NULL); pthread_cond_init(&threadpool->completion_condvar, NULL); - pthread_mutex_init(&threadpool->command_mutex, NULL); - pthread_cond_init(&threadpool->command_condvar, NULL); #endif #if PTHREADPOOL_USE_FUTEX @@ -325,6 +336,12 @@ PTHREADPOOL_WEAK struct pthreadpool* pthreadpool_create(size_t threads_count) { PTHREADPOOL_PRIVATE_IMPL(pthreadpool_create) +void pthreadpool_set_num_threads_to_use(size_t num_threads) { + max_num_threads = num_threads; +} + +size_t pthreadpool_get_num_threads_to_use() { return max_num_threads; } + PTHREADPOOL_INTERNAL void pthreadpool_parallelize( struct pthreadpool* threadpool, thread_function_t thread_function, const void* params, size_t params_size, void* task, void* context, @@ -337,10 +354,16 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Protect the global threadpool structures */ pthread_mutex_lock(&threadpool->execution_mutex); -#if !PTHREADPOOL_USE_FUTEX + const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; + size_t max_threads_to_use = pthreadpool_get_num_threads_to_use(); + const struct fxdiv_divisor_size_t num_threads_to_use = + fxdiv_init_size_t(min(threads_count.value, max_threads_to_use)); /* Lock the command variables to ensure that threads don't start processing * before they observe complete command with all arguments */ - pthread_mutex_lock(&threadpool->command_mutex); +#if !PTHREADPOOL_USE_FUTEX + for (size_t tid = 1; tid < num_threads_to_use.value; tid++) { + pthread_mutex_lock(&(threadpool->threads[tid].command_mutex)); + } #endif /* Setup global arguments */ @@ -352,9 +375,11 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Locking of completion_mutex not needed: readers are sleeping on * command_condvar */ - const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, - threads_count.value - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t( + &threadpool->active_threads, + num_threads_to_use.value - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + num_threads_to_use.value); #if PTHREADPOOL_USE_FUTEX pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif @@ -366,9 +391,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Spread the work between threads */ const struct fxdiv_result_size_t range_params = - fxdiv_divide_size_t(linear_range, threads_count); + fxdiv_divide_size_t(linear_range, num_threads_to_use); size_t range_start = 0; - for (size_t tid = 0; tid < threads_count.value; tid++) { + for (size_t tid = 0; tid < num_threads_to_use.value; tid++) { struct thread_info* thread = &threadpool->threads[tid]; const size_t range_length = range_params.quotient + (size_t)(tid < range_params.remainder); @@ -381,40 +406,44 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( range_start = range_end; } - /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, - * argument, flags) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in - * command mask to ensure the unmasked command is different then the last - * command, because worker threads monitor for change in the unmasked command. - */ - const uint32_t old_command = - pthreadpool_load_relaxed_uint32_t(&threadpool->command); - const uint32_t new_command = - ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; - - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated command - * parameters. - * - * Note: release semantics is necessary even with a conditional variable, - * because the workers might be waiting in a spin-loop rather than the - * conditional variable. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, new_command); + for (size_t tid = 1; tid < num_threads_to_use.value; tid++) { + /* + * Update the threadpool command. + * Imporantly, do it after initializing command parameters (range, task, + * argument, flags) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in + * command mask to ensure the unmasked command is different then the last + * command, because worker threads monitor for change in the unmasked + * command. + */ + const uint32_t old_command = + pthreadpool_load_relaxed_uint32_t(&(threadpool->threads[tid].command)); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | + threadpool_command_parallelize; + + /* + * Store the command with release semantics to guarantee that if a worker + * thread observes the new command value, it also observes the updated + * command parameters. + * + * Note: release semantics is necessary even with a conditional variable, + * because the workers might be waiting in a spin-loop rather than the + * conditional variable. + */ + pthreadpool_store_release_uint32_t(&(threadpool->threads[tid].command), + new_command); #if PTHREADPOOL_USE_FUTEX - /* Wake up the threads */ - futex_wake_all(&threadpool->command); + /* Wake up the threads */ + futex_wake_all(&(threadpool->threads[tid].command)); #else - /* Unlock the command variables before waking up the threads for better - * performance */ - pthread_mutex_unlock(&threadpool->command_mutex); + /* Unlock the command variables before waking up the threads for better + * performance */ + pthread_mutex_unlock(&(threadpool->threads[tid].command_mutex)); - /* Wake up the threads */ - pthread_cond_broadcast(&threadpool->command_condvar); + /* Wake up the threads */ + pthread_cond_broadcast(&(threadpool->threads[tid].command_condvar)); #endif + } /* Save and modify FPU denormals control, if needed */ struct fpu_state saved_fpu_state = {0}; @@ -450,41 +479,48 @@ PTHREADPOOL_WEAK void pthreadpool_destroy(struct pthreadpool* threadpool) { threads_count - 1 /* caller thread */); pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated - * active_threads/has_active_threads values. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, - threadpool_command_shutdown); - /* Wake up worker threads */ - futex_wake_all(&threadpool->command); + for (size_t tid = 1; tid < threads_count; tid++) { + /* + * Store the command with release semantics to guarantee that if a + * worker thread observes the new command value, it also observes the + * updated active_threads/has_active_threads values. + */ + pthreadpool_store_release_uint32_t(&threadpool->threads[tid].command, + threadpool_command_shutdown); + + futex_wake_all(&threadpool->threads[tid].command); + } #else - /* Lock the command variable to ensure that threads don't shutdown until - * both command and active_threads are updated */ - pthread_mutex_lock(&threadpool->command_mutex); - + for (size_t tid = 1; tid < threads_count; tid++) { + /* Lock the command variable to ensure that threads don't shutdown until + * both command and active_threads are updated */ + pthread_mutex_lock(&threadpool->threads[tid].command_mutex); + } pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated - * active_threads value. - * - * Note: the release fence inside pthread_mutex_unlock is insufficient, - * because the workers might be waiting in a spin-loop rather than the - * conditional variable. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, - threadpool_command_shutdown); - - /* Wake up worker threads */ - pthread_cond_broadcast(&threadpool->command_condvar); + for (size_t tid = 1; tid < threads_count; tid++) { + /* + * Store the command with release semantics to guarantee that if a + * worker thread observes the new command value, it also observes the + * updated active_threads value. + * + * Note: the release fence inside pthread_mutex_unlock is insufficient, + * because the workers might be waiting in a spin-loop rather than the + * conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->threads[tid].command, + threadpool_command_shutdown); + + /* Wake up worker threads */ + pthread_cond_broadcast(&threadpool->threads[tid].command_condvar); + } - /* Commit the state changes and let workers start processing */ - pthread_mutex_unlock(&threadpool->command_mutex); + for (size_t tid = 1; tid < threads_count; tid++) { + /* Commit the state changes and let workers start processing */ + pthread_mutex_unlock(&threadpool->threads[tid].command_mutex); + } #endif /* Wait until all threads return */ @@ -497,8 +533,10 @@ PTHREADPOOL_WEAK void pthreadpool_destroy(struct pthreadpool* threadpool) { #if !PTHREADPOOL_USE_FUTEX pthread_mutex_destroy(&threadpool->completion_mutex); pthread_cond_destroy(&threadpool->completion_condvar); - pthread_mutex_destroy(&threadpool->command_mutex); - pthread_cond_destroy(&threadpool->command_condvar); + for (size_t tid = 0; tid < threads_count.value; tid++) { + pthread_mutex_destroy(&threadpool->threads[tid].command_mutex); + pthread_cond_destroy(&threadpool->threads[tid].command_condvar); + } #endif } #if PTHREADPOOL_USE_CPUINFO diff --git a/src/threadpool-common.h b/src/threadpool-common.h index 087cda1..0fab42e 100644 --- a/src/threadpool-common.h +++ b/src/threadpool-common.h @@ -116,4 +116,21 @@ #endif #endif +// ported from: +// https://stackoverflow.com/questions/18298280/how-to-declare-a-variable-as-thread-local-portably +/* gcc doesn't know _Thread_local from C11 yet */ +#ifdef __GNUC__ +#define thread_local __thread +/* +// c11 standard already has thread_local specified +// https://en.cppreference.com/w/c/thread/thread_local +#elif __STDC_VERSION__ >= 201112L +# define thread_local _Thread_local +*/ +#elif defined(_MSC_VER) +#define thread_local __declspec(thread) +#else +#error Cannot define thread_local +#endif + #endif // __PTHREADPOOL_SRC_THREADPOOL_COMMON_H_ diff --git a/src/threadpool-object.h b/src/threadpool-object.h index cde54d3..bab3e58 100644 --- a/src/threadpool-object.h +++ b/src/threadpool-object.h @@ -91,6 +91,32 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { */ HANDLE thread_handle; #endif + +#if !PTHREADPOOL_USE_GCD + /** + * The last command submitted to the thread pool. + */ + pthreadpool_atomic_uint32_t command; +#endif +#if PTHREADPOOL_USE_CONDVAR + /** + * Guards access to the @a command variable. + */ + pthread_mutex_t command_mutex; + /** + * Condition variable to wait for change of the @a command variable. + */ + pthread_cond_t command_condvar; +#endif +#if PTHREADPOOL_USE_EVENT + /** + * Events to wait on for change of the @a command variable. + * To avoid race conditions due to spin-lock synchronization, we use two + * events and switch event in use after every submitted command according to + * the high bit of the command word. + */ + HANDLE command_event[2]; +#endif }; PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % @@ -1056,6 +1082,13 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * The number of threads that are processing an operation. */ pthreadpool_atomic_size_t active_threads; + /* + * Enable restricting task parallelization among a subset of + * pthreadpool threads. + * As per this change, this feature is not available in GCD based + * pthreadpool + */ + pthreadpool_atomic_size_t num_threads_to_use; #endif #if PTHREADPOOL_USE_FUTEX /** @@ -1065,12 +1098,6 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * - has_active_threads == 1 if active_threads != 0 */ pthreadpool_atomic_uint32_t has_active_threads; -#endif -#if !PTHREADPOOL_USE_GCD - /** - * The last command submitted to the thread pool. - */ - pthreadpool_atomic_uint32_t command; #endif /** * The entry point function to call for each thread in the thread pool for @@ -1169,14 +1196,6 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * @a active_threads is zero). */ pthread_cond_t completion_condvar; - /** - * Guards access to the @a command variable. - */ - pthread_mutex_t command_mutex; - /** - * Condition variable to wait for change of the @a command variable. - */ - pthread_cond_t command_condvar; #endif #if PTHREADPOOL_USE_EVENT /** @@ -1185,14 +1204,8 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * synchronization, we use two events and switch event in use after every * submitted command according to the high bit of the command word. */ + uint32_t completion_event_index; HANDLE completion_event[2]; - /** - * Events to wait on for change of the @a command variable. - * To avoid race conditions due to spin-lock synchronization, we use two - * events and switch event in use after every submitted command according to - * the high bit of the command word. - */ - HANDLE command_event[2]; #endif /** * FXdiv divisor for the number of threads in the thread pool. diff --git a/src/windows.c b/src/windows.c index d3e9575..df79d83 100644 --- a/src/windows.c +++ b/src/windows.c @@ -16,6 +16,7 @@ /* Configuration header */ #include + #include "threadpool-common.h" /* Windows headers */ @@ -32,6 +33,8 @@ #include "threadpool-object.h" #include "threadpool-utils.h" +thread_local size_t max_num_threads = UINT_MAX; + static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) { if (pthreadpool_decrement_fetch_acquire_release_size_t( @@ -67,10 +70,10 @@ static void wait_worker_threads(struct pthreadpool* threadpool, assert(pthreadpool_load_relaxed_size_t(&threadpool->active_threads) == 0); } -static uint32_t wait_for_new_command(struct pthreadpool* threadpool, +static uint32_t wait_for_new_command(struct thread_info* thread, uint32_t last_command, uint32_t last_flags) { - uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + uint32_t command = pthreadpool_load_acquire_uint32_t(&thread->command); if (command != last_command) { return command; } @@ -80,7 +83,7 @@ static uint32_t wait_for_new_command(struct pthreadpool* threadpool, for (uint32_t i = 0; i < PTHREADPOOL_SPIN_WAIT_ITERATIONS; i++) { pthreadpool_yield(i); - command = pthreadpool_load_acquire_uint32_t(&threadpool->command); + command = pthreadpool_load_acquire_uint32_t(&thread->command); if (command != last_command) { return command; } @@ -90,10 +93,10 @@ static uint32_t wait_for_new_command(struct pthreadpool* threadpool, /* Spin-wait disabled or timed out, fall back to event wait */ const uint32_t event_index = (last_command >> 31); const DWORD wait_status = - WaitForSingleObject(threadpool->command_event[event_index], INFINITE); + WaitForSingleObject(thread->command_event[event_index], INFINITE); assert(wait_status == WAIT_OBJECT_0); - command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + command = pthreadpool_load_relaxed_uint32_t(&thread->command); assert(command != last_command); return command; } @@ -110,7 +113,7 @@ static DWORD WINAPI thread_main(LPVOID arg) { /* Monitor new commands and act accordingly */ for (;;) { - uint32_t command = wait_for_new_command(threadpool, last_command, flags); + uint32_t command = wait_for_new_command(thread, last_command, flags); pthreadpool_fence_acquire(); flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); @@ -142,6 +145,9 @@ static DWORD WINAPI thread_main(LPVOID arg) { /* Notify the master thread that we finished processing */ const uint32_t event_index = command >> 31; checkin_worker_thread(threadpool, event_index); + const uint32_t completion_event_index = + pthreadpool_load_relaxed_uint32_t(&threadpool->completion_event_index); + checkin_worker_thread(threadpool, completion_event_index); /* Update last command */ last_command = command; }; @@ -161,6 +167,8 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; } threadpool->threads_count = fxdiv_init_size_t(threads_count); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + threads_count); for (size_t tid = 0; tid < threads_count; tid++) { threadpool->threads[tid].thread_number = tid; threadpool->threads[tid].threadpool = threadpool; @@ -176,13 +184,11 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { threadpool->completion_event[i] = CreateEventW( NULL /* event attributes */, TRUE /* manual-reset event: yes */, FALSE /* initial state: non-signaled */, NULL /* name */); - threadpool->command_event[i] = CreateEventW( - NULL /* event attributes */, TRUE /* manual-reset event: yes */, - FALSE /* initial state: non-signaled */, NULL /* name */); } pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->completion_event_index, 0); /* Caller thread serves as worker #0. Thus, we create system threads * starting with worker #1. */ @@ -191,6 +197,11 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { NULL /* thread attributes */, 0 /* stack size: default */, &thread_main, &threadpool->threads[tid], 0 /* creation flags */, NULL /* thread id */); + for (size_t i = 0; i < 2; i++) { + threadpool->threads[tid].command_event[i] = CreateEventW( + NULL /* event attributes */, TRUE /* manual-reset event: yes */, + FALSE /* initial state: nonsignaled */, NULL /* name */); + } } /* Wait until all threads initialize */ @@ -199,6 +210,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return threadpool; } +void pthreadpool_set_num_threads_to_use(size_t num_threads) { + max_num_threads = num_threads; +} + +size_t pthreadpool_get_num_threads_to_use() { return max_num_threads; } + PTHREADPOOL_INTERNAL void pthreadpool_parallelize( struct pthreadpool* threadpool, thread_function_t thread_function, const void* params, size_t params_size, void* task, void* context, @@ -221,8 +238,14 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, - threads_count.value - 1 /* caller thread */); + size_t max_threads_to_use = pthreadpool_get_num_threads_to_use(); + const struct fxdiv_divisor_size_t num_threads_to_use = + fxdiv_init_size_t(min(threads_count.value, max_threads_to_use)); + pthreadpool_store_relaxed_size_t( + &threadpool->active_threads, + num_threads_to_use.value - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, + num_threads_to_use.value); if (params_size != 0) { CopyMemory(&threadpool->params, params, params_size); @@ -231,9 +254,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Spread the work between threads */ const struct fxdiv_result_size_t range_params = - fxdiv_divide_size_t(linear_range, threads_count); + fxdiv_divide_size_t(linear_range, num_threads_to_use); size_t range_start = 0; - for (size_t tid = 0; tid < threads_count.value; tid++) { + for (size_t tid = 0; tid < num_threads_to_use.value; tid++) { struct thread_info* thread = &threadpool->threads[tid]; const size_t range_length = range_params.quotient + (size_t)(tid < range_params.remainder); @@ -246,52 +269,62 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( range_start = range_end; } - /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, - * argument, flags) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in - * command mask to ensure the unmasked command is different then the last - * command, because worker threads monitor for change in the unmasked command. - */ - const uint32_t old_command = - pthreadpool_load_relaxed_uint32_t(&threadpool->command); - const uint32_t new_command = - ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; - - /* - * Reset the command event for the next command. - * It is important to reset the event before writing out the new command, - * because as soon as the worker threads observe the new command, they may - * process it and switch to waiting on the next command event. - * - * Note: the event is different from the command event signalled in this - * update. - */ - const uint32_t event_index = (old_command >> 31); - BOOL reset_event_status = - ResetEvent(threadpool->command_event[event_index ^ 1]); - assert(reset_event_status != FALSE); - - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated command - * parameters. - * - * Note: release semantics is necessary, because the workers might be waiting - * in a spin-loop rather than on the event object. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, new_command); - - /* - * Signal the event to wake up the threads. - * Event in use must be switched after every submitted command to avoid race - * conditions. Choose the event based on the high bit of the command, which is - * flipped on every update. - */ - const BOOL set_event_status = - SetEvent(threadpool->command_event[event_index]); - assert(set_event_status != FALSE); + uint32_t completion_event_index = + pthreadpool_load_relaxed_uint32_t(&threadpool->completion_event_index); + completion_event_index = completion_event_index ^ 1; + pthreadpool_store_relaxed_size_t(&threadpool->completion_event_index, + completion_event_index); + + for (size_t tid = 1; tid < num_threads_to_use.value; tid++) { + /* + * Update the threadpool command. + * Importantly, do it after initializing command parameters (range, task, + * argument, flags) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in + * command mask to ensure the unmasked command is different then the last + * command, because worker threads monitor for change in the unmasked + * command. + */ + const uint32_t old_command = + pthreadpool_load_relaxed_uint32_t(&threadpool->threads[tid].command); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | + threadpool_command_parallelize; + + /* + * Reset the command event for the next command. + * It is important to reset the event before writing out the new command, + * because as soon as the worker threads observe the new command, they may + * process it and switch to waiting on the next command event. + * + * Note: the event is different from the command event signalled in this + * update. + */ + const uint32_t event_index = (old_command >> 31); + BOOL reset_event_status = + ResetEvent(threadpool->threads[tid].command_event[event_index ^ 1]); + assert(reset_event_status != FALSE); + + /* + * Store the command with release semantics to guarantee that if a worker + * thread observes the new command value, it also observes the updated + * command parameters. + * + * Note: release semantics is necessary, because the workers might be + * waiting in a spin-loop rather than on the event object. + */ + pthreadpool_store_release_uint32_t(&threadpool->threads[tid].command, + new_command); + + /* + * Signal the event to wake up the threads. + * Event in use must be switched after every submitted command to avoid race + * conditions. Choose the event based on the high bit of the command, which + * is flipped on every update. + */ + const BOOL set_event_status = + SetEvent(threadpool->threads[tid].command_event[event_index]); + assert(set_event_status != FALSE); + } /* Save and modify FPU denormals control, if needed */ struct fpu_state saved_fpu_state = {0}; @@ -312,13 +345,14 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( * Wait until the threads finish computation * Use the complementary event because it corresponds to the new command. */ - wait_worker_threads(threadpool, event_index ^ 1); + wait_worker_threads(threadpool, completion_event_index); /* * Reset the completion event for the next command. * Note: the event is different from the one used for waiting in this update. */ - reset_event_status = ResetEvent(threadpool->completion_event[event_index]); + BOOL reset_event_status = + ResetEvent(threadpool->completion_event[completion_event_index ^ 1]); assert(reset_event_status != FALSE); /* Make changes by other threads visible to this thread */ @@ -336,26 +370,28 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - /* - * Store the command with release semantics to guarantee that if a worker - * thread observes the new command value, it also observes the updated - * active_threads values. - */ - const uint32_t old_command = - pthreadpool_load_relaxed_uint32_t(&threadpool->command); - pthreadpool_store_release_uint32_t(&threadpool->command, - threadpool_command_shutdown); - - /* - * Signal the event to wake up the threads. - * Event in use must be switched after every submitted command to avoid - * race conditions. Choose the event based on the high bit of the command, - * which is flipped on every update. - */ - const uint32_t event_index = (old_command >> 31); - const BOOL set_event_status = - SetEvent(threadpool->command_event[event_index]); - assert(set_event_status != FALSE); + for (size_t tid = 1; tid < threads_count; tid++) { + /* + * Store the command with release semantics to guarantee that if a + * worker thread observes the new command value, it also observes the + * updated active_threads values. + */ + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t( + &threadpool->threads[tid].command); + pthreadpool_store_release_uint32_t(&threadpool->threads[tid].command, + threadpool_command_shutdown); + + /* + * Signal the event to wake up the threads. + * Event in use must be switched after every submitted command to avoid + * race conditions. Choose the event based on the high bit of the + * command, which is flipped on every update. + */ + const uint32_t event_index = (old_command >> 31); + const BOOL set_event_status = + SetEvent(threadpool->threads[tid].command_event[event_index]); + assert(set_event_status != FALSE); + } /* Wait until all threads return */ for (size_t tid = 1; tid < threads_count; tid++) { @@ -368,6 +404,13 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { const BOOL close_status = CloseHandle(thread_handle); assert(close_status != FALSE); } + for (size_t i = 0; i < 2; i++) { + if (threadpool->threads[tid].command_event[i] != NULL) { + const BOOL close_status = + CloseHandle(threadpool->threads[tid].command_event[i]); + assert(close_status != FALSE); + } + } } /* Release resources */ @@ -376,10 +419,6 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { assert(close_status != FALSE); } for (size_t i = 0; i < 2; i++) { - if (threadpool->command_event[i] != NULL) { - const BOOL close_status = CloseHandle(threadpool->command_event[i]); - assert(close_status != FALSE); - } if (threadpool->completion_event[i] != NULL) { const BOOL close_status = CloseHandle(threadpool->completion_event[i]); diff --git a/test/pthreadpool.cc b/test/pthreadpool.cc index 687c9c9..52be0cc 100644 --- a/test/pthreadpool.cc +++ b/test/pthreadpool.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include // NOLINT #include @@ -13976,3 +13977,335 @@ TEST(Parallelize6DTile2D, MultiThreadPoolWorkStealing) { kParallelize6DTile2DRangeK * kParallelize6DTile2DRangeL * kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); } + +struct array_addition_context { + double* augend; + double* addend; + double* sum; + std::mutex m; + int num_threads; + int* thread_ids; +}; + +static void add_arrays(struct array_addition_context* context, size_t start_i, + size_t tile_i) { +#if defined(__linux__) || defined(__EMSCRIPTEN__) || defined(_WIN32) || \ + defined(__CYGWIN__) + { + int thread_id; +#if defined(_WIN32) || defined(__CYGWIN__) + thread_id = GetCurrentThreadId(); +#else + thread_id = pthread_self(); +#endif + std::lock_guard l(context->m); + for (int i = 0; i < context->num_threads; i++) { + if (context->thread_ids[i] == thread_id) { + break; + } else if (context->thread_ids[i] == 0) { + context->thread_ids[i] = thread_id; + break; + } + } + } +#endif + for (size_t i = start_i; i < start_i + tile_i; ++i) { + context->sum[i] = context->augend[i] + context->addend[i]; + } +} + +void init_context(double* augend, double* addend, double* sum, int* thread_ids, + int num_threads, struct array_addition_context* context, + double* ref_sum, double init_val = 1.5) { + double val = init_val; + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + augend[i] = val; + addend[i] = val; + ref_sum[i] = 2 * val; + } + for (size_t i = 0; i < num_threads; ++i) { + thread_ids[i] = 0; + } + + context->augend = augend; + context->addend = addend; + context->sum = sum; + context->num_threads = num_threads; + context->thread_ids = thread_ids; +} + +void check_num_threads_used(int* thread_ids, int num_threads, + int num_threads_used) { +#if defined(__linux__) || defined(__EMSCRIPTEN__) || defined(_WIN32) || \ + defined(__CYGWIN__) + int total_threads = 0; + for (size_t i = 0; i < num_threads; ++i) { + if (thread_ids[i] != 0) { + total_threads += 1; + } + } + EXPECT_EQ(total_threads, num_threads_used); +#endif +} + +/* + * Check num threads used is little flaky since work stealing can result in + * one thread stealing the work from the other thread as a result of which the + * other thread will never pick up the work. If it does not pick up the work, it + * will not set the thread_ids appropriately. One way to fix this is to make + * workstealing compile time option and disable it for testing. Not in favor of + * it though, so maybe need a better way of testing. + */ +TEST(CapNumThreadsTest, RunUnderCapacity) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(2); + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 2); +} + +TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes1) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(2); + pthreadpool_set_num_threads_to_use(3); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 3); +} + +TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes2) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(2); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 2); + + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum, + 2.3); + pthreadpool_set_num_threads_to_use(3); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 3); +} + +TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes3) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(1); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 1); + + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum, + 2.3); + pthreadpool_set_num_threads_to_use(4); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 4); +} + +TEST(CapNumThreadsTest, RunAtCapacity) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(num_threads); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, num_threads); +} + +TEST(CapNumThreadsTest, RunOverCapacity) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(16); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, num_threads); +} + +TEST(CapNumThreadsTest, RunSingleThreaded) { + double augend[kParallelize1DTile1DRange]; + double addend[kParallelize1DTile1DRange]; + double sum[kParallelize1DTile1DRange]; + double ref_sum[kParallelize1DTile1DRange]; + int num_threads = 4; + int thread_ids[num_threads]; + + struct array_addition_context context; + init_context(augend, addend, sum, thread_ids, num_threads, &context, ref_sum); + + auto_pthreadpool_t threadpool(pthreadpool_create(num_threads), + pthreadpool_destroy); + ASSERT_TRUE(threadpool.get()); + pthreadpool_set_num_threads_to_use(1); + + if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { + GTEST_SKIP(); + } + + pthreadpool_parallelize_1d_tile_1d(threadpool.get(), + (pthreadpool_task_1d_tile_1d_t)add_arrays, + (void*)&context, kParallelize1DTile1DRange, + kParallelize1DTile1DTile, 0 /* flags */); + + for (size_t i = 0; i < kParallelize1DTile1DRange; ++i) { + EXPECT_LT(abs(context.sum[i] - ref_sum[i]), 1e-5); + } + check_num_threads_used(thread_ids, num_threads, 1); +}