From 503bc4600f69378cdd455aa212cfb0ecef8890c6 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 7 Feb 2022 16:51:50 +0100 Subject: [PATCH] ARROW-15593: [C++] Make after-fork ThreadPool reinitialization thread-safe Since after-fork reinitialization is triggered when one of the ThreadPool methods is called, it can be very well be called from multiple threads at once. Make it thread-safe. --- cpp/src/arrow/util/mutex.cc | 31 +++++++ cpp/src/arrow/util/mutex.h | 21 +++++ cpp/src/arrow/util/thread_pool.cc | 41 +++++---- cpp/src/arrow/util/thread_pool.h | 6 +- cpp/src/arrow/util/thread_pool_test.cc | 117 +++++++++++++++++++++---- 5 files changed, 181 insertions(+), 35 deletions(-) diff --git a/cpp/src/arrow/util/mutex.cc b/cpp/src/arrow/util/mutex.cc index 7456d7889d8e..9f82ad45b074 100644 --- a/cpp/src/arrow/util/mutex.cc +++ b/cpp/src/arrow/util/mutex.cc @@ -19,6 +19,11 @@ #include +#ifndef _WIN32 +#include +#include +#endif + #include "arrow/util/logging.h" namespace arrow { @@ -50,5 +55,31 @@ Mutex::Guard Mutex::Lock() { Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {} +#ifndef _WIN32 +namespace { + +struct AfterForkState { + // A global instance that will also register the atfork handler when + // constructed. + static AfterForkState instance; + + // The mutex may be used at shutdown, so make it eternal. + // The leak (only in child processes) is a small price to pay for robustness. + Mutex* mutex = nullptr; + + private: + AfterForkState() { + pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork); + } + + static void AfterFork() { instance.mutex = new Mutex; } +}; + +AfterForkState AfterForkState::instance; +} // namespace + +Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; } +#endif // _WIN32 + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/mutex.h b/cpp/src/arrow/util/mutex.h index f4fc64181fb1..ac63cf70cd9a 100644 --- a/cpp/src/arrow/util/mutex.h +++ b/cpp/src/arrow/util/mutex.h @@ -60,5 +60,26 @@ class ARROW_EXPORT Mutex { std::unique_ptr impl_; }; +#ifndef _WIN32 +/// Return a pointer to a process-wide, process-specific Mutex that can be used +/// at any point in a child process. NULL is returned when called in the parent. +/// +/// The rule is to first check that getpid() corresponds to the parent process pid +/// and, if not, call this function to lock any after-fork reinitialization code. +/// Like this: +/// +/// std::atomic pid{getpid()}; +/// ... +/// if (pid.load() != getpid()) { +/// // In child process +/// auto lock = GlobalForkSafeMutex()->Lock(); +/// if (pid.load() != getpid()) { +/// // Reinitialize internal structures after fork +/// ... +/// pid.store(getpid()); +ARROW_EXPORT +Mutex* GlobalForkSafeMutex(); +#endif + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 37132fe1a9ce..a1387947e3a2 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -28,6 +28,7 @@ #include "arrow/util/io_util.h" #include "arrow/util/logging.h" +#include "arrow/util/mutex.h" namespace arrow { namespace internal { @@ -235,24 +236,28 @@ ThreadPool::~ThreadPool() { void ThreadPool::ProtectAgainstFork() { #ifndef _WIN32 pid_t current_pid = getpid(); - if (pid_ != current_pid) { - // Reinitialize internal state in child process after fork() - // Ideally we would use pthread_at_fork(), but that doesn't allow - // storing an argument, hence we'd need to maintain a list of all - // existing ThreadPools. - int capacity = state_->desired_capacity_; - - auto new_state = std::make_shared(); - new_state->please_shutdown_ = state_->please_shutdown_; - new_state->quick_shutdown_ = state_->quick_shutdown_; - - pid_ = current_pid; - sp_state_ = new_state; - state_ = sp_state_.get(); - - // Launch worker threads anew - if (!state_->please_shutdown_) { - ARROW_UNUSED(SetCapacity(capacity)); + if (pid_.load() != current_pid) { + // Reinitialize internal state in child process after fork(). + { + // Since after-fork reinitialization is triggered when one of the ThreadPool + // methods is called, it can be very well be called from multiple threads + // at once. Therefore, it needs to be guarded with a lock. + auto lock = util::GlobalForkSafeMutex()->Lock(); + + if (pid_.load() != current_pid) { + int capacity = state_->desired_capacity_; + + auto new_state = std::make_shared(); + new_state->please_shutdown_ = state_->please_shutdown_; + new_state->quick_shutdown_ = state_->quick_shutdown_; + + sp_state_ = new_state; + state_ = sp_state_.get(); + pid_ = current_pid; + + // Launch worker threads anew + ARROW_UNUSED(SetCapacity(capacity)); + } } } #endif diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 4ed908d6f297..a104e0e35900 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -21,6 +21,10 @@ #include #endif +#ifndef _WIN32 +#include +#endif + #include #include #include @@ -373,7 +377,7 @@ class ARROW_EXPORT ThreadPool : public Executor { State* state_; bool shutdown_on_destroy_; #ifndef _WIN32 - pid_t pid_; + std::atomic pid_; #endif }; diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 399c755a8f95..56efedca74ad 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -36,6 +36,7 @@ #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" +#include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/test_common.h" #include "arrow/util/thread_pool.h" @@ -610,32 +611,42 @@ TEST_F(TestThreadPool, SubmitWithStopTokenCancelled) { #if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \ defined(THREAD_SANITIZER)) -TEST_F(TestThreadPool, ForkSafety) { - pid_t child_pid; - int child_status; +class TestThreadPoolForkSafety : public TestThreadPool { + public: + void CheckChildExit(int child_pid) { + ASSERT_GT(child_pid, 0); + int child_status; + int got_pid = waitpid(child_pid, &child_status, 0); + ASSERT_EQ(got_pid, child_pid); + if (WIFSIGNALED(child_status)) { + FAIL() << "Child terminated by signal " << WTERMSIG(child_status); + } + if (!WIFEXITED(child_status)) { + FAIL() << "Child didn't terminate normally?? Child status = " << child_status; + } + ASSERT_EQ(WEXITSTATUS(child_status), 0); + } +}; + +TEST_F(TestThreadPoolForkSafety, Basics) { { // Fork after task submission auto pool = this->MakeThreadPool(3); ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add, 4, 5)); ASSERT_OK_AND_EQ(9, fut.result()); - child_pid = fork(); + auto child_pid = fork(); if (child_pid == 0) { // Child: thread pool should be usable ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add, 3, 4)); - if (*fut.result() != 7) { - std::exit(1); - } + ASSERT_FINISHES_OK_AND_EQ(7, fut); // Shutting down shouldn't hang or fail Status st = pool->Shutdown(); std::exit(st.ok() ? 0 : 2); } else { // Parent - ASSERT_GT(child_pid, 0); - ASSERT_GT(waitpid(child_pid, &child_status, 0), 0); - ASSERT_TRUE(WIFEXITED(child_status)); - ASSERT_EQ(WEXITSTATUS(child_status), 0); + CheckChildExit(child_pid); ASSERT_OK(pool->Shutdown()); } } @@ -644,7 +655,7 @@ TEST_F(TestThreadPool, ForkSafety) { auto pool = this->MakeThreadPool(3); ASSERT_OK(pool->Shutdown()); - child_pid = fork(); + auto child_pid = fork(); if (child_pid == 0) { // Child // Spawning a task should return with error (pool was shutdown) @@ -657,13 +668,87 @@ TEST_F(TestThreadPool, ForkSafety) { std::exit(0); } else { // Parent - ASSERT_GT(child_pid, 0); - ASSERT_GT(waitpid(child_pid, &child_status, 0), 0); - ASSERT_TRUE(WIFEXITED(child_status)); - ASSERT_EQ(WEXITSTATUS(child_status), 0); + CheckChildExit(child_pid); } } } + +TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) { + // ARROW-15593: race condition in after-fork ThreadPool reinitialization + // when SpawnReal() was called from multiple threads in a forked child. + auto run_in_child = [](ThreadPool* pool) { + const int n_threads = 5; + std::vector> futures; + std::vector threads; + futures.reserve(n_threads); + threads.reserve(n_threads); + + auto run_in_thread = [&]() { + auto maybe_fut = pool->Submit(add, 3, 4); + futures.push_back(DeferNotOk(std::move(maybe_fut))); + }; + + for (int i = 0; i < n_threads; ++i) { + threads.emplace_back(run_in_thread); + } + for (auto& thread : threads) { + thread.join(); + } + for (const auto& fut : futures) { + ASSERT_FINISHES_OK_AND_EQ(7, fut); + } + }; + + { + auto pool = this->MakeThreadPool(3); + ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add, 4, 5)); + ASSERT_OK_AND_EQ(9, fut.result()); + + auto child_pid = fork(); + if (child_pid == 0) { + // Child: spawn tasks from multiple threads at once + run_in_child(pool.get()); + std::exit(0); + } else { + // Parent + CheckChildExit(child_pid); + ASSERT_OK(pool->Shutdown()); + } + } +} + +TEST_F(TestThreadPoolForkSafety, NestedChild) { + { + auto pool = this->MakeThreadPool(3); + ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add, 4, 5)); + ASSERT_OK_AND_EQ(9, fut.result()); + + auto child_pid = fork(); + if (child_pid == 0) { + // Child + ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add, 3, 4)); + // Fork while the task is running + auto grandchild_pid = fork(); + if (grandchild_pid == 0) { + // Grandchild + ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add, 1, 2)); + ASSERT_FINISHES_OK_AND_EQ(3, fut); + ASSERT_OK(pool->Shutdown()); + } else { + // Child + CheckChildExit(grandchild_pid); + ASSERT_FINISHES_OK_AND_EQ(7, fut); + ASSERT_OK(pool->Shutdown()); + } + std::exit(0); + } else { + // Parent + CheckChildExit(child_pid); + ASSERT_OK(pool->Shutdown()); + } + } +} + #endif TEST(TestGlobalThreadPool, Capacity) {