Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ using internal::ThreadPool;

namespace io {

static IOContext g_default_io_context{};

IOContext::IOContext(MemoryPool* pool, StopToken stop_token)
: IOContext(pool, internal::GetIOThreadPool(), std::move(stop_token)) {}

const IOContext& default_io_context() { return g_default_io_context; }
const IOContext& default_io_context() {
// Avoid using a global variable because of initialization order issues (ARROW-18383)
static IOContext g_default_io_context{};
return g_default_io_context;
}

int GetIOThreadPoolCapacity() { return internal::GetIOThreadPool()->GetCapacity(); }

Expand Down Expand Up @@ -103,7 +105,7 @@ class InputStreamBlockIterator {

} // namespace

const IOContext& Readable::io_context() const { return g_default_io_context; }
const IOContext& Readable::io_context() const { return default_io_context(); }

Status InputStream::Advance(int64_t nbytes) { return Read(nbytes).status(); }

Expand Down
164 changes: 90 additions & 74 deletions cpp/src/arrow/util/atfork_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,104 +34,120 @@ namespace internal {

namespace {

struct RunningHandler {
// A temporary owning copy of a handler, to make sure that a handler
// that runs before fork can still run after fork.
std::shared_ptr<AtForkHandler> handler;
// The token returned by the before-fork handler, to pass to after-fork handlers.
std::any token;

explicit RunningHandler(std::shared_ptr<AtForkHandler> handler)
: handler(std::move(handler)) {}
};
// Singleton state for at-fork management.
// We do not use global variables because of initialization order issues (ARROW-18383).
// Instead, a function-local static ensures the state is initialized
// opportunistically (see GetAtForkState()).
struct AtForkState {
struct RunningHandler {
// A temporary owning copy of a handler, to make sure that a handler
// that runs before fork can still run after fork.
std::shared_ptr<AtForkHandler> handler;
// The token returned by the before-fork handler, to pass to after-fork handlers.
std::any token;

explicit RunningHandler(std::shared_ptr<AtForkHandler> handler)
: handler(std::move(handler)) {}
};

void MaintainHandlersUnlocked() {
auto it = std::remove_if(
handlers_.begin(), handlers_.end(),
[](const std::weak_ptr<AtForkHandler>& ptr) { return ptr.expired(); });
handlers_.erase(it, handlers_.end());
}

std::mutex g_mutex;
std::vector<std::weak_ptr<AtForkHandler>> g_handlers;
std::vector<RunningHandler> g_handlers_while_forking;
void BeforeFork() {
// Lock the mutex and keep it locked until the end of AfterForkParent(),
// to avoid multiple concurrent forks and atforks.
mutex_.lock();

void MaintainHandlersUnlocked() {
auto it = std::remove_if(
g_handlers.begin(), g_handlers.end(),
[](const std::weak_ptr<AtForkHandler>& ptr) { return ptr.expired(); });
g_handlers.erase(it, g_handlers.end());
}
DCHECK(handlers_while_forking_.empty()); // AfterForkParent clears it

void BeforeFork() {
// Lock the mutex and keep it locked until the end of AfterForkParent(),
// to avoid multiple concurrent forks and atforks.
g_mutex.lock();

DCHECK(g_handlers_while_forking.empty()); // AfterForkParent clears it
for (const auto& weak_handler : handlers_) {
if (auto handler = weak_handler.lock()) {
handlers_while_forking_.emplace_back(std::move(handler));
}
}

for (const auto& weak_handler : g_handlers) {
if (auto handler = weak_handler.lock()) {
g_handlers_while_forking.emplace_back(std::move(handler));
// XXX can the handler call RegisterAtFork()?
for (auto&& handler : handlers_while_forking_) {
if (handler.handler->before) {
handler.token = handler.handler->before();
}
}
}

// XXX can the handler call RegisterAtFork()?
for (auto&& handler : g_handlers_while_forking) {
if (handler.handler->before) {
handler.token = handler.handler->before();
void AfterForkParent() {
// The mutex was locked by BeforeFork()
auto handlers = std::move(handlers_while_forking_);
handlers_while_forking_.clear();

// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->parent_after) {
handler.handler->parent_after(std::move(handler.token));
}
}
}
}

void AfterForkParent() {
// The mutex was locked by BeforeFork()
mutex_.unlock();
// handlers will be destroyed here without the mutex locked, so that
// any action taken by destructors might call RegisterAtFork
}

auto handlers = std::move(g_handlers_while_forking);
g_handlers_while_forking.clear();
// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->parent_after) {
handler.handler->parent_after(std::move(handler.token));
void AfterForkChild() {
// Need to reinitialize the mutex as it is probably invalid. Also, the
// old mutex destructor may fail.
// Fortunately, we are a single thread in the child process by now, so no
// additional synchronization is needed.
new (&mutex_) std::mutex;

auto handlers = std::move(handlers_while_forking_);
handlers_while_forking_.clear();

// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->child_after) {
handler.handler->child_after(std::move(handler.token));
}
}
}

g_mutex.unlock();
// handlers will be destroyed here without the mutex locked, so that
// any action taken by destructors might call RegisterAtFork
}

void AfterForkChild() {
// Need to reinitialize the mutex as it is probably invalid. Also, the
// old mutex destructor may fail.
// Fortunately, we are a single thread in the child process by now, so no
// additional synchronization is needed.
new (&g_mutex) std::mutex;

auto handlers = std::move(g_handlers_while_forking);
g_handlers_while_forking.clear();
// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->child_after) {
handler.handler->child_after(std::move(handler.token));
}
void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
std::lock_guard<std::mutex> lock(mutex_);
// This is O(n) for each at-fork registration. We assume that n remains
// typically low and calls to this function are not performance-critical.
MaintainHandlersUnlocked();
handlers_.push_back(std::move(weak_handler));
}
}

struct AtForkInitializer {
AtForkInitializer() {
std::mutex mutex_;
std::vector<std::weak_ptr<AtForkHandler>> handlers_;
std::vector<RunningHandler> handlers_while_forking_;
};

AtForkState* GetAtForkState() {
static std::unique_ptr<AtForkState> state = []() {
auto state = std::make_unique<AtForkState>();
#ifndef _WIN32
int r = pthread_atfork(&BeforeFork, &AfterForkParent, &AfterForkChild);
int r = pthread_atfork(/*prepare=*/[] { GetAtForkState()->BeforeFork(); },
/*parent=*/[] { GetAtForkState()->AfterForkParent(); },
/*child=*/[] { GetAtForkState()->AfterForkChild(); });
if (r != 0) {
IOErrorFromErrno(r, "Error when calling pthread_atfork: ").Abort();
}
#endif
}
};
return state;
}();
return state.get();
}

}; // namespace

void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
static AtForkInitializer initializer;

std::lock_guard<std::mutex> lock(g_mutex);
MaintainHandlersUnlocked();
g_handlers.push_back(std::move(weak_handler));
GetAtForkState()->RegisterAtFork(std::move(weak_handler));
}

} // namespace internal
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ std::shared_ptr<ThreadPool> ThreadPool::MakeCpuThreadPool() {
}

ThreadPool* GetCpuThreadPool() {
// Avoid using a global variable because of initialization order issues (ARROW-18383)
static std::shared_ptr<ThreadPool> singleton = ThreadPool::MakeCpuThreadPool();
return singleton.get();
}
Expand Down