Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& r
int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s;
do {
// only try to reserve process memory
st = thread_context->thread_mem_tracker_mgr->try_reserve(size, true);
st = thread_context->thread_mem_tracker_mgr->try_reserve(
size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS);
if (st.ok()) {
memtable_flush_executor->inc_flushing_task();
break;
Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/memory/global_memory_arbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ void GlobalMemoryArbitrator::refresh_memory_bvar() {
memory_arbitrator_refresh_interval_growth_bytes.get_value();
}

bool GlobalMemoryArbitrator::reserve_process_memory(int64_t bytes) {
int64_t old_reserved_mem = _process_reserved_memory.load(std::memory_order_relaxed);
int64_t new_reserved_mem = 0;
do {
new_reserved_mem = old_reserved_mem + bytes;
} while (!_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem,
std::memory_order_relaxed));
return true;
}

bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark()) {
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/memory/global_memory_arbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class GlobalMemoryArbitrator {
return msg;
}

static bool reserve_process_memory(int64_t bytes);
static bool try_reserve_process_memory(int64_t bytes);
static void shrink_process_reserved(int64_t bytes);

Expand Down
5 changes: 0 additions & 5 deletions be/src/runtime/memory/mem_counter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,6 @@ class MemCounter {
int64_t current_value() const { return _current_value.load(std::memory_order_relaxed); }
int64_t peak_value() const { return _peak_value.load(std::memory_order_relaxed); }

static std::string print_bytes(int64_t bytes) {
return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
: "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES);
}

private:
std::atomic<int64_t> _current_value {0};
std::atomic<int64_t> _peak_value {0};
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/memory/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class MemTracker final {
const std::string& label() const { return _label; }
std::string log_usage() const {
return fmt::format("MemTracker name={}, Used={}({} B), Peak={}({} B)", _label,
MemCounter::print_bytes(consumption()), consumption(),
MemCounter::print_bytes(peak_consumption()), peak_consumption());
PrettyPrinter::print_bytes(consumption()), consumption(),
PrettyPrinter::print_bytes(peak_consumption()), peak_consumption());
}

private:
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,10 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
std::string err_msg = fmt::format(
"memory tracker limit exceeded, tracker label:{}, type:{}, limit "
"{}, peak used {}, current used {}. backend {}, {}.",
label(), type_string(_type), MemCounter::print_bytes(limit()),
MemCounter::print_bytes(peak_consumption()), MemCounter::print_bytes(consumption()),
BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str());
label(), type_string(_type), PrettyPrinter::print_bytes(limit()),
PrettyPrinter::print_bytes(peak_consumption()),
PrettyPrinter::print_bytes(consumption()), BackendOptions::get_localhost(),
GlobalMemoryArbitrator::process_memory_used_str());
if (_type == Type::QUERY || _type == Type::LOAD) {
err_msg += fmt::format(
" exec node:<{}>, can `set exec_mem_limit` to change limit, details see be.INFO.",
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
// it will not take effect.
if (consumption() + bytes > _limit) {
return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}",
MemCounter::print_bytes(bytes),
PrettyPrinter::print_bytes(bytes),
tracker_limit_exceeded_str()));
}
return Status::OK();
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/memory/memory_reclamation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,16 @@ bool MemoryReclamation::revoke_process_memory(const std::string& revoke_reason)
LOG(INFO) << fmt::format(
"[MemoryGC] start MemoryReclamation::revoke_process_memory, {}, need free size: {}.",
GlobalMemoryArbitrator::process_mem_log_str(),
MemCounter::print_bytes(MemInfo::process_full_gc_size()));
PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()));
Defer defer {[&]() {
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] end MemoryReclamation::revoke_process_memory, {}, need free size: {}, "
"free Memory {}. cost(us): {}, details: {}",
GlobalMemoryArbitrator::process_mem_log_str(),
MemCounter::print_bytes(MemInfo::process_full_gc_size()),
MemCounter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str());
PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()),
PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str());
}};

// step1: start canceling from the query with the largest memory usage until the memory of process_full_gc_size is freed.
Expand Down
77 changes: 51 additions & 26 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,21 @@ class ThreadMemTrackerMgr {
void consume(int64_t size);
void flush_untracked_mem();

enum class TryReserveChecker {
NONE = 0,
CHECK_TASK = 1,
CHECK_WORKLOAD_GROUP = 2,
CHECK_TASK_AND_WORKLOAD_GROUP = 3,
CHECK_PROCESS = 4,
CHECK_TASK_AND_PROCESS = 5,
CHECK_WORKLOAD_GROUP_AND_PROCESS = 6,
CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS = 7,
};

// if only_check_process_memory == true, still reserve query, wg, process memory, only check process memory.
MOCK_FUNCTION doris::Status try_reserve(int64_t size, bool only_check_process_memory = false);
MOCK_FUNCTION doris::Status try_reserve(
int64_t size, TryReserveChecker checker =
TryReserveChecker::CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS);

void shrink_reserved();

Expand Down Expand Up @@ -290,8 +303,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
_stop_consume = false;
}

inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size,
bool only_check_process_memory) {
inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size, TryReserveChecker checker) {
DCHECK(size >= 0);
CHECK(init());
DCHECK(_limiter_tracker);
Expand All @@ -301,48 +313,61 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size,
flush_untracked_mem();
auto wg_ptr = _wg_wptr.lock();

if (only_check_process_memory) {
_limiter_tracker->reserve(size);
if (wg_ptr) {
wg_ptr->add_wg_refresh_interval_memory_growth(size);
}
} else {
bool task_limit_checker = static_cast<int>(checker) & 1;
bool workload_group_limit_checker = static_cast<int>(checker) & 2;
bool process_limit_checker = static_cast<int>(checker) & 4;

if (task_limit_checker) {
if (!_limiter_tracker->try_reserve(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because query memory exceeded, memory "
"tracker "
"consumption: {}, limit: {}",
PrettyPrinter::print(size, TUnit::BYTES),
PrettyPrinter::print(_limiter_tracker->consumption(), TUnit::BYTES),
PrettyPrinter::print(_limiter_tracker->limit(), TUnit::BYTES));
"tracker: {}, "
"consumption: {}, limit: {}, peak: {}",
PrettyPrinter::print_bytes(size), _limiter_tracker->label(),
PrettyPrinter::print_bytes(_limiter_tracker->consumption()),
PrettyPrinter::print_bytes(_limiter_tracker->limit()),
PrettyPrinter::print_bytes(_limiter_tracker->peak_consumption()));
return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
}
if (wg_ptr) {
} else {
_limiter_tracker->reserve(size);
}

if (wg_ptr) {
if (workload_group_limit_checker) {
if (!wg_ptr->try_add_wg_refresh_interval_memory_growth(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because workload group memory exceeded, "
"workload group: {}",
PrettyPrinter::print(size, TUnit::BYTES), wg_ptr->memory_debug_string());
PrettyPrinter::print_bytes(size), wg_ptr->memory_debug_string());
_limiter_tracker->release(size); // rollback
_limiter_tracker->shrink_reserved(size); // rollback
return doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg);
}
} else {
wg_ptr->add_wg_refresh_interval_memory_growth(size);
}
}

if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
auto err_msg =
fmt::format("reserve memory failed, size: {}, because proccess memory exceeded, {}",
PrettyPrinter::print(size, TUnit::BYTES),
GlobalMemoryArbitrator::process_mem_log_str());
_limiter_tracker->release(size); // rollback
_limiter_tracker->shrink_reserved(size); // rollback
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
if (process_limit_checker) {
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because proccess memory exceeded, {}",
PrettyPrinter::print_bytes(size),
GlobalMemoryArbitrator::process_mem_log_str());
_limiter_tracker->release(size); // rollback
_limiter_tracker->shrink_reserved(size); // rollback
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
}
return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(err_msg);
}
return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(err_msg);
} else {
doris::GlobalMemoryArbitrator::reserve_process_memory(size);
}

_reserved_mem += size;
DCHECK(_reserved_mem >= 0);
return doris::Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ QueryContext::~QueryContext() {
mem_tracker_msg = fmt::format(
"deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
"PeakUsed={}",
print_id(_query_id), MemCounter::print_bytes(query_mem_tracker()->limit()),
MemCounter::print_bytes(query_mem_tracker()->consumption()),
MemCounter::print_bytes(query_mem_tracker()->peak_consumption()));
print_id(_query_id), PrettyPrinter::print_bytes(query_mem_tracker()->limit()),
PrettyPrinter::print_bytes(query_mem_tracker()->consumption()),
PrettyPrinter::print_bytes(query_mem_tracker()->peak_consumption()));
}
[[maybe_unused]] uint64_t group_id = 0;
if (workload_group()) {
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,18 +257,18 @@ int64_t WorkloadGroup::revoke_memory(int64_t need_free_mem, const std::string& r

auto group_revoke_reason = fmt::format(
"{}, revoke group id:{}, name:{}, used:{}, limit:{}", revoke_reason, _id, _name,
MemCounter::print_bytes(used_memory), MemCounter::print_bytes(_memory_limit));
PrettyPrinter::print_bytes(used_memory), PrettyPrinter::print_bytes(_memory_limit));
LOG(INFO) << fmt::format(
"[MemoryGC] start WorkloadGroup::revoke_memory, {}, need free size: {}.",
group_revoke_reason, MemCounter::print_bytes(need_free_mem));
group_revoke_reason, PrettyPrinter::print_bytes(need_free_mem));
Defer defer {[&]() {
std::stringstream ss;
group_revoke_profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] end WorkloadGroup::revoke_memory, {}, need free size: {}, free Memory "
"{}. cost(us): {}, details: {}",
group_revoke_reason, MemCounter::print_bytes(need_free_mem),
MemCounter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str());
group_revoke_reason, PrettyPrinter::print_bytes(need_free_mem),
PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str());
}};

// step 1. free top overcommit query
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,14 +699,14 @@ int64_t WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_(
"[MemoryGC] start WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, {}, "
"number of overcommited groups: {}, total exceeded memory: {}.",
revoke_reason, exceeded_memory_heap.size(),
MemCounter::print_bytes(total_exceeded_memory));
PrettyPrinter::print_bytes(total_exceeded_memory));
Defer defer {[&]() {
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] end WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, "
"{}, number of overcommited groups: {}, free memory {}. cost(us): {}, details: {}",
revoke_reason, exceeded_memory_heap.size(), MemCounter::print_bytes(freed_mem),
revoke_reason, exceeded_memory_heap.size(), PrettyPrinter::print_bytes(freed_mem),
watch.elapsed_time() / 1000, ss.str());
}};

Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/workload_management/memory_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ namespace doris {
std::string MemoryContext::debug_string() {
return fmt::format("TaskId={}, Memory(Used={}, Limit={}, Peak={})",
print_id(resource_ctx_->task_controller()->task_id()),
MemCounter::print_bytes(current_memory_bytes()),
MemCounter::print_bytes(mem_limit()),
MemCounter::print_bytes(peak_memory_bytes()));
PrettyPrinter::print_bytes(current_memory_bytes()),
PrettyPrinter::print_bytes(mem_limit()),
PrettyPrinter::print_bytes(peak_memory_bytes()));
}

#include "common/compile_check_end.h"
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/workload_management/task_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ std::string TaskController::debug_string() {
"TaskId={}, Memory(Used={}, Limit={}, Peak={}), Spill(RunningSpillTaskCnt={}, "
"TotalPausedPeriodSecs={}, LatestPausedReason={})",
print_id(task_id_),
MemCounter::print_bytes(resource_ctx_->memory_context()->current_memory_bytes()),
MemCounter::print_bytes(resource_ctx_->memory_context()->mem_limit()),
MemCounter::print_bytes(resource_ctx_->memory_context()->peak_memory_bytes()),
PrettyPrinter::print_bytes(resource_ctx_->memory_context()->current_memory_bytes()),
PrettyPrinter::print_bytes(resource_ctx_->memory_context()->mem_limit()),
PrettyPrinter::print_bytes(resource_ctx_->memory_context()->peak_memory_bytes()),
revoking_tasks_count_, memory_sufficient_time() / NANOS_PER_SEC,
paused_reason_.status().to_string());
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/util/pretty_printer.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ class PrettyPrinter {

/// Convenience method
static std::string print_bytes(int64_t value) {
return PrettyPrinter::print(value, TUnit::BYTES);
return value >= 0 ? PrettyPrinter::print(value, TUnit::BYTES)
: "-" + PrettyPrinter::print(std::abs(value), TUnit::BYTES);
}

private:
Expand Down
Loading
Loading