From b9eb28e3749bcd7f56141cb91608201329f6c128 Mon Sep 17 00:00:00 2001 From: SerhiiHrabarskiy Date: Tue, 16 Dec 2025 10:49:04 +0200 Subject: [PATCH 1/2] Implemented runtime thread count resizing for execution pool --- include/execq/internal/ExecutionPool.h | 20 ++++- include/execq/internal/ThreadWorker.h | 4 +- src/ExecutionPool.cpp | 116 ++++++++++++++++++++++++- src/ThreadWorker.cpp | 26 ++++-- 4 files changed, 153 insertions(+), 13 deletions(-) diff --git a/include/execq/internal/ExecutionPool.h b/include/execq/internal/ExecutionPool.h index 3a12d3c..f0297aa 100644 --- a/include/execq/internal/ExecutionPool.h +++ b/include/execq/internal/ExecutionPool.h @@ -42,10 +42,14 @@ namespace execq virtual bool notifyOneWorker() = 0; virtual void notifyAllWorkers() = 0; + + virtual void setThreadCount(uint32_t threadCount) = 0; }; namespace impl { + using ThreadWorkers = std::vector>; + class ExecutionPool: public IExecutionPool { public: @@ -56,18 +60,26 @@ namespace execq virtual bool notifyOneWorker() final; virtual void notifyAllWorkers() final; - + + virtual void setThreadCount(uint32_t threadCount) override final; + private: - std::atomic_bool m_valid { true }; + bool shouldWorkerExit(); + void extractFinishedWorkers(ThreadWorkers& workers); + + std::atomic_uint64_t m_threadCount {0}; + TaskProviderList m_providerGroup; - std::vector> m_workers; + mutable std::mutex m_workersMutex; + ThreadWorkers m_workers; + const IThreadWorkerFactory& m_workerFactory; }; namespace details { - bool NotifyWorkers(const std::vector>& workers, const bool single); + bool NotifyWorkers(const ThreadWorkers& workers, const bool single); } } } diff --git a/include/execq/internal/ThreadWorker.h b/include/execq/internal/ThreadWorker.h index 702f298..38a8453 100644 --- a/include/execq/internal/ThreadWorker.h +++ b/include/execq/internal/ThreadWorker.h @@ -35,6 +35,7 @@ namespace execq namespace impl { using Task = std::packaged_task; + using ThreadStopCb = std::function; class ITaskProvider { public: @@ -50,6 +51,7 @@ namespace execq virtual ~IThreadWorker() = default; virtual bool notifyWorker() = 0; + virtual bool finished() { return false; } }; @@ -60,7 +62,7 @@ namespace execq virtual ~IThreadWorkerFactory() = default; - virtual std::unique_ptr createWorker(impl::ITaskProvider& provider) const = 0; + virtual std::unique_ptr createWorker(impl::ITaskProvider& provider, ThreadStopCb cb = nullptr) const = 0; }; } } diff --git a/src/ExecutionPool.cpp b/src/ExecutionPool.cpp index 40bbeae..2ccd392 100644 --- a/src/ExecutionPool.cpp +++ b/src/ExecutionPool.cpp @@ -24,11 +24,34 @@ #include "ExecutionPool.h" +namespace +{ +static constexpr uint64_t packThreadsCount(uint32_t target, uint32_t current) +{ + return (uint64_t(target) << 32) | uint64_t(current); +} +static constexpr uint32_t unpackTarget(uint64_t threadCount) +{ + return uint32_t(threadCount >> 32); +} +static constexpr uint32_t unpackCurrent(uint64_t threadCount) +{ + return uint32_t(threadCount & 0xFFFFFFFFu); +} +} + execq::impl::ExecutionPool::ExecutionPool(const uint32_t threadCount, const IThreadWorkerFactory& workerFactory) + : m_workerFactory { workerFactory } { + m_threadCount.store(packThreadsCount(threadCount, threadCount)); + + std::scoped_lock lock(m_workersMutex); + m_workers.reserve(threadCount); + for (uint32_t i = 0; i < threadCount; i++) { - m_workers.emplace_back(workerFactory.createWorker(m_providerGroup)); + ThreadStopCb cb = [this]() { return this->shouldWorkerExit(); }; + m_workers.emplace_back(workerFactory.createWorker(m_providerGroup, cb)); } } @@ -44,17 +67,106 @@ void execq::impl::ExecutionPool::removeProvider(ITaskProvider& provider) bool execq::impl::ExecutionPool::notifyOneWorker() { + std::lock_guard lock(m_workersMutex); return details::NotifyWorkers(m_workers, true); } void execq::impl::ExecutionPool::notifyAllWorkers() { + std::lock_guard lock(m_workersMutex); details::NotifyWorkers(m_workers, false); } +void execq::impl::ExecutionPool::setThreadCount(uint32_t threadCount) +{ + if (threadCount < 2) + return; + + uint32_t toAdd = 0; + bool shrinking = false; + + for (;;) { + uint64_t tc = m_threadCount.load(); + uint32_t cur = unpackCurrent(tc); + + if (threadCount > cur) { + uint32_t localToAdd = threadCount - cur; + uint64_t desired = packThreadsCount(threadCount, threadCount); + + if (m_threadCount.compare_exchange_weak(tc, desired)) { + toAdd = localToAdd; + shrinking = false; + break; + } + } else { + uint64_t desired = packThreadsCount(threadCount, cur); + + if (m_threadCount.compare_exchange_weak(tc, desired)) { + toAdd = 0; + shrinking = (threadCount < cur); + break; + } + } + } + + ThreadWorkers toDestroy; + + { + std::scoped_lock lock(m_workersMutex); + + extractFinishedWorkers(toDestroy); + + if (toAdd > 0) { + m_workers.reserve(m_workers.size() + toAdd); + + for (uint32_t i = 0; i < toAdd; ++i) { + ThreadStopCb cb = [this]() { return this->shouldWorkerExit(); }; + m_workers.emplace_back(m_workerFactory.createWorker(m_providerGroup, cb)); + } + } + + if (shrinking || toAdd > 0) { + details::NotifyWorkers(m_workers, false); + } + } + + toDestroy.clear(); +} + +void execq::impl::ExecutionPool::extractFinishedWorkers(ThreadWorkers& workers) +{ + auto it = m_workers.begin(); + while (it != m_workers.end()) { + if ((*it)->finished()) { + workers.emplace_back(std::move(*it)); + it = m_workers.erase(it); + } else { + ++it; + } + } +} + +bool execq::impl::ExecutionPool::shouldWorkerExit() +{ + for (;;) + { + uint64_t threadsCount = m_threadCount.load(); + uint32_t target = unpackTarget(threadsCount); + uint32_t current = unpackCurrent(threadsCount); + + if (current <= target) + return false; + + uint64_t desired = packThreadsCount(target, current - 1); + + if (m_threadCount.compare_exchange_weak(threadsCount, desired)) + return true; + } +} + // Details -bool execq::impl::details::NotifyWorkers(const std::vector>& workers, const bool single) +bool execq::impl::details::NotifyWorkers(const ThreadWorkers& workers, const bool single) { bool notified = false; for (const auto& worker : workers) diff --git a/src/ThreadWorker.cpp b/src/ThreadWorker.cpp index 2d23d30..7561b98 100644 --- a/src/ThreadWorker.cpp +++ b/src/ThreadWorker.cpp @@ -31,10 +31,11 @@ namespace execq class ThreadWorker: public IThreadWorker { public: - explicit ThreadWorker(ITaskProvider& provider); + explicit ThreadWorker(ITaskProvider& provider, ThreadStopCb cb = nullptr); virtual ~ThreadWorker(); - virtual bool notifyWorker() final; + virtual bool notifyWorker() override final; + virtual bool finished() override final; private: void threadMain(); @@ -43,9 +44,11 @@ namespace execq private: std::atomic_bool m_shouldQuit { false }; std::atomic_bool m_checkNextTask { false }; + std::atomic_bool m_finished { false }; std::condition_variable m_condition; std::mutex m_mutex; std::unique_ptr m_thread; + ThreadStopCb m_stopCb = nullptr; ITaskProvider& m_provider; }; @@ -57,9 +60,9 @@ std::shared_ptr execq::impl::IThreadWor class ThreadWorkerFactory: public IThreadWorkerFactory { public: - virtual std::unique_ptr createWorker(ITaskProvider& provider) const final + virtual std::unique_ptr createWorker(ITaskProvider& provider, ThreadStopCb cb = nullptr) const final { - return std::unique_ptr(new ThreadWorker(provider)); + return std::unique_ptr(new ThreadWorker(provider, cb)); } }; @@ -67,8 +70,9 @@ std::shared_ptr execq::impl::IThreadWor return s_factory; } -execq::impl::ThreadWorker::ThreadWorker(ITaskProvider& provider) +execq::impl::ThreadWorker::ThreadWorker(ITaskProvider& provider, ThreadStopCb cb) : m_provider(provider) +, m_stopCb(cb) {} execq::impl::ThreadWorker::~ThreadWorker() @@ -83,7 +87,7 @@ execq::impl::ThreadWorker::~ThreadWorker() bool execq::impl::ThreadWorker::notifyWorker() { std::lock_guard lock(m_mutex); - if (m_checkNextTask) + if (m_finished || m_checkNextTask) { return false; } @@ -99,6 +103,11 @@ bool execq::impl::ThreadWorker::notifyWorker() return true; } +bool execq::impl::ThreadWorker::finished() +{ + return m_finished.load(); +} + void execq::impl::ThreadWorker::shutdown() { std::lock_guard lock(m_mutex); @@ -114,6 +123,9 @@ void execq::impl::ThreadWorker::threadMain() { break; } + + if (m_stopCb && m_stopCb()) + break; m_checkNextTask = false; Task task = m_provider.nextTask(); @@ -136,4 +148,6 @@ void execq::impl::ThreadWorker::threadMain() m_condition.wait(lock); } + + m_finished.store(true); } From 3d3b6afd9b9d6f66e6a13fb74345b31eca337916 Mon Sep 17 00:00:00 2001 From: SerhiiHrabarskiy Date: Fri, 19 Dec 2025 00:26:49 +0200 Subject: [PATCH 2/2] Fixed tests on Windows and added tests for resizable execution pool --- CMakeLists.txt | 6 +- include/execq/internal/ExecutionPool.h | 2 +- src/ExecutionPool.cpp | 16 +-- src/ThreadWorker.cpp | 10 +- tests/ExecqTestUtil.h | 4 +- tests/ExecutionPoolTest.cpp | 137 +++++++++++++++++++++++++ tests/ExecutionQueueTest.cpp | 11 +- tests/ExecutionStreamTest.cpp | 2 +- 8 files changed, 167 insertions(+), 21 deletions(-) create mode 100644 tests/ExecutionPoolTest.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d08165b..897e71e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,13 +38,17 @@ include_directories(execq PRIVATE ### execq unit-tests ### - if (EXECQ_TESTING_ENABLE) + if (MSVC) + set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) + endif() + set(TEST_SOURCES tests/ExecqTestUtil.h tests/CancelTokenProviderTest.cpp tests/ExecutionStreamTest.cpp tests/ExecutionQueueTest.cpp + tests/ExecutionPoolTest.cpp tests/TaskExecutionQueueTest.cpp tests/TaskProviderListTest.cpp ) diff --git a/include/execq/internal/ExecutionPool.h b/include/execq/internal/ExecutionPool.h index f0297aa..a980bfb 100644 --- a/include/execq/internal/ExecutionPool.h +++ b/include/execq/internal/ExecutionPool.h @@ -61,7 +61,7 @@ namespace execq virtual bool notifyOneWorker() final; virtual void notifyAllWorkers() final; - virtual void setThreadCount(uint32_t threadCount) override final; + virtual void setThreadCount(uint32_t threadCount) final; private: bool shouldWorkerExit(); diff --git a/src/ExecutionPool.cpp b/src/ExecutionPool.cpp index 2ccd392..51751f9 100644 --- a/src/ExecutionPool.cpp +++ b/src/ExecutionPool.cpp @@ -45,7 +45,7 @@ execq::impl::ExecutionPool::ExecutionPool(const uint32_t threadCount, const IThr { m_threadCount.store(packThreadsCount(threadCount, threadCount)); - std::scoped_lock lock(m_workersMutex); + std::lock_guard lock(m_workersMutex); m_workers.reserve(threadCount); for (uint32_t i = 0; i < threadCount; i++) @@ -86,11 +86,11 @@ void execq::impl::ExecutionPool::setThreadCount(uint32_t threadCount) for (;;) { uint64_t tc = m_threadCount.load(); - uint32_t cur = unpackCurrent(tc); + auto cur = unpackCurrent(tc); if (threadCount > cur) { uint32_t localToAdd = threadCount - cur; - uint64_t desired = packThreadsCount(threadCount, threadCount); + auto desired = packThreadsCount(threadCount, threadCount); if (m_threadCount.compare_exchange_weak(tc, desired)) { toAdd = localToAdd; @@ -98,7 +98,7 @@ void execq::impl::ExecutionPool::setThreadCount(uint32_t threadCount) break; } } else { - uint64_t desired = packThreadsCount(threadCount, cur); + auto desired = packThreadsCount(threadCount, cur); if (m_threadCount.compare_exchange_weak(tc, desired)) { toAdd = 0; @@ -111,7 +111,7 @@ void execq::impl::ExecutionPool::setThreadCount(uint32_t threadCount) ThreadWorkers toDestroy; { - std::scoped_lock lock(m_workersMutex); + std::lock_guard lock(m_workersMutex); extractFinishedWorkers(toDestroy); @@ -150,13 +150,13 @@ bool execq::impl::ExecutionPool::shouldWorkerExit() for (;;) { uint64_t threadsCount = m_threadCount.load(); - uint32_t target = unpackTarget(threadsCount); - uint32_t current = unpackCurrent(threadsCount); + auto target = unpackTarget(threadsCount); + auto current = unpackCurrent(threadsCount); if (current <= target) return false; - uint64_t desired = packThreadsCount(target, current - 1); + auto desired = packThreadsCount(target, current - 1); if (m_threadCount.compare_exchange_weak(threadsCount, desired)) return true; diff --git a/src/ThreadWorker.cpp b/src/ThreadWorker.cpp index 7561b98..2714fb5 100644 --- a/src/ThreadWorker.cpp +++ b/src/ThreadWorker.cpp @@ -34,8 +34,8 @@ namespace execq explicit ThreadWorker(ITaskProvider& provider, ThreadStopCb cb = nullptr); virtual ~ThreadWorker(); - virtual bool notifyWorker() override final; - virtual bool finished() override final; + virtual bool notifyWorker() final; + virtual bool finished() final; private: void threadMain(); @@ -48,9 +48,9 @@ namespace execq std::condition_variable m_condition; std::mutex m_mutex; std::unique_ptr m_thread; - ThreadStopCb m_stopCb = nullptr; ITaskProvider& m_provider; + ThreadStopCb m_stopCb = nullptr; }; } } @@ -71,8 +71,8 @@ std::shared_ptr execq::impl::IThreadWor } execq::impl::ThreadWorker::ThreadWorker(ITaskProvider& provider, ThreadStopCb cb) -: m_provider(provider) -, m_stopCb(cb) + : m_provider(provider) + , m_stopCb(std::move(cb)) {} execq::impl::ThreadWorker::~ThreadWorker() diff --git a/tests/ExecqTestUtil.h b/tests/ExecqTestUtil.h index 6284fd2..f5aa697 100644 --- a/tests/ExecqTestUtil.h +++ b/tests/ExecqTestUtil.h @@ -40,12 +40,14 @@ namespace execq MOCK_METHOD0(notifyOneWorker, bool()); MOCK_METHOD0(notifyAllWorkers, void()); + + MOCK_METHOD1(setThreadCount, void(uint32_t threadCount)); }; class MockThreadWorkerFactory: public execq::impl::IThreadWorkerFactory { public: - MOCK_CONST_METHOD1(createWorker, std::unique_ptr(execq::impl::ITaskProvider& provider)); + MOCK_CONST_METHOD2(createWorker, std::unique_ptr(execq::impl::ITaskProvider& provider, execq::impl::ThreadStopCb cb)); }; class MockThreadWorker: public execq::impl::IThreadWorker diff --git a/tests/ExecutionPoolTest.cpp b/tests/ExecutionPoolTest.cpp new file mode 100644 index 0000000..738f26f --- /dev/null +++ b/tests/ExecutionPoolTest.cpp @@ -0,0 +1,137 @@ +/* + * MIT License + * + * Copyright (c) 2018 Alkenso (Vladimir Vashurkin) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "execq.h" +#include + +using namespace execq; + +struct ActiveTasksCounter +{ + std::atomic_int active{0}; + std::atomic_int maxActive{0}; + + void operator()(const std::atomic_bool& isCanceled, int) + { + if (isCanceled) + return; + + int now = active.fetch_add(1) + 1; + maxActive.store(std::max(maxActive.load(), now)); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + active.fetch_sub(1); + } +}; + +TEST(ExecutionPool, ScaleUp) +{ + auto pool = CreateExecutionPool(2); + + ActiveTasksCounter counter; + + auto queue = CreateConcurrentExecutionQueue(pool, std::ref(counter)); + + for (int i = 0; i < 20; ++i) + queue->push(i); + + pool->setThreadCount(4); + + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + EXPECT_EQ(counter.maxActive.load(), 5); +} + +TEST(ExecutionPool, ScaleDown) +{ + auto pool = CreateExecutionPool(4); + + ActiveTasksCounter counter; + + auto queue = CreateConcurrentExecutionQueue(pool, std::ref(counter)); + + for (int i = 0; i < 50; ++i) + queue->push(i); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + pool->setThreadCount(2); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + counter.maxActive.store(0); + + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + EXPECT_EQ(counter.maxActive.load(), 3); +} + +TEST(ExecutionPool, ScaleDown_InvalidValue) +{ + auto pool = CreateExecutionPool(4); + + ActiveTasksCounter counter; + + auto queue = CreateConcurrentExecutionQueue(pool, std::ref(counter)); + + for (int i = 0; i < 50; ++i) + queue->push(i); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + pool->setThreadCount(1); // Invalid thread count + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + counter.maxActive.store(0); + + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + EXPECT_EQ(counter.maxActive.load(), 5); +} + +TEST(ExecutionPool, ReconfigureWhileBusy_NoTaskLost) +{ + auto pool = CreateExecutionPool(2); + + constexpr int tasksCount = 500; + std::atomic_int done{0}; + + auto queue = CreateConcurrentExecutionQueue(pool, + [&](const std::atomic_bool&, int){ + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + done.fetch_add(1); + }); + + constexpr auto timeout = std::chrono::seconds(15); + auto deadline = std::chrono::steady_clock::now() + timeout; + + for (int i = 0; i < tasksCount; ++i) + { + queue->push(i); + if (i % 50 == 0) + pool->setThreadCount((i % 3) + 2); + + std::chrono::microseconds(100); + } + + while (done.load() < tasksCount && std::chrono::steady_clock::now() < deadline) + std::this_thread::yield(); + + EXPECT_EQ(done.load(), tasksCount); +} + diff --git a/tests/ExecutionQueueTest.cpp b/tests/ExecutionQueueTest.cpp index f349095..8fd8d49 100644 --- a/tests/ExecutionQueueTest.cpp +++ b/tests/ExecutionQueueTest.cpp @@ -22,6 +22,8 @@ * SOFTWARE. */ +#include + #include "execq.h" #include "ExecqTestUtil.h" @@ -72,7 +74,8 @@ TEST(ExecutionPool, ExecutionQueue_MultipleTasks) for (size_t i = 0; i < count; i++) { - queue->push(arc4random()); + static std::mt19937 rng{std::random_device{}()}; + queue->push(static_cast(rng())); } } @@ -114,7 +117,7 @@ TEST(ExecutionPool, ExecutionQueue_ExecutionPool_Concurrent) // Queue also creates additional single thread worker for its own needs std::unique_ptr additionalWorkerPtr(new MockThreadWorker{}); MockThreadWorker& additionalWorker = *additionalWorkerPtr; - EXPECT_CALL(workerFactory, createWorker(::testing::_)) + EXPECT_CALL(workerFactory, createWorker(::testing::_, ::testing::_)) .WillOnce(::testing::Return(::testing::ByMove(std::move(additionalWorkerPtr)))); @@ -174,7 +177,7 @@ TEST(ExecutionPool, ExecutionQueue_ExecutionPool_Serial) // Queue also creates additional single thread worker for its own needs std::unique_ptr additionalWorkerPtr(new MockThreadWorker{}); MockThreadWorker& additionalWorker = *additionalWorkerPtr; - EXPECT_CALL(workerFactory, createWorker(::testing::_)) + EXPECT_CALL(workerFactory, createWorker(::testing::_, ::testing::_)) .WillOnce(::testing::Return(::testing::ByMove(std::move(additionalWorkerPtr)))); @@ -244,7 +247,7 @@ TEST(ExecutionPool, ExecutionQueue_Cancelability) // Queue also creates additional single thread worker for its own needs std::unique_ptr additionalWorkerPtr(new MockThreadWorker{}); - EXPECT_CALL(workerFactory, createWorker(::testing::_)) + EXPECT_CALL(workerFactory, createWorker(::testing::_, ::testing::_)) .WillOnce(::testing::Return(::testing::ByMove(std::move(additionalWorkerPtr)))); diff --git a/tests/ExecutionStreamTest.cpp b/tests/ExecutionStreamTest.cpp index 4d317ea..0f3d4fd 100644 --- a/tests/ExecutionStreamTest.cpp +++ b/tests/ExecutionStreamTest.cpp @@ -76,7 +76,7 @@ TEST(ExecutionPool, ExecutionStream_WorkerPool) // Strean also creates additional single thread worker for its own needs std::unique_ptr additionalWorkerPtr(new MockThreadWorker{}); MockThreadWorker& additionalWorker = *additionalWorkerPtr; - EXPECT_CALL(workerFactory, createWorker(::testing::_)) + EXPECT_CALL(workerFactory, createWorker(::testing::_, ::testing::_)) .WillOnce(::testing::Return(::testing::ByMove(std::move(additionalWorkerPtr))));