From d90b17e614d26266b766203950ccb51da464a85d Mon Sep 17 00:00:00 2001 From: Zihan Qi Date: Fri, 4 Apr 2025 11:55:48 -0700 Subject: [PATCH] ARROW-45860: Make cpu_count respect Linux CPU affinity --- cpp/src/arrow/util/CMakeLists.txt | 1 + cpp/src/arrow/util/affinity.h | 26 ++++++++++++++++++ cpp/src/arrow/util/cpu_info.cc | 3 ++- cpp/src/arrow/util/cpu_info_test.cc | 21 +++++++++++++++ cpp/src/arrow/util/thread_pool.cc | 3 ++- cpp/src/arrow/util/thread_pool_test.cc | 37 +++++++++++++++++++++++--- 6 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 cpp/src/arrow/util/affinity.h create mode 100644 cpp/src/arrow/util/cpu_info_test.cc diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 17eea5532cc..8f976ae9a4a 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -54,6 +54,7 @@ add_arrow_test(utility-test cache_test.cc checked_cast_test.cc compression_test.cc + cpu_info_test.cc decimal_test.cc float16_test.cc fixed_width_test.cc diff --git a/cpp/src/arrow/util/affinity.h b/cpp/src/arrow/util/affinity.h new file mode 100644 index 00000000000..808fd46c849 --- /dev/null +++ b/cpp/src/arrow/util/affinity.h @@ -0,0 +1,26 @@ +#pragma once + +#include + +#ifdef __linux__ +#include +#include +#endif + +namespace arrow { +namespace internal { + +// Returns the number of CPUs the current process is allowed to use. +// Falls back to std::thread::hardware_concurrency() if affinity is not available. +inline int GetAffinityCpuCount() { +#ifdef __linux__ + cpu_set_t mask; + if (sched_getaffinity(0, sizeof(mask), &mask) == 0) { + return CPU_COUNT(&mask); + } +#endif + return std::thread::hardware_concurrency(); +} + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/cpu_info.cc b/cpp/src/arrow/util/cpu_info.cc index 6dbf2c35c1e..76283ae6f54 100644 --- a/cpp/src/arrow/util/cpu_info.cc +++ b/cpp/src/arrow/util/cpu_info.cc @@ -46,6 +46,7 @@ #include #include "arrow/result.h" +#include "arrow/util/affinity.h" #include "arrow/util/io_util.h" #include "arrow/util/logging_internal.h" #include "arrow/util/string.h" @@ -513,7 +514,7 @@ struct CpuInfo::Impl { OsRetrieveCacheSize(&cache_sizes); OsRetrieveCpuInfo(&hardware_flags, &vendor, &model_name); original_hardware_flags = hardware_flags; - num_cores = std::max(static_cast(std::thread::hardware_concurrency()), 1); + num_cores = std::max(GetAffinityCpuCount(), 1); // parse user simd level auto maybe_env_var = GetEnvVar("ARROW_USER_SIMD_LEVEL"); diff --git a/cpp/src/arrow/util/cpu_info_test.cc b/cpp/src/arrow/util/cpu_info_test.cc new file mode 100644 index 00000000000..74f2d5193fc --- /dev/null +++ b/cpp/src/arrow/util/cpu_info_test.cc @@ -0,0 +1,21 @@ +#include +#include "arrow/util/cpu_info.h" + +#ifdef __linux__ +#include +#endif + +TEST(CpuInfoTest, CpuAffinity) { +#ifdef __linux__ + auto cpu_info = arrow::internal::CpuInfo::GetInstance(); + int affinity_cores = cpu_info->num_cores(); + + cpu_set_t mask; + ASSERT_EQ(sched_getaffinity(0, sizeof(mask), &mask), 0); + int expected = CPU_COUNT(&mask); + + ASSERT_EQ(affinity_cores, expected); +#else + GTEST_SKIP() << "CpuInfo affinity check only applies on Linux."; +#endif +} \ No newline at end of file diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 33531b384d0..cae64802b8a 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -26,6 +26,7 @@ #include #include +#include "arrow/util/affinity.h" #include "arrow/util/atfork_internal.h" #include "arrow/util/config.h" #include "arrow/util/io_util.h" @@ -733,7 +734,7 @@ int ThreadPool::DefaultCapacity() { int capacity, limit; capacity = ParseOMPEnvVar("OMP_NUM_THREADS"); if (capacity == 0) { - capacity = std::thread::hardware_concurrency(); + capacity = GetAffinityCpuCount(); } limit = ParseOMPEnvVar("OMP_THREAD_LIMIT"); if (limit > 0) { diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 2c831460302..ef3614207b7 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -38,6 +38,7 @@ #include "arrow/testing/executor_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/affinity.h" #include "arrow/util/config.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" @@ -1039,35 +1040,63 @@ TEST(TestGlobalThreadPool, Capacity) { // Exercise default capacity heuristic ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT")); + +#ifdef __linux__ + int expected_capacity = arrow::internal::GetAffinityCpuCount(); +#else int hw_capacity = std::thread::hardware_concurrency(); - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); + int expected_capacity = hw_capacity; +#endif + + ASSERT_EQ(ThreadPool::DefaultCapacity(), expected_capacity); + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 13); + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 7); ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 1); + ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999")); - if (hw_capacity <= 999) { - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); - } +#ifdef __linux__ + ASSERT_EQ(ThreadPool::DefaultCapacity(), std::min(999, arrow::internal::GetAffinityCpuCount())); +#else + ASSERT_EQ(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity)); +#endif + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 6); + ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 2); // Invalid env values ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0")); +#ifdef __linux__ + ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount()); +#else ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); +#endif + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x")); +#ifdef __linux__ + ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount()); +#else ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); +#endif + ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1")); ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999")); +#ifdef __linux__ + ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount()); +#else ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); +#endif ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));