Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cpp/src/arrow/util/mutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

#include <mutex>

#ifndef _WIN32
#include <pthread.h>
#include <atomic>
#endif

#include "arrow/util/logging.h"

namespace arrow {
Expand Down Expand Up @@ -50,5 +55,31 @@ Mutex::Guard Mutex::Lock() {

Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}

#ifndef _WIN32
namespace {

struct AfterForkState {
// A global instance that will also register the atfork handler when
// constructed.
static AfterForkState instance;

// The mutex may be used at shutdown, so make it eternal.
// The leak (only in child processes) is a small price to pay for robustness.
Mutex* mutex = nullptr;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe it's not trivial when working with multiprocessing in Python. Why not use a unique_ptr here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a lot of control over shutdown ordering at the moment. So it's possible the main thread starts tearing down and deleting any unique_ptr here before the child threads have fully shutdown. Then the child threads might still try and access this mutex which would lead to accessing freed memory.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not trivial when working with multiprocessing in Python

Do you have a particular use case in mind? For example, the most common multiprocessing pattern is "fork-join" where you fork off a number of child processes for some task and then join them all back together when the task is done. In that case the leak would be limited because the memory would be free when the child process ended.

The only way to really grow this leak is to recursively and continuously fork. Even then you would run out of process IDs far before any significant memory was lost. I suppose you could be continuously killing the parent off after the fork and leaving an orphan but I don't know of any real world use case for this.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a lot of control over shutdown ordering at the moment. So it's possible the main thread starts tearing down and deleting any unique_ptr here before the child threads have fully shutdown. Then the child threads might still try and access this mutex which would lead to accessing freed memory.

I see, thanks.

Do you have a particular use case in mind?

My intuition is that one may create and destroy themultiprocessing.Pool continuously, which may result in too many leaks here.

Copy link
Copy Markdown
Member

@westonpace westonpace Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intuition is that one may create and destroy themultiprocessing.Pool continuously, which may result in too many leaks here.

That should be ok. For example, imagine your process is on pid 100. You create multiprocessing.Pool with size 10. It will create processes 101, 102, ..., 110. Each of those child processes will be just a tiny bit too large (~40 bytes). Process 100 will still be the correct size. When you destroy the pool the processes 101-110 go away. Process 100 remains and is still the correct size. When you create a new pool the whole thing repeats with 10 new process IDs.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the record, no process will be larger because of this leak. Perhaps I should have put "leak" in quotes, because regardless of using unique_ptr or not, the mutex will be destroyed at process exit. The difference is mainly in code hygiene, and also some sanitizers may report it as an actual leak (memory that wasn't deallocated before process exit).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be ok. For example, imagine your process is on pid 100. You create multiprocessing.Pool with size 10. It will create processes 101, 102, ..., 110. Each of those child processes will be just a tiny bit too large (~40 bytes). Process 100 will still be the correct size. When you destroy the pool the processes 101-110 go away. Process 100 remains and is still the correct size. When you create a new pool the whole thing repeats with 10 new process IDs.

Yep. You are right. thanks.


private:
AfterForkState() {
pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork);
}

static void AfterFork() { instance.mutex = new Mutex; }
};

AfterForkState AfterForkState::instance;
} // namespace

Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; }
#endif // _WIN32

} // namespace util
} // namespace arrow
21 changes: 21 additions & 0 deletions cpp/src/arrow/util/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,26 @@ class ARROW_EXPORT Mutex {
std::unique_ptr<Impl, void (*)(Impl*)> impl_;
};

#ifndef _WIN32
/// Return a pointer to a process-wide, process-specific Mutex that can be used
/// at any point in a child process. NULL is returned when called in the parent.
///
/// The rule is to first check that getpid() corresponds to the parent process pid
/// and, if not, call this function to lock any after-fork reinitialization code.
/// Like this:
///
/// std::atomic<pid_t> pid{getpid()};
/// ...
/// if (pid.load() != getpid()) {
/// // In child process
/// auto lock = GlobalForkSafeMutex()->Lock();
/// if (pid.load() != getpid()) {
/// // Reinitialize internal structures after fork
/// ...
/// pid.store(getpid());
ARROW_EXPORT
Mutex* GlobalForkSafeMutex();
#endif

} // namespace util
} // namespace arrow
41 changes: 23 additions & 18 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/mutex.h"

namespace arrow {
namespace internal {
Expand Down Expand Up @@ -235,24 +236,28 @@ ThreadPool::~ThreadPool() {
void ThreadPool::ProtectAgainstFork() {
#ifndef _WIN32
pid_t current_pid = getpid();
if (pid_ != current_pid) {
// Reinitialize internal state in child process after fork()
// Ideally we would use pthread_at_fork(), but that doesn't allow
// storing an argument, hence we'd need to maintain a list of all
// existing ThreadPools.
int capacity = state_->desired_capacity_;

auto new_state = std::make_shared<ThreadPool::State>();
new_state->please_shutdown_ = state_->please_shutdown_;
new_state->quick_shutdown_ = state_->quick_shutdown_;

pid_ = current_pid;
sp_state_ = new_state;
state_ = sp_state_.get();

// Launch worker threads anew
if (!state_->please_shutdown_) {
ARROW_UNUSED(SetCapacity(capacity));
if (pid_.load() != current_pid) {
// Reinitialize internal state in child process after fork().
{
// Since after-fork reinitialization is triggered when one of the ThreadPool
// methods is called, it can be very well be called from multiple threads
// at once. Therefore, it needs to be guarded with a lock.
auto lock = util::GlobalForkSafeMutex()->Lock();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some random thoughts for comment.
What about adding a GlobalForkSafeOnceFlag()? Looks to me std::call_once is more straightforward here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need a std::call_once per use case, though, while this lock is global.


if (pid_.load() != current_pid) {
int capacity = state_->desired_capacity_;

auto new_state = std::make_shared<ThreadPool::State>();
new_state->please_shutdown_ = state_->please_shutdown_;
new_state->quick_shutdown_ = state_->quick_shutdown_;

sp_state_ = new_state;
state_ = sp_state_.get();
pid_ = current_pid;

// Launch worker threads anew
ARROW_UNUSED(SetCapacity(capacity));
}
}
}
#endif
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/util/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
#include <unistd.h>
#endif

#ifndef _WIN32
#include <atomic>
#endif

#include <cstdint>
#include <memory>
#include <queue>
Expand Down Expand Up @@ -373,7 +377,7 @@ class ARROW_EXPORT ThreadPool : public Executor {
State* state_;
bool shutdown_on_destroy_;
#ifndef _WIN32
pid_t pid_;
std::atomic<pid_t> pid_;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pid_ need to be atomic since you're double checking and the second check is inside a mutex?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the first load (line 239) may have data race with the store (line 256)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be ok because we have the double check on line 247 which happens inside the mutex.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed the code should be safe in practice.
But at least in theory, I think it still violates data race free rule.
E.g., TSAN will complain below code.

#include <mutex>
#include <thread>

std::mutex g_mtx;
int g_i = 0;

void f() {
  if (g_i == 0) {
    std::lock_guard<std::mutex> lk(g_mtx);
    if (g_i == 0) {
      g_i = 1;
    }
  }
}

int main(void) {
  std::thread t1(f), t2(f), t3(f), t4(f);
  t1.join(); t2.join(); t3.join(); t4.join();
  return 0;
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, an atomic is required at least for theoretical correctness and to avoid (potentially) tripping up TSAN.
The compiler and/or the CPU is otherwise theoretically allowed to issue the integer read as multiple distinct reads (say, issue a 32-bit read as two 16-bit reads; I suppose that never happens in practice :-)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make this discussion more complete.

If we only care about atomicity, to satisfy TSAN without introducing unnecessary memory barriers, we can use relaxed memory ordering when accessing the atomic variable, not the default sequential consistent memory ordering.

The assembly code will probably be the same as a normal variable on most architectures.

if (g_i.load(std::memory_order_relaxed) == 0) {
  std::lock_guard<std::mutex> lk(g_mtx);
  if (g_i.load(std::memory_order_relaxed) == 0) {
    g_i.store(1, std::memory_order_relaxed);
  }
}

This might help performance on cpus with more relaxed memory models than x86 (e.g., arm, ppc) in some cases, but the improvement is often negligible in a real world application.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the details @cyb70289 . I appreciate you both taking the time to help explain this. This code shouldn't be on the data path so I agree the extra performance is not worth false positives in TSAN or the extra complexity of relaxed memory order.

#endif
};

Expand Down
117 changes: 101 additions & 16 deletions cpp/src/arrow/util/thread_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/test_common.h"
#include "arrow/util/thread_pool.h"
Expand Down Expand Up @@ -610,32 +611,42 @@ TEST_F(TestThreadPool, SubmitWithStopTokenCancelled) {

#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
defined(THREAD_SANITIZER))
TEST_F(TestThreadPool, ForkSafety) {
pid_t child_pid;
int child_status;

class TestThreadPoolForkSafety : public TestThreadPool {
public:
void CheckChildExit(int child_pid) {
ASSERT_GT(child_pid, 0);
int child_status;
int got_pid = waitpid(child_pid, &child_status, 0);
ASSERT_EQ(got_pid, child_pid);
if (WIFSIGNALED(child_status)) {
FAIL() << "Child terminated by signal " << WTERMSIG(child_status);
}
if (!WIFEXITED(child_status)) {
FAIL() << "Child didn't terminate normally?? Child status = " << child_status;
}
ASSERT_EQ(WEXITSTATUS(child_status), 0);
}
};

TEST_F(TestThreadPoolForkSafety, Basics) {
{
// Fork after task submission
auto pool = this->MakeThreadPool(3);
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
ASSERT_OK_AND_EQ(9, fut.result());

child_pid = fork();
auto child_pid = fork();
if (child_pid == 0) {
// Child: thread pool should be usable
ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 3, 4));
if (*fut.result() != 7) {
std::exit(1);
}
ASSERT_FINISHES_OK_AND_EQ(7, fut);
// Shutting down shouldn't hang or fail
Status st = pool->Shutdown();
std::exit(st.ok() ? 0 : 2);
} else {
// Parent
ASSERT_GT(child_pid, 0);
ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
ASSERT_TRUE(WIFEXITED(child_status));
ASSERT_EQ(WEXITSTATUS(child_status), 0);
CheckChildExit(child_pid);
ASSERT_OK(pool->Shutdown());
}
}
Expand All @@ -644,7 +655,7 @@ TEST_F(TestThreadPool, ForkSafety) {
auto pool = this->MakeThreadPool(3);
ASSERT_OK(pool->Shutdown());

child_pid = fork();
auto child_pid = fork();
if (child_pid == 0) {
// Child
// Spawning a task should return with error (pool was shutdown)
Expand All @@ -657,13 +668,87 @@ TEST_F(TestThreadPool, ForkSafety) {
std::exit(0);
} else {
// Parent
ASSERT_GT(child_pid, 0);
ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
ASSERT_TRUE(WIFEXITED(child_status));
ASSERT_EQ(WEXITSTATUS(child_status), 0);
CheckChildExit(child_pid);
}
}
}

TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) {
// ARROW-15593: race condition in after-fork ThreadPool reinitialization
// when SpawnReal() was called from multiple threads in a forked child.
auto run_in_child = [](ThreadPool* pool) {
const int n_threads = 5;
std::vector<Future<int>> futures;
std::vector<std::thread> threads;
futures.reserve(n_threads);
threads.reserve(n_threads);

auto run_in_thread = [&]() {
auto maybe_fut = pool->Submit(add<int>, 3, 4);
futures.push_back(DeferNotOk(std::move(maybe_fut)));
};

for (int i = 0; i < n_threads; ++i) {
threads.emplace_back(run_in_thread);
}
for (auto& thread : threads) {
thread.join();
}
for (const auto& fut : futures) {
ASSERT_FINISHES_OK_AND_EQ(7, fut);
}
};

{
auto pool = this->MakeThreadPool(3);
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
ASSERT_OK_AND_EQ(9, fut.result());

auto child_pid = fork();
if (child_pid == 0) {
// Child: spawn tasks from multiple threads at once
run_in_child(pool.get());
std::exit(0);
} else {
// Parent
CheckChildExit(child_pid);
ASSERT_OK(pool->Shutdown());
}
}
}

TEST_F(TestThreadPoolForkSafety, NestedChild) {
{
auto pool = this->MakeThreadPool(3);
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
ASSERT_OK_AND_EQ(9, fut.result());

auto child_pid = fork();
if (child_pid == 0) {
// Child
ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 3, 4));
// Fork while the task is running
auto grandchild_pid = fork();
if (grandchild_pid == 0) {
// Grandchild
ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 1, 2));
ASSERT_FINISHES_OK_AND_EQ(3, fut);
ASSERT_OK(pool->Shutdown());
} else {
// Child
CheckChildExit(grandchild_pid);
ASSERT_FINISHES_OK_AND_EQ(7, fut);
ASSERT_OK(pool->Shutdown());
}
std::exit(0);
} else {
// Parent
CheckChildExit(child_pid);
ASSERT_OK(pool->Shutdown());
}
}
}

#endif

TEST(TestGlobalThreadPool, Capacity) {
Expand Down