diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index cd523609d27..6465ebbc6fc 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -46,44 +45,54 @@ struct Task { } // namespace struct SerialExecutor::State { - std::queue task_queue; + std::deque task_queue; std::mutex mutex; std::condition_variable wait_for_tasks; - bool finished; + bool finished{false}; }; -SerialExecutor::SerialExecutor() : state_(new State()) {} -SerialExecutor::~SerialExecutor() {} +SerialExecutor::SerialExecutor() : state_(std::make_shared()) {} + +SerialExecutor::~SerialExecutor() = default; Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, StopToken stop_token, StopCallback&& stop_callback) { - // The serial task queue is truly serial (no mutex needed) but SpawnReal may be called - // from external threads (e.g. when transferring back from blocking I/O threads) so a - // mutex is needed + // While the SerialExecutor runs tasks synchronously on its main thread, + // SpawnReal may be called from external threads (e.g. when transferring back + // from blocking I/O threads), so we need to keep the state alive *and* to + // lock its contents. + // + // Note that holding the lock while notifying the condition variable may + // not be sufficient, as some exit paths in the main thread are unlocked. + auto state = state_; { - std::lock_guard lg(state_->mutex); - state_->task_queue.push( + std::lock_guard lk(state->mutex); + state->task_queue.push_back( Task{std::move(task), std::move(stop_token), std::move(stop_callback)}); } - state_->wait_for_tasks.notify_one(); + state->wait_for_tasks.notify_one(); return Status::OK(); } void SerialExecutor::MarkFinished() { - std::lock_guard lk(state_->mutex); - state_->finished = true; - // Keep the lock when notifying to avoid situations where the SerialExecutor - // would start being destroyed while the notify_one() call is still ongoing. - state_->wait_for_tasks.notify_one(); + // Same comment as SpawnReal above + auto state = state_; + { + std::lock_guard lk(state->mutex); + state->finished = true; + } + state->wait_for_tasks.notify_one(); } void SerialExecutor::RunLoop() { + // This is called from the SerialExecutor's main thread, so the + // state is guaranteed to be kept alive. std::unique_lock lk(state_->mutex); while (!state_->finished) { while (!state_->task_queue.empty()) { Task task = std::move(state_->task_queue.front()); - state_->task_queue.pop(); + state_->task_queue.pop_front(); lk.unlock(); if (!task.stop_token.IsStopRequested()) { std::move(task.callable)(); diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index cd964385c6e..c388680befc 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -225,7 +225,7 @@ class ARROW_EXPORT SerialExecutor : public Executor { // State uses mutex struct State; - std::unique_ptr state_; + std::shared_ptr state_; template Result Run(TopLevelTask initial_task) { diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 2390f8c1a41..9926ac1a7a4 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -135,6 +135,30 @@ class TestRunSynchronously : public testing::TestWithParam { Status RunVoid(FnOnce(Executor*)> top_level_task) { return RunSynchronouslyVoid(std::move(top_level_task), UseThreads()); } + + void TestContinueAfterExternal(bool transfer_to_main_thread) { + bool continuation_ran = false; + EXPECT_OK_AND_ASSIGN(auto external_pool, ThreadPool::Make(1)); + auto top_level_task = [&](Executor* executor) { + struct Callback { + Status operator()(...) { + *continuation_ran = true; + return Status::OK(); + } + bool* continuation_ran; + }; + auto fut = DeferNotOk(external_pool->Submit([&] { + SleepABit(); + return Status::OK(); + })); + if (transfer_to_main_thread) { + fut = executor->Transfer(fut); + } + return fut.Then(Callback{&continuation_ran}); + }; + ASSERT_OK(RunVoid(std::move(top_level_task))); + EXPECT_TRUE(continuation_ran); + } }; TEST_P(TestRunSynchronously, SimpleRun) { @@ -209,25 +233,16 @@ TEST_P(TestRunSynchronously, StopTokenSubmit) { } TEST_P(TestRunSynchronously, ContinueAfterExternal) { - bool continuation_ran = false; - EXPECT_OK_AND_ASSIGN(auto mock_io_pool, ThreadPool::Make(1)); - auto top_level_task = [&](Executor* executor) { - struct Callback { - Status operator()(...) { - continuation_ran = true; - return Status::OK(); - } - bool& continuation_ran; - }; - return executor - ->Transfer(DeferNotOk(mock_io_pool->Submit([&] { - SleepABit(); - return Status::OK(); - }))) - .Then(Callback{continuation_ran}); - }; - ASSERT_OK(RunVoid(std::move(top_level_task))); - EXPECT_TRUE(continuation_ran); + // The future returned by the top-level task completes on another thread. + // This can trigger delicate race conditions in the SerialExecutor code, + // especially destruction. + this->TestContinueAfterExternal(/*transfer_to_main_thread=*/false); +} + +TEST_P(TestRunSynchronously, ContinueAfterExternalTransferred) { + // Like above, but the future is transferred back to the serial executor + // after completion on an external thread. + this->TestContinueAfterExternal(/*transfer_to_main_thread=*/true); } TEST_P(TestRunSynchronously, SchedulerAbort) {