Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ include_directories(execq PRIVATE


### execq unit-tests ###

if (EXECQ_TESTING_ENABLE)
if (MSVC)
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
endif()

set(TEST_SOURCES
tests/ExecqTestUtil.h
tests/CancelTokenProviderTest.cpp
tests/ExecutionStreamTest.cpp
tests/ExecutionQueueTest.cpp
tests/ExecutionPoolTest.cpp
tests/TaskExecutionQueueTest.cpp
tests/TaskProviderListTest.cpp
)
Expand Down
20 changes: 16 additions & 4 deletions include/execq/internal/ExecutionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ namespace execq

virtual bool notifyOneWorker() = 0;
virtual void notifyAllWorkers() = 0;

virtual void setThreadCount(uint32_t threadCount) = 0;
};

namespace impl
{
using ThreadWorkers = std::vector<std::unique_ptr<IThreadWorker>>;

class ExecutionPool: public IExecutionPool
{
public:
Expand All @@ -56,18 +60,26 @@ namespace execq

virtual bool notifyOneWorker() final;
virtual void notifyAllWorkers() final;


virtual void setThreadCount(uint32_t threadCount) final;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'virtual' is redundant since the function is already declared 'final'


private:
std::atomic_bool m_valid { true };
bool shouldWorkerExit();
void extractFinishedWorkers(ThreadWorkers& workers);

std::atomic_uint64_t m_threadCount {0};
Comment thread
WarGloom marked this conversation as resolved.

TaskProviderList m_providerGroup;

std::vector<std::unique_ptr<IThreadWorker>> m_workers;
mutable std::mutex m_workersMutex;
ThreadWorkers m_workers;
const IThreadWorkerFactory& m_workerFactory;
};


namespace details
{
bool NotifyWorkers(const std::vector<std::unique_ptr<IThreadWorker>>& workers, const bool single);
bool NotifyWorkers(const ThreadWorkers& workers, const bool single);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need const bool ?

}
}
}
Expand Down
4 changes: 3 additions & 1 deletion include/execq/internal/ThreadWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ namespace execq
namespace impl
{
using Task = std::packaged_task<void()>;
using ThreadStopCb = std::function<bool()>;
class ITaskProvider
{
public:
Expand All @@ -50,6 +51,7 @@ namespace execq
virtual ~IThreadWorker() = default;

virtual bool notifyWorker() = 0;
virtual bool finished() { return false; }
};


Expand All @@ -60,7 +62,7 @@ namespace execq

virtual ~IThreadWorkerFactory() = default;

virtual std::unique_ptr<impl::IThreadWorker> createWorker(impl::ITaskProvider& provider) const = 0;
virtual std::unique_ptr<impl::IThreadWorker> createWorker(impl::ITaskProvider& provider, ThreadStopCb cb = nullptr) const = 0;
};
}
}
Expand Down
116 changes: 114 additions & 2 deletions src/ExecutionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,34 @@

#include "ExecutionPool.h"

namespace
{
static constexpr uint64_t packThreadsCount(uint32_t target, uint32_t current)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'packThreadsCount' is a static definition in anonymous namespace; static is redundant here

{
return (uint64_t(target) << 32) | uint64_t(current);
Comment thread
SerhiiHrabarskiy marked this conversation as resolved.
}
static constexpr uint32_t unpackTarget(uint64_t threadCount)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is a static definition in anonymous namespace; static is redundant here

{
return uint32_t(threadCount >> 32);
}
static constexpr uint32_t unpackCurrent(uint64_t threadCount)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is a static definition in anonymous namespace; static is redundant here

{
return uint32_t(threadCount & 0xFFFFFFFFu);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

execq::impl::ExecutionPool::ExecutionPool(const uint32_t threadCount, const IThreadWorkerFactory& workerFactory)
: m_workerFactory { workerFactory }
{
m_threadCount.store(packThreadsCount(threadCount, threadCount));

std::lock_guard<std::mutex> lock(m_workersMutex);
m_workers.reserve(threadCount);

for (uint32_t i = 0; i < threadCount; i++)
{
m_workers.emplace_back(workerFactory.createWorker(m_providerGroup));
ThreadStopCb cb = [this]() { return this->shouldWorkerExit(); };
m_workers.emplace_back(workerFactory.createWorker(m_providerGroup, cb));
}
}

Expand All @@ -44,17 +67,106 @@ void execq::impl::ExecutionPool::removeProvider(ITaskProvider& provider)

bool execq::impl::ExecutionPool::notifyOneWorker()
{
std::lock_guard<std::mutex> lock(m_workersMutex);
return details::NotifyWorkers(m_workers, true);
}

void execq::impl::ExecutionPool::notifyAllWorkers()
{
std::lock_guard<std::mutex> lock(m_workersMutex);
details::NotifyWorkers(m_workers, false);
}
void execq::impl::ExecutionPool::setThreadCount(uint32_t threadCount)
{
if (threadCount < 2)
return;

uint32_t toAdd = 0;
bool shrinking = false;

for (;;) {
uint64_t tc = m_threadCount.load();
auto cur = unpackCurrent(tc);

if (threadCount > cur) {
uint32_t localToAdd = threadCount - cur;
auto desired = packThreadsCount(threadCount, threadCount);

if (m_threadCount.compare_exchange_weak(tc, desired)) {
toAdd = localToAdd;
shrinking = false;
break;
}
} else {
auto desired = packThreadsCount(threadCount, cur);

if (m_threadCount.compare_exchange_weak(tc, desired)) {
toAdd = 0;
shrinking = (threadCount < cur);
break;
}
}
}

ThreadWorkers toDestroy;

{
std::lock_guard<std::mutex> lock(m_workersMutex);

extractFinishedWorkers(toDestroy);

if (toAdd > 0) {
m_workers.reserve(m_workers.size() + toAdd);

for (uint32_t i = 0; i < toAdd; ++i) {
ThreadStopCb cb = [this]() { return this->shouldWorkerExit(); };
m_workers.emplace_back(m_workerFactory.createWorker(m_providerGroup, cb));
}
}

if (shrinking || toAdd > 0) {
details::NotifyWorkers(m_workers, false);
}
}

toDestroy.clear();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant

}

void execq::impl::ExecutionPool::extractFinishedWorkers(ThreadWorkers& workers)
{
auto it = m_workers.begin();
while (it != m_workers.end()) {
if ((*it)->finished()) {
workers.emplace_back(std::move(*it));
it = m_workers.erase(it);
} else {
++it;
}
}
}

bool execq::impl::ExecutionPool::shouldWorkerExit()
{
for (;;)
{
uint64_t threadsCount = m_threadCount.load();
auto target = unpackTarget(threadsCount);
auto current = unpackCurrent(threadsCount);

if (current <= target)
return false;

auto desired = packThreadsCount(target, current - 1);

if (m_threadCount.compare_exchange_weak(threadsCount, desired))
return true;
}
}


// Details

bool execq::impl::details::NotifyWorkers(const std::vector<std::unique_ptr<IThreadWorker>>& workers, const bool single)
bool execq::impl::details::NotifyWorkers(const ThreadWorkers& workers, const bool single)
{
bool notified = false;
for (const auto& worker : workers)
Expand Down
26 changes: 20 additions & 6 deletions src/ThreadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ namespace execq
class ThreadWorker: public IThreadWorker
{
public:
explicit ThreadWorker(ITaskProvider& provider);
explicit ThreadWorker(ITaskProvider& provider, ThreadStopCb cb = nullptr);
virtual ~ThreadWorker();

virtual bool notifyWorker() final;
virtual bool finished() final;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'virtual' is redundant since the function is already declared 'final'


private:
void threadMain();
Expand All @@ -43,11 +44,13 @@ namespace execq
private:
std::atomic_bool m_shouldQuit { false };
std::atomic_bool m_checkNextTask { false };
std::atomic_bool m_finished { false };
std::condition_variable m_condition;
std::mutex m_mutex;
std::unique_ptr<std::thread> m_thread;

ITaskProvider& m_provider;
ThreadStopCb m_stopCb = nullptr;
};
}
}
Expand All @@ -57,18 +60,19 @@ std::shared_ptr<const execq::impl::IThreadWorkerFactory> execq::impl::IThreadWor
class ThreadWorkerFactory: public IThreadWorkerFactory
{
public:
virtual std::unique_ptr<IThreadWorker> createWorker(ITaskProvider& provider) const final
virtual std::unique_ptr<IThreadWorker> createWorker(ITaskProvider& provider, ThreadStopCb cb = nullptr) const final
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'virtual' is redundant since the function is already declared 'final'

{
return std::unique_ptr<IThreadWorker>(new ThreadWorker(provider));
return std::unique_ptr<IThreadWorker>(new ThreadWorker(provider, cb));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::make_uniquestd::thread

}
};

static std::shared_ptr<IThreadWorkerFactory> s_factory = std::make_shared<ThreadWorkerFactory>();
return s_factory;
}

execq::impl::ThreadWorker::ThreadWorker(ITaskProvider& provider)
: m_provider(provider)
execq::impl::ThreadWorker::ThreadWorker(ITaskProvider& provider, ThreadStopCb cb)
: m_provider(provider)
, m_stopCb(std::move(cb))
{}

execq::impl::ThreadWorker::~ThreadWorker()
Expand All @@ -83,7 +87,7 @@ execq::impl::ThreadWorker::~ThreadWorker()
bool execq::impl::ThreadWorker::notifyWorker()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_checkNextTask)
if (m_finished || m_checkNextTask)
{
return false;
}
Expand All @@ -99,6 +103,11 @@ bool execq::impl::ThreadWorker::notifyWorker()
return true;
}

bool execq::impl::ThreadWorker::finished()
{
return m_finished.load();
}

void execq::impl::ThreadWorker::shutdown()
{
std::lock_guard<std::mutex> lock(m_mutex);
Expand All @@ -114,6 +123,9 @@ void execq::impl::ThreadWorker::threadMain()
{
break;
}

if (m_stopCb && m_stopCb())
break;

m_checkNextTask = false;
Task task = m_provider.nextTask();
Expand All @@ -136,4 +148,6 @@ void execq::impl::ThreadWorker::threadMain()

m_condition.wait(lock);
}

m_finished.store(true);
}
4 changes: 3 additions & 1 deletion tests/ExecqTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ namespace execq

MOCK_METHOD0(notifyOneWorker, bool());
MOCK_METHOD0(notifyAllWorkers, void());

MOCK_METHOD1(setThreadCount, void(uint32_t threadCount));
};

class MockThreadWorkerFactory: public execq::impl::IThreadWorkerFactory
{
public:
MOCK_CONST_METHOD1(createWorker, std::unique_ptr<execq::impl::IThreadWorker>(execq::impl::ITaskProvider& provider));
MOCK_CONST_METHOD2(createWorker, std::unique_ptr<execq::impl::IThreadWorker>(execq::impl::ITaskProvider& provider, execq::impl::ThreadStopCb cb));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use modern mock style ?
like MOCK_METHOD(bool, name, (), (const, override));

};

class MockThreadWorker: public execq::impl::IThreadWorker
Expand Down
Loading