From be52dbc3dc78b9abeb6c310d15f8a0ba71f5bcc3 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 22 Nov 2022 20:21:58 +0100 Subject: [PATCH] ARROW-18383: [C++] Avoid global variables for thread pools and at-fork handlers Initialization order of module globals is undefined. In a particular case, the IO thread pool would first be instantiated at library load, registering an at-fork handler. Then, only after, the at-fork handlers would be initialized, losing the handler registered just before. --- cpp/src/arrow/io/interfaces.cc | 10 +- cpp/src/arrow/util/atfork_internal.cc | 164 ++++++++++++++------------ cpp/src/arrow/util/thread_pool.cc | 1 + 3 files changed, 97 insertions(+), 78 deletions(-) diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 238e297a7f4..a78bc1b55c2 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -50,12 +50,14 @@ using internal::ThreadPool; namespace io { -static IOContext g_default_io_context{}; - IOContext::IOContext(MemoryPool* pool, StopToken stop_token) : IOContext(pool, internal::GetIOThreadPool(), std::move(stop_token)) {} -const IOContext& default_io_context() { return g_default_io_context; } +const IOContext& default_io_context() { + // Avoid using a global variable because of initialization order issues (ARROW-18383) + static IOContext g_default_io_context{}; + return g_default_io_context; +} int GetIOThreadPoolCapacity() { return internal::GetIOThreadPool()->GetCapacity(); } @@ -103,7 +105,7 @@ class InputStreamBlockIterator { } // namespace -const IOContext& Readable::io_context() const { return g_default_io_context; } +const IOContext& Readable::io_context() const { return default_io_context(); } Status InputStream::Advance(int64_t nbytes) { return Read(nbytes).status(); } diff --git a/cpp/src/arrow/util/atfork_internal.cc b/cpp/src/arrow/util/atfork_internal.cc index 99c2cdeb6c3..eb26304fba3 100644 --- a/cpp/src/arrow/util/atfork_internal.cc +++ b/cpp/src/arrow/util/atfork_internal.cc @@ -34,104 +34,120 @@ namespace internal { namespace { -struct RunningHandler { - // A temporary owning copy of a handler, to make sure that a handler - // that runs before fork can still run after fork. - std::shared_ptr handler; - // The token returned by the before-fork handler, to pass to after-fork handlers. - std::any token; - - explicit RunningHandler(std::shared_ptr handler) - : handler(std::move(handler)) {} -}; +// Singleton state for at-fork management. +// We do not use global variables because of initialization order issues (ARROW-18383). +// Instead, a function-local static ensures the state is initialized +// opportunistically (see GetAtForkState()). +struct AtForkState { + struct RunningHandler { + // A temporary owning copy of a handler, to make sure that a handler + // that runs before fork can still run after fork. + std::shared_ptr handler; + // The token returned by the before-fork handler, to pass to after-fork handlers. + std::any token; + + explicit RunningHandler(std::shared_ptr handler) + : handler(std::move(handler)) {} + }; + + void MaintainHandlersUnlocked() { + auto it = std::remove_if( + handlers_.begin(), handlers_.end(), + [](const std::weak_ptr& ptr) { return ptr.expired(); }); + handlers_.erase(it, handlers_.end()); + } -std::mutex g_mutex; -std::vector> g_handlers; -std::vector g_handlers_while_forking; + void BeforeFork() { + // Lock the mutex and keep it locked until the end of AfterForkParent(), + // to avoid multiple concurrent forks and atforks. + mutex_.lock(); -void MaintainHandlersUnlocked() { - auto it = std::remove_if( - g_handlers.begin(), g_handlers.end(), - [](const std::weak_ptr& ptr) { return ptr.expired(); }); - g_handlers.erase(it, g_handlers.end()); -} + DCHECK(handlers_while_forking_.empty()); // AfterForkParent clears it -void BeforeFork() { - // Lock the mutex and keep it locked until the end of AfterForkParent(), - // to avoid multiple concurrent forks and atforks. - g_mutex.lock(); - - DCHECK(g_handlers_while_forking.empty()); // AfterForkParent clears it + for (const auto& weak_handler : handlers_) { + if (auto handler = weak_handler.lock()) { + handlers_while_forking_.emplace_back(std::move(handler)); + } + } - for (const auto& weak_handler : g_handlers) { - if (auto handler = weak_handler.lock()) { - g_handlers_while_forking.emplace_back(std::move(handler)); + // XXX can the handler call RegisterAtFork()? + for (auto&& handler : handlers_while_forking_) { + if (handler.handler->before) { + handler.token = handler.handler->before(); + } } } - // XXX can the handler call RegisterAtFork()? - for (auto&& handler : g_handlers_while_forking) { - if (handler.handler->before) { - handler.token = handler.handler->before(); + void AfterForkParent() { + // The mutex was locked by BeforeFork() + auto handlers = std::move(handlers_while_forking_); + handlers_while_forking_.clear(); + + // Execute handlers in reverse order + for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { + auto&& handler = *it; + if (handler.handler->parent_after) { + handler.handler->parent_after(std::move(handler.token)); + } } - } -} -void AfterForkParent() { - // The mutex was locked by BeforeFork() + mutex_.unlock(); + // handlers will be destroyed here without the mutex locked, so that + // any action taken by destructors might call RegisterAtFork + } - auto handlers = std::move(g_handlers_while_forking); - g_handlers_while_forking.clear(); - // Execute handlers in reverse order - for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { - auto&& handler = *it; - if (handler.handler->parent_after) { - handler.handler->parent_after(std::move(handler.token)); + void AfterForkChild() { + // Need to reinitialize the mutex as it is probably invalid. Also, the + // old mutex destructor may fail. + // Fortunately, we are a single thread in the child process by now, so no + // additional synchronization is needed. + new (&mutex_) std::mutex; + + auto handlers = std::move(handlers_while_forking_); + handlers_while_forking_.clear(); + + // Execute handlers in reverse order + for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { + auto&& handler = *it; + if (handler.handler->child_after) { + handler.handler->child_after(std::move(handler.token)); + } } } - g_mutex.unlock(); - // handlers will be destroyed here without the mutex locked, so that - // any action taken by destructors might call RegisterAtFork -} - -void AfterForkChild() { - // Need to reinitialize the mutex as it is probably invalid. Also, the - // old mutex destructor may fail. - // Fortunately, we are a single thread in the child process by now, so no - // additional synchronization is needed. - new (&g_mutex) std::mutex; - - auto handlers = std::move(g_handlers_while_forking); - g_handlers_while_forking.clear(); - // Execute handlers in reverse order - for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { - auto&& handler = *it; - if (handler.handler->child_after) { - handler.handler->child_after(std::move(handler.token)); - } + void RegisterAtFork(std::weak_ptr weak_handler) { + std::lock_guard lock(mutex_); + // This is O(n) for each at-fork registration. We assume that n remains + // typically low and calls to this function are not performance-critical. + MaintainHandlersUnlocked(); + handlers_.push_back(std::move(weak_handler)); } -} -struct AtForkInitializer { - AtForkInitializer() { + std::mutex mutex_; + std::vector> handlers_; + std::vector handlers_while_forking_; +}; + +AtForkState* GetAtForkState() { + static std::unique_ptr state = []() { + auto state = std::make_unique(); #ifndef _WIN32 - int r = pthread_atfork(&BeforeFork, &AfterForkParent, &AfterForkChild); + int r = pthread_atfork(/*prepare=*/[] { GetAtForkState()->BeforeFork(); }, + /*parent=*/[] { GetAtForkState()->AfterForkParent(); }, + /*child=*/[] { GetAtForkState()->AfterForkChild(); }); if (r != 0) { IOErrorFromErrno(r, "Error when calling pthread_atfork: ").Abort(); } #endif - } -}; + return state; + }(); + return state.get(); +} }; // namespace void RegisterAtFork(std::weak_ptr weak_handler) { - static AtForkInitializer initializer; - - std::lock_guard lock(g_mutex); - MaintainHandlersUnlocked(); - g_handlers.push_back(std::move(weak_handler)); + GetAtForkState()->RegisterAtFork(std::move(weak_handler)); } } // namespace internal diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index ca868248de4..9f233a869b3 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -535,6 +535,7 @@ std::shared_ptr ThreadPool::MakeCpuThreadPool() { } ThreadPool* GetCpuThreadPool() { + // Avoid using a global variable because of initialization order issues (ARROW-18383) static std::shared_ptr singleton = ThreadPool::MakeCpuThreadPool(); return singleton.get(); }