Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <deque>
#include <list>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
Expand All @@ -46,44 +45,54 @@ struct Task {
} // namespace

struct SerialExecutor::State {
std::queue<Task> task_queue;
std::deque<Task> task_queue;
std::mutex mutex;
std::condition_variable wait_for_tasks;
bool finished;
bool finished{false};
};

SerialExecutor::SerialExecutor() : state_(new State()) {}
SerialExecutor::~SerialExecutor() {}
SerialExecutor::SerialExecutor() : state_(std::make_shared<State>()) {}

SerialExecutor::~SerialExecutor() = default;

Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
StopToken stop_token, StopCallback&& stop_callback) {
// The serial task queue is truly serial (no mutex needed) but SpawnReal may be called
// from external threads (e.g. when transferring back from blocking I/O threads) so a
// mutex is needed
// While the SerialExecutor runs tasks synchronously on its main thread,
// SpawnReal may be called from external threads (e.g. when transferring back
// from blocking I/O threads), so we need to keep the state alive *and* to
// lock its contents.
//
// Note that holding the lock while notifying the condition variable may
// not be sufficient, as some exit paths in the main thread are unlocked.
auto state = state_;
{
std::lock_guard<std::mutex> lg(state_->mutex);
state_->task_queue.push(
std::lock_guard<std::mutex> lk(state->mutex);
state->task_queue.push_back(
Task{std::move(task), std::move(stop_token), std::move(stop_callback)});
}
state_->wait_for_tasks.notify_one();
state->wait_for_tasks.notify_one();
return Status::OK();
}

void SerialExecutor::MarkFinished() {
std::lock_guard<std::mutex> lk(state_->mutex);
state_->finished = true;
// Keep the lock when notifying to avoid situations where the SerialExecutor
// would start being destroyed while the notify_one() call is still ongoing.
state_->wait_for_tasks.notify_one();
// Same comment as SpawnReal above
auto state = state_;
{
std::lock_guard<std::mutex> lk(state->mutex);
state->finished = true;
}
state->wait_for_tasks.notify_one();
}

void SerialExecutor::RunLoop() {
// This is called from the SerialExecutor's main thread, so the
// state is guaranteed to be kept alive.
std::unique_lock<std::mutex> lk(state_->mutex);

while (!state_->finished) {
while (!state_->task_queue.empty()) {
Task task = std::move(state_->task_queue.front());
state_->task_queue.pop();
state_->task_queue.pop_front();
lk.unlock();
if (!task.stop_token.IsStopRequested()) {
std::move(task.callable)();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class ARROW_EXPORT SerialExecutor : public Executor {

// State uses mutex
struct State;
std::unique_ptr<State> state_;
std::shared_ptr<State> state_;

template <typename T>
Result<T> Run(TopLevelTask<T> initial_task) {
Expand Down
53 changes: 34 additions & 19 deletions cpp/src/arrow/util/thread_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,30 @@ class TestRunSynchronously : public testing::TestWithParam<bool> {
Status RunVoid(FnOnce<Future<>(Executor*)> top_level_task) {
return RunSynchronouslyVoid(std::move(top_level_task), UseThreads());
}

void TestContinueAfterExternal(bool transfer_to_main_thread) {
bool continuation_ran = false;
EXPECT_OK_AND_ASSIGN(auto external_pool, ThreadPool::Make(1));
auto top_level_task = [&](Executor* executor) {
struct Callback {
Status operator()(...) {
*continuation_ran = true;
return Status::OK();
}
bool* continuation_ran;
};
auto fut = DeferNotOk(external_pool->Submit([&] {
SleepABit();
return Status::OK();
}));
if (transfer_to_main_thread) {
fut = executor->Transfer(fut);
}
return fut.Then(Callback{&continuation_ran});
};
ASSERT_OK(RunVoid(std::move(top_level_task)));
EXPECT_TRUE(continuation_ran);
}
};

TEST_P(TestRunSynchronously, SimpleRun) {
Expand Down Expand Up @@ -209,25 +233,16 @@ TEST_P(TestRunSynchronously, StopTokenSubmit) {
}

TEST_P(TestRunSynchronously, ContinueAfterExternal) {
bool continuation_ran = false;
EXPECT_OK_AND_ASSIGN(auto mock_io_pool, ThreadPool::Make(1));
auto top_level_task = [&](Executor* executor) {
struct Callback {
Status operator()(...) {
continuation_ran = true;
return Status::OK();
}
bool& continuation_ran;
};
return executor
->Transfer(DeferNotOk(mock_io_pool->Submit([&] {
SleepABit();
return Status::OK();
})))
.Then(Callback{continuation_ran});
};
ASSERT_OK(RunVoid(std::move(top_level_task)));
EXPECT_TRUE(continuation_ran);
// The future returned by the top-level task completes on another thread.
// This can trigger delicate race conditions in the SerialExecutor code,
// especially destruction.
this->TestContinueAfterExternal(/*transfer_to_main_thread=*/false);
}

TEST_P(TestRunSynchronously, ContinueAfterExternalTransferred) {
// Like above, but the future is transferred back to the serial executor
// after completion on an external thread.
this->TestContinueAfterExternal(/*transfer_to_main_thread=*/true);
}

TEST_P(TestRunSynchronously, SchedulerAbort) {
Expand Down