From 939621a2237b6ef3f533f1b3eb9fdcc13ca20b27 Mon Sep 17 00:00:00 2001 From: Zihan Qi Date: Fri, 4 Apr 2025 11:55:48 -0700 Subject: [PATCH 1/6] ARROW-45860: Make cpu_count respect Linux CPU affinity --- cpp/src/arrow/util/CMakeLists.txt | 1 + cpp/src/arrow/util/affinity.h | 26 ++++++++++++++++++ cpp/src/arrow/util/cpu_info.cc | 3 ++- cpp/src/arrow/util/cpu_info_test.cc | 21 +++++++++++++++ cpp/src/arrow/util/thread_pool.cc | 3 ++- cpp/src/arrow/util/thread_pool_test.cc | 37 +++++++++++++++++++++++--- 6 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 cpp/src/arrow/util/affinity.h create mode 100644 cpp/src/arrow/util/cpu_info_test.cc diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index df47389240e..fa9a5d58ffa 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -54,6 +54,7 @@ add_arrow_test(utility-test cache_test.cc checked_cast_test.cc compression_test.cc + cpu_info_test.cc decimal_test.cc float16_test.cc fixed_width_test.cc diff --git a/cpp/src/arrow/util/affinity.h b/cpp/src/arrow/util/affinity.h new file mode 100644 index 00000000000..808fd46c849 --- /dev/null +++ b/cpp/src/arrow/util/affinity.h @@ -0,0 +1,26 @@ +#pragma once + +#include + +#ifdef __linux__ +#include +#include +#endif + +namespace arrow { +namespace internal { + +// Returns the number of CPUs the current process is allowed to use. +// Falls back to std::thread::hardware_concurrency() if affinity is not available. +inline int GetAffinityCpuCount() { +#ifdef __linux__ + cpu_set_t mask; + if (sched_getaffinity(0, sizeof(mask), &mask) == 0) { + return CPU_COUNT(&mask); + } +#endif + return std::thread::hardware_concurrency(); +} + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/cpu_info.cc b/cpp/src/arrow/util/cpu_info.cc index 6dbf2c35c1e..76283ae6f54 100644 --- a/cpp/src/arrow/util/cpu_info.cc +++ b/cpp/src/arrow/util/cpu_info.cc @@ -46,6 +46,7 @@ #include #include "arrow/result.h" +#include "arrow/util/affinity.h" #include "arrow/util/io_util.h" #include "arrow/util/logging_internal.h" #include "arrow/util/string.h" @@ -513,7 +514,7 @@ struct CpuInfo::Impl { OsRetrieveCacheSize(&cache_sizes); OsRetrieveCpuInfo(&hardware_flags, &vendor, &model_name); original_hardware_flags = hardware_flags; - num_cores = std::max(static_cast(std::thread::hardware_concurrency()), 1); + num_cores = std::max(GetAffinityCpuCount(), 1); // parse user simd level auto maybe_env_var = GetEnvVar("ARROW_USER_SIMD_LEVEL"); diff --git a/cpp/src/arrow/util/cpu_info_test.cc b/cpp/src/arrow/util/cpu_info_test.cc new file mode 100644 index 00000000000..74f2d5193fc --- /dev/null +++ b/cpp/src/arrow/util/cpu_info_test.cc @@ -0,0 +1,21 @@ +#include +#include "arrow/util/cpu_info.h" + +#ifdef __linux__ +#include +#endif + +TEST(CpuInfoTest, CpuAffinity) { +#ifdef __linux__ + auto cpu_info = arrow::internal::CpuInfo::GetInstance(); + int affinity_cores = cpu_info->num_cores(); + + cpu_set_t mask; + ASSERT_EQ(sched_getaffinity(0, sizeof(mask), &mask), 0); + int expected = CPU_COUNT(&mask); + + ASSERT_EQ(affinity_cores, expected); +#else + GTEST_SKIP() << "CpuInfo affinity check only applies on Linux."; +#endif +} \ No newline at end of file diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 89834e5a11f..e278003391e 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -26,6 +26,7 @@ #include #include +#include "arrow/util/affinity.h" #include "arrow/util/atfork_internal.h" #include "arrow/util/config.h" #include "arrow/util/io_util.h" @@ -735,7 +736,7 @@ int ThreadPool::DefaultCapacity() { int capacity, limit; capacity = ParseOMPEnvVar("OMP_NUM_THREADS"); if (capacity == 0) { - capacity = std::thread::hardware_concurrency(); + capacity = GetAffinityCpuCount(); } limit = ParseOMPEnvVar("OMP_THREAD_LIMIT"); if (limit > 0) { diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 2c831460302..ef3614207b7 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -38,6 +38,7 @@ #include "arrow/testing/executor_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/affinity.h" #include "arrow/util/config.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" @@ -1039,35 +1040,63 @@ TEST(TestGlobalThreadPool, Capacity) { // Exercise default capacity heuristic ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT")); + +#ifdef __linux__ + int expected_capacity = arrow::internal::GetAffinityCpuCount(); +#else int hw_capacity = std::thread::hardware_concurrency(); - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); + int expected_capacity = hw_capacity; +#endif + + ASSERT_EQ(ThreadPool::DefaultCapacity(), expected_capacity); + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 13); + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 7); ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 1); + ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999")); - if (hw_capacity <= 999) { - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); - } +#ifdef __linux__ + ASSERT_EQ(ThreadPool::DefaultCapacity(), std::min(999, arrow::internal::GetAffinityCpuCount())); +#else + ASSERT_EQ(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity)); +#endif + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 6); + ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 2); // Invalid env values ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0")); +#ifdef __linux__ + ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount()); +#else ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); +#endif + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x")); +#ifdef __linux__ + ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount()); +#else ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); +#endif + ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1")); ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999")); +#ifdef __linux__ + ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount()); +#else ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); +#endif ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT")); From 4da1383d50ea697d9886ba96b71cb6674adf5358 Mon Sep 17 00:00:00 2001 From: AntoinePrv Date: Mon, 21 Jul 2025 15:28:18 +0200 Subject: [PATCH 2/6] Address reviewers comments --- cpp/src/arrow/util/affinity.h | 26 ------------------ cpp/src/arrow/util/cpu_info.cc | 31 +++++++++++++++++++-- cpp/src/arrow/util/cpu_info.h | 6 ++++ cpp/src/arrow/util/cpu_info_test.cc | 19 ++++--------- cpp/src/arrow/util/thread_pool.cc | 23 ++++++++-------- cpp/src/arrow/util/thread_pool.h | 1 + cpp/src/arrow/util/thread_pool_test.cc | 38 +++++++------------------- docs/source/cpp/threading.rst | 2 +- 8 files changed, 64 insertions(+), 82 deletions(-) delete mode 100644 cpp/src/arrow/util/affinity.h diff --git a/cpp/src/arrow/util/affinity.h b/cpp/src/arrow/util/affinity.h deleted file mode 100644 index 808fd46c849..00000000000 --- a/cpp/src/arrow/util/affinity.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once - -#include - -#ifdef __linux__ -#include -#include -#endif - -namespace arrow { -namespace internal { - -// Returns the number of CPUs the current process is allowed to use. -// Falls back to std::thread::hardware_concurrency() if affinity is not available. -inline int GetAffinityCpuCount() { -#ifdef __linux__ - cpu_set_t mask; - if (sched_getaffinity(0, sizeof(mask), &mask) == 0) { - return CPU_COUNT(&mask); - } -#endif - return std::thread::hardware_concurrency(); -} - -} // namespace internal -} // namespace arrow diff --git a/cpp/src/arrow/util/cpu_info.cc b/cpp/src/arrow/util/cpu_info.cc index 76283ae6f54..05304cd51cc 100644 --- a/cpp/src/arrow/util/cpu_info.cc +++ b/cpp/src/arrow/util/cpu_info.cc @@ -27,6 +27,10 @@ # include #endif +#ifdef __linux__ +# include +#endif + #ifdef _WIN32 # include @@ -46,7 +50,6 @@ #include #include "arrow/result.h" -#include "arrow/util/affinity.h" #include "arrow/util/io_util.h" #include "arrow/util/logging_internal.h" #include "arrow/util/string.h" @@ -383,6 +386,14 @@ int64_t LinuxParseCpuFlags(const std::string& values) { # endif } +// Retrieve the affinity into the out pointer if available. +void LinuxRetrieveAffinityCpuCount(int* count) { + cpu_set_t mask; + if (sched_getaffinity(0, sizeof(mask), &mask) == 0) { + *count = CPU_COUNT(&mask); + } +} + void OsRetrieveCacheSize(std::array* cache_sizes) { for (int i = 0; i < kCacheLevels; ++i) { const int64_t cache_size = LinuxGetCacheSize(i); @@ -505,6 +516,7 @@ void ArchVerifyCpuRequirements(const CpuInfo* ci) {} struct CpuInfo::Impl { int64_t hardware_flags = 0; int num_cores = 0; + int num_affinity_cores = -1; int64_t original_hardware_flags = 0; Vendor vendor = Vendor::Unknown; std::string model_name = "Unknown"; @@ -514,7 +526,18 @@ struct CpuInfo::Impl { OsRetrieveCacheSize(&cache_sizes); OsRetrieveCpuInfo(&hardware_flags, &vendor, &model_name); original_hardware_flags = hardware_flags; - num_cores = std::max(GetAffinityCpuCount(), 1); + + num_cores = static_cast(std::thread::hardware_concurrency()); + if (num_cores <= 0) { + num_cores = 4; + ARROW_LOG(WARNING) << "Failed to determine the number of available threads, " + "using a hardcoded arbitrary value " + << num_cores; + } + +#ifdef __linux__ + LinuxRetrieveAffinityCpuCount(&num_affinity_cores); +#endif // parse user simd level auto maybe_env_var = GetEnvVar("ARROW_USER_SIMD_LEVEL"); @@ -551,7 +574,9 @@ const CpuInfo* CpuInfo::GetInstance() { int64_t CpuInfo::hardware_flags() const { return impl_->hardware_flags; } -int CpuInfo::num_cores() const { return impl_->num_cores <= 0 ? 1 : impl_->num_cores; } +int CpuInfo::num_cores() const { return impl_->num_cores; } + +int CpuInfo::num_affinity_cores() const { return impl_->num_affinity_cores; } CpuInfo::Vendor CpuInfo::vendor() const { return impl_->vendor; } diff --git a/cpp/src/arrow/util/cpu_info.h b/cpp/src/arrow/util/cpu_info.h index 949719b97ed..e320257e020 100644 --- a/cpp/src/arrow/util/cpu_info.h +++ b/cpp/src/arrow/util/cpu_info.h @@ -69,8 +69,14 @@ class ARROW_EXPORT CpuInfo { int64_t hardware_flags() const; /// Returns the number of cores (including hyper-threaded) on this machine. + // The number returned is guaranteed to be greater or equal to one. int num_cores() const; + /// \brief Return the number of cores the process is allowed to use. + /// + /// This is currently only implemented on Linux, return -1 on other platforms. + int num_affinity_cores() const; + /// Returns the vendor of the cpu. Vendor vendor() const; diff --git a/cpp/src/arrow/util/cpu_info_test.cc b/cpp/src/arrow/util/cpu_info_test.cc index 74f2d5193fc..49d62150909 100644 --- a/cpp/src/arrow/util/cpu_info_test.cc +++ b/cpp/src/arrow/util/cpu_info_test.cc @@ -1,21 +1,14 @@ -#include #include "arrow/util/cpu_info.h" -#ifdef __linux__ -#include -#endif +#include TEST(CpuInfoTest, CpuAffinity) { -#ifdef __linux__ auto cpu_info = arrow::internal::CpuInfo::GetInstance(); - int affinity_cores = cpu_info->num_cores(); - - cpu_set_t mask; - ASSERT_EQ(sched_getaffinity(0, sizeof(mask), &mask), 0); - int expected = CPU_COUNT(&mask); - ASSERT_EQ(affinity_cores, expected); + ASSERT_LE(cpu_info->num_affinity_cores(), cpu_info->num_cores()); +#ifdef __linux__ + ASSERT_GE(cpu_info->num_affinity_cores(), 1); #else - GTEST_SKIP() << "CpuInfo affinity check only applies on Linux."; + ASSERT_EQ(cpu_info->num_affinity_cores(), -1); #endif -} \ No newline at end of file +} diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index e278003391e..f2a4d05d8c0 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -26,9 +26,9 @@ #include #include -#include "arrow/util/affinity.h" #include "arrow/util/atfork_internal.h" #include "arrow/util/config.h" +#include "arrow/util/cpu_info.h" #include "arrow/util/io_util.h" #include "arrow/util/logging_internal.h" #include "arrow/util/mutex.h" @@ -733,20 +733,21 @@ static int ParseOMPEnvVar(const char* name) { } int ThreadPool::DefaultCapacity() { - int capacity, limit; - capacity = ParseOMPEnvVar("OMP_NUM_THREADS"); - if (capacity == 0) { - capacity = GetAffinityCpuCount(); + const CpuInfo* ci = CpuInfo::GetInstance(); + + int capacity = ParseOMPEnvVar("OMP_NUM_THREADS"); + if (capacity <= 0) { + capacity = ci->num_affinity_cores(); + } + if (capacity <= 0) { + capacity = ci->num_cores(); } - limit = ParseOMPEnvVar("OMP_THREAD_LIMIT"); + + int limit = ParseOMPEnvVar("OMP_THREAD_LIMIT"); if (limit > 0) { capacity = std::min(limit, capacity); } - if (capacity == 0) { - ARROW_LOG(WARNING) << "Failed to determine the number of available threads, " - "using a hardcoded arbitrary value"; - capacity = 4; - } + return capacity; } diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index cd32781aed7..2e80f6e5441 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -475,6 +475,7 @@ class ARROW_EXPORT ThreadPool : public Executor { // Heuristic for the default capacity of a thread pool for CPU-bound tasks. // This is exposed as a static method to help with testing. + // The number returned is guaranteed to be greater or equal to one. static int DefaultCapacity(); // Shutdown the pool. Once the pool starts shutting down, new tasks diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index ef3614207b7..0d627828181 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -38,7 +38,6 @@ #include "arrow/testing/executor_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" -#include "arrow/util/affinity.h" #include "arrow/util/config.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" @@ -1041,14 +1040,9 @@ TEST(TestGlobalThreadPool, Capacity) { ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT")); -#ifdef __linux__ - int expected_capacity = arrow::internal::GetAffinityCpuCount(); -#else int hw_capacity = std::thread::hardware_concurrency(); - int expected_capacity = hw_capacity; -#endif - - ASSERT_EQ(ThreadPool::DefaultCapacity(), expected_capacity); + ASSERT_LT(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 13); @@ -1061,11 +1055,8 @@ TEST(TestGlobalThreadPool, Capacity) { ASSERT_EQ(ThreadPool::DefaultCapacity(), 1); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999")); -#ifdef __linux__ - ASSERT_EQ(ThreadPool::DefaultCapacity(), std::min(999, arrow::internal::GetAffinityCpuCount())); -#else - ASSERT_EQ(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity)); -#endif + ASSERT_LE(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity)); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 6); @@ -1076,27 +1067,18 @@ TEST(TestGlobalThreadPool, Capacity) { // Invalid env values ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0")); -#ifdef __linux__ - ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount()); -#else - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); -#endif + ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x")); -#ifdef __linux__ - ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount()); -#else - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); -#endif + ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1")); ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999")); -#ifdef __linux__ - ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount()); -#else - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); -#endif + ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT")); diff --git a/docs/source/cpp/threading.rst b/docs/source/cpp/threading.rst index 4a1a65ffe01..d2d0b2d0f13 100644 --- a/docs/source/cpp/threading.rst +++ b/docs/source/cpp/threading.rst @@ -44,7 +44,7 @@ CPU vs. I/O ----------- In order to minimize the overhead of context switches our default thread pool -for CPU-intensive tasks has a fixed size, defaulting to +for CPU-intensive tasks has a fixed size, defaulting to the process CPU affinity (on Linux) or `std::thread::hardware_concurrency `_. This means that CPU tasks should never block for long periods of time because this will result in under-utilization of the CPU. To achieve this we have a separate From aea35ec0e43b7490521f3eb338e64655417480d5 Mon Sep 17 00:00:00 2001 From: AntoinePrv Date: Mon, 21 Jul 2025 16:09:20 +0200 Subject: [PATCH 3/6] Add License in file --- cpp/src/arrow/util/cpu_info_test.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/cpp/src/arrow/util/cpu_info_test.cc b/cpp/src/arrow/util/cpu_info_test.cc index 49d62150909..c775646f8d0 100644 --- a/cpp/src/arrow/util/cpu_info_test.cc +++ b/cpp/src/arrow/util/cpu_info_test.cc @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #include "arrow/util/cpu_info.h" #include From b20240fdf44e12177e3b12474b759eb064934189 Mon Sep 17 00:00:00 2001 From: AntoinePrv Date: Mon, 21 Jul 2025 16:14:13 +0200 Subject: [PATCH 4/6] Fix test assertion --- cpp/src/arrow/util/thread_pool_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 0d627828181..45441fa3216 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -1041,7 +1041,7 @@ TEST(TestGlobalThreadPool, Capacity) { ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT")); int hw_capacity = std::thread::hardware_concurrency(); - ASSERT_LT(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity); ASSERT_GE(ThreadPool::DefaultCapacity(), 1); ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13")); From 668895f227de622817cfd1af45f70aaf0cc8981c Mon Sep 17 00:00:00 2001 From: AntoinePrv Date: Mon, 21 Jul 2025 17:37:19 +0200 Subject: [PATCH 5/6] Move affinity detection to io_util --- cpp/src/arrow/result.h | 7 +++++++ cpp/src/arrow/util/CMakeLists.txt | 1 - cpp/src/arrow/util/cpu_info.cc | 30 ++-------------------------- cpp/src/arrow/util/cpu_info.h | 6 ------ cpp/src/arrow/util/cpu_info_test.cc | 31 ----------------------------- cpp/src/arrow/util/io_util.cc | 16 +++++++++++++++ cpp/src/arrow/util/io_util.h | 6 ++++++ cpp/src/arrow/util/io_util_test.cc | 11 ++++++++++ cpp/src/arrow/util/thread_pool.cc | 16 ++++++++------- 9 files changed, 51 insertions(+), 73 deletions(-) delete mode 100644 cpp/src/arrow/util/cpu_info_test.cc diff --git a/cpp/src/arrow/result.h b/cpp/src/arrow/result.h index 895f3085c62..594f173c2bf 100644 --- a/cpp/src/arrow/result.h +++ b/cpp/src/arrow/result.h @@ -377,6 +377,13 @@ class [[nodiscard]] Result : public util::EqualityComparable> { return MoveValueUnsafe(); } + T ValueOr(T alternative) const& { + if (!ok()) { + return alternative; + } + return ValueUnsafe(); + } + /// Retrieve the value if ok(), falling back to an alternative generated by the provided /// factory template diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index fa9a5d58ffa..df47389240e 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -54,7 +54,6 @@ add_arrow_test(utility-test cache_test.cc checked_cast_test.cc compression_test.cc - cpu_info_test.cc decimal_test.cc float16_test.cc fixed_width_test.cc diff --git a/cpp/src/arrow/util/cpu_info.cc b/cpp/src/arrow/util/cpu_info.cc index 05304cd51cc..6dbf2c35c1e 100644 --- a/cpp/src/arrow/util/cpu_info.cc +++ b/cpp/src/arrow/util/cpu_info.cc @@ -27,10 +27,6 @@ # include #endif -#ifdef __linux__ -# include -#endif - #ifdef _WIN32 # include @@ -386,14 +382,6 @@ int64_t LinuxParseCpuFlags(const std::string& values) { # endif } -// Retrieve the affinity into the out pointer if available. -void LinuxRetrieveAffinityCpuCount(int* count) { - cpu_set_t mask; - if (sched_getaffinity(0, sizeof(mask), &mask) == 0) { - *count = CPU_COUNT(&mask); - } -} - void OsRetrieveCacheSize(std::array* cache_sizes) { for (int i = 0; i < kCacheLevels; ++i) { const int64_t cache_size = LinuxGetCacheSize(i); @@ -516,7 +504,6 @@ void ArchVerifyCpuRequirements(const CpuInfo* ci) {} struct CpuInfo::Impl { int64_t hardware_flags = 0; int num_cores = 0; - int num_affinity_cores = -1; int64_t original_hardware_flags = 0; Vendor vendor = Vendor::Unknown; std::string model_name = "Unknown"; @@ -526,18 +513,7 @@ struct CpuInfo::Impl { OsRetrieveCacheSize(&cache_sizes); OsRetrieveCpuInfo(&hardware_flags, &vendor, &model_name); original_hardware_flags = hardware_flags; - - num_cores = static_cast(std::thread::hardware_concurrency()); - if (num_cores <= 0) { - num_cores = 4; - ARROW_LOG(WARNING) << "Failed to determine the number of available threads, " - "using a hardcoded arbitrary value " - << num_cores; - } - -#ifdef __linux__ - LinuxRetrieveAffinityCpuCount(&num_affinity_cores); -#endif + num_cores = std::max(static_cast(std::thread::hardware_concurrency()), 1); // parse user simd level auto maybe_env_var = GetEnvVar("ARROW_USER_SIMD_LEVEL"); @@ -574,9 +550,7 @@ const CpuInfo* CpuInfo::GetInstance() { int64_t CpuInfo::hardware_flags() const { return impl_->hardware_flags; } -int CpuInfo::num_cores() const { return impl_->num_cores; } - -int CpuInfo::num_affinity_cores() const { return impl_->num_affinity_cores; } +int CpuInfo::num_cores() const { return impl_->num_cores <= 0 ? 1 : impl_->num_cores; } CpuInfo::Vendor CpuInfo::vendor() const { return impl_->vendor; } diff --git a/cpp/src/arrow/util/cpu_info.h b/cpp/src/arrow/util/cpu_info.h index e320257e020..949719b97ed 100644 --- a/cpp/src/arrow/util/cpu_info.h +++ b/cpp/src/arrow/util/cpu_info.h @@ -69,14 +69,8 @@ class ARROW_EXPORT CpuInfo { int64_t hardware_flags() const; /// Returns the number of cores (including hyper-threaded) on this machine. - // The number returned is guaranteed to be greater or equal to one. int num_cores() const; - /// \brief Return the number of cores the process is allowed to use. - /// - /// This is currently only implemented on Linux, return -1 on other platforms. - int num_affinity_cores() const; - /// Returns the vendor of the cpu. Vendor vendor() const; diff --git a/cpp/src/arrow/util/cpu_info_test.cc b/cpp/src/arrow/util/cpu_info_test.cc deleted file mode 100644 index c775646f8d0..00000000000 --- a/cpp/src/arrow/util/cpu_info_test.cc +++ /dev/null @@ -1,31 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/util/cpu_info.h" - -#include - -TEST(CpuInfoTest, CpuAffinity) { - auto cpu_info = arrow::internal::CpuInfo::GetInstance(); - - ASSERT_LE(cpu_info->num_affinity_cores(), cpu_info->num_cores()); -#ifdef __linux__ - ASSERT_GE(cpu_info->num_affinity_cores(), 1); -#else - ASSERT_EQ(cpu_info->num_affinity_cores(), -1); -#endif -} diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 661634e2c3b..4e3c8a9176f 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -115,6 +115,7 @@ #elif __linux__ # include # include +# include #endif #ifdef _WIN32 @@ -2219,6 +2220,21 @@ int64_t GetTotalMemoryBytes() { #endif } +Result GetNumAffinityCores() { +#if defined(__linux__) + cpu_set_t mask; + if (sched_getaffinity(0, sizeof(mask), &mask) == 0) { + auto count = CPU_COUNT(&mask); + if (count > 0 && + static_cast(count) < std::numeric_limits::max()) { + return {static_cast(count)}; + } + } + return {Status::IOError("Could not read the CPU affinity.")}; +#endif + return {Status::NotImplemented("Only implemented for Linux")}; +} + Result LoadDynamicLibrary(const char* path) { #ifdef _WIN32 ARROW_ASSIGN_OR_RAISE(auto platform_path, PlatformFilename::FromString(path)); diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index 892641d4bc5..19c7cdcb636 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -419,6 +419,12 @@ int64_t GetCurrentRSS(); ARROW_EXPORT int64_t GetTotalMemoryBytes(); +/// \brief Get the number of affinity core on the system. +/// +/// This is only implemented on Linux. +/// If a value is returned, it is guaranteed to be greater or equal to one. +ARROW_EXPORT Result GetNumAffinityCores(); + /// \brief Load a dynamic library /// /// This wraps dlopen() except on Windows, where LoadLibrary() is called. diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index 1ff8fcf7adb..1d7740f7388 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -1123,5 +1123,16 @@ TEST(Memory, TotalMemory) { #endif } +TEST(CpuInfoTest, CpuAffinity) { + auto affinity_cores = GetNumAffinityCores(); + +#ifdef __linux__ + ASSERT_TRUE(affinity_cores.ok()); + ASSERT_LE(affinity_cores.ValueOr(1u), std::thread::hardware_concurrency()); +#else + ASSERT_FALSE(affinity_cores.ok()); +#endif +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index f2a4d05d8c0..bf107006f8b 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -28,7 +28,6 @@ #include "arrow/util/atfork_internal.h" #include "arrow/util/config.h" -#include "arrow/util/cpu_info.h" #include "arrow/util/io_util.h" #include "arrow/util/logging_internal.h" #include "arrow/util/mutex.h" @@ -733,21 +732,24 @@ static int ParseOMPEnvVar(const char* name) { } int ThreadPool::DefaultCapacity() { - const CpuInfo* ci = CpuInfo::GetInstance(); - int capacity = ParseOMPEnvVar("OMP_NUM_THREADS"); if (capacity <= 0) { - capacity = ci->num_affinity_cores(); + capacity = static_cast(GetNumAffinityCores().ValueOr(0)); + } + if (capacity <= 0) { + capacity = static_cast(std::thread::hardware_concurrency()); } if (capacity <= 0) { - capacity = ci->num_cores(); + capacity = 4; + ARROW_LOG(WARNING) << "Failed to determine the number of available threads, " + "using a hardcoded arbitrary value of " + << capacity; } - int limit = ParseOMPEnvVar("OMP_THREAD_LIMIT"); + const int limit = ParseOMPEnvVar("OMP_THREAD_LIMIT"); if (limit > 0) { capacity = std::min(limit, capacity); } - return capacity; } From 651d8f20ee1ec7e4dfbe0aaeebd010a6a1363385 Mon Sep 17 00:00:00 2001 From: AntoinePrv Date: Wed, 20 Aug 2025 10:12:26 +0200 Subject: [PATCH 6/6] Apply reviewer suggestions --- cpp/src/arrow/result.h | 1 + cpp/src/arrow/util/io_util.cc | 9 +++++---- cpp/src/arrow/util/io_util.h | 2 +- cpp/src/arrow/util/io_util_test.cc | 12 ++++++------ 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/result.h b/cpp/src/arrow/result.h index 594f173c2bf..2b25de69486 100644 --- a/cpp/src/arrow/result.h +++ b/cpp/src/arrow/result.h @@ -377,6 +377,7 @@ class [[nodiscard]] Result : public util::EqualityComparable> { return MoveValueUnsafe(); } + /// Return a copy of the internally stored value or alternative if an error is stored. T ValueOr(T alternative) const& { if (!ok()) { return alternative; diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 4e3c8a9176f..818371c4d9e 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -2220,19 +2220,20 @@ int64_t GetTotalMemoryBytes() { #endif } -Result GetNumAffinityCores() { +Result GetNumAffinityCores() { #if defined(__linux__) cpu_set_t mask; if (sched_getaffinity(0, sizeof(mask), &mask) == 0) { auto count = CPU_COUNT(&mask); if (count > 0 && static_cast(count) < std::numeric_limits::max()) { - return {static_cast(count)}; + return static_cast(count); } } - return {Status::IOError("Could not read the CPU affinity.")}; + return IOErrorFromErrno(errno, "Could not read the CPU affinity."); +#else + return Status::NotImplemented("Only implemented for Linux"); #endif - return {Status::NotImplemented("Only implemented for Linux")}; } Result LoadDynamicLibrary(const char* path) { diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index 19c7cdcb636..e9f218b5205 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -423,7 +423,7 @@ int64_t GetTotalMemoryBytes(); /// /// This is only implemented on Linux. /// If a value is returned, it is guaranteed to be greater or equal to one. -ARROW_EXPORT Result GetNumAffinityCores(); +ARROW_EXPORT Result GetNumAffinityCores(); /// \brief Load a dynamic library /// diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index 1d7740f7388..885f2355f4e 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -1123,14 +1123,14 @@ TEST(Memory, TotalMemory) { #endif } -TEST(CpuInfoTest, CpuAffinity) { - auto affinity_cores = GetNumAffinityCores(); - +TEST(CpuAffinity, NumberOfCores) { + auto maybe_affinity_cores = GetNumAffinityCores(); #ifdef __linux__ - ASSERT_TRUE(affinity_cores.ok()); - ASSERT_LE(affinity_cores.ValueOr(1u), std::thread::hardware_concurrency()); + ASSERT_OK_AND_ASSIGN(auto affinity_cores, maybe_affinity_cores); + ASSERT_GE(affinity_cores, 1); + ASSERT_LE(affinity_cores, std::thread::hardware_concurrency()); #else - ASSERT_FALSE(affinity_cores.ok()); + ASSERT_RAISES(NotImplemented, maybe_affinity_cores); #endif }