Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions .github/workflows/sanitizers.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
name: Sanitizers

on:
push:
branches: [ main, dev ]
pull_request:
branches: [ main, dev ]
workflow_dispatch:

jobs:
sanitizers:
name: ${{ matrix.sanitizer }} (GCC 14, C++20)
runs-on: ubuntu-24.04
strategy:
fail-fast: false
matrix:
include:
- sanitizer: ASan
flags: -fsanitize=address -fno-omit-frame-pointer
env_var: ASAN_OPTIONS=detect_leaks=1
- sanitizer: TSan
flags: -fsanitize=thread
env_var: TSAN_OPTIONS=second_deadlock_stack=1
- sanitizer: UBSan
flags: -fsanitize=undefined -fno-sanitize-recover=all
env_var: UBSAN_OPTIONS=print_stacktrace=1

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y cmake ninja-build g++-14

- name: Configure CMake
env:
CC: gcc-14
CXX: g++-14
run: |
cmake -B build -G Ninja \
-DCMAKE_BUILD_TYPE=RelWithDebInfo \
-DCMAKE_CXX_STANDARD=20 \
-DCMAKE_CXX_FLAGS="${{ matrix.flags }}" \
-DCMAKE_EXE_LINKER_FLAGS="${{ matrix.flags }}" \
-DTHREADSCHEDULE_BUILD_TESTS=ON

- name: Build
run: cmake --build build --parallel

- name: Run tests
env:
${{ matrix.env_var }}: ""
run: |
cd build
ctest --output-on-failure --parallel 2 --timeout 120

coverage:
name: Coverage (GCC 14, C++20)
runs-on: ubuntu-24.04
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y cmake ninja-build g++-14 lcov

- name: Configure CMake
env:
CC: gcc-14
CXX: g++-14
run: |
cmake -B build -G Ninja \
-DCMAKE_BUILD_TYPE=Debug \
-DCMAKE_CXX_STANDARD=20 \
-DCMAKE_CXX_FLAGS="--coverage -fprofile-arcs -ftest-coverage" \
-DCMAKE_EXE_LINKER_FLAGS="--coverage" \
-DTHREADSCHEDULE_BUILD_TESTS=ON

- name: Build
run: cmake --build build --parallel

- name: Run tests
run: |
cd build
ctest --output-on-failure --parallel

- name: Collect coverage
run: |
lcov --capture --directory build --output-file coverage.info \
--gcov-tool gcov-14 --ignore-errors mismatch
lcov --remove coverage.info \
'*/build/_deps/*' '/usr/*' '*/tests/*' '*/benchmarks/*' \
--output-file coverage.info --ignore-errors unused
lcov --list coverage.info

- name: Upload coverage artifact
uses: actions/upload-artifact@v4
with:
name: coverage-report
path: coverage.info

clang-tidy:
name: Clang-Tidy (Clang 19, C++20)
runs-on: ubuntu-24.04
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y cmake ninja-build clang-19 clang-tidy-19

- name: Configure CMake
env:
CC: clang-19
CXX: clang++-19
run: |
cmake -B build -G Ninja \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_CXX_STANDARD=20 \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DTHREADSCHEDULE_BUILD_TESTS=ON \
-DTHREADSCHEDULE_BUILD_EXAMPLES=ON

- name: Build
run: cmake --build build --parallel

- name: Run clang-tidy
run: |
find include/threadschedule -name '*.hpp' | \
xargs clang-tidy-19 -p build --warnings-as-errors='*' \
--header-filter='include/threadschedule/.*' 2>&1 | \
head -200
97 changes: 97 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,102 @@
# Changelog

## v2.1.0

> **No API/ABI breaking changes.** All modifications are bug fixes (aligning
> behaviour with documented API), internal optimizations, additive overloads,
> new classes, and new tests/infrastructure.

### Bug Fixes

- **`when_all<T>` no longer requires default-constructible `T`** -- the
`results.emplace_back()` on the exception path was removed. The vector is
never consumed when an exception is rethrown. (futures.hpp)

- **`when_any` no longer busy-polls at 1 ms** -- exponential backoff
(1 ms → 16 ms cap) and a randomized start index eliminate CPU waste and
index bias. Empty input now throws `std::invalid_argument` instead of
looping forever. (futures.hpp)

- **`ScheduledThreadPoolT::insert_task` checks `stop_`** -- scheduling a
task after `shutdown()` now returns a pre-cancelled `ScheduledTaskHandle`
instead of silently inserting a task that will never execute.
(scheduled_pool.hpp)

- **`ChaosController` uses actual thread priority** -- priority jitter now
reads the real scheduling priority via `sched_getparam()` on Linux instead
of hardcoding `ThreadPriority::normal()`. (chaos.hpp)

- **`ErrorHandler::handle_error` releases the lock before invoking
callbacks** -- callbacks are snapshot-copied under the mutex, then executed
outside the critical section, eliminating deadlock risk when callbacks
interact with the handler. (error_handler.hpp)

- **`PoolWithErrors` documentation corrected** -- the doc comment now says
"implicitly movable" instead of the incorrect "non-movable".
(thread_pool_with_errors.hpp)

### Performance

- **`distribute_affinities_by_numa` calls `read_topology()` once** -- the
previous implementation read sysfs O(n) times for n threads. New additive
overloads `affinity_for_node(CpuTopology const&, ...)` and
`distribute_affinities_by_numa(CpuTopology const&, ...)` accept a
pre-read topology snapshot. (topology.hpp)

### New Features

- **`InlinePool`** -- deterministic, single-threaded pool that executes every
task synchronously on the calling thread. Same `submit`/`post`/`try_submit`
API as `ThreadPool`, making it a drop-in for unit tests.
(inline_pool.hpp)

- **`task_group<Pool>`** -- structured concurrency primitive. All submitted
tasks are guaranteed to complete before `wait()` returns (or the destructor
runs). First exception is captured and rethrown from `wait()`.
(task_group.hpp)

- **`PoolWithErrors` forwarding constructor** -- new 2+ argument constructor
forwards pool-specific arguments (e.g. `deque_capacity` for
`HighPerformancePool`). (thread_pool_with_errors.hpp)

- **`apply_profile_detailed()`** -- new function returning a
`std::vector<std::error_code>` with one entry per configuration step,
unlike `apply_profile()` which aggregates into a single error code.
(profiles.hpp)

### Module Exports

- Added missing exports to `threadschedule.cppm`: `when_all`, `when_any`,
`when_all_settled`, `ShutdownPolicy`, `IndefiniteWait`, `PollingWait`,
`ThreadPoolBase`, `LightweightPoolT`, `LightweightPool`, `GlobalPool`,
`PoolWithErrors`, `ScheduledLightweightPool`, `TaskStartCallback`,
`TaskEndCallback`, `schedule_on`, `run_on`, `pool_executor`, `InlinePool`,
`task_group`, `apply_profile_detailed`.

### Tests

- **65 new Google Test cases** across four new test files:
- `thread_pool_v2_test.cpp` -- `try_submit`, `try_post`, `submit_batch`,
`parallel_for_each`, `ShutdownPolicy`, `LightweightPool`, `GlobalPool`,
`ScheduledThreadPool`, stop-token tasks, `InlinePool`, `task_group`.
- `futures_test.cpp` -- `when_all`, `when_any`, `when_all_settled` (typed
and void variants, empty input, exception propagation).
- `registry_query_test.cpp` -- chainable `QueryView` API: `filter`, `map`,
`for_each`, `find_if`, `any`/`all`/`none`, `take`, `skip`.
- `coroutine_pool_test.cpp` -- `schedule_on`, `run_on`, `pool_executor`,
nested awaits, cross-pool hops, exception propagation (C++20 coroutines).

### CI / Infrastructure

- **New `sanitizers.yml` workflow** with:
- **ASan** (AddressSanitizer + LeakSanitizer)
- **TSan** (ThreadSanitizer)
- **UBSan** (UndefinedBehaviorSanitizer)
- **Code coverage** job (gcov + lcov, artifact upload)
- **Clang-Tidy** job (Clang 19, C++20)

---

## v2.0.0

### Breaking Changes
Expand Down
12 changes: 8 additions & 4 deletions include/threadschedule/chaos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,19 @@ class ChaosController
});
}

// Priority jitter around current policy
// Priority jitter around the thread's actual priority
if (config_.priority_jitter != 0)
{
std::uniform_int_distribution<int> dist(-config_.priority_jitter, config_.priority_jitter);
registry().apply(pred, [&](RegisteredThreadInfo const& info) {
int delta = dist(rng);
// Use normal as baseline if we can't read current
ThreadPriority prio = ThreadPriority::normal();
(void)registry().set_priority(info.tid, ThreadPriority{prio.value() + delta});
int baseline = ThreadPriority::normal().value();
#ifndef _WIN32
sched_param sp{};
if (sched_getparam(info.tid, &sp) == 0)
baseline = sp.sched_priority;
#endif
(void)registry().set_priority(info.tid, ThreadPriority{baseline + delta});
});
}

Expand Down
12 changes: 9 additions & 3 deletions include/threadschedule/error_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,16 @@ class ErrorHandler
*/
void handle_error(TaskError const& error)
{
std::lock_guard<std::mutex> lock(mutex_);
error_count_++;
std::vector<ErrorCallback> snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
error_count_++;
snapshot.reserve(callbacks_.size());
for (auto const& [id, callback] : callbacks_)
snapshot.push_back(callback);
}

for (auto const& [id, callback] : callbacks_)
for (auto const& callback : snapshot)
{
try
{
Expand Down
39 changes: 32 additions & 7 deletions include/threadschedule/futures.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#include <chrono>
#include <exception>
#include <future>
#include <random>
#include <stdexcept>
#include <thread>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -47,7 +50,6 @@ auto when_all(std::vector<std::future<T>>& futures) -> std::vector<T>
{
if (!first_error)
first_error = std::current_exception();
results.emplace_back();
}
}

Expand Down Expand Up @@ -92,8 +94,7 @@ inline void when_all(std::vector<std::future<void>>& futures)
* @tparam T The value type of each future.
*/
template <typename T>
auto when_all_settled(std::vector<std::future<T>>& futures)
-> std::vector<expected<T, std::exception_ptr>>
auto when_all_settled(std::vector<std::future<T>>& futures) -> std::vector<expected<T, std::exception_ptr>>
{
std::vector<expected<T, std::exception_ptr>> results;
results.reserve(futures.size());
Expand All @@ -116,8 +117,7 @@ auto when_all_settled(std::vector<std::future<T>>& futures)
/**
* @brief Block until all void futures complete, returning an @c expected per slot.
*/
inline auto when_all_settled(std::vector<std::future<void>>& futures)
-> std::vector<expected<void, std::exception_ptr>>
inline auto when_all_settled(std::vector<std::future<void>>& futures) -> std::vector<expected<void, std::exception_ptr>>
{
std::vector<expected<void, std::exception_ptr>> results;
results.reserve(futures.size());
Expand Down Expand Up @@ -153,33 +153,58 @@ inline auto when_all_settled(std::vector<std::future<void>>& futures)
template <typename T>
auto when_any(std::vector<std::future<T>>& futures) -> std::pair<size_t, T>
{
if (futures.empty())
throw std::invalid_argument("when_any: empty futures vector");

thread_local std::mt19937 rng{std::random_device{}()};
std::uniform_int_distribution<size_t> dist(0, futures.size() - 1);
size_t const start = dist(rng);
unsigned backoff_ms = 1;

while (true)
{
for (size_t i = 0; i < futures.size(); ++i)
for (size_t k = 0; k < futures.size(); ++k)
{
size_t const i = (start + k) % futures.size();
if (futures[i].wait_for(std::chrono::milliseconds(1)) == std::future_status::ready)
return {i, futures[i].get()};
}
std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms));
if (backoff_ms < 16)
backoff_ms *= 2;
}
}

/**
* @brief Block until the first void future becomes ready.
*
* @return The index of the first ready future.
* @throws std::invalid_argument If @p futures is empty.
*/
inline auto when_any(std::vector<std::future<void>>& futures) -> size_t
{
if (futures.empty())
throw std::invalid_argument("when_any: empty futures vector");

thread_local std::mt19937 rng{std::random_device{}()};
std::uniform_int_distribution<size_t> dist(0, futures.size() - 1);
size_t const start = dist(rng);
unsigned backoff_ms = 1;

while (true)
{
for (size_t i = 0; i < futures.size(); ++i)
for (size_t k = 0; k < futures.size(); ++k)
{
size_t const i = (start + k) % futures.size();
if (futures[i].wait_for(std::chrono::milliseconds(1)) == std::future_status::ready)
{
futures[i].get();
return i;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms));
if (backoff_ms < 16)
backoff_ms *= 2;
}
}

Expand Down
Loading
Loading