From 206f9c58f03ffacefc94b80c3c8d45a7478dd4b7 Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Fri, 11 Nov 2022 23:51:09 +0800 Subject: [PATCH 1/7] update --- be/src/runtime/load_channel_mgr.cpp | 6 ++ be/src/runtime/load_channel_mgr.h | 96 ++++++++++++++++++++--------- 2 files changed, 73 insertions(+), 29 deletions(-) diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 6eef349fe1ca3e..26f5596c3bb4e6 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -70,6 +70,12 @@ LoadChannelMgr::~LoadChannelMgr() { Status LoadChannelMgr::init(int64_t process_mem_limit) { _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit); _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100; + // If a load channel's memory consumption is no more than 10% of the hard limit, it's not + // worth to reduce memory on it. Since we only reduce 1/3 memory for one load channel, + // for a channel consume 10% of hard limit, we can only release about 3% memory each time, + // it's not quite helpfull to reduce memory pressure. + // In this case we need to pick multiple load channels to reduce memory more effectively. + _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.05; _mem_tracker = std::make_unique("LoadChannelMgr"); _mem_tracker_set = std::make_unique(MemTrackerLimiter::Type::LOAD, "LoadChannelMgrTrackerSet"); diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 3f27eafd0e8df0..736efb3ceacd32 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -87,7 +87,6 @@ class LoadChannelMgr { std::mutex _lock; // load id -> load channel std::unordered_map> _load_channels; - std::shared_ptr _reduce_memory_channel = nullptr; Cache* _last_success_channel = nullptr; // check the total load channel mem consumption of this Backend @@ -96,6 +95,14 @@ class LoadChannelMgr { std::unique_ptr _mem_tracker_set; int64_t _load_hard_mem_limit = -1; int64_t _load_soft_mem_limit = -1; + // By default, we try to reduce memory on the load channel with largest mem consumption, + // but if there are lots of small load channel, even the largest one consumes very + // small memory, in this case we need to pick multiple load channels to reduce memory + // more effectively. + // `_load_channel_min_mem_to_reduce` is used to determine whether the largest load channel's + // memory consumption is big enough. + int64_t _load_channel_min_mem_to_reduce = -1; + bool _soft_reduce_mem_in_progress = false; // If hard limit reached, one thread will trigger load channel flush, // other threads should wait on the condition variable. @@ -171,10 +178,9 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { return Status::OK(); } - // Pick load channel to reduce memory. - std::shared_ptr channel; // Indicate whether current thread is reducing mem on hard limit. bool reducing_mem_on_hard_limit = false; + std::vector channels_to_reduce_mem; { std::unique_lock l(_lock); while (_should_wait_flush) { @@ -182,41 +188,71 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) << ", waiting for flush"; _wait_flush_cond.wait(l); } + bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || + MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit; // Some other thread is flushing data, and not reached hard limit now, // we don't need to handle mem limit in current thread. - if (_reduce_memory_channel != nullptr && - _mem_tracker->consumption() < _load_hard_mem_limit && - MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { + if (_soft_reduce_mem_in_progress && !hard_limit_reached) { return Status::OK(); } - // We need to pick a LoadChannel to reduce memory usage. - // If `_reduce_memory_channel` is not null, it means the hard limit is - // exceed now, we still need to pick a load channel again. Because - // `_reduce_memory_channel` might not be the largest consumer now. - int64_t max_consume = 0; + // Pick LoadChannels to reduce memory usage, if some other thread is reducing memory + // due to soft limit, and we reached hard limit now, current thread may pick some + // duplicate channels and trigger duplicate reducing memory process. + // But the load channel's reduce memory process is thread safe, only 1 thread can + // reduce memory at the same time, other threads will wait on a condition variable, + // after the reduce-memory work finished, all threads will return. + std::vector candidate_channels; + int64_t total_consume = 0; for (auto& kv : _load_channels) { if (kv.second->is_high_priority()) { // do not select high priority channel to reduce memory // to avoid blocking them. continue; } - if (kv.second->mem_consumption() > max_consume) { - max_consume = kv.second->mem_consumption(); - channel = kv.second; - } + candidate_channels.push_back(kv.second.get()); + total_consume += kv.second->mem_consumption(); } - if (max_consume == 0) { + + if (candidate_channels.empty()) { // should not happen, add log to observe - LOG(WARNING) << "failed to find suitable load channel when total load mem limit exceed"; + LOG(WARNING) << "All load channels are high priority, failed to find suitable" + << "channels to reduce memory when total load mem limit exceed"; return Status::OK(); } - DCHECK(channel.get() != nullptr); - _reduce_memory_channel = channel; + + // sort all load channels, try to find the largest one. + std::sort(candidate_channels.begin(), candidate_channels.end(), + [](LoadChannel* lhs, LoadChannel* rhs) { + return lhs->mem_consumption() > rhs->mem_consumption(); + }); + + int64_t mem_consumption_in_picked_channel = 0; + auto largest_channel = *candidate_channels.begin(); + // If some load-channel is big enough, we can reduce it only, try our best to avoid + // reducing small load channels. + if (_load_channel_min_mem_to_reduce > 0 && + largest_channel->mem_consumption() > _load_channel_min_mem_to_reduce) { + // Pick 1 load channel to reduce memory. + channels_to_reduce_mem.push_back(largest_channel); + mem_consumption_in_picked_channel = largest_channel->mem_consumption(); + } else { + // Pick multiple channels to reduce memory. + int64_t mem_to_flushed = total_consume / 3; + for (auto ch : candidate_channels) { + channels_to_reduce_mem.push_back(ch); + mem_consumption_in_picked_channel += ch->mem_consumption(); + if (mem_consumption_in_picked_channel >= mem_to_flushed) { + break; + } + } + } std::ostringstream oss; if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { - oss << "reducing memory of " << *channel << " because total load mem consumption " + oss << "reducing memory of " << channels_to_reduce_mem.size() + << " load channels (total mem consumption: " << mem_consumption_in_picked_channel + << " bytes), because total load mem consumption " << PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES) << " has exceeded"; if (_mem_tracker->consumption() > _load_hard_mem_limit) { @@ -224,24 +260,26 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) reducing_mem_on_hard_limit = true; oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES); } else { + _soft_reduce_mem_in_progress = true; oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES); } } else { _should_wait_flush = true; reducing_mem_on_hard_limit = true; - oss << "reducing memory of " << *channel << " because process memory used " - << PerfCounters::get_vm_rss_str() << " has exceeded limit " + oss << "reducing memory of " << channels_to_reduce_mem.size() + << " load channels (total mem consumption: " << mem_consumption_in_picked_channel + << " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit " << PrettyPrinter::print(process_mem_limit, TUnit::BYTES) << " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str(); } LOG(INFO) << oss.str(); } - // No matter soft limit or hard limit reached, only 1 thread will wait here, - // if hard limit reached, other threads will pend at the beginning of this - // method. - Status st = channel->handle_mem_exceed_limit(response); - LOG(INFO) << "reduce memory of " << *channel << " finished"; + Status st = Status::OK(); + for (auto ch : channels_to_reduce_mem) { + LOG(INFO) << "reduce memory of " << *ch; + st = ch->template handle_mem_exceed_limit(response); + } { std::lock_guard l(_lock); @@ -251,8 +289,8 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) _should_wait_flush = false; _wait_flush_cond.notify_all(); } - if (_reduce_memory_channel == channel) { - _reduce_memory_channel = nullptr; + if (_soft_reduce_mem_in_progress) { + _soft_reduce_mem_in_progress = false; } } return st; From 7415de03949b5bf60438cc7e6770bae18c1e976b Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Sat, 12 Nov 2022 01:31:41 +0800 Subject: [PATCH 2/7] update --- be/src/runtime/load_channel_mgr.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 736efb3ceacd32..250a087d0c6ede 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -277,8 +277,11 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) Status st = Status::OK(); for (auto ch : channels_to_reduce_mem) { - LOG(INFO) << "reduce memory of " << *ch; + uint64_t begin = GetCurrentTimeMicros(); + int64_t mem_usage = ch->mem_consumption(); st = ch->template handle_mem_exceed_limit(response); + LOG(INFO) << "reduced memory of " << *ch << ", cost " << GetCurrentTimeMicros() - begin + << " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes"; } { From 0d227087657b40e22f8185f8287b0205873237d1 Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Sat, 12 Nov 2022 02:07:53 +0800 Subject: [PATCH 3/7] update --- be/src/runtime/load_channel_mgr.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 250a087d0c6ede..115a7cad394a55 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -202,7 +202,7 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) // But the load channel's reduce memory process is thread safe, only 1 thread can // reduce memory at the same time, other threads will wait on a condition variable, // after the reduce-memory work finished, all threads will return. - std::vector candidate_channels; + std::vector> candidate_channels; int64_t total_consume = 0; for (auto& kv : _load_channels) { if (kv.second->is_high_priority()) { @@ -210,7 +210,7 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) // to avoid blocking them. continue; } - candidate_channels.push_back(kv.second.get()); + candidate_channels.push_back(kv.second); total_consume += kv.second->mem_consumption(); } @@ -234,13 +234,13 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) if (_load_channel_min_mem_to_reduce > 0 && largest_channel->mem_consumption() > _load_channel_min_mem_to_reduce) { // Pick 1 load channel to reduce memory. - channels_to_reduce_mem.push_back(largest_channel); + channels_to_reduce_mem.push_back(largest_channel.get()); mem_consumption_in_picked_channel = largest_channel->mem_consumption(); } else { // Pick multiple channels to reduce memory. int64_t mem_to_flushed = total_consume / 3; for (auto ch : candidate_channels) { - channels_to_reduce_mem.push_back(ch); + channels_to_reduce_mem.push_back(ch.get()); mem_consumption_in_picked_channel += ch->mem_consumption(); if (mem_consumption_in_picked_channel >= mem_to_flushed) { break; @@ -279,7 +279,7 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) for (auto ch : channels_to_reduce_mem) { uint64_t begin = GetCurrentTimeMicros(); int64_t mem_usage = ch->mem_consumption(); - st = ch->template handle_mem_exceed_limit(response); + st = ch->handle_mem_exceed_limit(response); LOG(INFO) << "reduced memory of " << *ch << ", cost " << GetCurrentTimeMicros() - begin << " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes"; } From fad1566db28dee3fb815767f6b08ce053fb190d5 Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Sat, 12 Nov 2022 02:20:47 +0800 Subject: [PATCH 4/7] update --- be/src/runtime/load_channel_mgr.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 115a7cad394a55..e987a7fe45ef8b 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -180,7 +180,7 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) } // Indicate whether current thread is reducing mem on hard limit. bool reducing_mem_on_hard_limit = false; - std::vector channels_to_reduce_mem; + std::vector> channels_to_reduce_mem; { std::unique_lock l(_lock); while (_should_wait_flush) { @@ -202,7 +202,7 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) // But the load channel's reduce memory process is thread safe, only 1 thread can // reduce memory at the same time, other threads will wait on a condition variable, // after the reduce-memory work finished, all threads will return. - std::vector> candidate_channels; + std::vector candidate_channels; int64_t total_consume = 0; for (auto& kv : _load_channels) { if (kv.second->is_high_priority()) { @@ -210,7 +210,7 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) // to avoid blocking them. continue; } - candidate_channels.push_back(kv.second); + candidate_channels.push_back(kv.second.get()); total_consume += kv.second->mem_consumption(); } @@ -234,13 +234,13 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) if (_load_channel_min_mem_to_reduce > 0 && largest_channel->mem_consumption() > _load_channel_min_mem_to_reduce) { // Pick 1 load channel to reduce memory. - channels_to_reduce_mem.push_back(largest_channel.get()); + channels_to_reduce_mem.push_back(_load_channels[largest_channel->load_id()]); mem_consumption_in_picked_channel = largest_channel->mem_consumption(); } else { // Pick multiple channels to reduce memory. int64_t mem_to_flushed = total_consume / 3; for (auto ch : candidate_channels) { - channels_to_reduce_mem.push_back(ch.get()); + channels_to_reduce_mem.push_back(_load_channels[ch->load_id()]); mem_consumption_in_picked_channel += ch->mem_consumption(); if (mem_consumption_in_picked_channel >= mem_to_flushed) { break; From b6629ceb5f5e1c20c844533d33eacf5e47d2195e Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Sat, 12 Nov 2022 03:14:45 +0800 Subject: [PATCH 5/7] fix lock race condition --- be/src/runtime/load_channel_mgr.h | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index e987a7fe45ef8b..b0b0317b3ca72a 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -202,7 +202,8 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) // But the load channel's reduce memory process is thread safe, only 1 thread can // reduce memory at the same time, other threads will wait on a condition variable, // after the reduce-memory work finished, all threads will return. - std::vector candidate_channels; + using ChannelMemPair = std::pair, int64_t>; + std::vector candidate_channels; int64_t total_consume = 0; for (auto& kv : _load_channels) { if (kv.second->is_high_priority()) { @@ -210,8 +211,10 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) // to avoid blocking them. continue; } - candidate_channels.push_back(kv.second.get()); - total_consume += kv.second->mem_consumption(); + int64_t mem = kv.second->mem_consumption(); + // save the mem consumption, since the calculation might be expensive. + candidate_channels.push_back(std::make_pair(kv.second, mem)); + total_consume += mem; } if (candidate_channels.empty()) { @@ -223,8 +226,8 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) // sort all load channels, try to find the largest one. std::sort(candidate_channels.begin(), candidate_channels.end(), - [](LoadChannel* lhs, LoadChannel* rhs) { - return lhs->mem_consumption() > rhs->mem_consumption(); + [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) { + return lhs.second > rhs.second; }); int64_t mem_consumption_in_picked_channel = 0; @@ -232,16 +235,16 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) // If some load-channel is big enough, we can reduce it only, try our best to avoid // reducing small load channels. if (_load_channel_min_mem_to_reduce > 0 && - largest_channel->mem_consumption() > _load_channel_min_mem_to_reduce) { + largest_channel.second > _load_channel_min_mem_to_reduce) { // Pick 1 load channel to reduce memory. - channels_to_reduce_mem.push_back(_load_channels[largest_channel->load_id()]); - mem_consumption_in_picked_channel = largest_channel->mem_consumption(); + channels_to_reduce_mem.push_back(largest_channel.first); + mem_consumption_in_picked_channel = largest_channel.second; } else { // Pick multiple channels to reduce memory. int64_t mem_to_flushed = total_consume / 3; for (auto ch : candidate_channels) { - channels_to_reduce_mem.push_back(_load_channels[ch->load_id()]); - mem_consumption_in_picked_channel += ch->mem_consumption(); + channels_to_reduce_mem.push_back(ch.first); + mem_consumption_in_picked_channel += ch.second; if (mem_consumption_in_picked_channel >= mem_to_flushed) { break; } From 555c8f10e1e81e7546e77953a71f76c4a8047622 Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Sat, 12 Nov 2022 03:36:45 +0800 Subject: [PATCH 6/7] update --- be/src/runtime/load_channel_mgr.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index b0b0317b3ca72a..c1c86a7890f6b6 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -283,7 +283,8 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) uint64_t begin = GetCurrentTimeMicros(); int64_t mem_usage = ch->mem_consumption(); st = ch->handle_mem_exceed_limit(response); - LOG(INFO) << "reduced memory of " << *ch << ", cost " << GetCurrentTimeMicros() - begin + LOG(INFO) << "reduced memory of " << *ch << ", cost " + << (GetCurrentTimeMicros() - begin) / 1000 << " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes"; } From 504052521650466dca6e76a489efcf38e3b00030 Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Sat, 12 Nov 2022 03:50:10 +0800 Subject: [PATCH 7/7] update --- be/src/runtime/load_channel_mgr.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 26f5596c3bb4e6..c81ace2487426a 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -75,7 +75,7 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) { // for a channel consume 10% of hard limit, we can only release about 3% memory each time, // it's not quite helpfull to reduce memory pressure. // In this case we need to pick multiple load channels to reduce memory more effectively. - _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.05; + _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1; _mem_tracker = std::make_unique("LoadChannelMgr"); _mem_tracker_set = std::make_unique(MemTrackerLimiter::Type::LOAD, "LoadChannelMgrTrackerSet");