diff --git a/src/common/include/display_device/retry_scheduler.h b/src/common/include/display_device/retry_scheduler.h index a15bdd5..082837a 100644 --- a/src/common/include/display_device/retry_scheduler.h +++ b/src/common/include/display_device/retry_scheduler.h @@ -5,6 +5,7 @@ #pragma once // system includes +#include #include #include #include @@ -15,11 +16,11 @@ namespace display_device { /** - * @brief A convenience class for stoping the RetryScheduler. + * @brief A convenience class for stopping the RetryScheduler. * - * It is conceptualy similar to `std::stop_token` except that it also used - * RAII to perform a cleanup. This allows to allows to return void types - * in the RetryScheduler without a hastle. + * It is conceptually similar to `std::stop_token` except that it also uses + * RAII to perform a cleanup. This allows to return void types + * in the RetryScheduler without a hassle. */ class SchedulerStopToken final { public: @@ -87,6 +88,23 @@ namespace display_device { concept ExecuteCallbackLike = ExecuteWithoutStopToken || ExecuteWithStopToken; } // namespace detail + /** + * @brief Scheduler options to be used when scheduling executor function. + */ + struct SchedulerOptions { + /** + * @brief Defines the executor's execution logic when it is scheduled. + */ + enum class Execution { + Immediate, ///< Executor is executed in the calling thread immediately and scheduled afterward. + ImmediateWithSleep, ///< The first sleep duration is TAKEN from `m_sleep_durations` and the calling thread is put to sleep. Once awoken, follows by same logic as `Immediate`. + ScheduledOnly ///< Executor is executed in the thread only. + }; + + std::vector m_sleep_durations; ///< Specifies for long the scheduled thread sleeps before invoking executor. Last duration is reused indefinitely. + Execution m_execution { Execution::Immediate }; ///< Executor's execution logic. + }; + /** * @brief A wrapper class around an interface that provides a thread-safe access to the * interface and allows to schedule arbitrary logic for it to retry until it succeeds. @@ -106,9 +124,9 @@ namespace display_device { std::unique_lock lock { m_mutex }; while (m_keep_alive) { m_syncing_thread = false; - if (m_sleep_duration > std::chrono::milliseconds::zero()) { + if (auto duration { takeNextDuration(m_sleep_durations) }; duration > std::chrono::milliseconds::zero()) { // We're going to sleep until manually woken up or the time elapses. - m_sleep_cv.wait_for(lock, m_sleep_duration, [this]() { return m_syncing_thread; }); + m_sleep_cv.wait_for(lock, duration, [this]() { return m_syncing_thread; }); } else { // We're going to sleep until manually woken up. @@ -153,9 +171,7 @@ namespace display_device { * @param exec_fn Provides thread-safe access to the interface for executing arbitrary logic. * It accepts a `stop_token` as a second parameter which can be used to stop * the scheduler. - * @param interval Specify the interval for the scheduler. - * @note Before the executor function is scheduled, it is first executed in the calling thread - * immediately and the callback is invoked before returning! + * @param options Options for the scheduler. * @note Previously scheduled executor is replaced by a new one! * @examples * std::unique_ptr iface = getIface(...); @@ -165,17 +181,21 @@ namespace display_device { * if (iface.revertSettings()) { * stop_token.requestStop(); * } - * }, 5ms); + * }, { .m_sleep_durations = { 50ms, 10ms }); * @examples_end */ void - schedule(std::function exec_fn, std::chrono::milliseconds interval) { + schedule(std::function exec_fn, const SchedulerOptions &options) { if (!exec_fn) { throw std::logic_error { "Empty callback function provided in RetryScheduler::schedule!" }; } - if (interval == std::chrono::milliseconds::zero()) { - throw std::logic_error { "Interval cannot be zero in RetryScheduler::schedule!" }; + if (options.m_sleep_durations.empty()) { + throw std::logic_error { "At least 1 sleep duration must be specified in RetryScheduler::schedule!" }; + } + + if (std::ranges::any_of(options.m_sleep_durations, [&](const auto &duration) { return duration == std::chrono::milliseconds::zero(); })) { + throw std::logic_error { "All of the durations specified in RetryScheduler::schedule must be larger than a 0!" }; } std::lock_guard lock { m_mutex }; @@ -184,10 +204,18 @@ namespace display_device { // We are catching the exception here instead of propagating to have // similar try...catch login as in the scheduler thread. try { - exec_fn(*m_iface, stop_token); + auto sleep_durations = options.m_sleep_durations; + if (options.m_execution != SchedulerOptions::Execution::ScheduledOnly) { + if (options.m_execution == SchedulerOptions::Execution::ImmediateWithSleep) { + std::this_thread::sleep_for(takeNextDuration(sleep_durations)); + } + + exec_fn(*m_iface, stop_token); + } + if (!stop_token.stopRequested()) { m_retry_function = std::move(exec_fn); - m_sleep_duration = interval; + m_sleep_durations = std::move(sleep_durations); syncThreadUnlocked(); } } @@ -199,7 +227,7 @@ namespace display_device { } /** - * @brief Execute a arbitrary logic using the provided interface in a thread-safe manner. + * @brief Execute arbitrary logic using the provided interface in a thread-safe manner. * @param exec_fn Provides thread-safe access to the interface for executing arbitrary logic. * Acceptable function signatures are: * - AnyReturnType(T &); @@ -271,12 +299,24 @@ namespace display_device { } private: + static std::chrono::milliseconds + takeNextDuration(std::vector &durations) { + if (durations.size() > 1) { + const auto front_it { std::begin(durations) }; + const auto front_value { *front_it }; + durations.erase(front_it); + return front_value; + } + + return durations.empty() ? std::chrono::milliseconds::zero() : durations.back(); + } + /** * @brief Clear the necessary data so that the thread will go into a deep sleep. */ void clearThreadLoopUnlocked() { - m_sleep_duration = std::chrono::milliseconds::zero(); + m_sleep_durations = {}; m_retry_function = nullptr; } @@ -301,12 +341,12 @@ namespace display_device { } std::unique_ptr m_iface; /**< Interface to be passed around to the executor functions. */ - std::chrono::milliseconds m_sleep_duration { 0 }; /**< A retry time for the timer. */ + std::vector m_sleep_durations; /**< Sleep times for the timer. */ std::function m_retry_function { nullptr }; /**< Function to be executed until it succeeds. */ - std::mutex m_mutex {}; /**< A mutext for synchronizing thread and "external" access. */ + std::mutex m_mutex {}; /**< A mutex for synchronizing thread and "external" access. */ std::condition_variable m_sleep_cv {}; /**< Condition variable for waking up thread. */ - bool m_syncing_thread { false }; /**< Safeguard for the condition variable to prevent sporadic thread wake ups. */ + bool m_syncing_thread { false }; /**< Safeguard for the condition variable to prevent sporadic thread wake-ups. */ bool m_keep_alive { true }; /**< When set to false, scheduler thread will exit. */ // Always the last in the list so that all the members are already initialized! diff --git a/tests/unit/general/test_retry_scheduler.cpp b/tests/unit/general/test_retry_scheduler.cpp index 296c855..2908011 100644 --- a/tests/unit/general/test_retry_scheduler.cpp +++ b/tests/unit/general/test_retry_scheduler.cpp @@ -32,35 +32,40 @@ TEST_F_S(NullptrInterfaceProvided) { } TEST_F_S(Schedule, NullptrCallbackProvided) { - EXPECT_THAT([&]() { m_impl.schedule(nullptr, 0ms); }, + EXPECT_THAT([&]() { m_impl.schedule(nullptr, { .m_sleep_durations = { 0ms } }); }, ThrowsMessage(HasSubstr("Empty callback function provided in RetryScheduler::schedule!"))); } -TEST_F_S(Schedule, ZeroInterval) { - EXPECT_THAT([&]() { m_impl.schedule([](auto, auto) {}, 0ms); }, - ThrowsMessage(HasSubstr("Interval cannot be zero in RetryScheduler::schedule!"))); +TEST_F_S(Schedule, NoDurations) { + EXPECT_THAT([&]() { m_impl.schedule([](auto, auto) {}, { .m_sleep_durations = {} }); }, + ThrowsMessage(HasSubstr("At least 1 sleep duration must be specified in RetryScheduler::schedule!"))); } -TEST_F_S(Schedule, SchedulingAtIntervals) { +TEST_F_S(Schedule, ZeroDuration) { + EXPECT_THAT([&]() { m_impl.schedule([](auto, auto) {}, { .m_sleep_durations = { 0ms } }); }, + ThrowsMessage(HasSubstr("All of the durations specified in RetryScheduler::schedule must be larger than a 0!"))); +} + +TEST_F_S(Schedule, SchedulingDurations) { // Note: in this test we care that the delay is not less than the requested one, but we // do not really have an upper ceiling... - const auto schedule_and_get_average_delays { [&](const std::chrono::milliseconds delay) { + const auto schedule_and_get_average_delays { [&](const std::vector &durations) { m_impl.execute([](TestIface &iface) { iface.m_durations.clear(); }); std::optional prev; - m_impl.schedule([&prev](TestIface &iface, auto &stop_token) { + m_impl.schedule([&durations, &prev](TestIface &iface, auto &stop_token) { auto now = std::chrono::high_resolution_clock::now(); if (prev) { iface.m_durations.push_back(static_cast(std::chrono::duration_cast(now - *prev).count())); - if (iface.m_durations.size() == 10) { + if (iface.m_durations.size() == durations.size()) { stop_token.requestStop(); } } prev = now; }, - delay); + { .m_sleep_durations = durations }); while (m_impl.isScheduled()) { std::this_thread::sleep_for(1ms); @@ -75,13 +80,14 @@ TEST_F_S(Schedule, SchedulingAtIntervals) { }); } }; - EXPECT_GE(schedule_and_get_average_delays(10ms), 10); - EXPECT_GE(schedule_and_get_average_delays(50ms), 50); + EXPECT_GE(schedule_and_get_average_delays({ 10ms, 10ms, 10ms, 10ms, 10ms, 10ms, 10ms, 10ms, 10ms, 10ms }), 10); + EXPECT_GE(schedule_and_get_average_delays({ 50ms, 50ms, 50ms, 50ms, 50ms, 50ms, 50ms, 50ms, 50ms, 50ms }), 50); + EXPECT_GE(schedule_and_get_average_delays({ 10ms, 20ms, 30ms, 40ms, 50ms, 10ms, 50ms, 10ms, 50ms, 10ms }), 28); } TEST_F_S(Schedule, SchedulerInteruptAndReplacement) { int counter_a { 0 }; - m_impl.schedule([&counter_a](auto, auto) { counter_a++; }, 5ms); + m_impl.schedule([&counter_a](auto, auto) { counter_a++; }, { .m_sleep_durations = { 5ms } }); while (counter_a < 3) { std::this_thread::sleep_for(1ms); @@ -94,7 +100,7 @@ TEST_F_S(Schedule, SchedulerInteruptAndReplacement) { counter_a_last_value = counter_a; counter_b++; }, - 1ms); + { .m_sleep_durations = { 1ms } }); while (counter_b < 3) { std::this_thread::sleep_for(1ms); @@ -111,31 +117,88 @@ TEST_F_S(Schedule, StoppedImmediately) { m_impl.schedule([&](auto, auto &stop_token) { stop_token.requestStop(); }, - 1000ms); + { .m_sleep_durations = { 1000ms } }); EXPECT_FALSE(m_impl.isScheduled()); } -TEST_F_S(Schedule, ImmediateExecution) { +TEST_F_S(Schedule, Execution, Immediate) { + const auto default_duration { 500ms }; + const auto calling_thread_id { std::this_thread::get_id() }; + std::optional first_call_scheduler_thread_id; + std::optional second_call_scheduler_thread_id; + + int first_call_delay { -1 }; + int second_call_delay { -1 }; + auto prev = std::chrono::high_resolution_clock::now(); + m_impl.schedule([&](auto, auto &stop_token) { + const auto now = std::chrono::high_resolution_clock::now(); + const auto duration = static_cast(std::chrono::duration_cast(now - prev).count()); + prev = now; + + if (!first_call_scheduler_thread_id) { + first_call_delay = duration; + first_call_scheduler_thread_id = std::this_thread::get_id(); + return; + } + + second_call_delay = duration; + second_call_scheduler_thread_id = std::this_thread::get_id(); + stop_token.requestStop(); + }, + { .m_sleep_durations = { default_duration * 2, default_duration }, .m_execution = display_device::SchedulerOptions::Execution::Immediate }); + + while (m_impl.isScheduled()) { + std::this_thread::sleep_for(1ms); + } + + EXPECT_GE(first_call_delay, 0); + EXPECT_LT(first_call_delay, default_duration.count()); + EXPECT_GE(second_call_delay, default_duration.count() * 2); + EXPECT_LT(second_call_delay, default_duration.count() * 3); + + EXPECT_TRUE(first_call_scheduler_thread_id); + EXPECT_TRUE(second_call_scheduler_thread_id); + + EXPECT_EQ(*first_call_scheduler_thread_id, calling_thread_id); + EXPECT_NE(*first_call_scheduler_thread_id, *second_call_scheduler_thread_id); +} + +TEST_F_S(Schedule, Execution, ImmediateWithSleep) { + const auto default_duration { 500ms }; const auto calling_thread_id { std::this_thread::get_id() }; std::optional first_call_scheduler_thread_id; std::optional second_call_scheduler_thread_id; + int first_call_delay { -1 }; + int second_call_delay { -1 }; + auto prev = std::chrono::high_resolution_clock::now(); m_impl.schedule([&](auto, auto &stop_token) { + const auto now = std::chrono::high_resolution_clock::now(); + const auto duration = static_cast(std::chrono::duration_cast(now - prev).count()); + prev = now; + if (!first_call_scheduler_thread_id) { + first_call_delay = duration; first_call_scheduler_thread_id = std::this_thread::get_id(); return; } + second_call_delay = duration; second_call_scheduler_thread_id = std::this_thread::get_id(); stop_token.requestStop(); }, - 1ms); + { .m_sleep_durations = { default_duration * 2, default_duration }, .m_execution = display_device::SchedulerOptions::Execution::ImmediateWithSleep }); while (m_impl.isScheduled()) { std::this_thread::sleep_for(1ms); } + EXPECT_GE(first_call_delay, default_duration.count() * 2); + EXPECT_LT(first_call_delay, default_duration.count() * 3); + EXPECT_GE(second_call_delay, default_duration.count()); + EXPECT_LT(second_call_delay, default_duration.count() * 2); + EXPECT_TRUE(first_call_scheduler_thread_id); EXPECT_TRUE(second_call_scheduler_thread_id); @@ -143,11 +206,53 @@ TEST_F_S(Schedule, ImmediateExecution) { EXPECT_NE(*first_call_scheduler_thread_id, *second_call_scheduler_thread_id); } +TEST_F_S(Schedule, Execution, ScheduledOnly) { + const auto default_duration { 500ms }; + const auto calling_thread_id { std::this_thread::get_id() }; + std::optional first_call_scheduler_thread_id; + std::optional second_call_scheduler_thread_id; + + int first_call_delay { -1 }; + int second_call_delay { -1 }; + auto prev = std::chrono::high_resolution_clock::now(); + m_impl.schedule([&](auto, auto &stop_token) { + const auto now = std::chrono::high_resolution_clock::now(); + const auto duration = static_cast(std::chrono::duration_cast(now - prev).count()); + prev = now; + + if (!first_call_scheduler_thread_id) { + first_call_delay = duration; + first_call_scheduler_thread_id = std::this_thread::get_id(); + return; + } + + second_call_delay = duration; + second_call_scheduler_thread_id = std::this_thread::get_id(); + stop_token.requestStop(); + }, + { .m_sleep_durations = { default_duration * 2, default_duration }, .m_execution = display_device::SchedulerOptions::Execution::ScheduledOnly }); + + while (m_impl.isScheduled()) { + std::this_thread::sleep_for(1ms); + } + + EXPECT_GE(first_call_delay, default_duration.count() * 2); + EXPECT_LT(first_call_delay, default_duration.count() * 3); + EXPECT_GE(second_call_delay, default_duration.count()); + EXPECT_LT(second_call_delay, default_duration.count() * 2); + + EXPECT_TRUE(first_call_scheduler_thread_id); + EXPECT_TRUE(second_call_scheduler_thread_id); + + EXPECT_NE(*first_call_scheduler_thread_id, calling_thread_id); + EXPECT_EQ(*first_call_scheduler_thread_id, *second_call_scheduler_thread_id); +} + TEST_F_S(Schedule, ExceptionThrown, DuringImmediateCall) { auto &logger { display_device::Logger::get() }; int counter_a { 0 }; - m_impl.schedule([&](auto, auto) { counter_a++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter_a++; }, { .m_sleep_durations = { 1ms } }); while (counter_a < 3) { std::this_thread::sleep_for(1ms); } @@ -161,13 +266,13 @@ TEST_F_S(Schedule, ExceptionThrown, DuringImmediateCall) { m_impl.schedule([&](auto, auto) { throw std::runtime_error("Get rekt!"); }, - 1ms); + { .m_sleep_durations = { 1ms } }); EXPECT_FALSE(m_impl.isScheduled()); EXPECT_EQ(output, "Exception thrown in the RetryScheduler::schedule. Stopping scheduler. Error:\nGet rekt!"); // Verify that scheduler still works int counter_b { 0 }; - m_impl.schedule([&](auto, auto) { counter_b++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter_b++; }, { .m_sleep_durations = { 1ms } }); while (counter_b < 3) { std::this_thread::sleep_for(1ms); } @@ -192,7 +297,7 @@ TEST_F_S(Schedule, ExceptionThrown, DuringScheduledCall) { } first_call = false; }, - 1ms); + { .m_sleep_durations = { 1ms } }); while (m_impl.isScheduled()) { std::this_thread::sleep_for(1ms); @@ -201,7 +306,7 @@ TEST_F_S(Schedule, ExceptionThrown, DuringScheduledCall) { // Verify that scheduler still works int counter { 0 }; - m_impl.schedule([&](auto, auto) { counter++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter++; }, { .m_sleep_durations = { 1ms } }); while (counter < 3) { std::this_thread::sleep_for(1ms); } @@ -217,7 +322,7 @@ TEST_F_S(Execute, NullptrCallbackProvided) { TEST_F_S(Execute, SchedulerNotStopped) { int counter { 0 }; - m_impl.schedule([&](auto, auto) { counter++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter++; }, { .m_sleep_durations = { 1ms } }); while (counter < 3) { std::this_thread::sleep_for(1ms); } @@ -243,7 +348,7 @@ TEST_F_S(Execute, SchedulerNotStopped) { TEST_F_S(Execute, SchedulerStopped) { int counter { 0 }; - m_impl.schedule([&](auto, auto) { counter++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter++; }, { .m_sleep_durations = { 1ms } }); while (counter < 3) { std::this_thread::sleep_for(1ms); } @@ -268,7 +373,7 @@ TEST_F_S(Execute, SchedulerStopped, ExceptThatItWasNotRunning) { EXPECT_FALSE(m_impl.isScheduled()); int counter { 0 }; - m_impl.schedule([&](auto, auto) { counter++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter++; }, { .m_sleep_durations = { 1ms } }); while (counter < 3) { std::this_thread::sleep_for(1ms); } @@ -279,7 +384,7 @@ TEST_F_S(Execute, SchedulerStopped, ExceptThatItWasNotRunning) { TEST_F_S(Execute, ExceptionThrown) { int counter { 0 }; - m_impl.schedule([&](auto, auto) { counter++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter++; }, { .m_sleep_durations = { 1ms } }); while (counter < 3) { std::this_thread::sleep_for(1ms); } @@ -297,7 +402,7 @@ TEST_F_S(Execute, ExceptionThrown) { TEST_F_S(Execute, ExceptionThrown, BeforeStopToken) { int counter { 0 }; - m_impl.schedule([&](auto, auto) { counter++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter++; }, { .m_sleep_durations = { 1ms } }); while (counter < 3) { std::this_thread::sleep_for(1ms); } @@ -316,7 +421,7 @@ TEST_F_S(Execute, ExceptionThrown, BeforeStopToken) { TEST_F_S(Execute, ExceptionThrown, AfterStopToken) { int counter { 0 }; - m_impl.schedule([&](auto, auto) { counter++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter++; }, { .m_sleep_durations = { 1ms } }); while (counter < 3) { std::this_thread::sleep_for(1ms); } @@ -336,7 +441,7 @@ TEST_F_S(Stop) { EXPECT_FALSE(m_impl.isScheduled()); int counter { 0 }; - m_impl.schedule([&](auto, auto) { counter++; }, 1ms); + m_impl.schedule([&](auto, auto) { counter++; }, { .m_sleep_durations = { 1ms } }); while (counter < 3) { std::this_thread::sleep_for(1ms); } @@ -351,7 +456,7 @@ TEST_F_S(ThreadCleanupInDestructor) { { display_device::RetryScheduler scheduler { std::make_unique() }; - scheduler.schedule([&](auto, auto) { counter++; }, 1ms); + scheduler.schedule([&](auto, auto) { counter++; }, { .m_sleep_durations = { 1ms } }); while (counter < 3) { std::this_thread::sleep_for(1ms); }