diff --git a/build/meson/tests/meson.build b/build/meson/tests/meson.build index 30df7d93124..f7ba5310188 100644 --- a/build/meson/tests/meson.build +++ b/build/meson/tests/meson.build @@ -192,8 +192,6 @@ test('test-zstream-1', test('test-zstream-3', zstreamtest, args: ['--newapi', '-t1', ZSTREAM_TESTTIME] + FUZZER_FLAGS, - # --newapi dies on Windows with "exit status 3221225477 or signal 3221225349 SIGinvalid" - should_fail: host_machine_os == os_windows, timeout: 120) test('test-longmatch', longmatch, timeout: 36) test('test-invalidDictionaries', invalidDictionaries) # should be fast diff --git a/lib/common/pool.c b/lib/common/pool.c index bf21c57ed66..63dc9d37789 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -173,7 +173,7 @@ static void POOL_join(POOL_ctx* ctx) { /* Join all of the threads */ { size_t i; for (i = 0; i < ctx->threadCapacity; ++i) { - ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */ + ZSTD_pthread_join(ctx->threads[i]); /* note : could fail */ } } } diff --git a/lib/common/threading.c b/lib/common/threading.c index c62c65d57e9..f2341105a1c 100644 --- a/lib/common/threading.c +++ b/lib/common/threading.c @@ -34,39 +34,92 @@ int g_ZSTD_threading_useless_symbol; /* === Implementation === */ +typedef struct { + void* (*start_routine)(void*); + void* arg; + int initialized; + ZSTD_pthread_cond_t initialized_cond; + ZSTD_pthread_mutex_t initialized_mutex; +} ZSTD_thread_params_t; + static unsigned __stdcall worker(void *arg) { - ZSTD_pthread_t* const thread = (ZSTD_pthread_t*) arg; - thread->arg = thread->start_routine(thread->arg); + void* (*start_routine)(void*); + void* thread_arg; + + /* Inialized thread_arg and start_routine and signal main thread that we don't need it + * to wait any longer. + */ + { + ZSTD_thread_params_t* thread_param = (ZSTD_thread_params_t*)arg; + thread_arg = thread_param->arg; + start_routine = thread_param->start_routine; + + /* Signal main thread that we are running and do not depend on its memory anymore */ + ZSTD_pthread_mutex_lock(&thread_param->initialized_mutex); + thread_param->initialized = 1; + ZSTD_pthread_cond_signal(&thread_param->initialized_cond); + ZSTD_pthread_mutex_unlock(&thread_param->initialized_mutex); + } + + start_routine(thread_arg); + return 0; } int ZSTD_pthread_create(ZSTD_pthread_t* thread, const void* unused, void* (*start_routine) (void*), void* arg) { + ZSTD_thread_params_t thread_param; (void)unused; - thread->arg = arg; - thread->start_routine = start_routine; - thread->handle = (HANDLE) _beginthreadex(NULL, 0, worker, thread, 0, NULL); - if (!thread->handle) + thread_param.start_routine = start_routine; + thread_param.arg = arg; + thread_param.initialized = 0; + *thread = NULL; + + /* Setup thread initialization synchronization */ + if(ZSTD_pthread_cond_init(&thread_param.initialized_cond, NULL)) { + /* Should never happen on Windows */ + return -1; + } + if(ZSTD_pthread_mutex_init(&thread_param.initialized_mutex, NULL)) { + /* Should never happen on Windows */ + ZSTD_pthread_cond_destroy(&thread_param.initialized_cond); + return -1; + } + + /* Spawn thread */ + *thread = (HANDLE)_beginthreadex(NULL, 0, worker, &thread_param, 0, NULL); + if (!thread) { + ZSTD_pthread_mutex_destroy(&thread_param.initialized_mutex); + ZSTD_pthread_cond_destroy(&thread_param.initialized_cond); return errno; - else - return 0; + } + + /* Wait for thread to be initialized */ + ZSTD_pthread_mutex_lock(&thread_param.initialized_mutex); + while(!thread_param.initialized) { + ZSTD_pthread_cond_wait(&thread_param.initialized_cond, &thread_param.initialized_mutex); + } + ZSTD_pthread_mutex_unlock(&thread_param.initialized_mutex); + ZSTD_pthread_mutex_destroy(&thread_param.initialized_mutex); + ZSTD_pthread_cond_destroy(&thread_param.initialized_cond); + + return 0; } -int ZSTD_pthread_join(ZSTD_pthread_t thread, void **value_ptr) +int ZSTD_pthread_join(ZSTD_pthread_t thread) { DWORD result; - if (!thread.handle) return 0; + if (!thread) return 0; - result = WaitForSingleObject(thread.handle, INFINITE); - CloseHandle(thread.handle); + result = WaitForSingleObject(thread, INFINITE); + CloseHandle(thread); switch (result) { case WAIT_OBJECT_0: - if (value_ptr) *value_ptr = thread.arg; return 0; case WAIT_ABANDONED: return EINVAL; diff --git a/lib/common/threading.h b/lib/common/threading.h index b1458054eee..fb5c1c87873 100644 --- a/lib/common/threading.h +++ b/lib/common/threading.h @@ -61,16 +61,12 @@ extern "C" { #define ZSTD_pthread_cond_broadcast(a) WakeAllConditionVariable((a)) /* ZSTD_pthread_create() and ZSTD_pthread_join() */ -typedef struct { - HANDLE handle; - void* (*start_routine)(void*); - void* arg; -} ZSTD_pthread_t; +typedef HANDLE ZSTD_pthread_t; int ZSTD_pthread_create(ZSTD_pthread_t* thread, const void* unused, void* (*start_routine) (void*), void* arg); -int ZSTD_pthread_join(ZSTD_pthread_t thread, void** value_ptr); +int ZSTD_pthread_join(ZSTD_pthread_t thread); /** * add here more wrappers as required @@ -98,7 +94,7 @@ int ZSTD_pthread_join(ZSTD_pthread_t thread, void** value_ptr); #define ZSTD_pthread_t pthread_t #define ZSTD_pthread_create(a, b, c, d) pthread_create((a), (b), (c), (d)) -#define ZSTD_pthread_join(a, b) pthread_join((a),(b)) +#define ZSTD_pthread_join(a) pthread_join((a),NULL) #else /* DEBUGLEVEL >= 1 */ @@ -123,7 +119,7 @@ int ZSTD_pthread_cond_destroy(ZSTD_pthread_cond_t* cond); #define ZSTD_pthread_t pthread_t #define ZSTD_pthread_create(a, b, c, d) pthread_create((a), (b), (c), (d)) -#define ZSTD_pthread_join(a, b) pthread_join((a),(b)) +#define ZSTD_pthread_join(a) pthread_join((a),NULL) #endif diff --git a/tests/fuzzer.c b/tests/fuzzer.c index 9d1862077a0..d58751a5cd8 100644 --- a/tests/fuzzer.c +++ b/tests/fuzzer.c @@ -431,8 +431,8 @@ static int threadPoolTests(void) { ZSTD_pthread_create(&t1, NULL, threadPoolTests_compressionJob, &p1); ZSTD_pthread_create(&t2, NULL, threadPoolTests_compressionJob, &p2); - ZSTD_pthread_join(t1, NULL); - ZSTD_pthread_join(t2, NULL); + ZSTD_pthread_join(t1); + ZSTD_pthread_join(t2); assert(!memcmp(decodedBuffer, decodedBuffer2, CNBuffSize)); free(decodedBuffer2);