Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions build/meson/tests/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ test('test-zstream-1',
test('test-zstream-3',
zstreamtest,
args: ['--newapi', '-t1', ZSTREAM_TESTTIME] + FUZZER_FLAGS,
# --newapi dies on Windows with "exit status 3221225477 or signal 3221225349 SIGinvalid"
should_fail: host_machine_os == os_windows,
timeout: 120)
test('test-longmatch', longmatch, timeout: 36)
test('test-invalidDictionaries', invalidDictionaries) # should be fast
Expand Down
2 changes: 1 addition & 1 deletion lib/common/pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ static void POOL_join(POOL_ctx* ctx) {
/* Join all of the threads */
{ size_t i;
for (i = 0; i < ctx->threadCapacity; ++i) {
ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */
ZSTD_pthread_join(ctx->threads[i]); /* note : could fail */
} }
}

Expand Down
79 changes: 66 additions & 13 deletions lib/common/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,92 @@ int g_ZSTD_threading_useless_symbol;

/* === Implementation === */

typedef struct {
void* (*start_routine)(void*);
void* arg;
int initialized;
ZSTD_pthread_cond_t initialized_cond;
ZSTD_pthread_mutex_t initialized_mutex;
} ZSTD_thread_params_t;

static unsigned __stdcall worker(void *arg)
{
ZSTD_pthread_t* const thread = (ZSTD_pthread_t*) arg;
thread->arg = thread->start_routine(thread->arg);
void* (*start_routine)(void*);
void* thread_arg;

/* Inialized thread_arg and start_routine and signal main thread that we don't need it
* to wait any longer.
*/
{
ZSTD_thread_params_t* thread_param = (ZSTD_thread_params_t*)arg;
thread_arg = thread_param->arg;
start_routine = thread_param->start_routine;

/* Signal main thread that we are running and do not depend on its memory anymore */
ZSTD_pthread_mutex_lock(&thread_param->initialized_mutex);
thread_param->initialized = 1;
ZSTD_pthread_cond_signal(&thread_param->initialized_cond);
ZSTD_pthread_mutex_unlock(&thread_param->initialized_mutex);
}

start_routine(thread_arg);

return 0;
}

int ZSTD_pthread_create(ZSTD_pthread_t* thread, const void* unused,
void* (*start_routine) (void*), void* arg)
{
ZSTD_thread_params_t thread_param;
(void)unused;
thread->arg = arg;
thread->start_routine = start_routine;
thread->handle = (HANDLE) _beginthreadex(NULL, 0, worker, thread, 0, NULL);

if (!thread->handle)
thread_param.start_routine = start_routine;
thread_param.arg = arg;
thread_param.initialized = 0;
*thread = NULL;

/* Setup thread initialization synchronization */
if(ZSTD_pthread_cond_init(&thread_param.initialized_cond, NULL)) {
/* Should never happen on Windows */
return -1;
}
if(ZSTD_pthread_mutex_init(&thread_param.initialized_mutex, NULL)) {
/* Should never happen on Windows */
ZSTD_pthread_cond_destroy(&thread_param.initialized_cond);
return -1;
}

/* Spawn thread */
*thread = (HANDLE)_beginthreadex(NULL, 0, worker, &thread_param, 0, NULL);
if (!thread) {
ZSTD_pthread_mutex_destroy(&thread_param.initialized_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to destroy the condition variable.

ZSTD_pthread_cond_destroy(&thread_param.initialized_cond);
return errno;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to unlock (until it is moved below the thread creation), and destroy the mutex/cond in this case.

else
return 0;
}

/* Wait for thread to be initialized */
ZSTD_pthread_mutex_lock(&thread_param.initialized_mutex);
while(!thread_param.initialized) {
ZSTD_pthread_cond_wait(&thread_param.initialized_cond, &thread_param.initialized_mutex);
}
ZSTD_pthread_mutex_unlock(&thread_param.initialized_mutex);
ZSTD_pthread_mutex_destroy(&thread_param.initialized_mutex);
ZSTD_pthread_cond_destroy(&thread_param.initialized_cond);

return 0;
}

int ZSTD_pthread_join(ZSTD_pthread_t thread, void **value_ptr)
int ZSTD_pthread_join(ZSTD_pthread_t thread)
{
DWORD result;

if (!thread.handle) return 0;
if (!thread) return 0;

result = WaitForSingleObject(thread.handle, INFINITE);
CloseHandle(thread.handle);
result = WaitForSingleObject(thread, INFINITE);
CloseHandle(thread);

switch (result) {
case WAIT_OBJECT_0:
if (value_ptr) *value_ptr = thread.arg;
return 0;
case WAIT_ABANDONED:
return EINVAL;
Expand Down
12 changes: 4 additions & 8 deletions lib/common/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ extern "C" {
#define ZSTD_pthread_cond_broadcast(a) WakeAllConditionVariable((a))

/* ZSTD_pthread_create() and ZSTD_pthread_join() */
typedef struct {
HANDLE handle;
void* (*start_routine)(void*);
void* arg;
} ZSTD_pthread_t;
typedef HANDLE ZSTD_pthread_t;

int ZSTD_pthread_create(ZSTD_pthread_t* thread, const void* unused,
void* (*start_routine) (void*), void* arg);

int ZSTD_pthread_join(ZSTD_pthread_t thread, void** value_ptr);
int ZSTD_pthread_join(ZSTD_pthread_t thread);

/**
* add here more wrappers as required
Expand Down Expand Up @@ -98,7 +94,7 @@ int ZSTD_pthread_join(ZSTD_pthread_t thread, void** value_ptr);

#define ZSTD_pthread_t pthread_t
#define ZSTD_pthread_create(a, b, c, d) pthread_create((a), (b), (c), (d))
#define ZSTD_pthread_join(a, b) pthread_join((a),(b))
#define ZSTD_pthread_join(a) pthread_join((a),NULL)

#else /* DEBUGLEVEL >= 1 */

Expand All @@ -123,7 +119,7 @@ int ZSTD_pthread_cond_destroy(ZSTD_pthread_cond_t* cond);

#define ZSTD_pthread_t pthread_t
#define ZSTD_pthread_create(a, b, c, d) pthread_create((a), (b), (c), (d))
#define ZSTD_pthread_join(a, b) pthread_join((a),(b))
#define ZSTD_pthread_join(a) pthread_join((a),NULL)

#endif

Expand Down
4 changes: 2 additions & 2 deletions tests/fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ static int threadPoolTests(void) {

ZSTD_pthread_create(&t1, NULL, threadPoolTests_compressionJob, &p1);
ZSTD_pthread_create(&t2, NULL, threadPoolTests_compressionJob, &p2);
ZSTD_pthread_join(t1, NULL);
ZSTD_pthread_join(t2, NULL);
ZSTD_pthread_join(t1);
ZSTD_pthread_join(t2);

assert(!memcmp(decodedBuffer, decodedBuffer2, CNBuffSize));
free(decodedBuffer2);
Expand Down