Skip to content
This repository was archived by the owner on Oct 12, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/core_sync_event.dd
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
New module core.sync.event

This module provides a cross-platform interface for lightweight signaling of other threads.
It can be used to start execution of multiple waiting threads simultaneously.
2 changes: 2 additions & 0 deletions mak/COPY
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ COPY=\
$(IMPDIR)\core\stdcpp\type_traits.d \
$(IMPDIR)\core\stdcpp\xutility.d \
\
$(IMPDIR)\core\sync\event.d \
\
$(IMPDIR)\core\sys\darwin\crt_externs.d \
$(IMPDIR)\core\sys\darwin\dlfcn.d \
$(IMPDIR)\core\sys\darwin\execinfo.d \
Expand Down
1 change: 1 addition & 0 deletions mak/DOCS
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ DOCS=\
$(DOCDIR)\core_sync_condition.html \
$(DOCDIR)\core_sync_config.html \
$(DOCDIR)\core_sync_exception.html \
$(DOCDIR)\core_sync_event.html \
$(DOCDIR)\core_sync_mutex.html \
$(DOCDIR)\core_sync_rwmutex.html \
$(DOCDIR)\core_sync_semaphore.html \
Expand Down
1 change: 1 addition & 0 deletions mak/SRCS
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ SRCS=\
src\core\sync\condition.d \
src\core\sync\config.d \
src\core\sync\exception.d \
src\core\sync\event.d \
src\core\sync\mutex.d \
src\core\sync\rwmutex.d \
src\core\sync\semaphore.d \
Expand Down
3 changes: 3 additions & 0 deletions mak/WINDOWS
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ $(IMPDIR)\core\stdc\wchar_.d : src\core\stdc\wchar_.d
$(IMPDIR)\core\stdc\wctype.d : src\core\stdc\wctype.d
copy $** $@

$(IMPDIR)\core\sync\event.d : src\core\sync\event.d
copy $** $@

$(IMPDIR)\core\stdcpp\allocator.d : src\core\stdcpp\allocator.d
copy $** $@

Expand Down
6 changes: 3 additions & 3 deletions src/core/sync/config.d
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ version (Posix)
private import core.time;


void mktspec( ref timespec t ) nothrow
void mktspec( ref timespec t ) nothrow @nogc
{
static if ( false && is( typeof( clock_gettime ) ) )
{
Expand All @@ -41,14 +41,14 @@ version (Posix)
}


void mktspec( ref timespec t, Duration delta ) nothrow
void mktspec( ref timespec t, Duration delta ) nothrow @nogc
{
mktspec( t );
mvtspec( t, delta );
}


void mvtspec( ref timespec t, Duration delta ) nothrow
void mvtspec( ref timespec t, Duration delta ) nothrow @nogc
{
auto val = delta;
val += dur!"seconds"( t.tv_sec );
Expand Down
331 changes: 331 additions & 0 deletions src/core/sync/event.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
/**
* The event module provides a primitive for lightweight signaling of other threads
* (emulating Windows events on Posix)
*
* Copyright: Copyright (c) 2019 D Language Foundation
* License: Distributed under the
* $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
* (See accompanying file LICENSE)
* Authors: Rainer Schuetze
* Source: $(DRUNTIMESRC core/sync/event.d)
*/
module core.sync.event;

version (Windows)
{
import core.sys.windows.basetsd /+: HANDLE +/;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why these comments?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This follows the example of https://github.com/dlang/druntime/pull/2400/files (selective imports create a public alias).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hasn't this been fixed a year ago or so?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh well, looks like it's really still in deprecation, but there's a PR to remove them for good:

dlang/dmd#9393

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will go in very soon.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rainers this has been merged now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not good enough:

test.d(3): Deprecation: core.sync.event.HANDLE is not visible from module test

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😞

import core.sys.windows.winerror /+: WAIT_TIMEOUT +/;
import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent,
WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/;
}
else version (Posix)
{
import core.sys.posix.pthread;
import core.sys.posix.sys.types;
}
else
{
static assert(false, "Platform not supported");
}

import core.time;
import core.internal.abort : abort;

/**
* represents an event. Clients of an event are suspended while waiting
* for the event to be "signaled".
*
* Implemented using `pthread_mutex` and `pthread_condition` on Posix and
* `CreateEvent` and `SetEvent` on Windows.
---
import core.sync.event, core.thread, std.file;

struct ProcessFile
{
ThreadGroup group;
Event event;
void[] buffer;

void doProcess()
{
event.wait();
// process buffer
}

void process(string filename)
{
event.initialize(true, false);
group = new ThreadGroup;
for (int i = 0; i < 10; ++i)
group.create(&doProcess);

buffer = std.file.read(filename);
event.set();
group.joinAll();
event.terminate();
}
}
---
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't it a documented unittest instead ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its already a bit too long for a really good example, and making it actually runnable would make it even longer.

*/
struct Event
{
nothrow @nogc:
/**
* Creates an event object.
*
* Params:
* manualReset = the state of the event is not reset automatically after resuming waiting clients
* initialState = initial state of the signal
*/
this(bool manualReset, bool initialState)
{
initialize(manualReset, initialState);
}

/**
* Initializes an event object. Does nothing if the event is already initialized.
*
* Params:
* manualReset = the state of the event is not reset automatically after resuming waiting clients
* initialState = initial state of the signal
*/
void initialize(bool manualReset, bool initialState)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a separate method?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot pass arguments to the constructor if it is part of another struct, e.g. https://github.com/dlang/druntime/pull/2514/files#diff-6615611ba51118215a271e3500c61122R2605. You have to use an explicit initialize-call then. __ctor could be an alternative, but I think it should not be part of the API.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a downside to do evStart = Event(...) in Gcx.initialize?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've explicitly added @disable this(this) as this could duplicate the handle reference.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not seem to prevent assignment: https://run.dlang.io/is/iefkBG.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, seems like opAssign should be disabled, too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's required then I'm fine with using initialize instead of a constructor.

{
version (Windows)
{
if (m_event)
return;
m_event = CreateEvent(null, manualReset, initialState, null);
m_event || abort("Error: CreateEvent failed.");
}
else version (Posix)
{
if (m_initalized)
return;
pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
abort("Error: pthread_mutex_init failed.");
pthread_cond_init(&m_cond, null) == 0 ||
abort("Error: pthread_cond_init failed.");
m_state = initialState;
m_manualReset = manualReset;
m_initalized = true;
}
}

// copying not allowed, can produce resource leaks
@disable this(this);
@disable void opAssign(Event);

~this()
{
terminate();
}

/**
* deinitialize event. Does nothing if the event is not initialized. There must not be
* threads currently waiting for the event to be signaled.
*/
void terminate()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No documentation.

{
version (Windows)
{
if (m_event)
CloseHandle(m_event);
m_event = null;
}
else version (Posix)
{
if (m_initalized)
{
pthread_mutex_destroy(&m_mutex) == 0 ||
abort("Error: pthread_mutex_destroy failed.");
pthread_cond_destroy(&m_cond) == 0 ||
abort("Error: pthread_cond_destroy failed.");
m_initalized = false;
}
}
}


/// Set the event to "signaled", so that waiting clients are resumed
void set()
{
version (Windows)
{
if (m_event)
SetEvent(m_event);
}
else version (Posix)
{
if (m_initalized)
{
pthread_mutex_lock(&m_mutex);
m_state = true;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
}
}
}

/// Reset the event manually
void reset()
{
version (Windows)
{
if (m_event)
ResetEvent(m_event);
}
else version (Posix)
{
if (m_initalized)
{
pthread_mutex_lock(&m_mutex);
m_state = false;
pthread_mutex_unlock(&m_mutex);
}
}
}

/**
* Wait for the event to be signaled without timeout.
*
* Returns:
* `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
*/
bool wait()
{
version (Windows)
{
return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0;
}
else version (Posix)
{
return wait(Duration.max);
}
}

/**
* Wait for the event to be signaled with timeout.
*
* Params:
* tmout = the maximum time to wait
* Returns:
* `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
* the event is uninitialized or another error occured
*/
bool wait(Duration tmout)
{
version (Windows)
{
if (!m_event)
return false;

auto maxWaitMillis = dur!("msecs")(uint.max - 1);

while (tmout > maxWaitMillis)
{
auto res = WaitForSingleObject(m_event, uint.max - 1);
if (res != WAIT_TIMEOUT)
return res == WAIT_OBJECT_0;
tmout -= maxWaitMillis;
}
auto ms = cast(uint)(tmout.total!"msecs");
return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0;
}
else version (Posix)
{
if (!m_initalized)
return false;

pthread_mutex_lock(&m_mutex);

int result = 0;
if (!m_state)
{
if (tmout == Duration.max)
{
result = pthread_cond_wait(&m_cond, &m_mutex);
}
else
{
import core.sync.config;

timespec t = void;
mktspec(t, tmout);

result = pthread_cond_timedwait(&m_cond, &m_mutex, &t);
}
}
if (result == 0 && !m_manualReset)
m_state = false;

pthread_mutex_unlock(&m_mutex);

return result == 0;
}
}

private:
version (Windows)
{
HANDLE m_event;
}
else version (Posix)
{
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
bool m_initalized;
bool m_state;
bool m_manualReset;
}
}

// Test single-thread (non-shared) use.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make this example public, s.t. there's at least one user-facing example.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this example is a pretty bad one as it doesn't involve multiple threads. Maybe better to move the example from the changelog into the documentation?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I just saw that the module is currently without any public example.

@nogc nothrow unittest
{
// auto-reset, initial state false
Event ev1 = Event(false, false);
assert(!ev1.wait(1.dur!"msecs"));
ev1.set();
assert(ev1.wait());
assert(!ev1.wait(1.dur!"msecs"));

// manual-reset, initial state true
Event ev2 = Event(true, true);
assert(ev2.wait());
assert(ev2.wait());
ev2.reset();
assert(!ev2.wait(1.dur!"msecs"));
}

unittest
{
import core.thread, core.atomic;

auto event = new Event(true, false);
int numThreads = 10;
shared int numRunning = 0;

void testFn()
{
event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
numRunning.atomicOp!"+="(1);
}

auto group = new ThreadGroup;

for (int i = 0; i < numThreads; ++i)
group.create(&testFn);

auto start = MonoTime.currTime;
assert(numRunning == 0);

event.set();
group.joinAll();

assert(numRunning == numThreads);

assert(MonoTime.currTime - start < 5.dur!"seconds");

delete event;
}