Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
#include "gen_cpp/Exprs_types.h"
#include "runtime/descriptors.h"
#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
#include "util/telemetry/telemetry.h"

namespace doris {

class ObjectPool;
class RuntimeProfile;
class RuntimeState;
class TPlanFragmentExecParams;
class RowDescriptor;
Expand Down
1 change: 1 addition & 0 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "http/web_page_handler.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/debug_util.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
#include "util/thread.h"
Expand Down
1 change: 1 addition & 0 deletions be/src/http/ev_http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "http/http_headers.h"
#include "http/http_request.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "service/brpc.h"
#include "util/debug_util.h"
#include "util/threadpool.h"
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/network_util.h"
#include "util/stopwatch.hpp"
#include "util/telemetry/telemetry.h"
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "runtime/load_channel.h"
#include "runtime/memory/mem_tracker.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
#include "util/stopwatch.hpp"
#include "util/time.h"

Expand Down
25 changes: 14 additions & 11 deletions be/src/runtime/memory/jemalloc_hook.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
#endif

extern "C" {

// Both je_nallocx and je_malloc will use the lock je_malloc_mutex_lock_slow,
// so enabling the jemalloc hook will double the lock usage.
// In extreme cases this will affect performance, consider turning off mem hook
// mem hook should avoid nesting new/malloc.

void* doris_malloc(size_t size) __THROW {
// Both je_nallocx and je_malloc will use the lock je_malloc_mutex_lock_slow,
// so enabling the jemalloc hook will double the lock usage.
// In extreme cases this will affect performance, consider turning off mem hook
TRY_CONSUME_MEM_TRACKER(jenallocx(size, 0), nullptr);
CONSUME_MEM_TRACKER(jenallocx(size, 0));
void* ptr = jemalloc(size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(jenallocx(size, 0));
Expand All @@ -53,7 +56,7 @@ void* doris_realloc(void* p, size_t size) __THROW {
int64_t old_size = jemalloc_usable_size(p);
#endif

TRY_CONSUME_MEM_TRACKER(jenallocx(size, 0) - old_size, nullptr);
CONSUME_MEM_TRACKER(jenallocx(size, 0) - old_size);
void* ptr = jerealloc(p, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(jenallocx(size, 0) - old_size);
Expand All @@ -66,7 +69,7 @@ void* doris_calloc(size_t n, size_t size) __THROW {
return nullptr;
}

TRY_CONSUME_MEM_TRACKER(n * size, nullptr);
CONSUME_MEM_TRACKER(n * size);
void* ptr = jecalloc(n, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(n * size);
Expand All @@ -82,7 +85,7 @@ void doris_cfree(void* ptr) __THROW {
}

void* doris_memalign(size_t align, size_t size) __THROW {
TRY_CONSUME_MEM_TRACKER(size, nullptr);
CONSUME_MEM_TRACKER(size);
void* ptr = jealigned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
Expand All @@ -93,7 +96,7 @@ void* doris_memalign(size_t align, size_t size) __THROW {
}

void* doris_aligned_alloc(size_t align, size_t size) __THROW {
TRY_CONSUME_MEM_TRACKER(size, nullptr);
CONSUME_MEM_TRACKER(size);
void* ptr = jealigned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
Expand All @@ -104,7 +107,7 @@ void* doris_aligned_alloc(size_t align, size_t size) __THROW {
}

void* doris_valloc(size_t size) __THROW {
TRY_CONSUME_MEM_TRACKER(size, nullptr);
CONSUME_MEM_TRACKER(size);
void* ptr = jevalloc(size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
Expand All @@ -115,7 +118,7 @@ void* doris_valloc(size_t size) __THROW {
}

void* doris_pvalloc(size_t size) __THROW {
TRY_CONSUME_MEM_TRACKER(size, nullptr);
CONSUME_MEM_TRACKER(size);
void* ptr = jevalloc(size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
Expand All @@ -126,7 +129,7 @@ void* doris_pvalloc(size_t size) __THROW {
}

int doris_posix_memalign(void** r, size_t align, size_t size) __THROW {
TRY_CONSUME_MEM_TRACKER(size, ENOMEM);
CONSUME_MEM_TRACKER(size);
int ret = jeposix_memalign(r, align, size);
if (UNLIKELY(ret != 0)) {
RELEASE_MEM_TRACKER(size);
Expand Down
84 changes: 67 additions & 17 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
#include "runtime/fragment_mgr.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
#include "util/stack_util.h"

Expand Down Expand Up @@ -237,25 +239,73 @@ void MemTrackerLimiter::print_log_process_usage(const std::string& msg, bool wit
}
}

std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
const std::string& limit_exceeded_errmsg) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail = fmt::format(
"Memory limit exceeded:<consuming tracker:<{}>, {}>, executing msg:<{}>. backend {} "
"process memory used {}, limit {}. If query tracker exceed, `set "
bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) {
if (!_oom_avoidance) {
return false;
}
// Limit process memory usage using the actual physical memory of the process in `/proc/self/status`.
// This is independent of the consumption value of the mem tracker, which counts the virtual memory
// of the process malloc.
// for fast, expect MemInfo::initialized() to be true.
//
// tcmalloc/jemalloc allocator cache does not participate in the mem check as part of the process physical memory.
// because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc allocator cache,
// but it may not actually alloc physical memory, which is not expected in mem hook fail.
if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() ||
MemInfo::sys_mem_available() < MemInfo::sys_mem_available_low_water_mark()) {
print_log_process_usage(
fmt::format("System Mem Exceed Limit Check Faild, Try Alloc: {}", bytes));
return true;
}
return false;
}

std::string MemTrackerLimiter::process_mem_log_str() {
return fmt::format(
"OS physical memory {}. Process memory usage {}, limit {}, soft limit {}. Sys "
"available memory {}, low water mark {}, warning water mark {}. Refresh interval "
"memory growth {} B",
PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), MemInfo::soft_mem_limit_str(),
MemInfo::sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES),
MemInfo::refresh_interval_memory_growth);
}

std::string MemTrackerLimiter::process_limit_exceeded_errmsg_str(int64_t bytes) {
return fmt::format(
"process memory used {} exceed limit {} or sys mem available {} less than low "
"water mark {}, failed alloc size {}",
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
MemInfo::sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
print_bytes(bytes));
}

std::string MemTrackerLimiter::query_tracker_limit_exceeded_str(
const std::string& tracker_limit_exceeded, const std::string& last_consumer_tracker,
const std::string& executing_msg) {
return fmt::format(
"Memory limit exceeded:{}, exec node:<{}>, execute msg:{}. backend {} "
"process memory used {}, limit {}. Can `set "
"exec_mem_limit=8G` to change limit, details see be.INFO.",
_label, limit_exceeded_errmsg, msg, BackendOptions::get_localhost(),
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str());
return detail;
tracker_limit_exceeded, last_consumer_tracker, executing_msg,
BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
MemInfo::mem_limit_str());
}

std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
return fmt::format(
"exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
label(), print_bytes(limit()), print_bytes(_consumption->peak_value()),
print_bytes(_consumption->current_value()));
}

Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_alloc_size) {
auto failed_msg =
mem_limit_exceeded(msg, tracker_limit_exceeded_errmsg_str(failed_alloc_size, this));
print_log_usage(failed_msg);
state->log_error(failed_msg);
return Status::MemoryLimitExceeded(failed_msg);
std::string MemTrackerLimiter::tracker_limit_exceeded_str(int64_t bytes) {
return fmt::format("failed alloc size {}, {}", print_bytes(bytes),
tracker_limit_exceeded_str());
}

int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
Expand Down
112 changes: 10 additions & 102 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
#include <atomic>

#include "common/config.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
#include "util/string_util.h"

namespace doris {
Expand Down Expand Up @@ -77,26 +73,7 @@ class MemTrackerLimiter final : public MemTracker {

~MemTrackerLimiter();

static bool sys_mem_exceed_limit_check(int64_t bytes) {
if (!_oom_avoidance) {
return false;
}
// Limit process memory usage using the actual physical memory of the process in `/proc/self/status`.
// This is independent of the consumption value of the mem tracker, which counts the virtual memory
// of the process malloc.
// for fast, expect MemInfo::initialized() to be true.
//
// tcmalloc/jemalloc allocator cache does not participate in the mem check as part of the process physical memory.
// because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc allocator cache,
// but it may not actually alloc physical memory, which is not expected in mem hook fail.
if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() ||
MemInfo::sys_mem_available() < MemInfo::sys_mem_available_low_water_mark()) {
print_log_process_usage(
fmt::format("System Mem Exceed Limit Check Faild, Try Alloc: {}", bytes));
return true;
}
return false;
}
static bool sys_mem_exceed_limit_check(int64_t bytes);

void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; }
Type type() const { return _type; }
Expand Down Expand Up @@ -141,12 +118,6 @@ class MemTrackerLimiter final : public MemTracker {
static std::string log_process_usage_str(const std::string& msg, bool with_stacktrace = true);
static void print_log_process_usage(const std::string& msg, bool with_stacktrace = true);

// Log the memory usage when memory limit is exceeded.
std::string mem_limit_exceeded(const std::string& msg,
const std::string& limit_exceeded_errmsg);
Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_allocation_size = 0);

// Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed.
// vm_rss_str and mem_available_str recorded when gc is triggered, for log printing.
static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str,
Expand Down Expand Up @@ -176,28 +147,14 @@ class MemTrackerLimiter final : public MemTracker {
return querytid;
}

static std::string process_mem_log_str() {
return fmt::format(
"OS physical memory {}. Process memory usage {}, limit {}, soft limit {}. Sys "
"available memory {}, low water mark {}, warning water mark {}. Refresh interval "
"memory growth {} B",
PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES),
MemInfo::refresh_interval_memory_growth);
}

static std::string process_limit_exceeded_errmsg_str(int64_t bytes) {
return fmt::format(
"process memory used {} exceed limit {} or sys mem available {} less than low "
"water mark {}, failed alloc size {}",
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
MemInfo::sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
print_bytes(bytes));
}
static std::string process_mem_log_str();
static std::string process_limit_exceeded_errmsg_str(int64_t bytes);
// Log the memory usage when memory limit is exceeded.
std::string query_tracker_limit_exceeded_str(const std::string& tracker_limit_exceeded,
const std::string& last_consumer_tracker,
const std::string& executing_msg);
std::string tracker_limit_exceeded_str();
std::string tracker_limit_exceeded_str(int64_t bytes);

std::string debug_string() {
std::stringstream msg;
Expand All @@ -211,26 +168,11 @@ class MemTrackerLimiter final : public MemTracker {
private:
friend class ThreadMemTrackerMgr;

// Increases consumption of this tracker by 'bytes' only if will not exceeding limit.
// Returns true if the consumption was successfully updated.
WARN_UNUSED_RESULT
bool try_consume(int64_t bytes, std::string& failed_msg, bool& is_process_exceed);

// When the accumulated untracked memory value exceeds the upper limit,
// the current value is returned and set to 0.
// Thread safety.
int64_t add_untracked_mem(int64_t bytes);

static std::string tracker_limit_exceeded_errmsg_str(int64_t bytes,
MemTrackerLimiter* exceed_tracker) {
return fmt::format(
"failed alloc size {}, exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
print_bytes(bytes), exceed_tracker->label(), print_bytes(exceed_tracker->limit()),
print_bytes(exceed_tracker->_consumption->peak_value()),
print_bytes(exceed_tracker->_consumption->current_value()));
}

private:
Type _type;

Expand Down Expand Up @@ -267,46 +209,12 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) {
consume(consume_bytes);
}

inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg,
bool& is_process_exceed) {
if (bytes <= 0) {
release(-bytes);
failed_msg = std::string();
return true;
}

if (config::memory_debug && bytes > 1073741824) { // 1G
print_log_process_usage(fmt::format("Alloc Large Memory, Try Alloc: {}", bytes));
}

if (sys_mem_exceed_limit_check(bytes)) {
failed_msg = process_limit_exceeded_errmsg_str(bytes);
is_process_exceed = true;
return false;
}

if (_limit < 0 || (is_overcommit_tracker() && config::enable_query_memroy_overcommit)) {
_consumption->add(bytes); // No limit at this tracker.
} else {
if (!_consumption->try_add(bytes, _limit)) {
failed_msg = tracker_limit_exceeded_errmsg_str(bytes, this);
is_process_exceed = false;
return false;
}
}
failed_msg = std::string();
return true;
}

inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
if (sys_mem_exceed_limit_check(bytes)) {
return Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes));
}
if (bytes <= 0 || (is_overcommit_tracker() && config::enable_query_memroy_overcommit)) {
return Status::OK();
}
if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
return Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this));
return Status::MemoryLimitExceeded(tracker_limit_exceeded_str(bytes));
}
return Status::OK();
}
Expand Down
Loading