diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 99cb0682462..638bbb3ab7f 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -667,8 +667,29 @@ MemoryPool* default_memory_pool() { #ifndef ARROW_JEMALLOC Status jemalloc_set_decay_ms(int ms) { - return Status::Invalid("jemalloc support is not built"); + return Status::NotImplemented("jemalloc support is not built"); } + +Result jemalloc_get_stat(const char* name) { + return Status::NotImplemented("jemalloc support is not built"); +} + +Status jemalloc_peak_reset() { + return Status::NotImplemented("jemalloc support is not built"); +} + +Status jemalloc_stats_print(const char* opts) { + return Status::NotImplemented("jemalloc support is not built"); +} + +Status jemalloc_stats_print(std::function write_cb, const char* opts) { + return Status::NotImplemented("jemalloc support is not built"); +} + +Result jemalloc_stats_string(const char* opts) { + return Status::NotImplemented("jemalloc support is not built"); +} + #endif /////////////////////////////////////////////////////////////////////// diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 58b375af3a9..dba55268d69 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -19,9 +19,11 @@ #include #include +#include #include #include +#include "arrow/result.h" #include "arrow/status.h" #include "arrow/type_fwd.h" #include "arrow/util/visibility.h" @@ -175,6 +177,37 @@ ARROW_EXPORT Status jemalloc_memory_pool(MemoryPool** out); ARROW_EXPORT Status jemalloc_set_decay_ms(int ms); +/// \brief Get basic statistics from jemalloc's mallctl. +/// See the MALLCTL NAMESPACE section in jemalloc project documentation for +/// available stats. +ARROW_EXPORT +Result jemalloc_get_stat(const char* name); + +/// \brief Reset the counter for peak bytes allocated in the calling thread to zero. +/// This affects subsequent calls to thread.peak.read, but not the values returned by +/// thread.allocated or thread.deallocated. +ARROW_EXPORT +Status jemalloc_peak_reset(); + +/// \brief Print summary statistics in human-readable form to stderr. +/// See malloc_stats_print documentation in jemalloc project documentation for +/// available opt flags. +ARROW_EXPORT +Status jemalloc_stats_print(const char* opts = ""); + +/// \brief Print summary statistics in human-readable form using a callback +/// See malloc_stats_print documentation in jemalloc project documentation for +/// available opt flags. +ARROW_EXPORT +Status jemalloc_stats_print(std::function write_cb, + const char* opts = ""); + +/// \brief Get summary statistics in human-readable form. +/// See malloc_stats_print documentation in jemalloc project documentation for +/// available opt flags. +ARROW_EXPORT +Result jemalloc_stats_string(const char* opts = ""); + /// \brief Return a process-wide memory pool based on mimalloc. /// /// May return NotImplemented if mimalloc is not available. diff --git a/cpp/src/arrow/memory_pool_jemalloc.cc b/cpp/src/arrow/memory_pool_jemalloc.cc index 48a5bac137b..03d2b28ee3e 100644 --- a/cpp/src/arrow/memory_pool_jemalloc.cc +++ b/cpp/src/arrow/memory_pool_jemalloc.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/memory_pool_internal.h" +#include "arrow/util/io_util.h" #include "arrow/util/logging.h" // IWYU pragma: keep // We can't put the jemalloc memory pool implementation into @@ -153,4 +154,55 @@ Status jemalloc_set_decay_ms(int ms) { #undef RETURN_IF_JEMALLOC_ERROR +Result jemalloc_get_stat(const char* name) { + size_t sz = sizeof(uint64_t); + int err; + uint64_t value; + + // Update the statistics cached by mallctl. + if (std::strcmp(name, "stats.allocated") == 0 || + std::strcmp(name, "stats.active") == 0 || + std::strcmp(name, "stats.metadata") == 0 || + std::strcmp(name, "stats.resident") == 0 || + std::strcmp(name, "stats.mapped") == 0 || + std::strcmp(name, "stats.retained") == 0) { + uint64_t epoch; + mallctl("epoch", &epoch, &sz, &epoch, sz); + } + + err = mallctl(name, &value, &sz, nullptr, 0); + + if (err) { + return arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name); + } + + return value; +} + +Status jemalloc_peak_reset() { + int err = mallctl("thread.peak.reset", nullptr, nullptr, nullptr, 0); + return err ? arrow::internal::IOErrorFromErrno(err, "Failed resetting thread.peak.") + : Status::OK(); +} + +Result jemalloc_stats_string(const char* opts) { + std::string stats; + auto write_cb = [&stats](const char* str) { stats.append(str); }; + ARROW_UNUSED(jemalloc_stats_print(write_cb, opts)); + return stats; +} + +Status jemalloc_stats_print(const char* opts) { + malloc_stats_print(nullptr, nullptr, opts); + return Status::OK(); +} + +Status jemalloc_stats_print(std::function write_cb, const char* opts) { + auto cb_wrapper = [](void* opaque, const char* str) { + (*static_cast*>(opaque))(str); + }; + malloc_stats_print(cb_wrapper, &write_cb, opts); + return Status::OK(); +} + } // namespace arrow diff --git a/cpp/src/arrow/memory_pool_test.cc b/cpp/src/arrow/memory_pool_test.cc index 591d86a23f5..5ac14a44b9a 100644 --- a/cpp/src/arrow/memory_pool_test.cc +++ b/cpp/src/arrow/memory_pool_test.cc @@ -25,6 +25,7 @@ #include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/config.h" +#include "arrow/util/logging.h" namespace arrow { @@ -168,7 +169,102 @@ TEST(Jemalloc, SetDirtyPageDecayMillis) { #ifdef ARROW_JEMALLOC ASSERT_OK(jemalloc_set_decay_ms(0)); #else - ASSERT_RAISES(Invalid, jemalloc_set_decay_ms(0)); + ASSERT_RAISES(NotImplemented, jemalloc_set_decay_ms(0)); +#endif +} + +TEST(Jemalloc, GetAllocationStats) { +#ifdef ARROW_JEMALLOC + uint8_t* data; + int64_t allocated, active, metadata, resident, mapped, retained, allocated0, active0, + metadata0, resident0, mapped0, retained0; + int64_t thread_allocated, thread_deallocated, thread_peak_read, thread_allocated0, + thread_deallocated0, thread_peak_read0; + MemoryPool* pool = nullptr; + ABORT_NOT_OK(jemalloc_memory_pool(&pool)); + ASSERT_EQ("jemalloc", pool->backend_name()); + + // Record stats before allocating + ASSERT_OK_AND_ASSIGN(allocated0, jemalloc_get_stat("stats.allocated")); + ASSERT_OK_AND_ASSIGN(active0, jemalloc_get_stat("stats.active")); + ASSERT_OK_AND_ASSIGN(metadata0, jemalloc_get_stat("stats.metadata")); + ASSERT_OK_AND_ASSIGN(resident0, jemalloc_get_stat("stats.resident")); + ASSERT_OK_AND_ASSIGN(mapped0, jemalloc_get_stat("stats.mapped")); + ASSERT_OK_AND_ASSIGN(retained0, jemalloc_get_stat("stats.retained")); + ASSERT_OK_AND_ASSIGN(thread_allocated0, jemalloc_get_stat("thread.allocated")); + ASSERT_OK_AND_ASSIGN(thread_deallocated0, jemalloc_get_stat("thread.deallocated")); + ASSERT_OK_AND_ASSIGN(thread_peak_read0, jemalloc_get_stat("thread.peak.read")); + + // Allocate memory + ASSERT_OK(pool->Allocate(1025, &data)); + ASSERT_EQ(pool->bytes_allocated(), 1025); + ASSERT_OK(pool->Reallocate(1025, 1023, &data)); + ASSERT_EQ(pool->bytes_allocated(), 1023); + + // Record stats after allocating + ASSERT_OK_AND_ASSIGN(allocated, jemalloc_get_stat("stats.allocated")); + ASSERT_OK_AND_ASSIGN(active, jemalloc_get_stat("stats.active")); + ASSERT_OK_AND_ASSIGN(metadata, jemalloc_get_stat("stats.metadata")); + ASSERT_OK_AND_ASSIGN(resident, jemalloc_get_stat("stats.resident")); + ASSERT_OK_AND_ASSIGN(mapped, jemalloc_get_stat("stats.mapped")); + ASSERT_OK_AND_ASSIGN(retained, jemalloc_get_stat("stats.retained")); + ASSERT_OK_AND_ASSIGN(thread_allocated, jemalloc_get_stat("thread.allocated")); + ASSERT_OK_AND_ASSIGN(thread_deallocated, jemalloc_get_stat("thread.deallocated")); + ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read")); + + // Check allocated stats pre-allocation + ASSERT_NEAR(allocated0, 120000, 100000); + ASSERT_NEAR(active0, 75000, 70000); + ASSERT_NEAR(metadata0, 3000000, 1000000); + ASSERT_NEAR(resident0, 3000000, 1000000); + ASSERT_NEAR(mapped0, 6500000, 1000000); + ASSERT_NEAR(retained0, 1500000, 2000000); + + // Check allocated stats change due to allocation + ASSERT_NEAR(allocated - allocated0, 70000, 50000); + ASSERT_NEAR(active - active0, 100000, 90000); + ASSERT_NEAR(metadata - metadata0, 500, 460); + ASSERT_NEAR(resident - resident0, 120000, 110000); + ASSERT_NEAR(mapped - mapped0, 100000, 90000); + ASSERT_NEAR(retained - retained0, 0, 40000); + + ASSERT_NEAR(thread_peak_read - thread_peak_read0, 1024, 700); + ASSERT_NEAR(thread_allocated - thread_allocated0, 2500, 500); + ASSERT_EQ(thread_deallocated - thread_deallocated0, 1280); + + // Resetting thread peak read metric + ASSERT_OK(pool->Allocate(12560, &data)); + ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read")); + ASSERT_NEAR(thread_peak_read, 15616, 1000); + ASSERT_OK(jemalloc_peak_reset()); + ASSERT_OK(pool->Allocate(1256, &data)); + ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read")); + ASSERT_NEAR(thread_peak_read, 1280, 100); + + // Print statistics to stderr + ASSERT_OK(jemalloc_stats_print("J")); + + // Read statistics into std::string + ASSERT_OK_AND_ASSIGN(std::string stats, jemalloc_stats_string("Jax")); + + // Read statistics into std::string with a lambda + std::string stats2; + auto write_cb = [&stats2](const char* str) { stats2.append(str); }; + ASSERT_OK(jemalloc_stats_print(write_cb, "Jax")); + + ASSERT_EQ(stats.rfind("{\"jemalloc\":{\"version\"", 0), 0); + ASSERT_EQ(stats2.rfind("{\"jemalloc\":{\"version\"", 0), 0); + ASSERT_EQ(stats.substr(0, 100), stats2.substr(0, 100)); +#else + std::string stats; + auto write_cb = [&stats](const char* str) { stats.append(str); }; + ASSERT_RAISES(NotImplemented, jemalloc_get_stat("thread.peak.read")); + ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated")); + ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated")); + ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocatedp")); + ASSERT_RAISES(NotImplemented, jemalloc_peak_reset()); + ASSERT_RAISES(NotImplemented, jemalloc_stats_print(write_cb, "Jax")); + ASSERT_RAISES(NotImplemented, jemalloc_stats_print("ax")); #endif }