diff --git a/cpp/src/arrow/result.h b/cpp/src/arrow/result.h index 895f3085c62..2b25de69486 100644 --- a/cpp/src/arrow/result.h +++ b/cpp/src/arrow/result.h @@ -377,6 +377,14 @@ class [[nodiscard]] Result : public util::EqualityComparable> { return MoveValueUnsafe(); } + /// Return a copy of the internally stored value or alternative if an error is stored. + T ValueOr(T alternative) const& { + if (!ok()) { + return alternative; + } + return ValueUnsafe(); + } + /// Retrieve the value if ok(), falling back to an alternative generated by the provided /// factory template diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 661634e2c3b..818371c4d9e 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -115,6 +115,7 @@ #elif __linux__ # include # include +# include #endif #ifdef _WIN32 @@ -2219,6 +2220,22 @@ int64_t GetTotalMemoryBytes() { #endif } +Result GetNumAffinityCores() { +#if defined(__linux__) + cpu_set_t mask; + if (sched_getaffinity(0, sizeof(mask), &mask) == 0) { + auto count = CPU_COUNT(&mask); + if (count > 0 && + static_cast(count) < std::numeric_limits::max()) { + return static_cast(count); + } + } + return IOErrorFromErrno(errno, "Could not read the CPU affinity."); +#else + return Status::NotImplemented("Only implemented for Linux"); +#endif +} + Result LoadDynamicLibrary(const char* path) { #ifdef _WIN32 ARROW_ASSIGN_OR_RAISE(auto platform_path, PlatformFilename::FromString(path)); diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index 892641d4bc5..e9f218b5205 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -419,6 +419,12 @@ int64_t GetCurrentRSS(); ARROW_EXPORT int64_t GetTotalMemoryBytes(); +/// \brief Get the number of affinity core on the system. +/// +/// This is only implemented on Linux. +/// If a value is returned, it is guaranteed to be greater or equal to one. +ARROW_EXPORT Result GetNumAffinityCores(); + /// \brief Load a dynamic library /// /// This wraps dlopen() except on Windows, where LoadLibrary() is called. diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index 1ff8fcf7adb..885f2355f4e 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -1123,5 +1123,16 @@ TEST(Memory, TotalMemory) { #endif } +TEST(CpuAffinity, NumberOfCores) { + auto maybe_affinity_cores = GetNumAffinityCores(); +#ifdef __linux__ + ASSERT_OK_AND_ASSIGN(auto affinity_cores, maybe_affinity_cores); + ASSERT_GE(affinity_cores, 1); + ASSERT_LE(affinity_cores, std::thread::hardware_concurrency()); +#else + ASSERT_RAISES(NotImplemented, maybe_affinity_cores); +#endif +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 89834e5a11f..bf107006f8b 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -732,19 +732,23 @@ static int ParseOMPEnvVar(const char* name) { } int ThreadPool::DefaultCapacity() { - int capacity, limit; - capacity = ParseOMPEnvVar("OMP_NUM_THREADS"); - if (capacity == 0) { - capacity = std::thread::hardware_concurrency(); + int capacity = ParseOMPEnvVar("OMP_NUM_THREADS"); + if (capacity <= 0) { + capacity = static_cast(GetNumAffinityCores().ValueOr(0)); } - limit = ParseOMPEnvVar("OMP_THREAD_LIMIT"); - if (limit > 0) { - capacity = std::min(limit, capacity); + if (capacity <= 0) { + capacity = static_cast(std::thread::hardware_concurrency()); } - if (capacity == 0) { - ARROW_LOG(WARNING) << "Failed to determine the number of available threads, " - "using a hardcoded arbitrary value"; + if (capacity <= 0) { capacity = 4; + ARROW_LOG(WARNING) << "Failed to determine the number of available threads, " + "using a hardcoded arbitrary value of " + << capacity; + } + + const int limit = ParseOMPEnvVar("OMP_THREAD_LIMIT"); + if (limit > 0) { + capacity = std::min(limit, capacity); } return capacity; } diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index cd32781aed7..2e80f6e5441 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -475,6 +475,7 @@ class ARROW_EXPORT ThreadPool : public Executor { // Heuristic for the default capacity of a thread pool for CPU-bound tasks. // This is exposed as a static method to help with testing. + // The number returned is guaranteed to be greater or equal to one. static int DefaultCapacity(); // Shutdown the pool. Once the pool starts shutting down, new tasks diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 2c831460302..45441fa3216 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -1039,35 +1039,46 @@ TEST(TestGlobalThreadPool, Capacity) { // Exercise default capacity heuristic ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT")); + int hw_capacity = std::thread::hardware_concurrency(); - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 13); + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 7); ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 1); + ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999")); - if (hw_capacity <= 999) { - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); - } + ASSERT_LE(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity)); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 6); + ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2")); ASSERT_EQ(ThreadPool::DefaultCapacity(), 2); // Invalid env values ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0")); - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); + ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz")); ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x")); - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); + ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1")); ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999")); - ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity); + ASSERT_GE(ThreadPool::DefaultCapacity(), 1); ASSERT_OK(DelEnvVar("OMP_NUM_THREADS")); ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT")); diff --git a/docs/source/cpp/threading.rst b/docs/source/cpp/threading.rst index 4a1a65ffe01..d2d0b2d0f13 100644 --- a/docs/source/cpp/threading.rst +++ b/docs/source/cpp/threading.rst @@ -44,7 +44,7 @@ CPU vs. I/O ----------- In order to minimize the overhead of context switches our default thread pool -for CPU-intensive tasks has a fixed size, defaulting to +for CPU-intensive tasks has a fixed size, defaulting to the process CPU affinity (on Linux) or `std::thread::hardware_concurrency `_. This means that CPU tasks should never block for long periods of time because this will result in under-utilization of the CPU. To achieve this we have a separate