From 46987a8b5a905a71f9ef409e02a62f32cdeeb703 Mon Sep 17 00:00:00 2001 From: Rainer Schuetze Date: Wed, 20 Mar 2019 08:27:40 +0100 Subject: [PATCH 1/3] add core.sync.event to be used as Windows events --- changelog/core_sync_event.dd | 33 ++++ mak/COPY | 2 + mak/DOCS | 1 + mak/SRCS | 1 + mak/WINDOWS | 3 + src/core/sync/config.d | 6 +- src/core/sync/event.d | 339 +++++++++++++++++++++++++++++++++++ 7 files changed, 382 insertions(+), 3 deletions(-) create mode 100644 changelog/core_sync_event.dd create mode 100644 src/core/sync/event.d diff --git a/changelog/core_sync_event.dd b/changelog/core_sync_event.dd new file mode 100644 index 0000000000..38b56fe5fe --- /dev/null +++ b/changelog/core_sync_event.dd @@ -0,0 +1,33 @@ +New module core.sync.event + +This module provides a cross-platform interface for lightweight signaling of other threads. +It can be used to start execution of multiple waiting threads simultaneously. +--- +import core.sync.event, core.thread, std.file; + +struct ProcessFile +{ + ThreadGroup group; + Event event; + void[] buffer; + + void doProcess() + { + event.wait(); + // process buffer + } + + void process(string filename) + { + event.initialize(true, false); + group = new ThreadGroup; + for (int i = 0; i < 10; ++i) + group.create(&doProcess); + + buffer = std.file.read(filename); + event.set(); + group.joinAll(); + event.terminate(); + } +} +--- diff --git a/mak/COPY b/mak/COPY index 5683188da9..9e0664a4c6 100644 --- a/mak/COPY +++ b/mak/COPY @@ -65,6 +65,8 @@ COPY=\ $(IMPDIR)\core\stdcpp\type_traits.d \ $(IMPDIR)\core\stdcpp\xutility.d \ \ + $(IMPDIR)\core\sync\event.d \ + \ $(IMPDIR)\core\sys\darwin\crt_externs.d \ $(IMPDIR)\core\sys\darwin\dlfcn.d \ $(IMPDIR)\core\sys\darwin\execinfo.d \ diff --git a/mak/DOCS b/mak/DOCS index 89f7546d70..96121956cd 100644 --- a/mak/DOCS +++ b/mak/DOCS @@ -55,6 +55,7 @@ DOCS=\ $(DOCDIR)\core_sync_condition.html \ $(DOCDIR)\core_sync_config.html \ $(DOCDIR)\core_sync_exception.html \ + $(DOCDIR)\core_sync_event.html \ $(DOCDIR)\core_sync_mutex.html \ $(DOCDIR)\core_sync_rwmutex.html \ $(DOCDIR)\core_sync_semaphore.html \ diff --git a/mak/SRCS b/mak/SRCS index bc93928b19..0904b9029e 100644 --- a/mak/SRCS +++ b/mak/SRCS @@ -68,6 +68,7 @@ SRCS=\ src\core\sync\condition.d \ src\core\sync\config.d \ src\core\sync\exception.d \ + src\core\sync\event.d \ src\core\sync\mutex.d \ src\core\sync\rwmutex.d \ src\core\sync\semaphore.d \ diff --git a/mak/WINDOWS b/mak/WINDOWS index 6a10a40b92..60e8173038 100644 --- a/mak/WINDOWS +++ b/mak/WINDOWS @@ -204,6 +204,9 @@ $(IMPDIR)\core\stdc\wchar_.d : src\core\stdc\wchar_.d $(IMPDIR)\core\stdc\wctype.d : src\core\stdc\wctype.d copy $** $@ +$(IMPDIR)\core\sync\event.d : src\core\sync\event.d + copy $** $@ + $(IMPDIR)\core\stdcpp\allocator.d : src\core\stdcpp\allocator.d copy $** $@ diff --git a/src/core/sync/config.d b/src/core/sync/config.d index 4540e9954c..6b27d35853 100644 --- a/src/core/sync/config.d +++ b/src/core/sync/config.d @@ -23,7 +23,7 @@ version (Posix) private import core.time; - void mktspec( ref timespec t ) nothrow + void mktspec( ref timespec t ) nothrow @nogc { static if ( false && is( typeof( clock_gettime ) ) ) { @@ -41,14 +41,14 @@ version (Posix) } - void mktspec( ref timespec t, Duration delta ) nothrow + void mktspec( ref timespec t, Duration delta ) nothrow @nogc { mktspec( t ); mvtspec( t, delta ); } - void mvtspec( ref timespec t, Duration delta ) nothrow + void mvtspec( ref timespec t, Duration delta ) nothrow @nogc { auto val = delta; val += dur!"seconds"( t.tv_sec ); diff --git a/src/core/sync/event.d b/src/core/sync/event.d new file mode 100644 index 0000000000..aabdb64e7c --- /dev/null +++ b/src/core/sync/event.d @@ -0,0 +1,339 @@ +/** + * The event module provides a primitive for lightweight signaling of other threads + * (emulating Windows events on Posix) + * + * Copyright: Copyright (c) 2019 D Language Foundation + * License: Distributed under the + * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). + * (See accompanying file LICENSE) + * Authors: Rainer Schuetze + * Source: $(DRUNTIMESRC core/sync/event.d) + */ +module core.sync.event; + +version (Windows) +{ + import core.sys.windows.basetsd /+: HANDLE +/; + import core.sys.windows.winerror /+: WAIT_TIMEOUT +/; + import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent, + WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/; +} +else version (Posix) +{ + import core.sys.posix.pthread; + import core.sys.posix.sys.types; +} +else +{ + static assert(false, "Platform not supported"); +} + +import core.time; +import core.internal.abort : abort; + +/** + * represents an event. Clients of an event are suspended while waiting + * for the event to be "signaled". + * + * Implemented using `pthread_mutex` and `pthread_condition` on Posix and + * `CreateEvent` and `SetEvent` on Windows. + */ +struct Event +{ + // Posix version inspired by http://www.it.uu.se/katalog/larme597/win32eoposix +nothrow @nogc: + /** + * Creates an event object. + * + * Params: + * manualReset = the state of the event is not reset automatically after resuming waiting clients + * initialState = initial state of the signal + */ + this(bool manualReset, bool initialState) + { + initialize(manualReset, initialState); + } + + /** + * Initializes an event object. Does nothing if the event is already initialized. + * + * Params: + * manualReset = the state of the event is not reset automatically after resuming waiting clients + * initialState = initial state of the signal + */ + void initialize(bool manualReset, bool initialState) + { + version (Windows) + { + if (m_event) + return; + m_event = CreateEvent(null, manualReset, initialState, null); + m_event || abort("Error: CreateEvent failed."); + } + else version (Posix) + { + if (m_initalized) + return; + !pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) || + abort("Error: pthread_mutex_init failed."); + m_state = initialState; + m_manualReset = manualReset; + m_initalized = true; + } + } + + // copying not allowed, can produce resource leaks + @disable this(this); + @disable void opAssign(Event); + + ~this() + { + terminate(); + } + + /** + * deinitialize event. Does nothing if the event is not initialized. There must not be + * threads currently waiting for the event to be signaled. + */ + void terminate() + { + version (Windows) + { + if (m_event) + CloseHandle(m_event); + m_event = null; + } + else version (Posix) + { + if (m_initalized) + { + assert(!m_waiter); + pthread_mutex_destroy(&m_mutex) == 0 || + abort("Error: pthread_mutex_destroy failed."); + m_initalized = false; + } + } + } + + + /// Set the event to "signaled", so that waiting clients are resumed + void set() + { + version (Windows) + { + if (m_event) + SetEvent(m_event); + } + else version (Posix) + { + if (m_initalized) + { + pthread_mutex_lock(&m_mutex); + m_state = true; + for (auto waiter = m_waiter; waiter != null; waiter = waiter.next) + { + pthread_mutex_lock(&waiter.mutex); + pthread_cond_signal(&waiter.cond); + pthread_mutex_unlock(&waiter.mutex); + } + pthread_mutex_unlock(&m_mutex); + } + } + } + + /// Reset the event manually + void reset() + { + version (Windows) + { + if (m_event) + ResetEvent(m_event); + } + else version (Posix) + { + if (m_initalized) + { + pthread_mutex_lock(&m_mutex); + m_state = false; + pthread_mutex_unlock(&m_mutex); + } + } + } + + /** + * Wait for the event to be signaled without timeout. + * + * Returns: + * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured + */ + bool wait() + { + version (Windows) + { + return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0; + } + else version (Posix) + { + return wait(Duration.max); + } + } + + /** + * Wait for the event to be signaled with timeout. + * + * Params: + * tmout = the maximum time to wait + * Returns: + * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or + * the event is uninitialized or another error occured + */ + bool wait(Duration tmout) + { + version (Windows) + { + if (!m_event) + return false; + + auto maxWaitMillis = dur!("msecs")(uint.max - 1); + + while (tmout > maxWaitMillis) + { + auto res = WaitForSingleObject(m_event, uint.max - 1); + if (res != WAIT_TIMEOUT) + return res == WAIT_OBJECT_0; + tmout -= maxWaitMillis; + } + auto ms = cast(uint)(tmout.total!"msecs"); + return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0; + } + else version (Posix) + { + if (!m_initalized) + return false; + + pthread_mutex_lock(&m_mutex); + + int result = 0; + if (!m_state) + { + list_element le; + pthread_mutex_init(&le.mutex, null) == 0 || + abort("Error: pthread_mutex_init failed."); + pthread_cond_init(&le.cond, null) == 0 || + abort("Error: pthread_cond_init failed."); + + addWaiter(&le); + pthread_mutex_unlock(&m_mutex); + + if (tmout == Duration.max) + { + result = pthread_cond_wait(&le.cond, &le.mutex); + } + else + { + import core.sync.config; + + timespec t = void; + mktspec(t, tmout); + + result = pthread_cond_timedwait(&le.cond, &le.mutex, &t); + } + + pthread_mutex_lock(&m_mutex); + removeWaiter(&le); + + pthread_mutex_destroy(&le.mutex); + pthread_cond_destroy(&le.cond); + } + if (result == 0 && !m_manualReset) + m_state = false; + + pthread_mutex_unlock(&m_mutex); + + return result == 0; + } + } + +private: + version (Windows) + { + HANDLE m_event; + } + else version (Posix) + { + pthread_mutex_t m_mutex; + list_element* m_waiter; + bool m_initalized; + bool m_state; + bool m_manualReset; + + static struct list_element + { + pthread_mutex_t mutex; // mutex for the conditional wait + pthread_cond_t cond; + list_element *next; + } + + void addWaiter(list_element* le) + { + le.next = m_waiter; + m_waiter = le; + } + + void removeWaiter(list_element* le) + { + for (auto pwaiter = &m_waiter; *pwaiter; pwaiter = &(*pwaiter).next) + if (*pwaiter == le) + { + *pwaiter = le.next; + return; + } + assert(false); + } + } +} + +// Test single-thread (non-shared) use. +@nogc nothrow unittest +{ + // auto-reset, initial state false + Event ev1 = Event(false, false); + assert(!ev1.wait(1.dur!"msecs")); + ev1.set(); + assert(ev1.wait()); + assert(!ev1.wait(1.dur!"msecs")); + + // manual-reset, initial state true + Event ev2 = Event(true, true); + assert(ev2.wait()); + assert(ev2.wait()); + ev2.reset(); + assert(!ev2.wait(1.dur!"msecs")); +} + +unittest +{ + import core.thread; + + auto event = new Event(true, false); + int numThreads = 10; + + void testFn() + { + event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner + } + + auto group = new ThreadGroup; + + for (int i = 0; i < numThreads; ++i) + group.create(&testFn); + + auto start = MonoTime.currTime; + + event.set(); + group.joinAll(); + + assert(MonoTime.currTime - start < 5.dur!"seconds"); + + delete event; +} From ba5bbeb2dc29ff1bcb7a337934b2482006cdfe42 Mon Sep 17 00:00:00 2001 From: Rainer Schuetze Date: Sun, 24 Mar 2019 10:43:01 +0100 Subject: [PATCH 2/3] use pthread_cond_broadcast instead of manual implementation --- src/core/sync/event.d | 67 ++++++++++--------------------------------- 1 file changed, 15 insertions(+), 52 deletions(-) diff --git a/src/core/sync/event.d b/src/core/sync/event.d index aabdb64e7c..c68b5293cc 100644 --- a/src/core/sync/event.d +++ b/src/core/sync/event.d @@ -40,7 +40,6 @@ import core.internal.abort : abort; */ struct Event { - // Posix version inspired by http://www.it.uu.se/katalog/larme597/win32eoposix nothrow @nogc: /** * Creates an event object. @@ -74,8 +73,10 @@ nothrow @nogc: { if (m_initalized) return; - !pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) || + pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 || abort("Error: pthread_mutex_init failed."); + pthread_cond_init(&m_cond, null) == 0 || + abort("Error: pthread_cond_init failed."); m_state = initialState; m_manualReset = manualReset; m_initalized = true; @@ -107,9 +108,10 @@ nothrow @nogc: { if (m_initalized) { - assert(!m_waiter); pthread_mutex_destroy(&m_mutex) == 0 || abort("Error: pthread_mutex_destroy failed."); + pthread_cond_destroy(&m_cond) == 0 || + abort("Error: pthread_cond_destroy failed."); m_initalized = false; } } @@ -130,12 +132,7 @@ nothrow @nogc: { pthread_mutex_lock(&m_mutex); m_state = true; - for (auto waiter = m_waiter; waiter != null; waiter = waiter.next) - { - pthread_mutex_lock(&waiter.mutex); - pthread_cond_signal(&waiter.cond); - pthread_mutex_unlock(&waiter.mutex); - } + pthread_cond_broadcast(&m_cond); pthread_mutex_unlock(&m_mutex); } } @@ -216,18 +213,9 @@ nothrow @nogc: int result = 0; if (!m_state) { - list_element le; - pthread_mutex_init(&le.mutex, null) == 0 || - abort("Error: pthread_mutex_init failed."); - pthread_cond_init(&le.cond, null) == 0 || - abort("Error: pthread_cond_init failed."); - - addWaiter(&le); - pthread_mutex_unlock(&m_mutex); - if (tmout == Duration.max) { - result = pthread_cond_wait(&le.cond, &le.mutex); + result = pthread_cond_wait(&m_cond, &m_mutex); } else { @@ -236,14 +224,8 @@ nothrow @nogc: timespec t = void; mktspec(t, tmout); - result = pthread_cond_timedwait(&le.cond, &le.mutex, &t); + result = pthread_cond_timedwait(&m_cond, &m_mutex, &t); } - - pthread_mutex_lock(&m_mutex); - removeWaiter(&le); - - pthread_mutex_destroy(&le.mutex); - pthread_cond_destroy(&le.cond); } if (result == 0 && !m_manualReset) m_state = false; @@ -262,34 +244,10 @@ private: else version (Posix) { pthread_mutex_t m_mutex; - list_element* m_waiter; + pthread_cond_t m_cond; bool m_initalized; bool m_state; bool m_manualReset; - - static struct list_element - { - pthread_mutex_t mutex; // mutex for the conditional wait - pthread_cond_t cond; - list_element *next; - } - - void addWaiter(list_element* le) - { - le.next = m_waiter; - m_waiter = le; - } - - void removeWaiter(list_element* le) - { - for (auto pwaiter = &m_waiter; *pwaiter; pwaiter = &(*pwaiter).next) - if (*pwaiter == le) - { - *pwaiter = le.next; - return; - } - assert(false); - } } } @@ -313,14 +271,16 @@ private: unittest { - import core.thread; + import core.thread, core.atomic; auto event = new Event(true, false); int numThreads = 10; + shared int numRunning = 0; void testFn() { event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner + numRunning.atomicOp!"+="(1); } auto group = new ThreadGroup; @@ -329,10 +289,13 @@ unittest group.create(&testFn); auto start = MonoTime.currTime; + assert(numRunning == 0); event.set(); group.joinAll(); + assert(numRunning == numThreads); + assert(MonoTime.currTime - start < 5.dur!"seconds"); delete event; From 89f4930d86a93a5db884dc9448c1780c2872af8f Mon Sep 17 00:00:00 2001 From: Rainer Schuetze Date: Sun, 24 Mar 2019 14:57:05 +0100 Subject: [PATCH 3/3] move exampple from changelog to doc --- changelog/core_sync_event.dd | 29 ----------------------------- src/core/sync/event.d | 29 +++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/changelog/core_sync_event.dd b/changelog/core_sync_event.dd index 38b56fe5fe..6fa89605df 100644 --- a/changelog/core_sync_event.dd +++ b/changelog/core_sync_event.dd @@ -2,32 +2,3 @@ New module core.sync.event This module provides a cross-platform interface for lightweight signaling of other threads. It can be used to start execution of multiple waiting threads simultaneously. ---- -import core.sync.event, core.thread, std.file; - -struct ProcessFile -{ - ThreadGroup group; - Event event; - void[] buffer; - - void doProcess() - { - event.wait(); - // process buffer - } - - void process(string filename) - { - event.initialize(true, false); - group = new ThreadGroup; - for (int i = 0; i < 10; ++i) - group.create(&doProcess); - - buffer = std.file.read(filename); - event.set(); - group.joinAll(); - event.terminate(); - } -} ---- diff --git a/src/core/sync/event.d b/src/core/sync/event.d index c68b5293cc..2fe5ea8a55 100644 --- a/src/core/sync/event.d +++ b/src/core/sync/event.d @@ -37,6 +37,35 @@ import core.internal.abort : abort; * * Implemented using `pthread_mutex` and `pthread_condition` on Posix and * `CreateEvent` and `SetEvent` on Windows. +--- +import core.sync.event, core.thread, std.file; + +struct ProcessFile +{ + ThreadGroup group; + Event event; + void[] buffer; + + void doProcess() + { + event.wait(); + // process buffer + } + + void process(string filename) + { + event.initialize(true, false); + group = new ThreadGroup; + for (int i = 0; i < 10; ++i) + group.create(&doProcess); + + buffer = std.file.read(filename); + event.set(); + group.joinAll(); + event.terminate(); + } +} +--- */ struct Event {