Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ set(src_plugin
plugin/InternalSdk.cpp
plugin/SdkFactory.cpp ../include/ECFMP/SdkEvents.h plugin/InternalSdkEvents.h)

set(src_thread
thread/ThreadPool.cpp
thread/ThreadPool.h)

set(src_time time/Clock.cpp time/Clock.h)

set(ALL_FILES
Expand All @@ -49,6 +53,7 @@ set(ALL_FILES
${src_log}
${src_pch}
${src_plugin}
${src_thread}
${src_time})

add_library(${PROJECT_NAME} STATIC ${ALL_FILES})
Expand All @@ -59,17 +64,17 @@ set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 20)
target_include_directories(${PROJECT_NAME} PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/nlohmann;"
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/euroscope;"
)
)

target_include_directories(${PROJECT_NAME} PUBLIC
"${CMAKE_CURRENT_SOURCE_DIR};"
"${CMAKE_CURRENT_SOURCE_DIR}/../include;"
)
)

# Treat Euroscope as a system include directory to suppress warning, because they have lots
target_include_directories(${PROJECT_NAME} SYSTEM PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/euroscope;"
)
)

target_compile_options(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Debug>:
Expand All @@ -89,4 +94,4 @@ target_compile_options(${PROJECT_NAME} PRIVATE
/WX;
-Wno-unused-parameter; # Lots of interfaces don't use everything
-Wno-missing-field-initializers; # Windows has loads of this sadly
)
)
23 changes: 15 additions & 8 deletions src/eventbus/AsynchronousEventDispatcher.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "ECFMP/eventbus/EventListener.h"
#include "EventDispatcher.h"
#include "thread/ThreadPool.h"
#include <cassert>
#include <future>

Expand All @@ -10,9 +11,13 @@ namespace ECFMP::EventBus {
class AsynchronousEventDispatcher : public EventDispatcher<EventType>
{
public:
explicit AsynchronousEventDispatcher(std::shared_ptr<EventListener<EventType>> listener) : listener(listener)
explicit AsynchronousEventDispatcher(
std::shared_ptr<EventListener<EventType>> listener, std::shared_ptr<Thread::ThreadPool> threadPool
)
: listener(listener), threadPool(threadPool)
{
assert(listener != nullptr && "Listener cannot be null");
assert(threadPool != nullptr && "Thread pool cannot be null");
}

/**
Expand All @@ -22,17 +27,19 @@ namespace ECFMP::EventBus {
{
// Copy the listener to ensure it is not destroyed before the event is dispatched
auto listenerCopy = listener;
static_cast<void>(std::async(
std::launch::async,
[listenerCopy](const auto& event) {
listenerCopy->OnEvent(event);
},
event
));
auto eventCopy = event;

// Dispatch the event asynchronously using the thread pool
threadPool->Schedule([listenerCopy, eventCopy]() {
listenerCopy->OnEvent(eventCopy);
});
};

private:
// The event listener for this dispatcher
std::shared_ptr<EventListener<EventType>> listener;

// The thread pool to use for dispatching events
std::shared_ptr<Thread::ThreadPool> threadPool;
};
}// namespace ECFMP::EventBus
15 changes: 11 additions & 4 deletions src/eventbus/EventDispatcherFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
#include "PendingEuroscopeEvents.h"
#include "SubscriptionFlags.h"
#include "SynchronousEventDispatcher.h"
#include <assert.h>
#include "thread/ThreadPool.h"

namespace ECFMP::EventBus {
class EventDispatcherFactory
{
public:
explicit EventDispatcherFactory(const std::shared_ptr<PendingEuroscopeEvents>& pendingEuroscopeEvents)
: pendingEuroscopeEvents(pendingEuroscopeEvents)
EventDispatcherFactory(
const std::shared_ptr<PendingEuroscopeEvents>& pendingEuroscopeEvents,
const std::shared_ptr<Thread::ThreadPool>& threadPool
)
: pendingEuroscopeEvents(pendingEuroscopeEvents), threadPool(threadPool)
{
assert(pendingEuroscopeEvents != nullptr && "pendingEuroscopeEvents cannot be null");
assert(threadPool != nullptr && "threadPool cannot be null");
}
virtual ~EventDispatcherFactory() = default;

Expand All @@ -26,7 +30,7 @@ namespace ECFMP::EventBus {
case EventDispatchMode::Sync:
return std::make_shared<SynchronousEventDispatcher<EventType>>(listener);
case EventDispatchMode::Async:
return std::make_shared<AsynchronousEventDispatcher<EventType>>(listener);
return std::make_shared<AsynchronousEventDispatcher<EventType>>(listener, threadPool);
case EventDispatchMode::Euroscope:
return std::make_shared<EuroscopeEventDispatcher<EventType>>(listener, pendingEuroscopeEvents);
default:
Expand All @@ -37,5 +41,8 @@ namespace ECFMP::EventBus {
private:
// A place that stores events that should be dispatched on the Euroscope thread.
std::shared_ptr<PendingEuroscopeEvents> pendingEuroscopeEvents;

// The thread pool to use for dispatching events
std::shared_ptr<Thread::ThreadPool> threadPool;
};
}// namespace ECFMP::EventBus
5 changes: 4 additions & 1 deletion src/eventbus/InternalEventBusFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#include "eventbus/InternalEventBusFactory.h"
#include "eventbus/PendingEuroscopeEvents.h"
#include "plugin/InternalSdkEvents.h"
#include "thread/ThreadPool.h"

namespace ECFMP::EventBus {
[[nodiscard]] auto MakeEventBus() -> std::shared_ptr<InternalEventBus>
{
// Thread pool will get stopped when the eventbus is destroyed
auto threadPool = std::make_shared<Thread::ThreadPool>();
auto pendingEuroscopeEvents = std::make_shared<PendingEuroscopeEvents>();
auto eventDispatcherFactory = std::make_shared<EventDispatcherFactory>(pendingEuroscopeEvents);
auto eventDispatcherFactory = std::make_shared<EventDispatcherFactory>(pendingEuroscopeEvents, threadPool);

auto eventBus = std::make_shared<InternalEventBus>(eventDispatcherFactory);
eventBus->SubscribeSync<Plugin::EuroscopeTimerTickEvent>(pendingEuroscopeEvents);
Expand Down
1 change: 1 addition & 0 deletions src/pch.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <Windows.h>
#include <algorithm>
#include <cassert>
#include <chrono>
#include <functional>
Expand Down
1 change: 1 addition & 0 deletions src/plugin/InternalSdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace ECFMP::Plugin {

void InternalSdk::Destroy()
{
// Resetting the event bus naturally means we wait for threads to finish.
eventBus.reset();
}

Expand Down
84 changes: 84 additions & 0 deletions src/thread/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include "ThreadPool.h"
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>

namespace ECFMP::Thread {
struct ThreadPool::Impl {
std::vector<std::thread> threads;

std::mutex mutex;

std::condition_variable condition;

std::queue<std::function<void()>> tasks;

bool running = true;
};

ThreadPool::ThreadPool() : impl(std::make_unique<ThreadPool::Impl>())
{
// Create 2 threads in the pool
for (int i = 0; i < 2; i++) {
impl->threads.emplace_back([this]() {
while (true) {
std::function<void()> task;

// Run this block in a lock
{
std::unique_lock<std::mutex> lock(impl->mutex);

// Wait for a task to be available, or for the pool to be stopped
impl->condition.wait(lock, [this]() {
return !impl->running || !impl->tasks.empty();
});

// If the pool is stopped, exit
if (!impl->running) {
return;
}

// If the pool doesn't have a task, continue
if (impl->tasks.empty()) {
continue;
}

// Grab the next task
task = std::move(impl->tasks.front());
impl->tasks.pop();
}

// Run the task
task();
}
});
}
}

ThreadPool::~ThreadPool()
{
// Join all threads
{
std::unique_lock<std::mutex> lock(impl->mutex);
impl->running = false;
}

impl->condition.notify_all();
for (auto& thread: impl->threads) {
thread.join();
}
}

void ThreadPool::Schedule(const std::function<void()>& function)
{
// Run this block in a lock, add the task to the queue
{
std::unique_lock<std::mutex> lock(impl->mutex);
impl->tasks.emplace(function);
}

// Notify a thread that a task is available
impl->condition.notify_one();
}
}// namespace ECFMP::Thread
16 changes: 16 additions & 0 deletions src/thread/ThreadPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

namespace ECFMP::Thread {
class ThreadPool
{
public:
ThreadPool();
~ThreadPool();

void Schedule(const std::function<void()>& function);

private:
struct Impl;
std::unique_ptr<Impl> impl;
};
}// namespace ECFMP::Thread
26 changes: 15 additions & 11 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ set(test__eventbus

set(test__flightinformationregion
flightinformationregion/ConcreteFlightInformationRegionTest.cpp
)
)

set(test__flowmeasure
flowmeasure/ConcreteAirportFilterTest.cpp
Expand All @@ -46,7 +46,7 @@ set(test__flowmeasure

set(test__log
log/LogDecoratorTest.cpp
)
)

set(test__mock
mock/MockEuroscopeAircraft.h
Expand All @@ -57,12 +57,15 @@ set(test__mock
set(test__pch
pch/pch.cpp
pch/pch.h
)
)

set(test__plugin
plugin/SdkFactoryTest.cpp
plugin/InternalSdkTest.cpp)

set(test__thread
thread/ThreadPoolTest.cpp)

set(test__other
main.cpp
pch/pch.cpp pch/pch.h
Expand All @@ -80,8 +83,9 @@ add_executable(${PROJECT_NAME}
${test__log}
${test__mock}
${test__plugin}
${test__thread}
${test__other}
)
)
add_test(NAME ${PROJECT_NAME} COMMAND ${PROJECT_NAME})
target_precompile_headers(${PROJECT_NAME} PRIVATE "pch/pch.h")

Expand All @@ -90,7 +94,7 @@ add_dependencies(${PROJECT_NAME}
ecfmp_sdk
gtest
gmock
)
)

#### INCLUDES
target_include_directories(${PROJECT_NAME} PUBLIC
Expand All @@ -99,12 +103,12 @@ target_include_directories(${PROJECT_NAME} PUBLIC
"${CMAKE_CURRENT_SOURCE_DIR}/../include;"
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/googletest/googlemock/include;"
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/nlohmann;"
)
)

# Treat Euroscope as a system include directory to suppress warning, because they have lots
target_include_directories(${PROJECT_NAME} SYSTEM PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/euroscope;"
)
)

#### LINKS
target_link_directories(
Expand All @@ -117,7 +121,7 @@ target_link_libraries(${PROJECT_NAME} PRIVATE
gmock
ecfmp_sdk
"EuroScopePlugInDll;"
)
)

set_target_properties(${PROJECT_NAME} PROPERTIES COMPILE_FLAGS " -m32" LINK_FLAGS "-m32" JSON_MultipleHeaders "ON ")

Expand All @@ -142,7 +146,7 @@ target_compile_options(${PROJECT_NAME} PRIVATE
-Wno-unused-parameter; # Lots of interfaces don't use everything
-Wno-missing-field-initializers; # Windows has loads of this sadly
/EHa;
)
)

#### LINK OPTIONS
target_link_options(${PROJECT_NAME} PRIVATE
Expand All @@ -159,10 +163,10 @@ target_link_options(${PROJECT_NAME} PRIVATE
>
/NODEFAULTLIB:LIBCMT;
/SUBSYSTEM:CONSOLE;
)
)

# Post-build copy the EuroScope binary
add_custom_command(TARGET ${PROJECT_NAME} POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy "${CMAKE_CURRENT_SOURCE_DIR}/../lib/EuroScopePlugInDll.dll" "${PROJECT_BINARY_DIR}/EuroScopePlugInDll.dll"
COMMENT "Copied EuroScope shared library to ${PROJECT_BINARY_DIR}/EuroScopePlugInDll.dll"
)
)
Loading