Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ add_arrow_test(utility-test
cache_test.cc
checked_cast_test.cc
compression_test.cc
cpu_info_test.cc
decimal_test.cc
float16_test.cc
fixed_width_test.cc
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/util/affinity.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <thread>

#ifdef __linux__
#include <sched.h>
#include <unistd.h>
#endif

namespace arrow {
namespace internal {

// Returns the number of CPUs the current process is allowed to use.
// Falls back to std::thread::hardware_concurrency() if affinity is not available.
inline int GetAffinityCpuCount() {
#ifdef __linux__
cpu_set_t mask;
if (sched_getaffinity(0, sizeof(mask), &mask) == 0) {
return CPU_COUNT(&mask);
}
#endif
return std::thread::hardware_concurrency();
}
Comment on lines +15 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this to cpp/src/arrow/util/cpu_info.cc?


} // namespace internal
} // namespace arrow
3 changes: 2 additions & 1 deletion cpp/src/arrow/util/cpu_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <thread>

#include "arrow/result.h"
#include "arrow/util/affinity.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/string.h"
Expand Down Expand Up @@ -513,7 +514,7 @@ struct CpuInfo::Impl {
OsRetrieveCacheSize(&cache_sizes);
OsRetrieveCpuInfo(&hardware_flags, &vendor, &model_name);
original_hardware_flags = hardware_flags;
num_cores = std::max(static_cast<int>(std::thread::hardware_concurrency()), 1);
num_cores = std::max(GetAffinityCpuCount(), 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about not changing this and adding a new method (num_affinity_cores()?) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Just to double-ceck: do we want ThreadPool::DefaultCapacity() to continue using hardware_concurrency() even on Linux, or would it make sense to prefer num_affinity_cores() there

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion:

  • CpuInfo::num_cores() uses hardware_concurrency()
  • ThreadPool::DefaultCapacity() uses CpuInfo::num_affinity_cores() not CpuInfo::num_cores()

@pitrou @cyb70289 Do you have any opinion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. num_cores() as hardware limit. num_affinity_cores() as actual resource available.


// parse user simd level
auto maybe_env_var = GetEnvVar("ARROW_USER_SIMD_LEVEL");
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/util/cpu_info_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include <gtest/gtest.h>
#include "arrow/util/cpu_info.h"

#ifdef __linux__
#include <sched.h>
#endif

TEST(CpuInfoTest, CpuAffinity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks this test will never fail? But I don't have better suggestion :)

#ifdef __linux__
auto cpu_info = arrow::internal::CpuInfo::GetInstance();
int affinity_cores = cpu_info->num_cores();

cpu_set_t mask;
ASSERT_EQ(sched_getaffinity(0, sizeof(mask), &mask), 0);
int expected = CPU_COUNT(&mask);

ASSERT_EQ(affinity_cores, expected);
#else
GTEST_SKIP() << "CpuInfo affinity check only applies on Linux.";
#endif
}
3 changes: 2 additions & 1 deletion cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <thread>
#include <vector>

#include "arrow/util/affinity.h"
#include "arrow/util/atfork_internal.h"
#include "arrow/util/config.h"
#include "arrow/util/io_util.h"
Expand Down Expand Up @@ -733,7 +734,7 @@ int ThreadPool::DefaultCapacity() {
int capacity, limit;
capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
if (capacity == 0) {
capacity = std::thread::hardware_concurrency();
capacity = GetAffinityCpuCount();
}
limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
if (limit > 0) {
Expand Down
37 changes: 33 additions & 4 deletions cpp/src/arrow/util/thread_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "arrow/testing/executor_util.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/affinity.h"
#include "arrow/util/config.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -1039,35 +1040,63 @@ TEST(TestGlobalThreadPool, Capacity) {
// Exercise default capacity heuristic
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));

#ifdef __linux__
int expected_capacity = arrow::internal::GetAffinityCpuCount();
#else
int hw_capacity = std::thread::hardware_concurrency();
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
int expected_capacity = hw_capacity;
#endif

ASSERT_EQ(ThreadPool::DefaultCapacity(), expected_capacity);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

Suggested change
ASSERT_EQ(ThreadPool::DefaultCapacity(), expected_capacity);
ASSERT_LT(ThreadPool::DefaultCapacity(), hw_capacity);


ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 13);

ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 7);
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));

ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 1);

ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999"));
if (hw_capacity <= 999) {
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
}
#ifdef __linux__
ASSERT_EQ(ThreadPool::DefaultCapacity(), std::min(999, arrow::internal::GetAffinityCpuCount()));
#else
ASSERT_EQ(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity));
#endif
Comment on lines +1064 to +1068
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify it? Leave only line 1065, and remove hw_capacity?


ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 6);

ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 2);

// Invalid env values
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0"));
#ifdef __linux__
ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount());
#else
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
#endif

ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x"));
#ifdef __linux__
ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount());
#else
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
#endif

ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1"));
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999"));
#ifdef __linux__
ASSERT_EQ(ThreadPool::DefaultCapacity(), arrow::internal::GetAffinityCpuCount());
#else
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
#endif

ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
Expand Down
Loading