diff --git a/include/pthreadpool.h b/include/pthreadpool.h index 6638942..07817c7 100644 --- a/include/pthreadpool.h +++ b/include/pthreadpool.h @@ -10,6 +10,7 @@ #ifndef __PTHREADPOOL_INCLUDE_PTHREADPOOL_H_ #define __PTHREADPOOL_INCLUDE_PTHREADPOOL_H_ +#include #include #include @@ -248,6 +249,26 @@ size_t pthreadpool_set_threads_count(pthreadpool_t threadpool, */ void pthreadpool_release_executor_threads(struct pthreadpool* threadpool); +/** + * Updates a thread pool with a given @a pthreadpool_executor. + * + * @param threadpool The thread pool in which to replace the executor. + * @param executor A pointer to a @a pthreadpool_executor object that + * will be used to determine the number of extra + * threads (plus the calling thread), and provide the + * threads itself, for each call to a + * `pthreadpool_parallelize_*` function. + * @param executor_context A pointer to the context that will be passed to the + * functions in the @a executor object. + * + * @return @c true if the @a executor was successfully swapped, and @c false if + * it was not, e.g. because the current and nex @a executor and @a + * executor_context are identical. + */ +bool pthreadpool_update_executor(pthreadpool_t threadpool, + struct pthreadpool_executor* executor, + void* executor_context); + /** * Process items on a 1D grid. * diff --git a/src/pthreads.c b/src/pthreads.c index 9454adf..210d0d1 100644 --- a/src/pthreads.c +++ b/src/pthreads.c @@ -73,6 +73,12 @@ #include #endif +/* Alloca */ +#if defined(_MSC_VER) +#include +#define alloca _alloca +#endif // defined(_MSC_VER) + /* Public library header */ #include @@ -593,6 +599,23 @@ struct pthreadpool* pthreadpool_create_v2(struct pthreadpool_executor* executor, return threadpool; } +static void wake_up_threads(void** contexts) { + struct pthreadpool* threadpool = (struct pthreadpool*)contexts[0]; + struct pthreadpool_executor* executor = &threadpool->executor; + for (uint32_t k = 1; contexts[k] != NULL; k++) { + if (contexts[k + 1] != NULL) { + /* Fly, my pretties! Fly, fly, fly! */ + executor->schedule(threadpool->executor_context, contexts[k], + (void (*)(void*))thread_main); + } else { + void* context = contexts[k]; + free(contexts); + thread_main(context); + return; + } + } +} + static void ensure_num_threads(struct pthreadpool* threadpool, uint32_t num_threads) { assert(num_threads >= 1); @@ -604,6 +627,9 @@ static void ensure_num_threads(struct pthreadpool* threadpool, return; } + void** thread_contexts = alloca(sizeof(void*) * num_threads); + int32_t num_threads_to_wake = 0; + /* Create any missing threads for this threadpool. */ for (uint32_t tid = 1; tid < num_threads && @@ -611,14 +637,27 @@ static void ensure_num_threads(struct pthreadpool* threadpool, tid++) { struct thread_info* thread = &threadpool->threads[tid]; - // Check whether this thread was active, and if not, start it up. + // Check whether this thread was active, and if not, add it to the list of + // threads that need starting. if (!pthreadpool_exchange_sequentially_consistent_uint32_t( &thread->is_active, 1)) { pthreadpool_register_threads(threadpool, 1); + thread_contexts[num_threads_to_wake++] = thread; + } + } + if (num_threads_to_wake > 1) { + void** contexts = malloc(sizeof(void*) * (num_threads_to_wake + 2)); + contexts[0] = threadpool; + memcpy(contexts + 1, thread_contexts, sizeof(void*) * num_threads_to_wake); + contexts[num_threads_to_wake + 1] = NULL; + /* Fly, my pretties! Fly, fly, fly! */ + executor->schedule(threadpool->executor_context, contexts, + (void (*)(void*))wake_up_threads); + } else if (num_threads_to_wake == 1) { + for (int k = 0; k < num_threads_to_wake; k++) { /* Fly, my pretties! Fly, fly, fly! */ - pthreadpool_log_debug("starting thread %u (arg=%p).", tid, thread); - executor->schedule(threadpool->executor_context, thread, + executor->schedule(threadpool->executor_context, thread_contexts[k], (void (*)(void*))thread_main); } } @@ -639,9 +678,11 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Make changes by other threads visible to this thread. */ pthreadpool_fence_acquire(); - /* Make sure the threadpool is idle. */ - assert(pthreadpool_load_consume_int32_t(&threadpool->num_active_threads) == - 0); + /* Make sure the threadpool is idle or done. */ + const int32_t num_active_threads = + pthreadpool_load_consume_int32_t(&threadpool->num_active_threads); + assert(num_active_threads == 0 || + num_active_threads == PTHREADPOOL_NUM_ACTIVE_THREADS_DONE); /* Setup global arguments */ pthreadpool_store_relaxed_void_p(&threadpool->thread_function, @@ -715,7 +756,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( static void pthreadpool_release_all_threads(struct pthreadpool* threadpool) { if (threadpool != NULL) { - assert(threadpool->num_active_threads == 0); + assert(threadpool->num_active_threads == 0 || + threadpool->num_active_threads == + PTHREADPOOL_NUM_ACTIVE_THREADS_DONE); // Set the state to "done". pthreadpool_store_sequentially_consistent_int32_t( @@ -726,13 +769,6 @@ static void pthreadpool_release_all_threads(struct pthreadpool* threadpool) { /* Wake up any thread waiting on a change of state. */ signal_num_active_threads(threadpool, 0); - - // Wait for any pending jobs to complete. - wait_on_num_recruited_threads(threadpool, 0); - - // Set the state back to "idle". - pthreadpool_store_sequentially_consistent_int32_t( - &threadpool->num_active_threads, 0); } } @@ -742,11 +778,35 @@ void pthreadpool_release_executor_threads(struct pthreadpool* threadpool) { } } +bool pthreadpool_update_executor(pthreadpool_t threadpool, + struct pthreadpool_executor* executor, + void* executor_context) { + /* Protect the global threadpool structures */ + pthreadpool_mutex_lock(&threadpool->execution_mutex); + + const bool res = threadpool->executor.num_threads != executor->num_threads || + threadpool->executor.schedule != executor->schedule || + threadpool->executor_context != executor_context; + if (res) { + pthreadpool_release_executor_threads(threadpool); + threadpool->executor = *executor; + threadpool->executor_context = executor_context; + } + + /* Unprotect the global threadpool structures now that we're done. */ + pthreadpool_mutex_unlock(&threadpool->execution_mutex); + + return res; +} + void pthreadpool_destroy(struct pthreadpool* threadpool) { if (threadpool != NULL) { /* Tell all threads to stop. */ pthreadpool_release_all_threads(threadpool); + // Wait for any recruited threads to leave. + wait_on_num_recruited_threads(threadpool, 0); + if (!threadpool->executor.num_threads) { /* Wait until all threads return */ for (size_t thread = 1; thread < threadpool->max_num_threads; thread++) {