Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/pthreadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#ifndef __PTHREADPOOL_INCLUDE_PTHREADPOOL_H_
#define __PTHREADPOOL_INCLUDE_PTHREADPOOL_H_

#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>

Expand Down Expand Up @@ -248,6 +249,26 @@ size_t pthreadpool_set_threads_count(pthreadpool_t threadpool,
*/
void pthreadpool_release_executor_threads(struct pthreadpool* threadpool);

/**
* Updates a thread pool with a given @a pthreadpool_executor.
*
* @param threadpool The thread pool in which to replace the executor.
* @param executor A pointer to a @a pthreadpool_executor object that
* will be used to determine the number of extra
* threads (plus the calling thread), and provide the
* threads itself, for each call to a
* `pthreadpool_parallelize_*` function.
* @param executor_context A pointer to the context that will be passed to the
* functions in the @a executor object.
*
* @return @c true if the @a executor was successfully swapped, and @c false if
* it was not, e.g. because the current and nex @a executor and @a
* executor_context are identical.
*/
bool pthreadpool_update_executor(pthreadpool_t threadpool,
struct pthreadpool_executor* executor,
void* executor_context);

/**
* Process items on a 1D grid.
*
Expand Down
88 changes: 74 additions & 14 deletions src/pthreads.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@
#include <cpuinfo.h>
#endif

/* Alloca */
#if defined(_MSC_VER)
#include <malloc.h>
#define alloca _alloca
#endif // defined(_MSC_VER)

/* Public library header */
#include <pthreadpool.h>

Expand Down Expand Up @@ -593,6 +599,23 @@ struct pthreadpool* pthreadpool_create_v2(struct pthreadpool_executor* executor,
return threadpool;
}

static void wake_up_threads(void** contexts) {
struct pthreadpool* threadpool = (struct pthreadpool*)contexts[0];
struct pthreadpool_executor* executor = &threadpool->executor;
for (uint32_t k = 1; contexts[k] != NULL; k++) {
if (contexts[k + 1] != NULL) {
/* Fly, my pretties! Fly, fly, fly! */
executor->schedule(threadpool->executor_context, contexts[k],
(void (*)(void*))thread_main);
} else {
void* context = contexts[k];
free(contexts);
thread_main(context);
return;
}
}
}

static void ensure_num_threads(struct pthreadpool* threadpool,
uint32_t num_threads) {
assert(num_threads >= 1);
Expand All @@ -604,21 +627,37 @@ static void ensure_num_threads(struct pthreadpool* threadpool,
return;
}

void** thread_contexts = alloca(sizeof(void*) * num_threads);
int32_t num_threads_to_wake = 0;

/* Create any missing threads for this threadpool. */
for (uint32_t tid = 1;
tid < num_threads &&
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads) > 0;
tid++) {
struct thread_info* thread = &threadpool->threads[tid];

// Check whether this thread was active, and if not, start it up.
// Check whether this thread was active, and if not, add it to the list of
// threads that need starting.
if (!pthreadpool_exchange_sequentially_consistent_uint32_t(
&thread->is_active, 1)) {
pthreadpool_register_threads(threadpool, 1);
thread_contexts[num_threads_to_wake++] = thread;
}
}

if (num_threads_to_wake > 1) {
void** contexts = malloc(sizeof(void*) * (num_threads_to_wake + 2));
contexts[0] = threadpool;
memcpy(contexts + 1, thread_contexts, sizeof(void*) * num_threads_to_wake);
contexts[num_threads_to_wake + 1] = NULL;
/* Fly, my pretties! Fly, fly, fly! */
executor->schedule(threadpool->executor_context, contexts,
(void (*)(void*))wake_up_threads);
} else if (num_threads_to_wake == 1) {
for (int k = 0; k < num_threads_to_wake; k++) {
/* Fly, my pretties! Fly, fly, fly! */
pthreadpool_log_debug("starting thread %u (arg=%p).", tid, thread);
executor->schedule(threadpool->executor_context, thread,
executor->schedule(threadpool->executor_context, thread_contexts[k],
(void (*)(void*))thread_main);
}
}
Expand All @@ -639,9 +678,11 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
/* Make changes by other threads visible to this thread. */
pthreadpool_fence_acquire();

/* Make sure the threadpool is idle. */
assert(pthreadpool_load_consume_int32_t(&threadpool->num_active_threads) ==
0);
/* Make sure the threadpool is idle or done. */
const int32_t num_active_threads =
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads);
assert(num_active_threads == 0 ||
num_active_threads == PTHREADPOOL_NUM_ACTIVE_THREADS_DONE);

/* Setup global arguments */
pthreadpool_store_relaxed_void_p(&threadpool->thread_function,
Expand Down Expand Up @@ -715,7 +756,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(

static void pthreadpool_release_all_threads(struct pthreadpool* threadpool) {
if (threadpool != NULL) {
assert(threadpool->num_active_threads == 0);
assert(threadpool->num_active_threads == 0 ||
threadpool->num_active_threads ==
PTHREADPOOL_NUM_ACTIVE_THREADS_DONE);

// Set the state to "done".
pthreadpool_store_sequentially_consistent_int32_t(
Expand All @@ -726,13 +769,6 @@ static void pthreadpool_release_all_threads(struct pthreadpool* threadpool) {

/* Wake up any thread waiting on a change of state. */
signal_num_active_threads(threadpool, 0);

// Wait for any pending jobs to complete.
wait_on_num_recruited_threads(threadpool, 0);

// Set the state back to "idle".
pthreadpool_store_sequentially_consistent_int32_t(
&threadpool->num_active_threads, 0);
}
}

Expand All @@ -742,11 +778,35 @@ void pthreadpool_release_executor_threads(struct pthreadpool* threadpool) {
}
}

bool pthreadpool_update_executor(pthreadpool_t threadpool,
struct pthreadpool_executor* executor,
void* executor_context) {
/* Protect the global threadpool structures */
pthreadpool_mutex_lock(&threadpool->execution_mutex);

const bool res = threadpool->executor.num_threads != executor->num_threads ||
threadpool->executor.schedule != executor->schedule ||
threadpool->executor_context != executor_context;
if (res) {
pthreadpool_release_executor_threads(threadpool);
threadpool->executor = *executor;
threadpool->executor_context = executor_context;
}

/* Unprotect the global threadpool structures now that we're done. */
pthreadpool_mutex_unlock(&threadpool->execution_mutex);

return res;
}

void pthreadpool_destroy(struct pthreadpool* threadpool) {
if (threadpool != NULL) {
/* Tell all threads to stop. */
pthreadpool_release_all_threads(threadpool);

// Wait for any recruited threads to leave.
wait_on_num_recruited_threads(threadpool, 0);

if (!threadpool->executor.num_threads) {
/* Wait until all threads return */
for (size_t thread = 1; thread < threadpool->max_num_threads; thread++) {
Expand Down
Loading