From 95c9d6223a1ad14dae925690f0a71986b63a489c Mon Sep 17 00:00:00 2001 From: Katze719 Date: Thu, 9 Apr 2026 22:52:13 +0200 Subject: [PATCH] =?UTF-8?q?v2.1.0:=20bug=20fixes,=20new=20features,=20test?= =?UTF-8?q?s,=20and=20CI=20=E2=80=94=20no=20API/ABI=20breaks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug fixes: - when_all no longer requires default-constructible T - when_any: exponential backoff, randomized start, empty-input guard - ScheduledThreadPoolT::insert_task rejects tasks after shutdown - ChaosController reads actual thread priority instead of hardcoded normal() - ErrorHandler releases lock before invoking callbacks - PoolWithErrors doc comment corrected (implicitly movable) Performance: - distribute_affinities_by_numa calls read_topology() once (was O(n)) - New overloads accepting pre-read CpuTopology New features: - InlinePool: deterministic single-threaded pool for unit tests - task_group: structured concurrency with scoped wait - PoolWithErrors forwarding constructor for pool-specific args - apply_profile_detailed() with per-step error codes - Expanded C++20 module exports (futures, pools, coroutine helpers) Tests: 65 new test cases (pool v2 APIs, futures, registry queries, coroutines) CI: ASan, TSan, UBSan, coverage, and clang-tidy workflows --- .github/workflows/sanitizers.yml | 138 +++++ CHANGELOG.md | 97 ++++ include/threadschedule/chaos.hpp | 12 +- include/threadschedule/error_handler.hpp | 12 +- include/threadschedule/futures.hpp | 39 +- include/threadschedule/inline_pool.hpp | 159 ++++++ include/threadschedule/profiles.hpp | 47 ++ include/threadschedule/scheduled_pool.hpp | 6 + include/threadschedule/task_group.hpp | 128 +++++ include/threadschedule/thread_pool.hpp | 6 + .../thread_pool_with_errors.hpp | 21 +- include/threadschedule/threadschedule.hpp | 4 + include/threadschedule/topology.hpp | 47 +- src/threadschedule.cppm | 26 + tests/CMakeLists.txt | 47 ++ tests/coroutine_pool_test.cpp | 114 ++++ tests/futures_test.cpp | 176 ++++++ tests/registry_query_test.cpp | 174 ++++++ tests/thread_pool_v2_test.cpp | 540 ++++++++++++++++++ 19 files changed, 1767 insertions(+), 26 deletions(-) create mode 100644 .github/workflows/sanitizers.yml create mode 100644 include/threadschedule/inline_pool.hpp create mode 100644 include/threadschedule/task_group.hpp create mode 100644 tests/coroutine_pool_test.cpp create mode 100644 tests/futures_test.cpp create mode 100644 tests/registry_query_test.cpp create mode 100644 tests/thread_pool_v2_test.cpp diff --git a/.github/workflows/sanitizers.yml b/.github/workflows/sanitizers.yml new file mode 100644 index 0000000..39960bc --- /dev/null +++ b/.github/workflows/sanitizers.yml @@ -0,0 +1,138 @@ +name: Sanitizers + +on: + push: + branches: [ main, dev ] + pull_request: + branches: [ main, dev ] + workflow_dispatch: + +jobs: + sanitizers: + name: ${{ matrix.sanitizer }} (GCC 14, C++20) + runs-on: ubuntu-24.04 + strategy: + fail-fast: false + matrix: + include: + - sanitizer: ASan + flags: -fsanitize=address -fno-omit-frame-pointer + env_var: ASAN_OPTIONS=detect_leaks=1 + - sanitizer: TSan + flags: -fsanitize=thread + env_var: TSAN_OPTIONS=second_deadlock_stack=1 + - sanitizer: UBSan + flags: -fsanitize=undefined -fno-sanitize-recover=all + env_var: UBSAN_OPTIONS=print_stacktrace=1 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y cmake ninja-build g++-14 + + - name: Configure CMake + env: + CC: gcc-14 + CXX: g++-14 + run: | + cmake -B build -G Ninja \ + -DCMAKE_BUILD_TYPE=RelWithDebInfo \ + -DCMAKE_CXX_STANDARD=20 \ + -DCMAKE_CXX_FLAGS="${{ matrix.flags }}" \ + -DCMAKE_EXE_LINKER_FLAGS="${{ matrix.flags }}" \ + -DTHREADSCHEDULE_BUILD_TESTS=ON + + - name: Build + run: cmake --build build --parallel + + - name: Run tests + env: + ${{ matrix.env_var }}: "" + run: | + cd build + ctest --output-on-failure --parallel 2 --timeout 120 + + coverage: + name: Coverage (GCC 14, C++20) + runs-on: ubuntu-24.04 + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y cmake ninja-build g++-14 lcov + + - name: Configure CMake + env: + CC: gcc-14 + CXX: g++-14 + run: | + cmake -B build -G Ninja \ + -DCMAKE_BUILD_TYPE=Debug \ + -DCMAKE_CXX_STANDARD=20 \ + -DCMAKE_CXX_FLAGS="--coverage -fprofile-arcs -ftest-coverage" \ + -DCMAKE_EXE_LINKER_FLAGS="--coverage" \ + -DTHREADSCHEDULE_BUILD_TESTS=ON + + - name: Build + run: cmake --build build --parallel + + - name: Run tests + run: | + cd build + ctest --output-on-failure --parallel + + - name: Collect coverage + run: | + lcov --capture --directory build --output-file coverage.info \ + --gcov-tool gcov-14 --ignore-errors mismatch + lcov --remove coverage.info \ + '*/build/_deps/*' '/usr/*' '*/tests/*' '*/benchmarks/*' \ + --output-file coverage.info --ignore-errors unused + lcov --list coverage.info + + - name: Upload coverage artifact + uses: actions/upload-artifact@v4 + with: + name: coverage-report + path: coverage.info + + clang-tidy: + name: Clang-Tidy (Clang 19, C++20) + runs-on: ubuntu-24.04 + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y cmake ninja-build clang-19 clang-tidy-19 + + - name: Configure CMake + env: + CC: clang-19 + CXX: clang++-19 + run: | + cmake -B build -G Ninja \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CXX_STANDARD=20 \ + -DCMAKE_EXPORT_COMPILE_COMMANDS=ON \ + -DTHREADSCHEDULE_BUILD_TESTS=ON \ + -DTHREADSCHEDULE_BUILD_EXAMPLES=ON + + - name: Build + run: cmake --build build --parallel + + - name: Run clang-tidy + run: | + find include/threadschedule -name '*.hpp' | \ + xargs clang-tidy-19 -p build --warnings-as-errors='*' \ + --header-filter='include/threadschedule/.*' 2>&1 | \ + head -200 diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cc1cb0..09e4876 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,102 @@ # Changelog +## v2.1.0 + +> **No API/ABI breaking changes.** All modifications are bug fixes (aligning +> behaviour with documented API), internal optimizations, additive overloads, +> new classes, and new tests/infrastructure. + +### Bug Fixes + +- **`when_all` no longer requires default-constructible `T`** -- the + `results.emplace_back()` on the exception path was removed. The vector is + never consumed when an exception is rethrown. (futures.hpp) + +- **`when_any` no longer busy-polls at 1 ms** -- exponential backoff + (1 ms → 16 ms cap) and a randomized start index eliminate CPU waste and + index bias. Empty input now throws `std::invalid_argument` instead of + looping forever. (futures.hpp) + +- **`ScheduledThreadPoolT::insert_task` checks `stop_`** -- scheduling a + task after `shutdown()` now returns a pre-cancelled `ScheduledTaskHandle` + instead of silently inserting a task that will never execute. + (scheduled_pool.hpp) + +- **`ChaosController` uses actual thread priority** -- priority jitter now + reads the real scheduling priority via `sched_getparam()` on Linux instead + of hardcoding `ThreadPriority::normal()`. (chaos.hpp) + +- **`ErrorHandler::handle_error` releases the lock before invoking + callbacks** -- callbacks are snapshot-copied under the mutex, then executed + outside the critical section, eliminating deadlock risk when callbacks + interact with the handler. (error_handler.hpp) + +- **`PoolWithErrors` documentation corrected** -- the doc comment now says + "implicitly movable" instead of the incorrect "non-movable". + (thread_pool_with_errors.hpp) + +### Performance + +- **`distribute_affinities_by_numa` calls `read_topology()` once** -- the + previous implementation read sysfs O(n) times for n threads. New additive + overloads `affinity_for_node(CpuTopology const&, ...)` and + `distribute_affinities_by_numa(CpuTopology const&, ...)` accept a + pre-read topology snapshot. (topology.hpp) + +### New Features + +- **`InlinePool`** -- deterministic, single-threaded pool that executes every + task synchronously on the calling thread. Same `submit`/`post`/`try_submit` + API as `ThreadPool`, making it a drop-in for unit tests. + (inline_pool.hpp) + +- **`task_group`** -- structured concurrency primitive. All submitted + tasks are guaranteed to complete before `wait()` returns (or the destructor + runs). First exception is captured and rethrown from `wait()`. + (task_group.hpp) + +- **`PoolWithErrors` forwarding constructor** -- new 2+ argument constructor + forwards pool-specific arguments (e.g. `deque_capacity` for + `HighPerformancePool`). (thread_pool_with_errors.hpp) + +- **`apply_profile_detailed()`** -- new function returning a + `std::vector` with one entry per configuration step, + unlike `apply_profile()` which aggregates into a single error code. + (profiles.hpp) + +### Module Exports + +- Added missing exports to `threadschedule.cppm`: `when_all`, `when_any`, + `when_all_settled`, `ShutdownPolicy`, `IndefiniteWait`, `PollingWait`, + `ThreadPoolBase`, `LightweightPoolT`, `LightweightPool`, `GlobalPool`, + `PoolWithErrors`, `ScheduledLightweightPool`, `TaskStartCallback`, + `TaskEndCallback`, `schedule_on`, `run_on`, `pool_executor`, `InlinePool`, + `task_group`, `apply_profile_detailed`. + +### Tests + +- **65 new Google Test cases** across four new test files: + - `thread_pool_v2_test.cpp` -- `try_submit`, `try_post`, `submit_batch`, + `parallel_for_each`, `ShutdownPolicy`, `LightweightPool`, `GlobalPool`, + `ScheduledThreadPool`, stop-token tasks, `InlinePool`, `task_group`. + - `futures_test.cpp` -- `when_all`, `when_any`, `when_all_settled` (typed + and void variants, empty input, exception propagation). + - `registry_query_test.cpp` -- chainable `QueryView` API: `filter`, `map`, + `for_each`, `find_if`, `any`/`all`/`none`, `take`, `skip`. + - `coroutine_pool_test.cpp` -- `schedule_on`, `run_on`, `pool_executor`, + nested awaits, cross-pool hops, exception propagation (C++20 coroutines). + +### CI / Infrastructure + +- **New `sanitizers.yml` workflow** with: + - **ASan** (AddressSanitizer + LeakSanitizer) + - **TSan** (ThreadSanitizer) + - **UBSan** (UndefinedBehaviorSanitizer) + - **Code coverage** job (gcov + lcov, artifact upload) + - **Clang-Tidy** job (Clang 19, C++20) + +--- + ## v2.0.0 ### Breaking Changes diff --git a/include/threadschedule/chaos.hpp b/include/threadschedule/chaos.hpp index 78883b3..dceee1d 100644 --- a/include/threadschedule/chaos.hpp +++ b/include/threadschedule/chaos.hpp @@ -126,15 +126,19 @@ class ChaosController }); } - // Priority jitter around current policy + // Priority jitter around the thread's actual priority if (config_.priority_jitter != 0) { std::uniform_int_distribution dist(-config_.priority_jitter, config_.priority_jitter); registry().apply(pred, [&](RegisteredThreadInfo const& info) { int delta = dist(rng); - // Use normal as baseline if we can't read current - ThreadPriority prio = ThreadPriority::normal(); - (void)registry().set_priority(info.tid, ThreadPriority{prio.value() + delta}); + int baseline = ThreadPriority::normal().value(); +#ifndef _WIN32 + sched_param sp{}; + if (sched_getparam(info.tid, &sp) == 0) + baseline = sp.sched_priority; +#endif + (void)registry().set_priority(info.tid, ThreadPriority{baseline + delta}); }); } diff --git a/include/threadschedule/error_handler.hpp b/include/threadschedule/error_handler.hpp index 920957f..3b9e50d 100644 --- a/include/threadschedule/error_handler.hpp +++ b/include/threadschedule/error_handler.hpp @@ -200,10 +200,16 @@ class ErrorHandler */ void handle_error(TaskError const& error) { - std::lock_guard lock(mutex_); - error_count_++; + std::vector snapshot; + { + std::lock_guard lock(mutex_); + error_count_++; + snapshot.reserve(callbacks_.size()); + for (auto const& [id, callback] : callbacks_) + snapshot.push_back(callback); + } - for (auto const& [id, callback] : callbacks_) + for (auto const& callback : snapshot) { try { diff --git a/include/threadschedule/futures.hpp b/include/threadschedule/futures.hpp index 19c8c9e..7e5c9f2 100644 --- a/include/threadschedule/futures.hpp +++ b/include/threadschedule/futures.hpp @@ -14,6 +14,9 @@ #include #include #include +#include +#include +#include #include #include @@ -47,7 +50,6 @@ auto when_all(std::vector>& futures) -> std::vector { if (!first_error) first_error = std::current_exception(); - results.emplace_back(); } } @@ -92,8 +94,7 @@ inline void when_all(std::vector>& futures) * @tparam T The value type of each future. */ template -auto when_all_settled(std::vector>& futures) - -> std::vector> +auto when_all_settled(std::vector>& futures) -> std::vector> { std::vector> results; results.reserve(futures.size()); @@ -116,8 +117,7 @@ auto when_all_settled(std::vector>& futures) /** * @brief Block until all void futures complete, returning an @c expected per slot. */ -inline auto when_all_settled(std::vector>& futures) - -> std::vector> +inline auto when_all_settled(std::vector>& futures) -> std::vector> { std::vector> results; results.reserve(futures.size()); @@ -153,13 +153,25 @@ inline auto when_all_settled(std::vector>& futures) template auto when_any(std::vector>& futures) -> std::pair { + if (futures.empty()) + throw std::invalid_argument("when_any: empty futures vector"); + + thread_local std::mt19937 rng{std::random_device{}()}; + std::uniform_int_distribution dist(0, futures.size() - 1); + size_t const start = dist(rng); + unsigned backoff_ms = 1; + while (true) { - for (size_t i = 0; i < futures.size(); ++i) + for (size_t k = 0; k < futures.size(); ++k) { + size_t const i = (start + k) % futures.size(); if (futures[i].wait_for(std::chrono::milliseconds(1)) == std::future_status::ready) return {i, futures[i].get()}; } + std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms)); + if (backoff_ms < 16) + backoff_ms *= 2; } } @@ -167,19 +179,32 @@ auto when_any(std::vector>& futures) -> std::pair * @brief Block until the first void future becomes ready. * * @return The index of the first ready future. + * @throws std::invalid_argument If @p futures is empty. */ inline auto when_any(std::vector>& futures) -> size_t { + if (futures.empty()) + throw std::invalid_argument("when_any: empty futures vector"); + + thread_local std::mt19937 rng{std::random_device{}()}; + std::uniform_int_distribution dist(0, futures.size() - 1); + size_t const start = dist(rng); + unsigned backoff_ms = 1; + while (true) { - for (size_t i = 0; i < futures.size(); ++i) + for (size_t k = 0; k < futures.size(); ++k) { + size_t const i = (start + k) % futures.size(); if (futures[i].wait_for(std::chrono::milliseconds(1)) == std::future_status::ready) { futures[i].get(); return i; } } + std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms)); + if (backoff_ms < 16) + backoff_ms *= 2; } } diff --git a/include/threadschedule/inline_pool.hpp b/include/threadschedule/inline_pool.hpp new file mode 100644 index 0000000..17066e5 --- /dev/null +++ b/include/threadschedule/inline_pool.hpp @@ -0,0 +1,159 @@ +#pragma once + +/** + * @file inline_pool.hpp + * @brief InlinePool: deterministic, single-threaded pool for unit testing. + * + * Executes every task synchronously on the calling thread. Has the same + * submit/post/try_submit/try_post API surface as ThreadPool so it can be + * used as a drop-in template argument in generic code that is + * parameterized on pool type. + */ + +#include "expected.hpp" +#include "thread_pool.hpp" +#include +#include +#include +#include +#include + +namespace threadschedule +{ + +/** + * @brief A pool that executes every task inline on the calling thread. + * + * Useful for deterministic unit testing: tasks run synchronously in + * submission order with no concurrency, making results fully + * reproducible and debuggable. + * + * @par API compatibility + * InlinePool provides the same submit/try_submit/post/try_post/ + * submit_batch/parallel_for_each surface as @ref ThreadPool. The + * returned futures are always already fulfilled when submit() returns. + * + * @par Limitations + * - size() always returns 0 (no worker threads). + * - shutdown() and wait_for_tasks() are no-ops. + * - There is no concurrency; tasks that block or deadlock will block + * the submitting thread. + */ +class InlinePool +{ + public: + explicit InlinePool(size_t /*num_threads*/ = 0) + { + } + + template + auto submit(F&& f, Args&&... args) -> std::future> + { + auto result = try_submit(std::forward(f), std::forward(args)...); + if (!result.has_value()) + throw std::runtime_error("InlinePool is shut down"); + return std::move(result.value()); + } + + template + auto try_submit(F&& f, Args&&... args) -> expected>, std::error_code> + { + using R = std::invoke_result_t; + if (stop_) + return unexpected(std::make_error_code(std::errc::operation_canceled)); + + std::promise p; + auto future = p.get_future(); + try + { + if constexpr (std::is_void_v) + { + std::invoke(std::forward(f), std::forward(args)...); + p.set_value(); + } + else + { + p.set_value(std::invoke(std::forward(f), std::forward(args)...)); + } + } + catch (...) + { + p.set_exception(std::current_exception()); + } + return future; + } + + template + void post(F&& f, Args&&... args) + { + auto r = try_post(std::forward(f), std::forward(args)...); + if (!r.has_value()) + throw std::runtime_error("InlinePool is shut down"); + } + + template + auto try_post(F&& f, Args&&... args) -> expected + { + if (stop_) + return unexpected(std::make_error_code(std::errc::operation_canceled)); + try + { + std::invoke(std::forward(f), std::forward(args)...); + } + catch (...) + { + } + return {}; + } + + template + auto try_submit_batch(Iterator begin, Iterator end) -> expected>, std::error_code> + { + if (stop_) + return unexpected(std::make_error_code(std::errc::operation_canceled)); + + std::vector> futures; + for (auto it = begin; it != end; ++it) + { + std::promise p; + futures.push_back(p.get_future()); + try + { + (*it)(); + p.set_value(); + } + catch (...) + { + p.set_exception(std::current_exception()); + } + } + return futures; + } + + template + auto submit_batch(Iterator begin, Iterator end) -> std::vector> + { + auto result = try_submit_batch(begin, end); + if (!result.has_value()) + throw std::runtime_error("InlinePool is shut down"); + return std::move(result.value()); + } + + template + void parallel_for_each(Iterator begin, Iterator end, F&& func) + { + for (auto it = begin; it != end; ++it) + func(*it); + } + + [[nodiscard]] auto size() const noexcept -> size_t { return 0; } + [[nodiscard]] auto pending_tasks() const noexcept -> size_t { return 0; } + + void wait_for_tasks() {} + void shutdown(ShutdownPolicy /*policy*/ = ShutdownPolicy::drain) { stop_ = true; } + + private: + bool stop_{false}; +}; + +} // namespace threadschedule diff --git a/include/threadschedule/profiles.hpp b/include/threadschedule/profiles.hpp index 82e7aa7..3b2471b 100644 --- a/include/threadschedule/profiles.hpp +++ b/include/threadschedule/profiles.hpp @@ -16,6 +16,7 @@ #include "thread_registry.hpp" #include #include +#include namespace threadschedule { @@ -220,4 +221,50 @@ inline auto apply_profile(ThreadRegistry& reg, Tid tid, ThreadProfile const& p) return unexpected(std::make_error_code(std::errc::operation_not_permitted)); } +/** + * @brief Apply a profile and return per-step error codes. + * + * Unlike @ref apply_profile (which aggregates into a single + * @c operation_not_permitted), this function returns a vector with one + * entry per configuration step. Successful steps have a default + * (zero) error code; failed steps carry the specific OS error. + * + * The steps are, in order: + * 0 - set_scheduling_policy + * 1 - set_affinity (only present when @c p.affinity has a value) + * + * @tparam ThreadLike A type satisfying the is_thread_like trait. + * @return Vector of error codes, one per step attempted. + */ +template , int> = 0> +inline auto apply_profile_detailed(ThreadLike& t, ThreadProfile const& p) -> std::vector +{ + std::vector results; + auto policy_result = t.set_scheduling_policy(p.policy, p.priority); + results.push_back(policy_result.has_value() ? std::error_code{} : policy_result.error()); + if (p.affinity.has_value()) + { + auto aff_result = t.set_affinity(*p.affinity); + results.push_back(aff_result.has_value() ? std::error_code{} : aff_result.error()); + } + return results; +} + +/** + * @brief Apply a profile to a ThreadControlBlock with per-step errors. + * @see apply_profile_detailed(ThreadLike&, ThreadProfile const&) + */ +inline auto apply_profile_detailed(ThreadControlBlock& t, ThreadProfile const& p) -> std::vector +{ + std::vector results; + auto policy_result = t.set_scheduling_policy(p.policy, p.priority); + results.push_back(policy_result.has_value() ? std::error_code{} : policy_result.error()); + if (p.affinity.has_value()) + { + auto aff_result = t.set_affinity(*p.affinity); + results.push_back(aff_result.has_value() ? std::error_code{} : aff_result.error()); + } + return results; +} + } // namespace threadschedule diff --git a/include/threadschedule/scheduled_pool.hpp b/include/threadschedule/scheduled_pool.hpp index f3f1863..4001ca1 100644 --- a/include/threadschedule/scheduled_pool.hpp +++ b/include/threadschedule/scheduled_pool.hpp @@ -298,6 +298,12 @@ class ScheduledThreadPoolT uint64_t const task_id = next_task_id_++; ScheduledTaskHandle handle(task_id); + if (stop_) + { + handle.cancel(); + return handle; + } + ScheduledTaskInfo info; info.id = task_id; info.next_run = run_time; diff --git a/include/threadschedule/task_group.hpp b/include/threadschedule/task_group.hpp new file mode 100644 index 0000000..63e8550 --- /dev/null +++ b/include/threadschedule/task_group.hpp @@ -0,0 +1,128 @@ +#pragma once + +/** + * @file task_group.hpp + * @brief Structured concurrency via @c task_group. + * + * A @c task_group ties a set of tasks to a scope: all submitted tasks + * are guaranteed to complete before @c wait() returns (or the destructor + * runs). This eliminates dangling-future bugs and makes exception + * propagation deterministic. + */ + +#include +#include +#include +#include + +namespace threadschedule +{ + +/** + * @brief Scoped task group that ensures all submitted work completes + * before the group is destroyed. + * + * @par Usage + * @code + * ThreadPool pool(4); + * { + * task_group group(pool); + * group.submit([]{ do_work_a(); }); + * group.submit([]{ do_work_b(); }); + * group.wait(); // blocks until both complete + * } + * @endcode + * + * @par Exception handling + * If any task throws, the first captured exception is rethrown from + * @c wait(). All remaining tasks still run to completion. + * + * @par Destructor + * The destructor calls @c wait() if it has not been called already, + * ensuring that tasks never outlive the group. Note: if the destructor + * must wait for slow tasks, it will block. + * + * @tparam Pool Thread pool type (must support @c submit(Callable)). + */ +template +class task_group +{ + public: + explicit task_group(Pool& pool) : pool_(pool) {} + + task_group(task_group const&) = delete; + auto operator=(task_group const&) -> task_group& = delete; + + ~task_group() + { + try + { + wait(); + } + catch (...) + { + } + } + + /** + * @brief Submit a void() callable to the group. + * + * The returned future is tracked internally; you do not need to + * store it yourself. + */ + template + void submit(F&& f) + { + auto future = pool_.submit(std::forward(f)); + std::lock_guard lock(mutex_); + futures_.push_back(std::move(future)); + } + + /** + * @brief Block until all submitted tasks complete. + * + * @throws Rethrows the first exception from any task. All tasks are + * still waited on even if one throws. + */ + void wait() + { + std::vector> local; + { + std::lock_guard lock(mutex_); + local.swap(futures_); + } + + std::exception_ptr first_error; + for (auto& f : local) + { + try + { + f.get(); + } + catch (...) + { + if (!first_error) + first_error = std::current_exception(); + } + } + + if (first_error) + std::rethrow_exception(first_error); + } + + /** + * @brief Number of pending (not yet waited) tasks. + */ + [[nodiscard]] auto pending() const -> size_t + { + std::lock_guard lock(mutex_); + return futures_.size(); + } + + private: + Pool& pool_; + mutable std::mutex mutex_; + std::vector> futures_; +}; + +} // namespace threadschedule diff --git a/include/threadschedule/thread_pool.hpp b/include/threadschedule/thread_pool.hpp index 47c5436..76a8ece 100644 --- a/include/threadschedule/thread_pool.hpp +++ b/include/threadschedule/thread_pool.hpp @@ -1130,6 +1130,11 @@ class HighPerformancePool on_task_start_(start_time, tid); } + // For submit() tasks the callable is a packaged_task which + // catches exceptions internally and stores them in the + // std::future shared state - those never reach this catch. + // For post() tasks (fire-and-forget) the catch prevents an + // unhandled exception from terminating the worker thread. try { task(); @@ -1720,6 +1725,7 @@ class ThreadPoolBase on_task_start_(start_time, tid); } + // See HighPerformancePool::worker_function for rationale. try { task(); diff --git a/include/threadschedule/thread_pool_with_errors.hpp b/include/threadschedule/thread_pool_with_errors.hpp index 756a320..2161269 100644 --- a/include/threadschedule/thread_pool_with_errors.hpp +++ b/include/threadschedule/thread_pool_with_errors.hpp @@ -15,7 +15,8 @@ namespace threadschedule /** * @brief Thread pool wrapper that combines any pool type with an @ref ErrorHandler. * - * Non-copyable, non-movable. Thread-safe (delegates to the underlying pool). + * Non-copyable; implicitly movable (default move operations). + * Thread-safe (delegates to the underlying pool). * * submit() wraps every task so that exceptions are both reported to * the @ref ErrorHandler (via registered callbacks) **and** re-thrown, making @@ -37,6 +38,24 @@ class PoolWithErrors { } + /** + * @brief Construct with forwarded pool arguments. + * + * Enables passing pool-specific constructor arguments (e.g. + * @c deque_capacity for @ref HighPerformancePool) while still + * attaching the error handler. + * + * @code + * PoolWithErrors pool(4, 2048, true); + * @endcode + */ + template + explicit PoolWithErrors(Arg1&& arg1, Arg2&& arg2, Args&&... args) + : pool_(std::forward(arg1), std::forward(arg2), std::forward(args)...), + error_handler_(std::make_shared()) + { + } + /** * @brief Submit a task with automatic error handling */ diff --git a/include/threadschedule/threadschedule.hpp b/include/threadschedule/threadschedule.hpp index 0db0ac7..7ff2359 100644 --- a/include/threadschedule/threadschedule.hpp +++ b/include/threadschedule/threadschedule.hpp @@ -5,11 +5,13 @@ #include "error_handler.hpp" #include "futures.hpp" #include "generator.hpp" +#include "inline_pool.hpp" #include "profiles.hpp" #include "pthread_wrapper.hpp" #include "scheduled_pool.hpp" #include "scheduler_policy.hpp" #include "task.hpp" +#include "task_group.hpp" #include "thread_pool.hpp" #include "thread_pool_with_errors.hpp" #include "thread_registry.hpp" @@ -61,6 +63,7 @@ using ts::GlobalPool; using ts::GlobalThreadPool; using ts::HighPerformancePool; using ts::HighPerformancePoolWithErrors; +using ts::InlinePool; using ts::JThreadWrapper; using ts::LightweightPool; using ts::LightweightPoolT; @@ -74,6 +77,7 @@ using ts::ScheduledThreadPool; using ts::ScheduledThreadPoolT; using ts::SchedulingPolicy; using ts::ShutdownPolicy; +using ts::task_group; using ts::TaskError; using ts::ThreadAffinity; using ts::ThreadByNameView; diff --git a/include/threadschedule/topology.hpp b/include/threadschedule/topology.hpp index 3f78d36..457db1c 100644 --- a/include/threadschedule/topology.hpp +++ b/include/threadschedule/topology.hpp @@ -140,17 +140,16 @@ inline auto read_topology() -> CpuTopology } /** - * @brief Build a ThreadAffinity for the given NUMA node. - * - * Calls read_topology() internally on every invocation (no caching). + * @brief Build a ThreadAffinity for the given NUMA node using a pre-read topology. * + * @param topo Pre-read topology snapshot. * @param node_index NUMA node index (wraps if out of range). * @param thread_index Used to select CPU(s) within the node. * @param threads_per_node Number of CPUs to include per thread (default 1). */ -inline auto affinity_for_node(int node_index, int thread_index, int threads_per_node = 1) -> ThreadAffinity +inline auto affinity_for_node(CpuTopology const& topo, int node_index, int thread_index, int threads_per_node = 1) + -> ThreadAffinity { - CpuTopology topo = read_topology(); if (topo.numa_nodes <= 0) return {}; int const n = (node_index % topo.numa_nodes + topo.numa_nodes) % topo.numa_nodes; @@ -161,7 +160,6 @@ inline auto affinity_for_node(int node_index, int thread_index, int threads_per_ int const cpu = cpus[(thread_index) % static_cast(cpus.size())]; aff.add_cpu(cpu); - // Optionally add more CPUs for the same thread if threads_per_node > 1 for (int k = 1; k < threads_per_node; ++k) { int const extra = cpus[(thread_index + k) % static_cast(cpus.size())]; @@ -170,26 +168,53 @@ inline auto affinity_for_node(int node_index, int thread_index, int threads_per_ return aff; } +/** + * @brief Build a ThreadAffinity for the given NUMA node. + * + * Calls read_topology() internally on every invocation (no caching). + * + * @param node_index NUMA node index (wraps if out of range). + * @param thread_index Used to select CPU(s) within the node. + * @param threads_per_node Number of CPUs to include per thread (default 1). + */ +inline auto affinity_for_node(int node_index, int thread_index, int threads_per_node = 1) -> ThreadAffinity +{ + return affinity_for_node(read_topology(), node_index, thread_index, threads_per_node); +} + /** * @brief Distribute thread affinities across NUMA nodes in round-robin order. * - * Returns one ThreadAffinity per thread, cycling through NUMA nodes - * so that consecutive threads are spread across different nodes. + * @overload Uses a pre-read topology to avoid repeated sysfs access. * + * @param topo Pre-read topology snapshot. * @param num_threads Number of affinity masks to generate. * @return Vector of @p num_threads ThreadAffinity objects. */ -inline auto distribute_affinities_by_numa(size_t num_threads) -> std::vector +inline auto distribute_affinities_by_numa(CpuTopology const& topo, size_t num_threads) -> std::vector { - CpuTopology topo = read_topology(); std::vector result; result.reserve(num_threads); for (size_t i = 0; i < num_threads; ++i) { int node = (topo.numa_nodes > 0) ? static_cast(i % topo.numa_nodes) : 0; - result.push_back(affinity_for_node(node, static_cast(i))); + result.push_back(affinity_for_node(topo, node, static_cast(i))); } return result; } +/** + * @brief Distribute thread affinities across NUMA nodes in round-robin order. + * + * Returns one ThreadAffinity per thread, cycling through NUMA nodes + * so that consecutive threads are spread across different nodes. + * + * @param num_threads Number of affinity masks to generate. + * @return Vector of @p num_threads ThreadAffinity objects. + */ +inline auto distribute_affinities_by_numa(size_t num_threads) -> std::vector +{ + return distribute_affinities_by_numa(read_topology(), num_threads); +} + } // namespace threadschedule diff --git a/src/threadschedule.cppm b/src/threadschedule.cppm index bab4644..affb995 100644 --- a/src/threadschedule.cppm +++ b/src/threadschedule.cppm @@ -85,16 +85,31 @@ using ::threadschedule::ErrorHandledTask; using ::threadschedule::make_error_handled_task; using ::threadschedule::FutureWithErrorHandler; +// -- futures.hpp ------------------------------------------------------------ +using ::threadschedule::when_all; +using ::threadschedule::when_any; +using ::threadschedule::when_all_settled; + // -- thread_pool.hpp -------------------------------------------------------- +using ::threadschedule::TaskStartCallback; +using ::threadschedule::TaskEndCallback; using ::threadschedule::WorkStealingDeque; +using ::threadschedule::ShutdownPolicy; +using ::threadschedule::IndefiniteWait; +using ::threadschedule::PollingWait; using ::threadschedule::HighPerformancePool; +using ::threadschedule::ThreadPoolBase; using ::threadschedule::FastThreadPool; using ::threadschedule::ThreadPool; +using ::threadschedule::LightweightPoolT; +using ::threadschedule::LightweightPool; +using ::threadschedule::GlobalPool; using ::threadschedule::GlobalThreadPool; using ::threadschedule::GlobalHighPerformancePool; using ::threadschedule::parallel_for_each; // -- thread_pool_with_errors.hpp -------------------------------------------- +using ::threadschedule::PoolWithErrors; using ::threadschedule::HighPerformancePoolWithErrors; using ::threadschedule::FastThreadPoolWithErrors; using ::threadschedule::ThreadPoolWithErrors; @@ -105,10 +120,12 @@ using ::threadschedule::ScheduledThreadPoolT; using ::threadschedule::ScheduledThreadPool; using ::threadschedule::ScheduledHighPerformancePool; using ::threadschedule::ScheduledFastThreadPool; +using ::threadschedule::ScheduledLightweightPool; // -- profiles.hpp ----------------------------------------------------------- using ::threadschedule::ThreadProfile; using ::threadschedule::apply_profile; +using ::threadschedule::apply_profile_detailed; // -- topology.hpp ----------------------------------------------------------- using ::threadschedule::CpuTopology; @@ -116,6 +133,12 @@ using ::threadschedule::read_topology; using ::threadschedule::affinity_for_node; using ::threadschedule::distribute_affinities_by_numa; +// -- inline_pool.hpp -------------------------------------------------------- +using ::threadschedule::InlinePool; + +// -- task_group.hpp --------------------------------------------------------- +using ::threadschedule::task_group; + // -- chaos.hpp -------------------------------------------------------------- using ::threadschedule::ChaosConfig; using ::threadschedule::ChaosController; @@ -124,6 +147,9 @@ using ::threadschedule::ChaosController; #if defined(__cpp_impl_coroutine) && __cpp_impl_coroutine >= 201902L using ::threadschedule::task; using ::threadschedule::sync_wait; +using ::threadschedule::schedule_on; +using ::threadschedule::run_on; +using ::threadschedule::pool_executor; using ::threadschedule::generator; #endif diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ef9ebb3..e649533 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -90,6 +90,42 @@ if(TARGET gtest) PROPERTIES TIMEOUT 120 ) + # Thread pool v2 API tests + add_executable(thread_pool_v2_test thread_pool_v2_test.cpp) + target_link_libraries(thread_pool_v2_test + ThreadSchedule::ThreadSchedule + gtest + gtest_main + ) + gtest_discover_tests(thread_pool_v2_test + DISCOVERY_TIMEOUT 60 + PROPERTIES TIMEOUT 120 + ) + + # Future combinator tests + add_executable(futures_test futures_test.cpp) + target_link_libraries(futures_test + ThreadSchedule::ThreadSchedule + gtest + gtest_main + ) + gtest_discover_tests(futures_test + DISCOVERY_TIMEOUT 60 + PROPERTIES TIMEOUT 120 + ) + + # Registry query API tests + add_executable(registry_query_test registry_query_test.cpp) + target_link_libraries(registry_query_test + ThreadSchedule::ThreadSchedule + gtest + gtest_main + ) + gtest_discover_tests(registry_query_test + DISCOVERY_TIMEOUT 60 + PROPERTIES TIMEOUT 120 + ) + if(THREADSCHEDULE_RUNTIME) add_executable(runtime_registry_test runtime_registry_test.cpp) target_link_libraries(runtime_registry_test @@ -138,6 +174,17 @@ if(TARGET gtest) DISCOVERY_TIMEOUT 60 PROPERTIES TIMEOUT 120 ) + + add_executable(coroutine_pool_test coroutine_pool_test.cpp) + target_link_libraries(coroutine_pool_test + ThreadSchedule::ThreadSchedule + gtest + gtest_main + ) + gtest_discover_tests(coroutine_pool_test + DISCOVERY_TIMEOUT 60 + PROPERTIES TIMEOUT 120 + ) endif() # Discovered tests already carry TIMEOUT=120 via PROPERTIES above diff --git a/tests/coroutine_pool_test.cpp b/tests/coroutine_pool_test.cpp new file mode 100644 index 0000000..57ada25 --- /dev/null +++ b/tests/coroutine_pool_test.cpp @@ -0,0 +1,114 @@ +#include +#include + +#if defined(__cpp_impl_coroutine) && __cpp_impl_coroutine >= 201902L + +using namespace threadschedule; + +TEST(CoroutinePool, ScheduleOnMovesToPoolThread) +{ + HighPerformancePool pool(2); + auto main_tid = std::this_thread::get_id(); + + auto coro = [&pool, main_tid]() -> task { + co_await schedule_on{pool}; + co_return std::this_thread::get_id(); + }; + + auto worker_tid = sync_wait(coro()); + EXPECT_NE(worker_tid, main_tid); +} + +TEST(CoroutinePool, RunOnReturnsValue) +{ + HighPerformancePool pool(2); + + auto future = run_on(pool, []() -> task { co_return 42; }); + EXPECT_EQ(future.get(), 42); +} + +TEST(CoroutinePool, RunOnVoid) +{ + HighPerformancePool pool(2); + std::atomic ran{false}; + + auto future = run_on(pool, [&ran]() -> task { + ran = true; + co_return; + }); + future.get(); + EXPECT_TRUE(ran); +} + +TEST(CoroutinePool, RunOnExecutesOnPoolThread) +{ + ThreadPool pool(2); + auto main_tid = std::this_thread::get_id(); + + auto future = run_on(pool, [main_tid]() -> task { + co_return std::this_thread::get_id() != main_tid; + }); + EXPECT_TRUE(future.get()); +} + +TEST(CoroutinePool, ScheduleOnWithNestedAwait) +{ + HighPerformancePool pool(2); + + auto inner = []() -> task { co_return 7; }; + + auto outer = [&pool, &inner]() -> task { + co_await schedule_on{pool}; + int v = co_await inner(); + co_return v * 6; + }; + + EXPECT_EQ(sync_wait(outer()), 42); +} + +TEST(CoroutinePool, PoolExecutorCanBeSetOnTask) +{ + ThreadPool pool(2); + pool_executor exec(pool); + + auto coro = []() -> task { co_return 10; }; + EXPECT_EQ(sync_wait(coro()), 10); +} + +TEST(CoroutinePool, RunOnPropagatesException) +{ + ThreadPool pool(2); + + auto future = run_on(pool, []() -> task { + throw std::runtime_error("oops"); + co_return 0; + }); + EXPECT_THROW(future.get(), std::runtime_error); +} + +TEST(CoroutinePool, MultipleScheduleOnHops) +{ + HighPerformancePool pool1(2); + ThreadPool pool2(2); + + auto coro = [&pool1, &pool2]() -> task { + co_await schedule_on{pool1}; + auto tid1 = std::this_thread::get_id(); + co_await schedule_on{pool2}; + auto tid2 = std::this_thread::get_id(); + (void)tid1; + (void)tid2; + co_return 99; + }; + + EXPECT_EQ(sync_wait(coro()), 99); +} + +#else + +TEST(CoroutinePool, SkippedNoCoroutineSupport) +{ + GTEST_SKIP() << "Coroutine support not available"; +} + +#endif diff --git a/tests/futures_test.cpp b/tests/futures_test.cpp new file mode 100644 index 0000000..661889d --- /dev/null +++ b/tests/futures_test.cpp @@ -0,0 +1,176 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace threadschedule; + +// ==================== when_all (non-void) ==================== + +TEST(FuturesTest, WhenAllCollectsResults) +{ + ThreadPool pool(2); + std::vector> futures; + for (int i = 0; i < 5; ++i) + futures.push_back(pool.submit([i] { return i * 10; })); + + auto results = when_all(futures); + ASSERT_EQ(results.size(), 5u); + for (int i = 0; i < 5; ++i) + EXPECT_EQ(results[i], i * 10); +} + +TEST(FuturesTest, WhenAllRethrowsFirstException) +{ + ThreadPool pool(2); + std::vector> futures; + futures.push_back(pool.submit([] { return 1; })); + futures.push_back(pool.submit([]() -> int { throw std::runtime_error("boom"); })); + futures.push_back(pool.submit([] { return 3; })); + + EXPECT_THROW(when_all(futures), std::runtime_error); +} + +TEST(FuturesTest, WhenAllEmptyVector) +{ + std::vector> empty; + auto results = when_all(empty); + EXPECT_TRUE(results.empty()); +} + +// ==================== when_all (void) ==================== + +TEST(FuturesTest, WhenAllVoidCompletes) +{ + ThreadPool pool(2); + std::atomic count{0}; + std::vector> futures; + for (int i = 0; i < 5; ++i) + futures.push_back(pool.submit([&count] { count.fetch_add(1, std::memory_order_relaxed); })); + + when_all(futures); + EXPECT_EQ(count.load(), 5); +} + +TEST(FuturesTest, WhenAllVoidRethrowsException) +{ + ThreadPool pool(2); + std::vector> futures; + futures.push_back(pool.submit([] {})); + futures.push_back(pool.submit([] { throw std::logic_error("fail"); })); + + EXPECT_THROW(when_all(futures), std::logic_error); +} + +// ==================== when_all_settled (non-void) ==================== + +TEST(FuturesTest, WhenAllSettledSuccess) +{ + ThreadPool pool(2); + std::vector> futures; + for (int i = 0; i < 3; ++i) + futures.push_back(pool.submit([i] { return i; })); + + auto results = when_all_settled(futures); + ASSERT_EQ(results.size(), 3u); + for (int i = 0; i < 3; ++i) + { + ASSERT_TRUE(results[i].has_value()); + EXPECT_EQ(results[i].value(), i); + } +} + +TEST(FuturesTest, WhenAllSettledWithExceptions) +{ + ThreadPool pool(2); + std::vector> futures; + futures.push_back(pool.submit([] { return 1; })); + futures.push_back(pool.submit([]() -> int { throw std::runtime_error("err"); })); + futures.push_back(pool.submit([] { return 3; })); + + auto results = when_all_settled(futures); + ASSERT_EQ(results.size(), 3u); + EXPECT_TRUE(results[0].has_value()); + EXPECT_FALSE(results[1].has_value()); + EXPECT_TRUE(results[2].has_value()); +} + +// ==================== when_all_settled (void) ==================== + +TEST(FuturesTest, WhenAllSettledVoid) +{ + ThreadPool pool(2); + std::vector> futures; + futures.push_back(pool.submit([] {})); + futures.push_back(pool.submit([] { throw std::runtime_error("fail"); })); + futures.push_back(pool.submit([] {})); + + auto results = when_all_settled(futures); + ASSERT_EQ(results.size(), 3u); + EXPECT_TRUE(results[0].has_value()); + EXPECT_FALSE(results[1].has_value()); + EXPECT_TRUE(results[2].has_value()); +} + +// ==================== when_any (non-void) ==================== + +TEST(FuturesTest, WhenAnyReturnsFirst) +{ + ThreadPool pool(4); + std::vector> futures; + for (int i = 0; i < 3; ++i) + futures.push_back(pool.submit([i] { return i + 100; })); + + auto [idx, value] = when_any(futures); + EXPECT_LT(idx, 3u); + EXPECT_EQ(value, static_cast(idx) + 100); +} + +TEST(FuturesTest, WhenAnyEmptyThrows) +{ + std::vector> empty; + EXPECT_THROW(when_any(empty), std::invalid_argument); +} + +TEST(FuturesTest, WhenAnySingleFuture) +{ + ThreadPool pool(1); + std::vector> futures; + futures.push_back(pool.submit([] { return 42; })); + + auto [idx, value] = when_any(futures); + EXPECT_EQ(idx, 0u); + EXPECT_EQ(value, 42); +} + +// ==================== when_any (void) ==================== + +TEST(FuturesTest, WhenAnyVoidReturnsFirst) +{ + ThreadPool pool(4); + std::atomic count{0}; + std::vector> futures; + for (int i = 0; i < 3; ++i) + futures.push_back(pool.submit([&count] { count.fetch_add(1, std::memory_order_relaxed); })); + + size_t idx = when_any(futures); + EXPECT_LT(idx, 3u); +} + +TEST(FuturesTest, WhenAnyVoidEmptyThrows) +{ + std::vector> empty; + EXPECT_THROW(when_any(empty), std::invalid_argument); +} + +TEST(FuturesTest, WhenAnyVoidPropagatesException) +{ + ThreadPool pool(1); + std::vector> futures; + futures.push_back(pool.submit([] { throw std::runtime_error("boom"); })); + + EXPECT_THROW(when_any(futures), std::runtime_error); +} diff --git a/tests/registry_query_test.cpp b/tests/registry_query_test.cpp new file mode 100644 index 0000000..c8464d3 --- /dev/null +++ b/tests/registry_query_test.cpp @@ -0,0 +1,174 @@ +#include +#include +#include +#include +#include +#include + +using namespace threadschedule; + +class RegistryQueryTest : public ::testing::Test +{ + protected: + ThreadRegistry reg_; + + void register_threads() + { + threads_.emplace_back("alpha", "io", [this](std::stop_token) { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this] { return done_; }); + }); + threads_.emplace_back("beta", "compute", [this](std::stop_token) { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this] { return done_; }); + }); + threads_.emplace_back("gamma", "io", [this](std::stop_token) { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this] { return done_; }); + }); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + void SetUp() override + { + register_threads(); + } + + void TearDown() override + { + { + std::lock_guard lock(mtx_); + done_ = true; + } + cv_.notify_all(); + threads_.clear(); + } + + private: + struct RegThread + { + std::thread t; + RegThread(std::string name, std::string tag, std::function fn) + { + t = std::thread([name = std::move(name), tag = std::move(tag), fn = std::move(fn)] { + AutoRegisterCurrentThread guard(name, tag); + std::stop_source src; + fn(src.get_token()); + }); + } + ~RegThread() + { + if (t.joinable()) + t.join(); + } + RegThread(RegThread&&) = default; + RegThread& operator=(RegThread&&) = default; + }; + std::vector threads_; + std::mutex mtx_; + std::condition_variable cv_; + bool done_{false}; +}; + +TEST_F(RegistryQueryTest, CountReturnsAll) +{ + EXPECT_GE(registry().count(), 3u); +} + +TEST_F(RegistryQueryTest, FilterByTag) +{ + auto io_count = registry() + .filter([](auto const& e) { return e.componentTag == "io"; }) + .count(); + EXPECT_EQ(io_count, 2u); +} + +TEST_F(RegistryQueryTest, FilterByName) +{ + auto result = registry() + .filter([](auto const& e) { return e.name == "beta"; }) + .count(); + EXPECT_EQ(result, 1u); +} + +TEST_F(RegistryQueryTest, MapExtractsNames) +{ + auto names = registry().map([](auto const& e) { return e.name; }); + auto it_alpha = std::find(names.begin(), names.end(), "alpha"); + auto it_beta = std::find(names.begin(), names.end(), "beta"); + EXPECT_NE(it_alpha, names.end()); + EXPECT_NE(it_beta, names.end()); +} + +TEST_F(RegistryQueryTest, FindIfFindsMatch) +{ + auto found = registry().find_if([](auto const& e) { return e.name == "gamma"; }); + ASSERT_TRUE(found.has_value()); + EXPECT_EQ(found->name, "gamma"); + EXPECT_EQ(found->componentTag, "io"); +} + +TEST_F(RegistryQueryTest, FindIfReturnsNulloptOnMiss) +{ + auto found = registry().find_if([](auto const& e) { return e.name == "nonexistent"; }); + EXPECT_FALSE(found.has_value()); +} + +TEST_F(RegistryQueryTest, AnyReturnsTrueWhenMatching) +{ + EXPECT_TRUE(registry().any([](auto const& e) { return e.componentTag == "compute"; })); +} + +TEST_F(RegistryQueryTest, AnyReturnsFalseWhenNoMatch) +{ + EXPECT_FALSE(registry().any([](auto const& e) { return e.componentTag == "missing"; })); +} + +TEST_F(RegistryQueryTest, AllReturnsFalseWhenNotAllMatch) +{ + EXPECT_FALSE(registry().all([](auto const& e) { return e.componentTag == "io"; })); +} + +TEST_F(RegistryQueryTest, NoneReturnsTrueWhenNoMatch) +{ + EXPECT_TRUE(registry().none([](auto const& e) { return e.name == "zzz"; })); +} + +TEST_F(RegistryQueryTest, NoneReturnsFalseWhenMatch) +{ + EXPECT_FALSE(registry().none([](auto const& e) { return e.name == "alpha"; })); +} + +TEST_F(RegistryQueryTest, TakeLimitsResults) +{ + auto view = registry().take(2); + EXPECT_LE(view.count(), 2u); +} + +TEST_F(RegistryQueryTest, SkipSkipsEntries) +{ + auto total = registry().count(); + auto skipped = registry().skip(1).count(); + if (total > 1) + { + EXPECT_EQ(skipped, total - 1); + } +} + +TEST_F(RegistryQueryTest, ForEachVisitsAll) +{ + size_t visited = 0; + registry().for_each([&visited](auto const&) { ++visited; }); + EXPECT_GE(visited, 3u); +} + +TEST_F(RegistryQueryTest, ChainedFilterMapForEach) +{ + auto io_names = registry() + .filter([](auto const& e) { return e.componentTag == "io"; }) + .map([](auto const& e) { return e.name; }); + EXPECT_EQ(io_names.size(), 2u); + std::set names(io_names.begin(), io_names.end()); + EXPECT_TRUE(names.count("alpha")); + EXPECT_TRUE(names.count("gamma")); +} diff --git a/tests/thread_pool_v2_test.cpp b/tests/thread_pool_v2_test.cpp new file mode 100644 index 0000000..9aa27e8 --- /dev/null +++ b/tests/thread_pool_v2_test.cpp @@ -0,0 +1,540 @@ +#include +#include +#include +#include +#include +#include + +using namespace threadschedule; + +// ==================== try_submit / try_post ==================== + +TEST(PoolV2, TrySubmitReturnsExpected) +{ + ThreadPool pool(2); + auto result = pool.try_submit([] { return 42; }); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value().get(), 42); +} + +TEST(PoolV2, TrySubmitAfterShutdownReturnsError) +{ + ThreadPool pool(2); + pool.shutdown(); + auto result = pool.try_submit([] { return 1; }); + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), std::make_error_code(std::errc::operation_canceled)); +} + +TEST(PoolV2, TryPostReturnsExpected) +{ + ThreadPool pool(2); + std::atomic ran{false}; + auto result = pool.try_post([&ran] { ran = true; }); + ASSERT_TRUE(result.has_value()); + pool.wait_for_tasks(); + EXPECT_TRUE(ran); +} + +TEST(PoolV2, TryPostAfterShutdownReturnsError) +{ + ThreadPool pool(2); + pool.shutdown(); + auto result = pool.try_post([] {}); + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), std::make_error_code(std::errc::operation_canceled)); +} + +TEST(PoolV2, PostThrowsOnShutdown) +{ + ThreadPool pool(2); + pool.shutdown(); + EXPECT_THROW(pool.post([] {}), std::runtime_error); +} + +// ==================== submit_batch / try_submit_batch ==================== + +TEST(PoolV2, SubmitBatchExecutesAll) +{ + HighPerformancePool pool(4); + std::atomic count{0}; + std::vector> tasks; + for (int i = 0; i < 100; ++i) + tasks.push_back([&count] { count.fetch_add(1, std::memory_order_relaxed); }); + + auto futures = pool.submit_batch(tasks.begin(), tasks.end()); + for (auto& f : futures) + f.get(); + + EXPECT_EQ(count.load(), 100); +} + +TEST(PoolV2, TrySubmitBatchAfterShutdown) +{ + HighPerformancePool pool(2); + pool.shutdown(); + std::vector> tasks = {[] {}, [] {}}; + auto result = pool.try_submit_batch(tasks.begin(), tasks.end()); + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), std::make_error_code(std::errc::operation_canceled)); +} + +TEST(PoolV2, SubmitBatchOnThreadPool) +{ + ThreadPool pool(2); + std::atomic count{0}; + std::vector> tasks; + for (int i = 0; i < 50; ++i) + tasks.push_back([&count] { count.fetch_add(1, std::memory_order_relaxed); }); + + auto futures = pool.submit_batch(tasks.begin(), tasks.end()); + for (auto& f : futures) + f.get(); + + EXPECT_EQ(count.load(), 50); +} + +// ==================== parallel_for_each ==================== + +TEST(PoolV2, ParallelForEachHP) +{ + HighPerformancePool pool(4); + std::vector> values(100); + for (auto& v : values) + v.store(0); + + std::vector indices(100); + std::iota(indices.begin(), indices.end(), 0); + + pool.parallel_for_each(indices.begin(), indices.end(), [&values](int idx) { values[idx].store(idx * 2); }); + + for (int i = 0; i < 100; ++i) + EXPECT_EQ(values[i].load(), i * 2); +} + +TEST(PoolV2, ParallelForEachThreadPool) +{ + ThreadPool pool(2); + std::vector data(50, 1); + + pool.parallel_for_each(data.begin(), data.end(), [](int& v) { v *= 3; }); + + for (auto const& v : data) + EXPECT_EQ(v, 3); +} + +// ==================== ShutdownPolicy ==================== + +TEST(PoolV2, ShutdownDrainCompletesAllTasks) +{ + std::atomic count{0}; + { + ThreadPool pool(2); + for (int i = 0; i < 20; ++i) + pool.post([&count] { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + count.fetch_add(1, std::memory_order_relaxed); + }); + pool.shutdown(ShutdownPolicy::drain); + } + EXPECT_EQ(count.load(), 20); +} + +TEST(PoolV2, ShutdownDropPendingMaySkipTasks) +{ + std::atomic count{0}; + { + HighPerformancePool pool(1); + pool.post([&count] { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + count.fetch_add(1, std::memory_order_relaxed); + }); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + for (int i = 0; i < 100; ++i) + pool.post([&count] { count.fetch_add(1, std::memory_order_relaxed); }); + pool.shutdown(ShutdownPolicy::drop_pending); + } + EXPECT_LT(count.load(), 101); +} + +TEST(PoolV2, ShutdownForTimedDrain) +{ + ThreadPool pool(2); + std::atomic count{0}; + for (int i = 0; i < 5; ++i) + pool.post([&count] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + count.fetch_add(1, std::memory_order_relaxed); + }); + + bool drained = pool.shutdown_for(std::chrono::milliseconds(5000)); + EXPECT_TRUE(drained); + EXPECT_EQ(count.load(), 5); +} + +// ==================== post on all pool types ==================== + +TEST(PoolV2, HPPoolPost) +{ + HighPerformancePool pool(2); + std::atomic ran{false}; + pool.post([&ran] { ran = true; }); + pool.wait_for_tasks(); + EXPECT_TRUE(ran); +} + +TEST(PoolV2, FastPoolPost) +{ + FastThreadPool pool(2); + std::atomic ran{false}; + pool.post([&ran] { ran = true; }); + pool.wait_for_tasks(); + EXPECT_TRUE(ran); +} + +// ==================== HP Pool deque_capacity constructor ==================== + +TEST(PoolV2, HPPoolCustomDequeCapacity) +{ + HighPerformancePool pool(2, 64); + auto f = pool.submit([] { return 7; }); + EXPECT_EQ(f.get(), 7); +} + +// ==================== Trace callbacks ==================== + +TEST(PoolV2, TraceCallbacksHP) +{ + HighPerformancePool pool(2); + std::atomic starts{0}; + std::atomic ends{0}; + + pool.set_on_task_start([&starts](auto, auto) { starts.fetch_add(1, std::memory_order_relaxed); }); + pool.set_on_task_end([&ends](auto, auto, auto) { ends.fetch_add(1, std::memory_order_relaxed); }); + + for (int i = 0; i < 10; ++i) + pool.post([] {}); + + pool.wait_for_tasks(); + EXPECT_EQ(starts.load(), 10); + EXPECT_EQ(ends.load(), 10); +} + +TEST(PoolV2, TraceCallbacksThreadPool) +{ + ThreadPool pool(2); + std::atomic starts{0}; + std::atomic ends{0}; + + pool.set_on_task_start([&starts](auto, auto) { starts.fetch_add(1, std::memory_order_relaxed); }); + pool.set_on_task_end([&ends](auto, auto, auto) { ends.fetch_add(1, std::memory_order_relaxed); }); + + for (int i = 0; i < 10; ++i) + pool.post([] {}); + + pool.wait_for_tasks(); + EXPECT_EQ(starts.load(), 10); + EXPECT_EQ(ends.load(), 10); +} + +// ==================== LightweightPool ==================== + +TEST(PoolV2, LightweightPoolPost) +{ + LightweightPool pool(2); + std::atomic count{0}; + + for (int i = 0; i < 50; ++i) + pool.post([&count] { count.fetch_add(1, std::memory_order_relaxed); }); + + pool.shutdown(ShutdownPolicy::drain); + EXPECT_EQ(count.load(), 50); +} + +TEST(PoolV2, LightweightPoolTryPost) +{ + LightweightPool pool(2); + std::atomic ran{false}; + auto result = pool.try_post([&ran] { ran = true; }); + ASSERT_TRUE(result.has_value()); + pool.shutdown(ShutdownPolicy::drain); + EXPECT_TRUE(ran); +} + +TEST(PoolV2, LightweightPoolPostBatch) +{ + LightweightPool pool(4); + std::atomic count{0}; + std::vector> tasks; + for (int i = 0; i < 100; ++i) + tasks.push_back([&count] { count.fetch_add(1, std::memory_order_relaxed); }); + + pool.post_batch(tasks.begin(), tasks.end()); + pool.shutdown(ShutdownPolicy::drain); + EXPECT_EQ(count.load(), 100); +} + +TEST(PoolV2, LightweightPoolShutdownDropPending) +{ + std::atomic count{0}; + { + LightweightPool pool(1); + pool.post([&count] { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + count.fetch_add(1, std::memory_order_relaxed); + }); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + for (int i = 0; i < 50; ++i) + pool.post([&count] { count.fetch_add(1, std::memory_order_relaxed); }); + pool.shutdown(ShutdownPolicy::drop_pending); + } + EXPECT_LT(count.load(), 51); +} + +TEST(PoolV2, LightweightPoolShutdownFor) +{ + LightweightPool pool(2); + std::atomic count{0}; + for (int i = 0; i < 5; ++i) + pool.post([&count] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + count.fetch_add(1, std::memory_order_relaxed); + }); + bool drained = pool.shutdown_for(std::chrono::milliseconds(5000)); + EXPECT_TRUE(drained); + EXPECT_EQ(count.load(), 5); +} + +TEST(PoolV2, LightweightPoolConfigureThreads) +{ + LightweightPool pool(2); + auto r = pool.configure_threads("lite"); + EXPECT_TRUE(r.has_value()); +} + +TEST(PoolV2, LightweightPoolCustomTaskSize) +{ + LightweightPoolT<128> pool(2); + std::atomic ran{false}; + pool.post([&ran] { ran = true; }); + pool.shutdown(ShutdownPolicy::drain); + EXPECT_TRUE(ran); +} + +// ==================== GlobalPool ==================== + +TEST(PoolV2, GlobalThreadPoolSubmit) +{ + auto f = GlobalThreadPool::submit([] { return 99; }); + EXPECT_EQ(f.get(), 99); +} + +TEST(PoolV2, GlobalThreadPoolPost) +{ + std::atomic ran{false}; + GlobalThreadPool::post([&ran] { ran = true; }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_TRUE(ran); +} + +// ==================== ScheduledThreadPool ==================== + +TEST(PoolV2, ScheduledAfterBasic) +{ + ScheduledThreadPool scheduler(2); + std::atomic ran{false}; + + scheduler.schedule_after(std::chrono::milliseconds(50), [&ran] { ran = true; }); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + EXPECT_TRUE(ran); +} + +TEST(PoolV2, ScheduledPeriodicRunsMultipleTimes) +{ + ScheduledThreadPool scheduler(2); + std::atomic count{0}; + + auto handle = scheduler.schedule_periodic(std::chrono::milliseconds(30), [&count] { + count.fetch_add(1, std::memory_order_relaxed); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + handle.cancel(); + scheduler.shutdown(); + EXPECT_GE(count.load(), 3); +} + +TEST(PoolV2, ScheduledCancel) +{ + ScheduledThreadPool scheduler(2); + std::atomic ran{false}; + + auto handle = scheduler.schedule_after(std::chrono::milliseconds(200), [&ran] { ran = true; }); + handle.cancel(); + EXPECT_TRUE(handle.is_cancelled()); + + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + EXPECT_FALSE(ran); +} + +TEST(PoolV2, ScheduledInsertAfterShutdownReturnsCancelledHandle) +{ + ScheduledThreadPool scheduler(2); + scheduler.shutdown(); + + auto handle = scheduler.schedule_after(std::chrono::milliseconds(10), [] {}); + EXPECT_TRUE(handle.is_cancelled()); +} + +TEST(PoolV2, ScheduledHPPool) +{ + ScheduledHighPerformancePool scheduler(2); + std::atomic ran{false}; + scheduler.schedule_after(std::chrono::milliseconds(20), [&ran] { ran = true; }); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + EXPECT_TRUE(ran); +} + +TEST(PoolV2, ScheduledLightweight) +{ + ScheduledLightweightPool scheduler(2); + std::atomic ran{false}; + scheduler.schedule_after(std::chrono::milliseconds(20), [&ran] { ran = true; }); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + EXPECT_TRUE(ran); +} + +// ==================== InlinePool ==================== + +TEST(PoolV2, InlinePoolSubmit) +{ + InlinePool pool; + auto f = pool.submit([] { return 42; }); + EXPECT_EQ(f.get(), 42); +} + +TEST(PoolV2, InlinePoolPost) +{ + InlinePool pool; + int value = 0; + pool.post([&value] { value = 7; }); + EXPECT_EQ(value, 7); +} + +TEST(PoolV2, InlinePoolExceptionPropagation) +{ + InlinePool pool; + auto f = pool.submit([]() -> int { throw std::runtime_error("inline boom"); }); + EXPECT_THROW(f.get(), std::runtime_error); +} + +TEST(PoolV2, InlinePoolShutdown) +{ + InlinePool pool; + pool.shutdown(); + EXPECT_THROW(pool.submit([] { return 1; }), std::runtime_error); +} + +TEST(PoolV2, InlinePoolParallelForEach) +{ + InlinePool pool; + std::vector data = {1, 2, 3, 4, 5}; + pool.parallel_for_each(data.begin(), data.end(), [](int& v) { v *= 10; }); + EXPECT_EQ(data, (std::vector{10, 20, 30, 40, 50})); +} + +// ==================== task_group ==================== + +TEST(PoolV2, TaskGroupWaitsForAll) +{ + ThreadPool pool(2); + std::atomic count{0}; + { + task_group group(pool); + for (int i = 0; i < 10; ++i) + group.submit([&count] { count.fetch_add(1, std::memory_order_relaxed); }); + group.wait(); + } + EXPECT_EQ(count.load(), 10); +} + +TEST(PoolV2, TaskGroupDestructorWaits) +{ + ThreadPool pool(2); + std::atomic count{0}; + { + task_group group(pool); + for (int i = 0; i < 5; ++i) + group.submit([&count] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + count.fetch_add(1, std::memory_order_relaxed); + }); + } + EXPECT_EQ(count.load(), 5); +} + +TEST(PoolV2, TaskGroupPropagatesException) +{ + ThreadPool pool(2); + task_group group(pool); + group.submit([] { throw std::runtime_error("group fail"); }); + EXPECT_THROW(group.wait(), std::runtime_error); +} + +TEST(PoolV2, TaskGroupPendingCount) +{ + ThreadPool pool(2); + task_group group(pool); + EXPECT_EQ(group.pending(), 0u); + group.submit([] { std::this_thread::sleep_for(std::chrono::milliseconds(100)); }); + EXPECT_GE(group.pending(), 0u); + group.wait(); + EXPECT_EQ(group.pending(), 0u); +} + +TEST(PoolV2, TaskGroupWithInlinePool) +{ + InlinePool pool; + int sum = 0; + { + task_group group(pool); + group.submit([&sum] { sum += 1; }); + group.submit([&sum] { sum += 2; }); + group.submit([&sum] { sum += 3; }); + group.wait(); + } + EXPECT_EQ(sum, 6); +} + +#if __cpp_lib_jthread >= 201911L +// ==================== Stop-token tasks (C++20) ==================== + +TEST(PoolV2, SubmitWithStopTokenSkipsWhenStopped) +{ + ThreadPool pool(2); + std::stop_source src; + src.request_stop(); + + auto f = pool.submit(src.get_token(), [] { return 42; }); + EXPECT_EQ(f.get(), 0); +} + +TEST(PoolV2, SubmitWithStopTokenExecutesNormally) +{ + ThreadPool pool(2); + std::stop_source src; + + auto f = pool.submit(src.get_token(), [] { return 42; }); + EXPECT_EQ(f.get(), 42); +} + +TEST(PoolV2, TrySubmitWithStopToken) +{ + HighPerformancePool pool(2); + std::stop_source src; + auto result = pool.try_submit(src.get_token(), [] { return 7; }); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value().get(), 7); +} +#endif