Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cpp/src/arrow/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ class [[nodiscard]] Result : public util::EqualityComparable<Result<T>> {
return MoveValueUnsafe();
}

/// Return a copy of the internally stored value or alternative if an error is stored.
T ValueOr(T alternative) const& {
if (!ok()) {
return alternative;
}
return ValueUnsafe();
}

/// Retrieve the value if ok(), falling back to an alternative generated by the provided
/// factory
template <typename G>
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/util/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
#elif __linux__
# include <sys/sysinfo.h>
# include <fstream>
# include <limits>
#endif

#ifdef _WIN32
Expand Down Expand Up @@ -2219,6 +2220,22 @@ int64_t GetTotalMemoryBytes() {
#endif
}

Result<int32_t> GetNumAffinityCores() {
#if defined(__linux__)
cpu_set_t mask;
if (sched_getaffinity(0, sizeof(mask), &mask) == 0) {
auto count = CPU_COUNT(&mask);
if (count > 0 &&
static_cast<uint64_t>(count) < std::numeric_limits<uint32_t>::max()) {
return static_cast<uint32_t>(count);
}
}
return IOErrorFromErrno(errno, "Could not read the CPU affinity.");
#else
return Status::NotImplemented("Only implemented for Linux");
#endif
}

Result<void*> LoadDynamicLibrary(const char* path) {
#ifdef _WIN32
ARROW_ASSIGN_OR_RAISE(auto platform_path, PlatformFilename::FromString(path));
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/util/io_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@ int64_t GetCurrentRSS();
ARROW_EXPORT
int64_t GetTotalMemoryBytes();

/// \brief Get the number of affinity core on the system.
///
/// This is only implemented on Linux.
/// If a value is returned, it is guaranteed to be greater or equal to one.
ARROW_EXPORT Result<int32_t> GetNumAffinityCores();

/// \brief Load a dynamic library
///
/// This wraps dlopen() except on Windows, where LoadLibrary() is called.
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/util/io_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1123,5 +1123,16 @@ TEST(Memory, TotalMemory) {
#endif
}

TEST(CpuAffinity, NumberOfCores) {
auto maybe_affinity_cores = GetNumAffinityCores();
#ifdef __linux__
ASSERT_OK_AND_ASSIGN(auto affinity_cores, maybe_affinity_cores);
ASSERT_GE(affinity_cores, 1);
ASSERT_LE(affinity_cores, std::thread::hardware_concurrency());
#else
ASSERT_RAISES(NotImplemented, maybe_affinity_cores);
#endif
}

} // namespace internal
} // namespace arrow
24 changes: 14 additions & 10 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,19 +732,23 @@ static int ParseOMPEnvVar(const char* name) {
}

int ThreadPool::DefaultCapacity() {
int capacity, limit;
capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
if (capacity == 0) {
capacity = std::thread::hardware_concurrency();
int capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
if (capacity <= 0) {
capacity = static_cast<int>(GetNumAffinityCores().ValueOr(0));
}
limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
if (limit > 0) {
capacity = std::min(limit, capacity);
if (capacity <= 0) {
capacity = static_cast<int>(std::thread::hardware_concurrency());
}
if (capacity == 0) {
ARROW_LOG(WARNING) << "Failed to determine the number of available threads, "
"using a hardcoded arbitrary value";
if (capacity <= 0) {
capacity = 4;
ARROW_LOG(WARNING) << "Failed to determine the number of available threads, "
"using a hardcoded arbitrary value of "
<< capacity;
}

const int limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
if (limit > 0) {
capacity = std::min(limit, capacity);
}
return capacity;
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class ARROW_EXPORT ThreadPool : public Executor {

// Heuristic for the default capacity of a thread pool for CPU-bound tasks.
// This is exposed as a static method to help with testing.
// The number returned is guaranteed to be greater or equal to one.
static int DefaultCapacity();

// Shutdown the pool. Once the pool starts shutting down, new tasks
Expand Down
25 changes: 18 additions & 7 deletions cpp/src/arrow/util/thread_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1039,35 +1039,46 @@ TEST(TestGlobalThreadPool, Capacity) {
// Exercise default capacity heuristic
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));

int hw_capacity = std::thread::hardware_concurrency();
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);

ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 13);

ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 7);
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));

ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 1);

ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999"));
if (hw_capacity <= 999) {
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
}
ASSERT_LE(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity));
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);

ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 6);

ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 2);

// Invalid env values
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);

ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);

ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1"));
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);

ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
Expand Down
2 changes: 1 addition & 1 deletion docs/source/cpp/threading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ CPU vs. I/O
-----------

In order to minimize the overhead of context switches our default thread pool
for CPU-intensive tasks has a fixed size, defaulting to
for CPU-intensive tasks has a fixed size, defaulting to the process CPU affinity (on Linux) or
`std::thread::hardware_concurrency <https://en.cppreference.com/w/cpp/thread/thread/hardware_concurrency>`_.
This means that CPU tasks should never block for long periods of time because this
will result in under-utilization of the CPU. To achieve this we have a separate
Expand Down
Loading