diff --git a/changelog/core_sync_event.dd b/changelog/core_sync_event.dd new file mode 100644 index 0000000000..6fa89605df --- /dev/null +++ b/changelog/core_sync_event.dd @@ -0,0 +1,4 @@ +New module core.sync.event + +This module provides a cross-platform interface for lightweight signaling of other threads. +It can be used to start execution of multiple waiting threads simultaneously. diff --git a/mak/COPY b/mak/COPY index 5683188da9..9e0664a4c6 100644 --- a/mak/COPY +++ b/mak/COPY @@ -65,6 +65,8 @@ COPY=\ $(IMPDIR)\core\stdcpp\type_traits.d \ $(IMPDIR)\core\stdcpp\xutility.d \ \ + $(IMPDIR)\core\sync\event.d \ + \ $(IMPDIR)\core\sys\darwin\crt_externs.d \ $(IMPDIR)\core\sys\darwin\dlfcn.d \ $(IMPDIR)\core\sys\darwin\execinfo.d \ diff --git a/mak/DOCS b/mak/DOCS index 89f7546d70..96121956cd 100644 --- a/mak/DOCS +++ b/mak/DOCS @@ -55,6 +55,7 @@ DOCS=\ $(DOCDIR)\core_sync_condition.html \ $(DOCDIR)\core_sync_config.html \ $(DOCDIR)\core_sync_exception.html \ + $(DOCDIR)\core_sync_event.html \ $(DOCDIR)\core_sync_mutex.html \ $(DOCDIR)\core_sync_rwmutex.html \ $(DOCDIR)\core_sync_semaphore.html \ diff --git a/mak/SRCS b/mak/SRCS index bc93928b19..0904b9029e 100644 --- a/mak/SRCS +++ b/mak/SRCS @@ -68,6 +68,7 @@ SRCS=\ src\core\sync\condition.d \ src\core\sync\config.d \ src\core\sync\exception.d \ + src\core\sync\event.d \ src\core\sync\mutex.d \ src\core\sync\rwmutex.d \ src\core\sync\semaphore.d \ diff --git a/mak/WINDOWS b/mak/WINDOWS index 6a10a40b92..60e8173038 100644 --- a/mak/WINDOWS +++ b/mak/WINDOWS @@ -204,6 +204,9 @@ $(IMPDIR)\core\stdc\wchar_.d : src\core\stdc\wchar_.d $(IMPDIR)\core\stdc\wctype.d : src\core\stdc\wctype.d copy $** $@ +$(IMPDIR)\core\sync\event.d : src\core\sync\event.d + copy $** $@ + $(IMPDIR)\core\stdcpp\allocator.d : src\core\stdcpp\allocator.d copy $** $@ diff --git a/src/core/sync/config.d b/src/core/sync/config.d index 4540e9954c..6b27d35853 100644 --- a/src/core/sync/config.d +++ b/src/core/sync/config.d @@ -23,7 +23,7 @@ version (Posix) private import core.time; - void mktspec( ref timespec t ) nothrow + void mktspec( ref timespec t ) nothrow @nogc { static if ( false && is( typeof( clock_gettime ) ) ) { @@ -41,14 +41,14 @@ version (Posix) } - void mktspec( ref timespec t, Duration delta ) nothrow + void mktspec( ref timespec t, Duration delta ) nothrow @nogc { mktspec( t ); mvtspec( t, delta ); } - void mvtspec( ref timespec t, Duration delta ) nothrow + void mvtspec( ref timespec t, Duration delta ) nothrow @nogc { auto val = delta; val += dur!"seconds"( t.tv_sec ); diff --git a/src/core/sync/event.d b/src/core/sync/event.d new file mode 100644 index 0000000000..2fe5ea8a55 --- /dev/null +++ b/src/core/sync/event.d @@ -0,0 +1,331 @@ +/** + * The event module provides a primitive for lightweight signaling of other threads + * (emulating Windows events on Posix) + * + * Copyright: Copyright (c) 2019 D Language Foundation + * License: Distributed under the + * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). + * (See accompanying file LICENSE) + * Authors: Rainer Schuetze + * Source: $(DRUNTIMESRC core/sync/event.d) + */ +module core.sync.event; + +version (Windows) +{ + import core.sys.windows.basetsd /+: HANDLE +/; + import core.sys.windows.winerror /+: WAIT_TIMEOUT +/; + import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent, + WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/; +} +else version (Posix) +{ + import core.sys.posix.pthread; + import core.sys.posix.sys.types; +} +else +{ + static assert(false, "Platform not supported"); +} + +import core.time; +import core.internal.abort : abort; + +/** + * represents an event. Clients of an event are suspended while waiting + * for the event to be "signaled". + * + * Implemented using `pthread_mutex` and `pthread_condition` on Posix and + * `CreateEvent` and `SetEvent` on Windows. +--- +import core.sync.event, core.thread, std.file; + +struct ProcessFile +{ + ThreadGroup group; + Event event; + void[] buffer; + + void doProcess() + { + event.wait(); + // process buffer + } + + void process(string filename) + { + event.initialize(true, false); + group = new ThreadGroup; + for (int i = 0; i < 10; ++i) + group.create(&doProcess); + + buffer = std.file.read(filename); + event.set(); + group.joinAll(); + event.terminate(); + } +} +--- + */ +struct Event +{ +nothrow @nogc: + /** + * Creates an event object. + * + * Params: + * manualReset = the state of the event is not reset automatically after resuming waiting clients + * initialState = initial state of the signal + */ + this(bool manualReset, bool initialState) + { + initialize(manualReset, initialState); + } + + /** + * Initializes an event object. Does nothing if the event is already initialized. + * + * Params: + * manualReset = the state of the event is not reset automatically after resuming waiting clients + * initialState = initial state of the signal + */ + void initialize(bool manualReset, bool initialState) + { + version (Windows) + { + if (m_event) + return; + m_event = CreateEvent(null, manualReset, initialState, null); + m_event || abort("Error: CreateEvent failed."); + } + else version (Posix) + { + if (m_initalized) + return; + pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 || + abort("Error: pthread_mutex_init failed."); + pthread_cond_init(&m_cond, null) == 0 || + abort("Error: pthread_cond_init failed."); + m_state = initialState; + m_manualReset = manualReset; + m_initalized = true; + } + } + + // copying not allowed, can produce resource leaks + @disable this(this); + @disable void opAssign(Event); + + ~this() + { + terminate(); + } + + /** + * deinitialize event. Does nothing if the event is not initialized. There must not be + * threads currently waiting for the event to be signaled. + */ + void terminate() + { + version (Windows) + { + if (m_event) + CloseHandle(m_event); + m_event = null; + } + else version (Posix) + { + if (m_initalized) + { + pthread_mutex_destroy(&m_mutex) == 0 || + abort("Error: pthread_mutex_destroy failed."); + pthread_cond_destroy(&m_cond) == 0 || + abort("Error: pthread_cond_destroy failed."); + m_initalized = false; + } + } + } + + + /// Set the event to "signaled", so that waiting clients are resumed + void set() + { + version (Windows) + { + if (m_event) + SetEvent(m_event); + } + else version (Posix) + { + if (m_initalized) + { + pthread_mutex_lock(&m_mutex); + m_state = true; + pthread_cond_broadcast(&m_cond); + pthread_mutex_unlock(&m_mutex); + } + } + } + + /// Reset the event manually + void reset() + { + version (Windows) + { + if (m_event) + ResetEvent(m_event); + } + else version (Posix) + { + if (m_initalized) + { + pthread_mutex_lock(&m_mutex); + m_state = false; + pthread_mutex_unlock(&m_mutex); + } + } + } + + /** + * Wait for the event to be signaled without timeout. + * + * Returns: + * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured + */ + bool wait() + { + version (Windows) + { + return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0; + } + else version (Posix) + { + return wait(Duration.max); + } + } + + /** + * Wait for the event to be signaled with timeout. + * + * Params: + * tmout = the maximum time to wait + * Returns: + * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or + * the event is uninitialized or another error occured + */ + bool wait(Duration tmout) + { + version (Windows) + { + if (!m_event) + return false; + + auto maxWaitMillis = dur!("msecs")(uint.max - 1); + + while (tmout > maxWaitMillis) + { + auto res = WaitForSingleObject(m_event, uint.max - 1); + if (res != WAIT_TIMEOUT) + return res == WAIT_OBJECT_0; + tmout -= maxWaitMillis; + } + auto ms = cast(uint)(tmout.total!"msecs"); + return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0; + } + else version (Posix) + { + if (!m_initalized) + return false; + + pthread_mutex_lock(&m_mutex); + + int result = 0; + if (!m_state) + { + if (tmout == Duration.max) + { + result = pthread_cond_wait(&m_cond, &m_mutex); + } + else + { + import core.sync.config; + + timespec t = void; + mktspec(t, tmout); + + result = pthread_cond_timedwait(&m_cond, &m_mutex, &t); + } + } + if (result == 0 && !m_manualReset) + m_state = false; + + pthread_mutex_unlock(&m_mutex); + + return result == 0; + } + } + +private: + version (Windows) + { + HANDLE m_event; + } + else version (Posix) + { + pthread_mutex_t m_mutex; + pthread_cond_t m_cond; + bool m_initalized; + bool m_state; + bool m_manualReset; + } +} + +// Test single-thread (non-shared) use. +@nogc nothrow unittest +{ + // auto-reset, initial state false + Event ev1 = Event(false, false); + assert(!ev1.wait(1.dur!"msecs")); + ev1.set(); + assert(ev1.wait()); + assert(!ev1.wait(1.dur!"msecs")); + + // manual-reset, initial state true + Event ev2 = Event(true, true); + assert(ev2.wait()); + assert(ev2.wait()); + ev2.reset(); + assert(!ev2.wait(1.dur!"msecs")); +} + +unittest +{ + import core.thread, core.atomic; + + auto event = new Event(true, false); + int numThreads = 10; + shared int numRunning = 0; + + void testFn() + { + event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner + numRunning.atomicOp!"+="(1); + } + + auto group = new ThreadGroup; + + for (int i = 0; i < numThreads; ++i) + group.create(&testFn); + + auto start = MonoTime.currTime; + assert(numRunning == 0); + + event.set(); + group.joinAll(); + + assert(numRunning == numThreads); + + assert(MonoTime.currTime - start < 5.dur!"seconds"); + + delete event; +}