Skip to content
23 changes: 22 additions & 1 deletion cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,29 @@ MemoryPool* default_memory_pool() {

#ifndef ARROW_JEMALLOC
Status jemalloc_set_decay_ms(int ms) {
return Status::Invalid("jemalloc support is not built");
return Status::NotImplemented("jemalloc support is not built");
}

Result<int64_t> jemalloc_get_stat(const char* name) {
return Status::NotImplemented("jemalloc support is not built");
}

Status jemalloc_peak_reset() {
return Status::NotImplemented("jemalloc support is not built");
}

Status jemalloc_stats_print(const char* opts) {
return Status::NotImplemented("jemalloc support is not built");
}

Status jemalloc_stats_print(std::function<void(const char*)> write_cb, const char* opts) {
return Status::NotImplemented("jemalloc support is not built");
}

Result<std::string> jemalloc_stats_string(const char* opts) {
return Status::NotImplemented("jemalloc support is not built");
}

#endif

///////////////////////////////////////////////////////////////////////
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>

#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/visibility.h"
Expand Down Expand Up @@ -175,6 +177,37 @@ ARROW_EXPORT Status jemalloc_memory_pool(MemoryPool** out);
ARROW_EXPORT
Status jemalloc_set_decay_ms(int ms);

/// \brief Get basic statistics from jemalloc's mallctl.
/// See the MALLCTL NAMESPACE section in jemalloc project documentation for
/// available stats.
ARROW_EXPORT
Result<int64_t> jemalloc_get_stat(const char* name);

/// \brief Reset the counter for peak bytes allocated in the calling thread to zero.
/// This affects subsequent calls to thread.peak.read, but not the values returned by
/// thread.allocated or thread.deallocated.
ARROW_EXPORT
Status jemalloc_peak_reset();

/// \brief Print summary statistics in human-readable form to stderr.
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Status jemalloc_stats_print(const char* opts = "");

/// \brief Print summary statistics in human-readable form using a callback
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Status jemalloc_stats_print(std::function<void(const char*)> write_cb,
const char* opts = "");

/// \brief Get summary statistics in human-readable form.
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Result<std::string> jemalloc_stats_string(const char* opts = "");

/// \brief Return a process-wide memory pool based on mimalloc.
///
/// May return NotImplemented if mimalloc is not available.
Expand Down
52 changes: 52 additions & 0 deletions cpp/src/arrow/memory_pool_jemalloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "arrow/memory_pool_internal.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h" // IWYU pragma: keep

// We can't put the jemalloc memory pool implementation into
Expand Down Expand Up @@ -153,4 +154,55 @@ Status jemalloc_set_decay_ms(int ms) {

#undef RETURN_IF_JEMALLOC_ERROR

Result<int64_t> jemalloc_get_stat(const char* name) {
size_t sz = sizeof(uint64_t);
int err;
uint64_t value;

// Update the statistics cached by mallctl.
if (std::strcmp(name, "stats.allocated") == 0 ||
std::strcmp(name, "stats.active") == 0 ||
std::strcmp(name, "stats.metadata") == 0 ||
std::strcmp(name, "stats.resident") == 0 ||
std::strcmp(name, "stats.mapped") == 0 ||
std::strcmp(name, "stats.retained") == 0) {
uint64_t epoch;
mallctl("epoch", &epoch, &sz, &epoch, sz);
}

err = mallctl(name, &value, &sz, nullptr, 0);

if (err) {
return arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name);
}

return value;
}

Status jemalloc_peak_reset() {
int err = mallctl("thread.peak.reset", nullptr, nullptr, nullptr, 0);
return err ? arrow::internal::IOErrorFromErrno(err, "Failed resetting thread.peak.")
: Status::OK();
}

Result<std::string> jemalloc_stats_string(const char* opts) {
std::string stats;
auto write_cb = [&stats](const char* str) { stats.append(str); };
ARROW_UNUSED(jemalloc_stats_print(write_cb, opts));
return stats;
}

Status jemalloc_stats_print(const char* opts) {
malloc_stats_print(nullptr, nullptr, opts);
return Status::OK();
}

Status jemalloc_stats_print(std::function<void(const char*)> write_cb, const char* opts) {
auto cb_wrapper = [](void* opaque, const char* str) {
(*static_cast<std::function<void(const char*)>*>(opaque))(str);
};
malloc_stats_print(cb_wrapper, &write_cb, opts);
return Status::OK();
}

} // namespace arrow
98 changes: 97 additions & 1 deletion cpp/src/arrow/memory_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"

namespace arrow {

Expand Down Expand Up @@ -168,7 +169,102 @@ TEST(Jemalloc, SetDirtyPageDecayMillis) {
#ifdef ARROW_JEMALLOC
ASSERT_OK(jemalloc_set_decay_ms(0));
#else
ASSERT_RAISES(Invalid, jemalloc_set_decay_ms(0));
ASSERT_RAISES(NotImplemented, jemalloc_set_decay_ms(0));
#endif
}

TEST(Jemalloc, GetAllocationStats) {
#ifdef ARROW_JEMALLOC
uint8_t* data;
int64_t allocated, active, metadata, resident, mapped, retained, allocated0, active0,
metadata0, resident0, mapped0, retained0;
int64_t thread_allocated, thread_deallocated, thread_peak_read, thread_allocated0,
thread_deallocated0, thread_peak_read0;
MemoryPool* pool = nullptr;
ABORT_NOT_OK(jemalloc_memory_pool(&pool));
ASSERT_EQ("jemalloc", pool->backend_name());

// Record stats before allocating
ASSERT_OK_AND_ASSIGN(allocated0, jemalloc_get_stat("stats.allocated"));
ASSERT_OK_AND_ASSIGN(active0, jemalloc_get_stat("stats.active"));
ASSERT_OK_AND_ASSIGN(metadata0, jemalloc_get_stat("stats.metadata"));
ASSERT_OK_AND_ASSIGN(resident0, jemalloc_get_stat("stats.resident"));
ASSERT_OK_AND_ASSIGN(mapped0, jemalloc_get_stat("stats.mapped"));
ASSERT_OK_AND_ASSIGN(retained0, jemalloc_get_stat("stats.retained"));
ASSERT_OK_AND_ASSIGN(thread_allocated0, jemalloc_get_stat("thread.allocated"));
ASSERT_OK_AND_ASSIGN(thread_deallocated0, jemalloc_get_stat("thread.deallocated"));
ASSERT_OK_AND_ASSIGN(thread_peak_read0, jemalloc_get_stat("thread.peak.read"));

// Allocate memory
ASSERT_OK(pool->Allocate(1025, &data));
ASSERT_EQ(pool->bytes_allocated(), 1025);
ASSERT_OK(pool->Reallocate(1025, 1023, &data));
ASSERT_EQ(pool->bytes_allocated(), 1023);

// Record stats after allocating
ASSERT_OK_AND_ASSIGN(allocated, jemalloc_get_stat("stats.allocated"));
ASSERT_OK_AND_ASSIGN(active, jemalloc_get_stat("stats.active"));
ASSERT_OK_AND_ASSIGN(metadata, jemalloc_get_stat("stats.metadata"));
ASSERT_OK_AND_ASSIGN(resident, jemalloc_get_stat("stats.resident"));
ASSERT_OK_AND_ASSIGN(mapped, jemalloc_get_stat("stats.mapped"));
ASSERT_OK_AND_ASSIGN(retained, jemalloc_get_stat("stats.retained"));
ASSERT_OK_AND_ASSIGN(thread_allocated, jemalloc_get_stat("thread.allocated"));
ASSERT_OK_AND_ASSIGN(thread_deallocated, jemalloc_get_stat("thread.deallocated"));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));

// Check allocated stats pre-allocation
ASSERT_NEAR(allocated0, 120000, 100000);
ASSERT_NEAR(active0, 75000, 70000);
ASSERT_NEAR(metadata0, 3000000, 1000000);
ASSERT_NEAR(resident0, 3000000, 1000000);
ASSERT_NEAR(mapped0, 6500000, 1000000);
ASSERT_NEAR(retained0, 1500000, 2000000);

// Check allocated stats change due to allocation
ASSERT_NEAR(allocated - allocated0, 70000, 50000);
ASSERT_NEAR(active - active0, 100000, 90000);
ASSERT_NEAR(metadata - metadata0, 500, 460);
ASSERT_NEAR(resident - resident0, 120000, 110000);
ASSERT_NEAR(mapped - mapped0, 100000, 90000);
ASSERT_NEAR(retained - retained0, 0, 40000);

ASSERT_NEAR(thread_peak_read - thread_peak_read0, 1024, 700);
ASSERT_NEAR(thread_allocated - thread_allocated0, 2500, 500);
ASSERT_EQ(thread_deallocated - thread_deallocated0, 1280);

// Resetting thread peak read metric
ASSERT_OK(pool->Allocate(12560, &data));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_NEAR(thread_peak_read, 15616, 1000);
ASSERT_OK(jemalloc_peak_reset());
ASSERT_OK(pool->Allocate(1256, &data));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_NEAR(thread_peak_read, 1280, 100);

// Print statistics to stderr
ASSERT_OK(jemalloc_stats_print("J"));

// Read statistics into std::string
ASSERT_OK_AND_ASSIGN(std::string stats, jemalloc_stats_string("Jax"));

// Read statistics into std::string with a lambda
std::string stats2;
auto write_cb = [&stats2](const char* str) { stats2.append(str); };
ASSERT_OK(jemalloc_stats_print(write_cb, "Jax"));

ASSERT_EQ(stats.rfind("{\"jemalloc\":{\"version\"", 0), 0);
ASSERT_EQ(stats2.rfind("{\"jemalloc\":{\"version\"", 0), 0);
ASSERT_EQ(stats.substr(0, 100), stats2.substr(0, 100));
#else
std::string stats;
auto write_cb = [&stats](const char* str) { stats.append(str); };
ASSERT_RAISES(NotImplemented, jemalloc_get_stat("thread.peak.read"));
ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated"));
ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated"));
ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocatedp"));
ASSERT_RAISES(NotImplemented, jemalloc_peak_reset());
ASSERT_RAISES(NotImplemented, jemalloc_stats_print(write_cb, "Jax"));
ASSERT_RAISES(NotImplemented, jemalloc_stats_print("ax"));
#endif
}

Expand Down