Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ void Daemon::memory_maintenance_thread() {
// TODO replace memory_gc_thread.

// step 6. Refresh weighted memory ratio of workload groups.
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();

// step 7. Analyze blocking queries.
Expand Down
7 changes: 0 additions & 7 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
if (workload_group_ptr) {
wg_ptr = workload_group_ptr;
wg_ptr->add_mem_tracker_limiter(mem_tracker);
_need_release_memtracker = true;
}
}
}
Expand All @@ -85,12 +84,6 @@ LoadChannel::~LoadChannel() {
rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first
<< ", num_rows_filtered: " << entry.second.second;
}
if (_need_release_memtracker) {
WorkloadGroupPtr wg_ptr = _query_thread_context.get_workload_group_ptr();
if (wg_ptr) {
wg_ptr->remove_mem_tracker_limiter(_query_thread_context.get_memory_tracker());
}
}
LOG(INFO) << "load channel removed"
<< " load_id=" << _load_id << ", is high priority=" << _is_high_priority
<< ", sender_ip=" << _sender_ip << rows_str.str();
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class LoadChannel {
int64_t _backend_id;

bool _enable_profile;
bool _need_release_memtracker = false;
};

inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ class MemTrackerLimiter final {
bool is_query_cancelled() { return _is_query_cancelled; }
void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); }

// Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove.
std::list<std::weak_ptr<MemTrackerLimiter>>::iterator wg_tracker_limiter_group_it;

/*
* Part 3, Memory tracking method (use carefully!)
*
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ QueryContext::~QueryContext() {
uint64_t group_id = 0;
if (_workload_group) {
group_id = _workload_group->id(); // before remove
_workload_group->remove_mem_tracker_limiter(query_mem_tracker);
_workload_group->remove_query(_query_id);
}

_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
Expand Down
84 changes: 51 additions & 33 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,32 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
}
}

// MemtrackerLimiter is not removed during query context release, so that should remove it here.
int64_t WorkloadGroup::make_memory_tracker_snapshots(
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
int64_t used_memory = 0;
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
for (const auto& trackerWptr : mem_tracker_group.trackers) {
auto tracker = trackerWptr.lock();
CHECK(tracker != nullptr);
if (tracker_snapshots != nullptr) {
tracker_snapshots->insert(tracker_snapshots->end(), tracker);
for (auto trackerWptr = mem_tracker_group.trackers.begin();
trackerWptr != mem_tracker_group.trackers.end();) {
auto tracker = trackerWptr->lock();
if (tracker == nullptr) {
trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
} else {
if (tracker_snapshots != nullptr) {
tracker_snapshots->insert(tracker_snapshots->end(), tracker);
}
used_memory += tracker->consumption();
++trackerWptr;
}
used_memory += tracker->consumption();
}
}
refresh_memory(used_memory);
// refresh total memory used.
_total_mem_used = used_memory;
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth.
_wg_refresh_interval_memory_growth.store(0.0);
_mem_used_status->set_value(used_memory);
return used_memory;
}
Expand All @@ -167,35 +178,38 @@ int64_t WorkloadGroup::memory_used() {
return make_memory_tracker_snapshots(nullptr);
}

void WorkloadGroup::refresh_memory(int64_t used_memory) {
// refresh total memory used.
_total_mem_used = used_memory;
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth.
_wg_refresh_interval_memory_growth.store(0.0);
}
void WorkloadGroup::do_sweep() {
// Clear memtracker limiter that is registered during query or load.
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
for (auto trackerWptr = mem_tracker_group.trackers.begin();
trackerWptr != mem_tracker_group.trackers.end();) {
auto tracker = trackerWptr->lock();
if (tracker == nullptr) {
trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
} else {
++trackerWptr;
}
}
}

void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
// Clear query context that is registered during query context ctor
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
mem_tracker_ptr->wg_tracker_limiter_group_it =
_mem_tracker_limiter_pool[group_num].trackers.insert(
_mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr);
for (auto iter = _query_ctxs.begin(); iter != _query_ctxs.end();) {
if (iter->second.lock() == nullptr) {
iter = _query_ctxs.erase(iter);
} else {
iter++;
}
}
}

void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
if (mem_tracker_ptr->wg_tracker_limiter_group_it !=
_mem_tracker_limiter_pool[group_num].trackers.end()) {
_mem_tracker_limiter_pool[group_num].trackers.erase(
mem_tracker_ptr->wg_tracker_limiter_group_it);
mem_tracker_ptr->wg_tracker_limiter_group_it =
_mem_tracker_limiter_pool[group_num].trackers.end();
}
_mem_tracker_limiter_pool[group_num].trackers.insert(
_mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr);
}

int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) {
Expand Down Expand Up @@ -230,14 +244,16 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile,
auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
const std::string& label) {
return fmt::format(
"{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, Execute "
"{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, "
"Execute "
"again after enough memory, details see be.INFO.",
cancel_str, label, MemCounter::print_bytes(mem_consumption),
GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
};
auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) {
return fmt::format(
"{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute again "
"{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute "
"again "
"after enough memory, details see be.INFO.",
cancel_str, label, MemCounter::print_bytes(mem_consumption),
GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
Expand All @@ -249,7 +265,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile,
_id, _name, _memory_limit, used_memory, need_free_mem);
Defer defer {[&]() {
LOG(INFO) << fmt::format(
"[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, used: "
"[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, "
"used: "
"{}, need_free_mem: {}, freed memory: {}.",
_id, _name, _memory_limit, used_memory, need_free_mem, freed_mem);
}};
Expand Down Expand Up @@ -542,7 +559,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
_cgroup_cpu_ctl->update_cpu_soft_limit(
CgroupCpuCtl::cpu_soft_limit_default_value());
} else {
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: "
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is "
"illegal: "
<< cpu_hard_limit << ", gid=" << tg_id;
}
} else {
Expand Down
15 changes: 2 additions & 13 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
// call make_memory_tracker_snapshots, so also refresh total memory used.
int64_t memory_used();
void refresh_memory(int64_t used_memory);

void do_sweep();

int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
Expand Down Expand Up @@ -132,8 +133,6 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr);

void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr);

// when mem_limit <=0 , it's an invalid value, then current group not participating in memory GC
// because mem_limit is not a required property
bool is_mem_limit_valid() {
Expand All @@ -154,11 +153,6 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return Status::OK();
}

void remove_query(TUniqueId query_id) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_query_ctxs.erase(query_id);
}

void shutdown() {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_is_shutdown = true;
Expand All @@ -169,11 +163,6 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _is_shutdown && _query_ctxs.empty();
}

int query_num() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _query_ctxs.size();
}

int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc);

void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
<< ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size;
}

void WorkloadGroupMgr::do_sweep() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
for (auto& [wg_id, wg] : _workload_groups) {
wg->do_sweep();
}
}

struct WorkloadGroupMemInfo {
int64_t total_mem_used = 0;
std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/workload_group/workload_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class WorkloadGroupMgr {

WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id);

void do_sweep();

void stop();

std::atomic<bool> _enable_cpu_hard_limit = false;
Expand Down