From 0e6ca13779b57d397a5ba6bfdcaa8a275bc8ea2e Mon Sep 17 00:00:00 2001 From: Pedro Gonnet Date: Mon, 20 Oct 2025 09:52:56 -0700 Subject: [PATCH] Use sentinel `thread_info`s with zero tiles/elements so that threads `[0, num_threads)` are always assigned to the same `task_info`, and threads with `thread_id >= num_threads` always steal. PiperOrigin-RevId: 821689473 --- src/fastpath.c | 253 +++++++++++++++++++++++------------------ src/portable-api.c | 196 +++++++++++++++++-------------- src/pthreads.c | 24 ++-- test/pthreadpool.cc | 36 ++++-- test/pthreadpool_v2.cc | 36 ++++-- 5 files changed, 319 insertions(+), 226 deletions(-) diff --git a/src/fastpath.c b/src/fastpath.c index 0e803b9..693f74c 100644 --- a/src/fastpath.c +++ b/src/fastpath.c @@ -9,6 +9,7 @@ /* Standard C headers */ #include +#include #include #include #include @@ -40,7 +41,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start); @@ -51,9 +52,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t index = @@ -78,7 +80,7 @@ pthreadpool_thread_parallelize_1d_with_thread_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t thread_number = thread->thread_number; @@ -89,9 +91,10 @@ pthreadpool_thread_parallelize_1d_with_thread_fastpath( } /* There still may be other threads with work */ - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t index = @@ -127,7 +130,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_with_uarch_fastpath( #endif const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start); @@ -138,9 +141,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_with_uarch_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t index = @@ -164,7 +168,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_tile_1d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -181,9 +185,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_tile_1d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t tile_index = @@ -207,7 +212,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -230,9 +235,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -259,7 +265,7 @@ pthreadpool_thread_parallelize_2d_with_thread_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -282,9 +288,10 @@ pthreadpool_thread_parallelize_2d_with_thread_fastpath( } /* There still may be other threads with work */ - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -310,7 +317,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_1d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -336,9 +343,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_1d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -379,7 +387,7 @@ pthreadpool_thread_parallelize_2d_tile_1d_with_uarch_fastpath( #endif const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -407,9 +415,10 @@ pthreadpool_thread_parallelize_2d_tile_1d_with_uarch_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -450,7 +459,7 @@ pthreadpool_thread_parallelize_2d_tile_1d_with_uarch_with_thread_fastpath( #endif const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -479,9 +488,10 @@ pthreadpool_thread_parallelize_2d_tile_1d_with_uarch_with_thread_fastpath( } /* There still may be other threads with work */ - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -509,7 +519,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -538,9 +548,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -582,7 +593,7 @@ pthreadpool_thread_parallelize_2d_tile_2d_with_uarch_fastpath( #endif const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const struct fxdiv_divisor_size_t tile_range_j = @@ -615,9 +626,10 @@ pthreadpool_thread_parallelize_2d_tile_2d_with_uarch_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -645,7 +657,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -676,9 +688,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -707,7 +720,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_1d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -741,9 +754,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_1d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -774,7 +788,7 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_thread_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -809,9 +823,10 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_thread_fastpath( } /* There still may be other threads with work */ - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -854,7 +869,7 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_uarch_fastpath( #endif const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -890,9 +905,10 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_uarch_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -935,7 +951,7 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_uarch_with_thread_fastpath( #endif const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -972,9 +988,10 @@ pthreadpool_thread_parallelize_3d_tile_1d_with_uarch_with_thread_fastpath( } /* There still may be other threads with work */ - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1004,7 +1021,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1042,9 +1059,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1088,7 +1106,7 @@ pthreadpool_thread_parallelize_3d_tile_2d_with_uarch_fastpath( #endif const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1130,9 +1148,10 @@ pthreadpool_thread_parallelize_3d_tile_2d_with_uarch_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1162,7 +1181,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1202,9 +1221,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1235,7 +1255,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_1d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1278,9 +1298,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_1d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1312,7 +1333,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1358,9 +1379,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1406,7 +1428,7 @@ pthreadpool_thread_parallelize_4d_tile_2d_with_uarch_fastpath( #endif const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1456,9 +1478,10 @@ pthreadpool_thread_parallelize_4d_tile_2d_with_uarch_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1491,7 +1514,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1539,9 +1562,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1574,7 +1598,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_1d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1625,9 +1649,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_1d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1662,7 +1687,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_2d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1716,9 +1741,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_2d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1753,7 +1779,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1809,9 +1835,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1847,7 +1874,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_1d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -1906,9 +1933,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_1d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = @@ -1945,7 +1973,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_2d_fastpath( void* const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const size_t threads_count = threadpool->threads_count; - const size_t range_threshold = -threads_count; + const size_t range_threshold = -threadpool->max_num_threads; /* Process thread's own range of items */ const size_t range_start = @@ -2008,9 +2036,10 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_2d_fastpath( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while (pthreadpool_decrement_fetch_relaxed_size_t( &other_thread->range_length) < range_threshold) { const size_t linear_index = diff --git a/src/portable-api.c b/src/portable-api.c index 68e2f2f..d0475e1 100644 --- a/src/portable-api.c +++ b/src/portable-api.c @@ -89,9 +89,10 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t index = @@ -123,9 +124,10 @@ static void thread_parallelize_1d_with_thread(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t index = @@ -169,9 +171,10 @@ static void thread_parallelize_1d_with_uarch(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t index = @@ -209,9 +212,10 @@ static void thread_parallelize_1d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t tile_index = @@ -433,9 +437,10 @@ static void thread_parallelize_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -481,9 +486,10 @@ static void thread_parallelize_2d_with_thread(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -532,9 +538,10 @@ static void thread_parallelize_2d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -599,9 +606,10 @@ static void thread_parallelize_2d_tile_1d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -667,9 +675,10 @@ static void thread_parallelize_2d_tile_1d_with_uarch_with_thread( /* There still may be other threads with work */ const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -943,9 +952,10 @@ static void thread_parallelize_2d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -1016,9 +1026,10 @@ static void thread_parallelize_2d_tile_2d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -1352,9 +1363,10 @@ static void thread_parallelize_3d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -1414,9 +1426,10 @@ static void thread_parallelize_3d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -1478,9 +1491,10 @@ static void thread_parallelize_3d_tile_1d_with_thread( /* There still may be other threads with work */ const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -1555,9 +1569,10 @@ static void thread_parallelize_3d_tile_1d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -1633,9 +1648,10 @@ static void thread_parallelize_3d_tile_1d_with_uarch_with_thread( /* There still may be other threads with work */ const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -1939,9 +1955,10 @@ static void thread_parallelize_3d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -2023,9 +2040,10 @@ static void thread_parallelize_3d_tile_2d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -2413,9 +2431,10 @@ static void thread_parallelize_4d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -2486,9 +2505,10 @@ static void thread_parallelize_4d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -2563,9 +2583,10 @@ static void thread_parallelize_4d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -2657,9 +2678,10 @@ static void thread_parallelize_4d_tile_2d_with_uarch( /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -2976,9 +2998,10 @@ static void thread_parallelize_5d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -3059,9 +3082,10 @@ static void thread_parallelize_5d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -3147,9 +3171,10 @@ static void thread_parallelize_5d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -3237,9 +3262,10 @@ static void thread_parallelize_6d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -3331,9 +3357,10 @@ static void thread_parallelize_6d_tile_1d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = @@ -3430,9 +3457,10 @@ static void thread_parallelize_6d_tile_2d(struct pthreadpool* threadpool, /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; const size_t threads_count = threadpool->threads_count; - for (size_t tid = modulo_decrement(thread_number, threads_count); - tid != thread_number; tid = modulo_decrement(tid, threads_count)) { - struct thread_info* other_thread = &threadpool->threads[tid]; + for (size_t tid = (thread_number < threads_count) ? 1 : 0; + tid < threads_count; tid++) { + struct thread_info* other_thread = + &threadpool->threads[(thread_number + tid) % threads_count]; while ( pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t linear_index = diff --git a/src/pthreads.c b/src/pthreads.c index 03bbddf..3db063b 100644 --- a/src/pthreads.c +++ b/src/pthreads.c @@ -111,7 +111,7 @@ static size_t pthreadpool_get_ticks(size_t since) { return __rdtsc() - since; } #define pthreadpool_log_debug(format, ...) \ fprintf(stderr, "[%zu] %s (%s:%i): " format "\n", pthreadpool_get_ticks(0), \ - __FUNCTION__, __FILE__, __LINE__ - 1, ##__VA_ARGS__); + __FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__); #else #define pthreadpool_log_debug(format, ...) #endif // PTHREADPOOL_DEBUG_LOGGING @@ -502,6 +502,8 @@ static pthreadpool_thread_return_t thread_main(void* arg) { pthreadpool_load_consume_int32_t(&threadpool->num_active_threads); } + pthreadpool_log_debug("thread %u entering main loop.", thread_id); + // Main loop. while (true) { if (curr_active_threads == PTHREADPOOL_NUM_ACTIVE_THREADS_DONE) { @@ -545,16 +547,10 @@ static pthreadpool_thread_return_t thread_main(void* arg) { const uint32_t max_active_threads = pthreadpool_load_acquire_size_t(&threadpool->threads_count); if (curr_active_threads < max_active_threads) { - const uint32_t assumed_thread_id = - (max_active_threads < threadpool->max_num_threads) - ? curr_active_threads - : thread_id; - // Do the needful. - pthreadpool_log_debug("thread %u working on job %u as thread %u.", - thread_id, threadpool->job_id, - assumed_thread_id); - run_thread_function(threadpool, assumed_thread_id); + pthreadpool_log_debug("thread %u working on job %u.", thread_id, + threadpool->job_id); + run_thread_function(threadpool, thread_id); } // Ring the bell on the way out. @@ -715,6 +711,14 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( range_start = range_end; } + // Populate sentinels. + for (size_t tid = num_threads; tid < threadpool->max_num_threads; tid++) { + struct thread_info* thread = &threadpool->threads[tid]; + pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_end, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_length, 0); + } + /* Make changes by this thread visible to other threads. */ pthreadpool_fence_release(); diff --git a/test/pthreadpool.cc b/test/pthreadpool.cc index 7d5b97d..e42879c 100644 --- a/test/pthreadpool.cc +++ b/test/pthreadpool.cc @@ -18,6 +18,7 @@ #include #include // NOLINT #include +#include #include // NOLINT #include @@ -120,14 +121,12 @@ static void WorkImbalance(std::atomic_int* num_processed_items, static std::mutex mutex; // NOLINT(build/c++11) static std::condition_variable cond_var; std::unique_lock lock(mutex); // NOLINT(build/c++11) - if (num_processed_items->fetch_add(increment, std::memory_order_acquire) + - increment == - total) { + if (num_processed_items->fetch_add(increment) + increment == total) { cond_var.notify_all(); } if (wait) { /* Wait until all items are computed */ - while (num_processed_items->load(std::memory_order_relaxed) != total) { + while (num_processed_items->load() != total) { cond_var.wait(lock); } } @@ -14165,8 +14164,17 @@ TEST(Parallelize6DTile2D, MultiThreadPoolWorkStealing) { kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); } -static void CheckThreadID(size_t num_threads, size_t thread_id, size_t) { - EXPECT_LT(thread_id, num_threads); +struct CheckThreadIDData { + CheckThreadIDData(size_t num_threads) : num_threads(num_threads) {} + size_t num_threads; + std::set thread_ids; +}; + +static void CheckThreadID(CheckThreadIDData* data, size_t thread_id, size_t) { + static std::mutex mutex; // NOLINT(build/c++11) + std::lock_guard lock(mutex); // NOLINT(build/c++11) + data->thread_ids.insert(thread_id); + ASSERT_LE(data->thread_ids.size(), data->num_threads); } TEST(SetNumThreads, ValidRange) { @@ -14190,10 +14198,12 @@ TEST(SetNumThreads, ValidRange) { num_threads); ASSERT_EQ(pthreadpool_get_threads_count(threadpool.get()), num_threads); + CheckThreadIDData data(num_threads); + pthreadpool_parallelize_1d_with_thread( threadpool.get(), reinterpret_cast(CheckThreadID), - (void*)num_threads, kParallelize1DRange, /*flags=*/0); + (void*)&data, kParallelize1DRange, /*flags=*/0); } } @@ -14218,10 +14228,12 @@ TEST(SetNumThreads, Maximum) { num_threads); ASSERT_EQ(pthreadpool_get_threads_count(threadpool.get()), num_threads); + CheckThreadIDData data1(num_threads); + pthreadpool_parallelize_1d_with_thread( threadpool.get(), reinterpret_cast(CheckThreadID), - (void*)num_threads, kParallelize1DRange, /*flags=*/0); + (void*)&data1, kParallelize1DRange, /*flags=*/0); // Set the maximum of threads (kNumMultiThreads). ASSERT_EQ(pthreadpool_set_threads_count(threadpool.get(), 0), @@ -14229,10 +14241,12 @@ TEST(SetNumThreads, Maximum) { ASSERT_EQ(pthreadpool_get_threads_count(threadpool.get()), kNumMultiThreads); + CheckThreadIDData data2(kNumMultiThreads); + pthreadpool_parallelize_1d_with_thread( threadpool.get(), reinterpret_cast(CheckThreadID), - (void*)kNumMultiThreads, kParallelize1DRange, /*flags=*/0); + (void*)&data2, kParallelize1DRange, /*flags=*/0); } } @@ -14258,10 +14272,12 @@ TEST(SetNumThreads, TooHigh) { ASSERT_EQ(pthreadpool_get_threads_count(threadpool.get()), kNumMultiThreads); + CheckThreadIDData data(kNumMultiThreads); + pthreadpool_parallelize_1d_with_thread( threadpool.get(), reinterpret_cast(CheckThreadID), - (void*)num_threads, kParallelize1DRange, /*flags=*/0); + (void*)&data, kParallelize1DRange, /*flags=*/0); } } diff --git a/test/pthreadpool_v2.cc b/test/pthreadpool_v2.cc index 3d03230..227f3d0 100644 --- a/test/pthreadpool_v2.cc +++ b/test/pthreadpool_v2.cc @@ -17,6 +17,7 @@ #include #include // NOLINT #include +#include #include // NOLINT #include #include @@ -180,14 +181,12 @@ static void WorkImbalance(std::atomic_int* num_processed_items, static std::mutex mutex; // NOLINT(build/c++11) static std::condition_variable cond_var; std::unique_lock lock(mutex); // NOLINT(build/c++11) - if (num_processed_items->fetch_add(increment, std::memory_order_acquire) + - increment == - total) { + if (num_processed_items->fetch_add(increment) + increment == total) { cond_var.notify_all(); } if (wait) { /* Wait until all items are computed */ - while (num_processed_items->load(std::memory_order_relaxed) != total) { + while (num_processed_items->load() != total) { cond_var.wait(lock); } } @@ -10828,8 +10827,17 @@ TEST(Parallelize6DTile2D, MultiThreadPoolWorkStealing) { kParallelize6DTile2DRangeM * kParallelize6DTile2DRangeN); } -static void CheckThreadID(size_t num_threads, size_t thread_id, size_t) { - EXPECT_LT(thread_id, num_threads); +struct CheckThreadIDData { + CheckThreadIDData(size_t num_threads) : num_threads(num_threads) {} + size_t num_threads; + std::set thread_ids; +}; + +static void CheckThreadID(CheckThreadIDData* data, size_t thread_id, size_t) { + static std::mutex mutex; // NOLINT(build/c++11) + std::lock_guard lock(mutex); // NOLINT(build/c++11) + data->thread_ids.insert(thread_id); + ASSERT_LE(data->thread_ids.size(), data->num_threads); } TEST(SetNumThreads, ValidRange) { @@ -10857,10 +10865,12 @@ TEST(SetNumThreads, ValidRange) { num_threads); ASSERT_EQ(pthreadpool_get_threads_count(threadpool.get()), num_threads); + CheckThreadIDData data(num_threads); + pthreadpool_parallelize_1d_with_thread( threadpool.get(), reinterpret_cast(CheckThreadID), - (void*)num_threads, kParallelize1DRange, /*flags=*/0); + (void*)&data, kParallelize1DRange, /*flags=*/0); } } @@ -10891,20 +10901,24 @@ TEST(SetNumThreads, Maximum) { num_threads); ASSERT_EQ(pthreadpool_get_threads_count(threadpool.get()), num_threads); + CheckThreadIDData data1(num_threads); + pthreadpool_parallelize_1d_with_thread( threadpool.get(), reinterpret_cast(CheckThreadID), - (void*)num_threads, kParallelize1DRange, /*flags=*/0); + (void*)&data1, kParallelize1DRange, /*flags=*/0); // Set the maximum of threads ((kNumThreadpoolThreads + 1)). ASSERT_EQ(pthreadpool_set_threads_count(threadpool.get(), 0), max_num_threads); ASSERT_EQ(pthreadpool_get_threads_count(threadpool.get()), max_num_threads); + CheckThreadIDData data2(max_num_threads); + pthreadpool_parallelize_1d_with_thread( threadpool.get(), reinterpret_cast(CheckThreadID), - (void*)max_num_threads, kParallelize1DRange, /*flags=*/0); + (void*)&data2, kParallelize1DRange, /*flags=*/0); } } @@ -10930,10 +10944,12 @@ TEST(SetNumThreads, TooHigh) { max_num_threads); ASSERT_EQ(pthreadpool_get_threads_count(threadpool.get()), max_num_threads); + CheckThreadIDData data(max_num_threads); + pthreadpool_parallelize_1d_with_thread( threadpool.get(), reinterpret_cast(CheckThreadID), - (void*)num_threads, kParallelize1DRange, /*flags=*/0); + (void*)&data, kParallelize1DRange, /*flags=*/0); } }