From 46bdbc24008cb23e0f53ad4df7bf1e8484e364d5 Mon Sep 17 00:00:00 2001 From: Andy Ford Date: Sat, 14 Oct 2023 15:12:15 +0100 Subject: [PATCH] fix: euroscope freezes when api is unavailable std::future (returned by std::async) blocks when destructing, which causes lags, even if the API is up. This change fixes that by moving to a thread pool model for asynchronous behaviour. Fixes #17 --- src/CMakeLists.txt | 13 ++- src/eventbus/AsynchronousEventDispatcher.h | 23 ++++-- src/eventbus/EventDispatcherFactory.h | 15 +++- src/eventbus/InternalEventBusFactory.cpp | 5 +- src/pch.h | 1 + src/plugin/InternalSdk.cpp | 1 + src/thread/ThreadPool.cpp | 84 ++++++++++++++++++++ src/thread/ThreadPool.h | 16 ++++ test/CMakeLists.txt | 26 +++--- test/eventbus/EventDispatcherFactoryTest.cpp | 9 ++- test/thread/ThreadPoolTest.cpp | 31 ++++++++ 11 files changed, 194 insertions(+), 30 deletions(-) create mode 100644 src/thread/ThreadPool.cpp create mode 100644 src/thread/ThreadPool.h create mode 100644 test/thread/ThreadPoolTest.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index eec9871..c5e1455 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -36,6 +36,10 @@ set(src_plugin plugin/InternalSdk.cpp plugin/SdkFactory.cpp ../include/ECFMP/SdkEvents.h plugin/InternalSdkEvents.h) +set(src_thread + thread/ThreadPool.cpp + thread/ThreadPool.h) + set(src_time time/Clock.cpp time/Clock.h) set(ALL_FILES @@ -49,6 +53,7 @@ set(ALL_FILES ${src_log} ${src_pch} ${src_plugin} + ${src_thread} ${src_time}) add_library(${PROJECT_NAME} STATIC ${ALL_FILES}) @@ -59,17 +64,17 @@ set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 20) target_include_directories(${PROJECT_NAME} PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../third_party/nlohmann;" "${CMAKE_CURRENT_SOURCE_DIR}/../third_party/euroscope;" - ) +) target_include_directories(${PROJECT_NAME} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR};" "${CMAKE_CURRENT_SOURCE_DIR}/../include;" - ) +) # Treat Euroscope as a system include directory to suppress warning, because they have lots target_include_directories(${PROJECT_NAME} SYSTEM PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../third_party/euroscope;" - ) +) target_compile_options(${PROJECT_NAME} PRIVATE $<$: @@ -89,4 +94,4 @@ target_compile_options(${PROJECT_NAME} PRIVATE /WX; -Wno-unused-parameter; # Lots of interfaces don't use everything -Wno-missing-field-initializers; # Windows has loads of this sadly - ) +) diff --git a/src/eventbus/AsynchronousEventDispatcher.h b/src/eventbus/AsynchronousEventDispatcher.h index 522e4bb..d0b1cf3 100644 --- a/src/eventbus/AsynchronousEventDispatcher.h +++ b/src/eventbus/AsynchronousEventDispatcher.h @@ -1,6 +1,7 @@ #pragma once #include "ECFMP/eventbus/EventListener.h" #include "EventDispatcher.h" +#include "thread/ThreadPool.h" #include #include @@ -10,9 +11,13 @@ namespace ECFMP::EventBus { class AsynchronousEventDispatcher : public EventDispatcher { public: - explicit AsynchronousEventDispatcher(std::shared_ptr> listener) : listener(listener) + explicit AsynchronousEventDispatcher( + std::shared_ptr> listener, std::shared_ptr threadPool + ) + : listener(listener), threadPool(threadPool) { assert(listener != nullptr && "Listener cannot be null"); + assert(threadPool != nullptr && "Thread pool cannot be null"); } /** @@ -22,17 +27,19 @@ namespace ECFMP::EventBus { { // Copy the listener to ensure it is not destroyed before the event is dispatched auto listenerCopy = listener; - static_cast(std::async( - std::launch::async, - [listenerCopy](const auto& event) { - listenerCopy->OnEvent(event); - }, - event - )); + auto eventCopy = event; + + // Dispatch the event asynchronously using the thread pool + threadPool->Schedule([listenerCopy, eventCopy]() { + listenerCopy->OnEvent(eventCopy); + }); }; private: // The event listener for this dispatcher std::shared_ptr> listener; + + // The thread pool to use for dispatching events + std::shared_ptr threadPool; }; }// namespace ECFMP::EventBus diff --git a/src/eventbus/EventDispatcherFactory.h b/src/eventbus/EventDispatcherFactory.h index cf01b90..4a171e4 100644 --- a/src/eventbus/EventDispatcherFactory.h +++ b/src/eventbus/EventDispatcherFactory.h @@ -5,16 +5,20 @@ #include "PendingEuroscopeEvents.h" #include "SubscriptionFlags.h" #include "SynchronousEventDispatcher.h" -#include +#include "thread/ThreadPool.h" namespace ECFMP::EventBus { class EventDispatcherFactory { public: - explicit EventDispatcherFactory(const std::shared_ptr& pendingEuroscopeEvents) - : pendingEuroscopeEvents(pendingEuroscopeEvents) + EventDispatcherFactory( + const std::shared_ptr& pendingEuroscopeEvents, + const std::shared_ptr& threadPool + ) + : pendingEuroscopeEvents(pendingEuroscopeEvents), threadPool(threadPool) { assert(pendingEuroscopeEvents != nullptr && "pendingEuroscopeEvents cannot be null"); + assert(threadPool != nullptr && "threadPool cannot be null"); } virtual ~EventDispatcherFactory() = default; @@ -26,7 +30,7 @@ namespace ECFMP::EventBus { case EventDispatchMode::Sync: return std::make_shared>(listener); case EventDispatchMode::Async: - return std::make_shared>(listener); + return std::make_shared>(listener, threadPool); case EventDispatchMode::Euroscope: return std::make_shared>(listener, pendingEuroscopeEvents); default: @@ -37,5 +41,8 @@ namespace ECFMP::EventBus { private: // A place that stores events that should be dispatched on the Euroscope thread. std::shared_ptr pendingEuroscopeEvents; + + // The thread pool to use for dispatching events + std::shared_ptr threadPool; }; }// namespace ECFMP::EventBus diff --git a/src/eventbus/InternalEventBusFactory.cpp b/src/eventbus/InternalEventBusFactory.cpp index 35b7607..4bc45ed 100644 --- a/src/eventbus/InternalEventBusFactory.cpp +++ b/src/eventbus/InternalEventBusFactory.cpp @@ -1,12 +1,15 @@ #include "eventbus/InternalEventBusFactory.h" #include "eventbus/PendingEuroscopeEvents.h" #include "plugin/InternalSdkEvents.h" +#include "thread/ThreadPool.h" namespace ECFMP::EventBus { [[nodiscard]] auto MakeEventBus() -> std::shared_ptr { + // Thread pool will get stopped when the eventbus is destroyed + auto threadPool = std::make_shared(); auto pendingEuroscopeEvents = std::make_shared(); - auto eventDispatcherFactory = std::make_shared(pendingEuroscopeEvents); + auto eventDispatcherFactory = std::make_shared(pendingEuroscopeEvents, threadPool); auto eventBus = std::make_shared(eventDispatcherFactory); eventBus->SubscribeSync(pendingEuroscopeEvents); diff --git a/src/pch.h b/src/pch.h index a8bc024..e521bd5 100644 --- a/src/pch.h +++ b/src/pch.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include #include diff --git a/src/plugin/InternalSdk.cpp b/src/plugin/InternalSdk.cpp index 222bbb8..6ba9bfc 100644 --- a/src/plugin/InternalSdk.cpp +++ b/src/plugin/InternalSdk.cpp @@ -19,6 +19,7 @@ namespace ECFMP::Plugin { void InternalSdk::Destroy() { + // Resetting the event bus naturally means we wait for threads to finish. eventBus.reset(); } diff --git a/src/thread/ThreadPool.cpp b/src/thread/ThreadPool.cpp new file mode 100644 index 0000000..99f62d6 --- /dev/null +++ b/src/thread/ThreadPool.cpp @@ -0,0 +1,84 @@ +#include "ThreadPool.h" +#include +#include +#include +#include + +namespace ECFMP::Thread { + struct ThreadPool::Impl { + std::vector threads; + + std::mutex mutex; + + std::condition_variable condition; + + std::queue> tasks; + + bool running = true; + }; + + ThreadPool::ThreadPool() : impl(std::make_unique()) + { + // Create 2 threads in the pool + for (int i = 0; i < 2; i++) { + impl->threads.emplace_back([this]() { + while (true) { + std::function task; + + // Run this block in a lock + { + std::unique_lock lock(impl->mutex); + + // Wait for a task to be available, or for the pool to be stopped + impl->condition.wait(lock, [this]() { + return !impl->running || !impl->tasks.empty(); + }); + + // If the pool is stopped, exit + if (!impl->running) { + return; + } + + // If the pool doesn't have a task, continue + if (impl->tasks.empty()) { + continue; + } + + // Grab the next task + task = std::move(impl->tasks.front()); + impl->tasks.pop(); + } + + // Run the task + task(); + } + }); + } + } + + ThreadPool::~ThreadPool() + { + // Join all threads + { + std::unique_lock lock(impl->mutex); + impl->running = false; + } + + impl->condition.notify_all(); + for (auto& thread: impl->threads) { + thread.join(); + } + } + + void ThreadPool::Schedule(const std::function& function) + { + // Run this block in a lock, add the task to the queue + { + std::unique_lock lock(impl->mutex); + impl->tasks.emplace(function); + } + + // Notify a thread that a task is available + impl->condition.notify_one(); + } +}// namespace ECFMP::Thread diff --git a/src/thread/ThreadPool.h b/src/thread/ThreadPool.h new file mode 100644 index 0000000..8fef869 --- /dev/null +++ b/src/thread/ThreadPool.h @@ -0,0 +1,16 @@ +#pragma once + +namespace ECFMP::Thread { + class ThreadPool + { + public: + ThreadPool(); + ~ThreadPool(); + + void Schedule(const std::function& function); + + private: + struct Impl; + std::unique_ptr impl; + }; +}// namespace ECFMP::Thread diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6e81b6f..2eb67d7 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -32,7 +32,7 @@ set(test__eventbus set(test__flightinformationregion flightinformationregion/ConcreteFlightInformationRegionTest.cpp - ) +) set(test__flowmeasure flowmeasure/ConcreteAirportFilterTest.cpp @@ -46,7 +46,7 @@ set(test__flowmeasure set(test__log log/LogDecoratorTest.cpp - ) +) set(test__mock mock/MockEuroscopeAircraft.h @@ -57,12 +57,15 @@ set(test__mock set(test__pch pch/pch.cpp pch/pch.h - ) +) set(test__plugin plugin/SdkFactoryTest.cpp plugin/InternalSdkTest.cpp) +set(test__thread + thread/ThreadPoolTest.cpp) + set(test__other main.cpp pch/pch.cpp pch/pch.h @@ -80,8 +83,9 @@ add_executable(${PROJECT_NAME} ${test__log} ${test__mock} ${test__plugin} + ${test__thread} ${test__other} - ) +) add_test(NAME ${PROJECT_NAME} COMMAND ${PROJECT_NAME}) target_precompile_headers(${PROJECT_NAME} PRIVATE "pch/pch.h") @@ -90,7 +94,7 @@ add_dependencies(${PROJECT_NAME} ecfmp_sdk gtest gmock - ) +) #### INCLUDES target_include_directories(${PROJECT_NAME} PUBLIC @@ -99,12 +103,12 @@ target_include_directories(${PROJECT_NAME} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include;" "${CMAKE_CURRENT_SOURCE_DIR}/../third_party/googletest/googlemock/include;" "${CMAKE_CURRENT_SOURCE_DIR}/../third_party/nlohmann;" - ) +) # Treat Euroscope as a system include directory to suppress warning, because they have lots target_include_directories(${PROJECT_NAME} SYSTEM PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../third_party/euroscope;" - ) +) #### LINKS target_link_directories( @@ -117,7 +121,7 @@ target_link_libraries(${PROJECT_NAME} PRIVATE gmock ecfmp_sdk "EuroScopePlugInDll;" - ) +) set_target_properties(${PROJECT_NAME} PROPERTIES COMPILE_FLAGS " -m32" LINK_FLAGS "-m32" JSON_MultipleHeaders "ON ") @@ -142,7 +146,7 @@ target_compile_options(${PROJECT_NAME} PRIVATE -Wno-unused-parameter; # Lots of interfaces don't use everything -Wno-missing-field-initializers; # Windows has loads of this sadly /EHa; - ) +) #### LINK OPTIONS target_link_options(${PROJECT_NAME} PRIVATE @@ -159,10 +163,10 @@ target_link_options(${PROJECT_NAME} PRIVATE > /NODEFAULTLIB:LIBCMT; /SUBSYSTEM:CONSOLE; - ) +) # Post-build copy the EuroScope binary add_custom_command(TARGET ${PROJECT_NAME} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy "${CMAKE_CURRENT_SOURCE_DIR}/../lib/EuroScopePlugInDll.dll" "${PROJECT_BINARY_DIR}/EuroScopePlugInDll.dll" COMMENT "Copied EuroScope shared library to ${PROJECT_BINARY_DIR}/EuroScopePlugInDll.dll" - ) +) diff --git a/test/eventbus/EventDispatcherFactoryTest.cpp b/test/eventbus/EventDispatcherFactoryTest.cpp index e8646cc..3439b8d 100644 --- a/test/eventbus/EventDispatcherFactoryTest.cpp +++ b/test/eventbus/EventDispatcherFactoryTest.cpp @@ -1,6 +1,7 @@ #include "eventbus/EventDispatcherFactory.h" #include "eventbus/PendingEuroscopeEvents.h" #include "eventbus/SubscriptionFlags.h" +#include "thread/ThreadPool.h" namespace ECFMPTest::EventBus { @@ -24,8 +25,11 @@ namespace ECFMPTest::EventBus { { public: EventDispatcherFactoryTest() - : pendingEvents(std::make_shared()), - eventDispatcherFactory(std::make_shared(pendingEvents)) + : threadPool(std::make_shared()), + pendingEvents(std::make_shared()), + eventDispatcherFactory( + std::make_shared(pendingEvents, threadPool) + ) {} [[nodiscard]] static auto CreateListener() -> std::shared_ptr @@ -41,6 +45,7 @@ namespace ECFMPTest::EventBus { } } + std::shared_ptr threadPool; std::shared_ptr pendingEvents; std::shared_ptr eventDispatcherFactory; }; diff --git a/test/thread/ThreadPoolTest.cpp b/test/thread/ThreadPoolTest.cpp new file mode 100644 index 0000000..ee13d2f --- /dev/null +++ b/test/thread/ThreadPoolTest.cpp @@ -0,0 +1,31 @@ +#include "thread/ThreadPool.h" + +namespace ECFMPTest::Thread { + class ThreadPoolTest : public ::testing::Test + { + public: + ECFMP::Thread::ThreadPool threadPool; + }; + + TEST_F(ThreadPoolTest, ItRunsTask) + { + std::atomic counter = 0; + threadPool.Schedule([&counter]() { + counter++; + }); + threadPool.Schedule([&counter]() { + counter++; + }); + threadPool.Schedule([&counter]() { + counter++; + }); + threadPool.Schedule([&counter]() { + counter++; + }); + + // Sleep for a bit to allow the tasks to run + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + EXPECT_EQ(4, counter); + } +}// namespace ECFMPTest::Thread