Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ namespace doris {
bool k_doris_exit = false;

void Daemon::tcmalloc_gc_thread() {
// TODO All cache GC wish to be supported
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(10))) {
size_t used_size = 0;
size_t free_size = 0;
Expand Down
6 changes: 0 additions & 6 deletions be/src/runtime/disk_io_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,6 @@ DiskIoMgr::~DiskIoMgr() {

Status DiskIoMgr::init(const int64_t mem_limit) {
_mem_tracker = std::make_unique<MemTrackerLimiter>(mem_limit, "DiskIO");
// If we hit the process limit, see if we can reclaim some memory by removing
// previously allocated (but unused) io buffers.
// TODO(zxy) After clearing the free buffer, how much impact will it have on subsequent
// queries may need to be verified.
ExecEnv::GetInstance()->process_mem_tracker()->add_gc_function(
std::bind<void>(&DiskIoMgr::gc_io_buffers, this, std::placeholders::_1));

for (int i = 0; i < _disk_queues.size(); ++i) {
_disk_queues[i] = new DiskQueue(i);
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot
}

std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) {
return fmt::format("MemTracker Label={}, Parent Label={}, Used={}, Peak={}", snapshot.label,
snapshot.parent,
PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES));
return fmt::format(
"MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label,
snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES),
snapshot.peak_consumption);
}

static std::unordered_map<std::string, std::shared_ptr<MemTracker>> global_mem_trackers;
Expand Down
89 changes: 27 additions & 62 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe
MemTrackerLimiter* tracker = this;
while (tracker != nullptr) {
_all_ancestors.push_back(tracker);
// Process tracker does not participate in the process memory limit, process tracker consumption is virtual memory,
// and there is a diff between the real physical memory value of the process. It is replaced by check_sys_mem_info.
if (tracker->has_limit() && tracker->label() != "Process")
_limited_ancestors.push_back(tracker);
tracker = tracker->_parent.get();
Expand Down Expand Up @@ -124,42 +126,6 @@ int64_t MemTrackerLimiter::get_lowest_limit() const {
return min_limit;
}

bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
if (max_consumption < 0) return true;
std::lock_guard<std::mutex> l(_gc_lock);
int64_t pre_gc_consumption = consumption();
// Check if someone gc'd before us
if (pre_gc_consumption < max_consumption) return false;

int64_t curr_consumption = pre_gc_consumption;
// Free some extra memory to avoid frequent GC, 4M is an empirical value, maybe it will be tested later.
const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L;
// Try to free up some memory
for (int i = 0; i < _gc_functions.size(); ++i) {
// Try to free up the amount we are over plus some extra so that we don't have to
// immediately GC again. Don't free all the memory since that can be unnecessarily
// expensive.
int64_t bytes_to_free = curr_consumption - max_consumption + EXTRA_BYTES_TO_FREE;
_gc_functions[i](bytes_to_free);
curr_consumption = consumption();
if (max_consumption - curr_consumption <= EXTRA_BYTES_TO_FREE) break;
}

return curr_consumption > max_consumption;
}

Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
if (UNLIKELY(gc_memory(_limit - bytes))) {
return Status::MemoryLimitExceeded(fmt::format(
"failed_alloc_size={} B, exceeded_tracker={}, limit={} B, peak_used={} B, "
"current_used={} B",
bytes, label(), _limit, _consumption->value(), _consumption->current_value()));
}
VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << _consumption->current_value() << " limit=" << _limit;
return Status::OK();
}

// Calling this on the query tracker results in output like:
//
// Query(4a4c81fedaed337d:4acadfda00000000) Limit=10.00 GB Total=508.28 MB Peak=508.45 MB
Expand All @@ -186,10 +152,10 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logge
int64_t peak_consumption = _consumption->value();
if (logged_consumption != nullptr) *logged_consumption = curr_consumption;

std::string detail = "MemTrackerLimiter Label={}, Limit={}, Used={}, Peak={}, Exceeded={}";
detail = fmt::format(detail, _label, PrettyPrinter::print(_limit, TUnit::BYTES),
PrettyPrinter::print(curr_consumption, TUnit::BYTES),
PrettyPrinter::print(peak_consumption, TUnit::BYTES),
std::string detail =
"MemTrackerLimiter Label={}, Limit={}({} B), Used={}({} B), Peak={}({} B), Exceeded={}";
detail = fmt::format(detail, _label, print_bytes(_limit), _limit, print_bytes(curr_consumption),
curr_consumption, print_bytes(peak_consumption), peak_consumption,
limit_exceeded() ? "true" : "false");

// This call does not need the children, so return early.
Expand Down Expand Up @@ -223,17 +189,15 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth,
if (!usage_string.empty()) usage_strings.push_back(usage_string);
*logged_consumption += tracker_consumption;
}
return join(usage_strings, "\n");
return usage_strings.size() == 0 ? "" : "\n " + join(usage_strings, "\n ");
}

Status MemTrackerLimiter::mem_limit_exceeded_construct(const std::string& msg) {
std::string detail = fmt::format(
"{}, backend {} process memory used {}, process limit {}. If is query, can "
"change the limit "
"by `set exec_mem_limit=xxx`, details mem usage see be.INFO.",
msg, BackendOptions::get_localhost(),
PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES));
"{}. backend {} process memory used {}, limit {}. If query tracker exceed, `set "
"exec_mem_limit=8G` to change limit, details mem usage see be.INFO.",
msg, BackendOptions::get_localhost(), print_bytes(PerfCounters::get_vm_rss()),
print_bytes(MemInfo::mem_limit()));
return Status::MemoryLimitExceeded(detail);
}

Expand All @@ -258,7 +222,7 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) {
Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
int64_t failed_allocation_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail = fmt::format("Memory limit exceeded:<consuming_tracker={}, ", _label);
std::string detail = fmt::format("Memory limit exceeded:<consuming tracker:<{}>, ", _label);
MemTrackerLimiter* exceeded_tracker = nullptr;
MemTrackerLimiter* max_consumption_tracker = nullptr;
int64_t free_size = INT64_MAX;
Expand All @@ -281,28 +245,29 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
MemTrackerLimiter* print_log_usage_tracker = nullptr;
if (exceeded_tracker != nullptr) {
detail += fmt::format(
"failed_alloc_size={} B, exceeded_tracker={}, limit={} B, peak_used={} B, "
"current_used={} B>, executing_msg:<{}>",
PrettyPrinter::print(failed_allocation_size, TUnit::BYTES),
exceeded_tracker->label(), exceeded_tracker->limit(),
exceeded_tracker->peak_consumption(), exceeded_tracker->consumption(), msg);
"failed alloc size {}, exceeded tracker:<{}>, limit {}, peak used {}, "
"current used {}>, executing msg:<{}>",
print_bytes(failed_allocation_size), exceeded_tracker->label(),
print_bytes(exceeded_tracker->limit()),
print_bytes(exceeded_tracker->peak_consumption()),
print_bytes(exceeded_tracker->consumption()), msg);
print_log_usage_tracker = exceeded_tracker;
} else if (!sys_exceed_st) {
detail += fmt::format("{}>, executing_msg:<{}>", sys_exceed_st.get_error_msg(), msg);
detail += fmt::format("{}>, executing msg:<{}>", sys_exceed_st.get_error_msg(), msg);
} else if (max_consumption_tracker != nullptr) {
// must after check_sys_mem_info false
detail += fmt::format(
"failed_alloc_size={} B, max_consumption_tracker={}, limit={} B, peak_used={} B, "
"current_used={} B>, executing_msg:<{}>",
PrettyPrinter::print(failed_allocation_size, TUnit::BYTES),
max_consumption_tracker->label(), max_consumption_tracker->limit(),
max_consumption_tracker->peak_consumption(), max_consumption_tracker->consumption(),
msg);
"failed alloc size {}, max consumption tracker:<{}>, limit {}, peak used {}, "
"current used {}>, executing msg:<{}>",
print_bytes(failed_allocation_size), max_consumption_tracker->label(),
print_bytes(max_consumption_tracker->limit()),
print_bytes(max_consumption_tracker->peak_consumption()),
print_bytes(max_consumption_tracker->consumption()), msg);
print_log_usage_tracker = max_consumption_tracker;
} else {
// The limit of the current tracker and parents is less than 0, the consume will not fail,
// and the current process memory has no excess limit.
detail += fmt::format("unknown exceed reason, executing_msg:<{}>", msg);
detail += fmt::format("unknown exceed reason, executing msg:<{}>", msg);
print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw();
}
auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
Expand All @@ -316,7 +281,7 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
Status failed_try_consume_st) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail =
fmt::format("Memory limit exceeded:<consuming_tracker={}, {}>, executing_msg:<{}>",
fmt::format("Memory limit exceeded:<consuming tracker:<{}>, {}>, executing msg:<{}>",
_label, failed_try_consume_st.get_error_msg(), msg);
auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
failed_tracker->print_log_usage(st.get_error_msg());
Expand Down
73 changes: 25 additions & 48 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ class MemTrackerLimiter final : public MemTracker {
// for fast, expect MemInfo::initialized() to be true.
if (PerfCounters::get_vm_rss() + bytes >= MemInfo::mem_limit()) {
auto st = Status::MemoryLimitExceeded(
"process memory used {} B, exceed limit {} B, failed_alloc_size={} B",
PerfCounters::get_vm_rss(), MemInfo::mem_limit(), bytes);
"process memory used {}, exceed limit {}, failed alloc size {}",
print_bytes(PerfCounters::get_vm_rss()), print_bytes(MemInfo::mem_limit()),
print_bytes(bytes));
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(st.get_error_msg());
return st;
}
Expand Down Expand Up @@ -108,21 +109,6 @@ class MemTrackerLimiter final : public MemTracker {
// Returns the lowest limit for this tracker limiter and its ancestors. Returns -1 if there is no limit.
int64_t get_lowest_limit() const;

typedef std::function<void(int64_t bytes_to_free)> GcFunction;
// Add a function 'f' to be called if the limit is reached, if none of the other
// previously-added GC functions were successful at freeing up enough memory.
// 'f' does not need to be thread-safe as long as it is added to only one tracker limiter.
// Note that 'f' must be valid for the lifetime of this tracker limiter.
void add_gc_function(GcFunction f) { _gc_functions.push_back(f); }

// TODO Should be managed in a separate process_mem_mgr, not in MemTracker
// If consumption is higher than max_consumption, attempts to free memory by calling
// any added GC functions. Returns true if max_consumption is still exceeded. Takes gc_lock.
// Note: If the cache of segment/chunk is released due to insufficient query memory at a certain moment,
// the performance of subsequent queries may be degraded, so the use of gc function should be careful enough.
bool gc_memory(int64_t max_consumption);
Status try_gc_memory(int64_t bytes);

public:
// up to (but not including) end_tracker.
// This happens when we want to update tracking on a particular mem tracker but the consumption
Expand All @@ -147,6 +133,7 @@ class MemTrackerLimiter final : public MemTracker {
// Limiting the recursive depth reduces the cost of dumping, particularly
// for the process tracker limiter.
std::string log_usage(int max_recursive_depth = INT_MAX, int64_t* logged_consumption = nullptr);
void print_log_usage(const std::string& msg);

// Log the memory usage when memory limit is exceeded and return a status object with
// msg of the allocation which caused the limit to be exceeded.
Expand All @@ -169,6 +156,10 @@ class MemTrackerLimiter final : public MemTracker {
return msg.str();
}

static std::string print_bytes(int64_t bytes) {
return fmt::format("{}", PrettyPrinter::print(bytes, TUnit::BYTES));
}

private:
// The following func, for automatic memory tracking and limiting based on system memory allocation.
friend class ThreadMemTrackerMgr;
Expand Down Expand Up @@ -198,7 +189,6 @@ class MemTrackerLimiter final : public MemTracker {
int64_t* logged_consumption);

static Status mem_limit_exceeded_construct(const std::string& msg);
void print_log_usage(const std::string& msg);

private:
// Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。
Expand Down Expand Up @@ -230,19 +220,6 @@ class MemTrackerLimiter final : public MemTracker {
std::atomic_size_t _had_child_count = 0;

bool _print_log_usage = false;

// Lock to protect gc_memory(). This prevents many GCs from occurring at once.
std::mutex _gc_lock;
// Functions to call after the limit is reached to free memory.
// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is
// reached. If limit_exceeded() is called and the limit is exceeded, it will first call
// the GcFunctions to try to free memory and recheck the limit. For example, the process
// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so
// this will be called before the process limit is reported as exceeded. GcFunctions are
// called in the order they are added, so expensive functions should be added last.
// GcFunctions are called with a global lock held, so should be non-blocking and not
// call back into MemTrackers, except to release memory.
std::vector<GcFunction> _gc_functions;
};

inline void MemTrackerLimiter::consume(int64_t bytes) {
Expand Down Expand Up @@ -286,19 +263,17 @@ inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
if (tracker->limit() < 0 || tracker->label() == "Process") {
tracker->_consumption->add(bytes); // No limit at this tracker.
} else {
// If TryConsume fails, we can try to GC, but we may need to try several times if
// there are concurrent consumers because we don't take a lock before trying to
// update _consumption.
while (true) {
if (LIKELY(tracker->_consumption->try_add(bytes, tracker->limit()))) break;
Status st = tracker->try_gc_memory(bytes);
if (!st) {
// Failed for this mem tracker. Roll back the ones that succeeded.
for (int j = _all_ancestors.size() - 1; j > i; --j) {
_all_ancestors[j]->_consumption->add(-bytes);
}
return st;
if (!tracker->_consumption->try_add(bytes, tracker->limit())) {
// Failed for this mem tracker. Roll back the ones that succeeded.
for (int j = _all_ancestors.size() - 1; j > i; --j) {
_all_ancestors[j]->_consumption->add(-bytes);
}
return Status::MemoryLimitExceeded(fmt::format(
"failed alloc size {}, exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
print_bytes(bytes), tracker->label(), print_bytes(tracker->limit()),
print_bytes(tracker->_consumption->value()),
print_bytes(tracker->_consumption->current_value())));
}
}
}
Expand All @@ -314,11 +289,13 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
// Walk the tracker tree top-down.
for (i = _limited_ancestors.size() - 1; i >= 0; --i) {
MemTrackerLimiter* tracker = _limited_ancestors[i];
// Process tracker does not participate in the process memory limit, process tracker consumption is virtual memory,
// and there is a diff between the real physical memory value of the process. It is replaced by check_sys_mem_info.
while (true) {
if (LIKELY(tracker->_consumption->current_value() + bytes < tracker->limit())) break;
RETURN_IF_ERROR(tracker->try_gc_memory(bytes));
if (tracker->_consumption->current_value() + bytes > tracker->limit()) {
return Status::MemoryLimitExceeded(
fmt::format("expected alloc size {}, exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
print_bytes(bytes), tracker->label(), print_bytes(tracker->limit()),
print_bytes(tracker->_consumption->value()),
print_bytes(tracker->_consumption->current_value())));
}
}
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ int main(int argc, char** argv) {
// this will cause coredump for ASAN build when running regression test,
// disable temporarily.
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
// The process tracker print log usage interval is 1s to avoid a large number of tasks being
// canceled when the process exceeds the mem limit, resulting in too many duplicate logs.
doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage();
sleep(1);
}
Expand Down
Loading