From 2c97f3399739275acb1c8b8c5cabae5c653a48e1 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 20 Sep 2024 14:08:19 +0800 Subject: [PATCH 1/3] [improve](load) change memtable memory limiter policy (#41018) ## Proposed changes #40912 has changed meaning of `write_mem` in memtable memory limiter. This PR is a followup to change the active memtable flush policy accordingly. It also changed: 1. The amount of memtable writers selected in one flush. 2. The memtable writers are selected in orders of its size. --- be/src/common/config.cpp | 6 - be/src/common/config.h | 6 - be/src/olap/memtable_memory_limiter.cpp | 169 ++++++++++++------------ be/src/olap/memtable_memory_limiter.h | 8 +- 4 files changed, 88 insertions(+), 101 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 4aea6200ce0387..b077deac04f700 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -620,12 +620,6 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "20"); // After minor gc, no minor gc during sleep, but full gc is possible. DEFINE_mInt32(memory_gc_sleep_time_ms, "500"); -// percent of (active memtables size / all memtables size) when reach hard limit -DEFINE_mInt32(memtable_hard_limit_active_percent, "50"); - -// percent of (active memtables size / all memtables size) when reach soft limit -DEFINE_mInt32(memtable_soft_limit_active_percent, "50"); - // max write buffer size before flush, default 200MB DEFINE_mInt64(write_buffer_size, "209715200"); // max buffer size used in memtable for the aggregated table, default 400MB diff --git a/be/src/common/config.h b/be/src/common/config.h index 13437d96b8eafe..734d73f46d8bb1 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -668,12 +668,6 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms); // After minor gc, no minor gc during sleep, but full gc is possible. DECLARE_mInt32(memory_gc_sleep_time_ms); -// percent of (active memtables size / all memtables size) when reach hard limit -DECLARE_mInt32(memtable_hard_limit_active_percent); - -// percent of (active memtables size / all memtables size) when reach soft limit -DECLARE_mInt32(memtable_soft_limit_active_percent); - // max write buffer size before flush, default 200MB DECLARE_mInt64(write_buffer_size); // max buffer size used in memtable for the aggregated table, default 400MB diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 9b9ce19f895cd1..213d5aaab9a7bc 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -75,33 +75,41 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr writer _writers.push_back(writer); } -bool MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() { +int64_t MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() { // reserve a small amount of memory so we do not trigger MinorGC - return doris::GlobalMemoryArbitrator::sys_mem_available() < - doris::MemInfo::sys_mem_available_warning_water_mark() + - config::memtable_limiter_reserved_memory_bytes; + return doris::MemInfo::sys_mem_available_warning_water_mark() - + doris::GlobalMemoryArbitrator::sys_mem_available() + + config::memtable_limiter_reserved_memory_bytes; } -bool MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() { +int64_t MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() { // reserve a small amount of memory so we do not trigger MinorGC - return GlobalMemoryArbitrator::process_memory_usage() > - MemInfo::soft_mem_limit() - config::memtable_limiter_reserved_memory_bytes; + return GlobalMemoryArbitrator::process_memory_usage() - MemInfo::soft_mem_limit() + + config::memtable_limiter_reserved_memory_bytes; } bool MemTableMemoryLimiter::_soft_limit_reached() { - return _mem_tracker->consumption() >= _load_soft_mem_limit || _hard_limit_reached(); + return _mem_tracker->consumption() > _load_soft_mem_limit || _hard_limit_reached(); } bool MemTableMemoryLimiter::_hard_limit_reached() { - return _mem_tracker->consumption() >= _load_hard_mem_limit || - _sys_avail_mem_less_than_warning_water_mark() || - _process_used_mem_more_than_soft_mem_limit(); + return _mem_tracker->consumption() > _load_hard_mem_limit || + _sys_avail_mem_less_than_warning_water_mark() > 0 || + _process_used_mem_more_than_soft_mem_limit() > 0; } bool MemTableMemoryLimiter::_load_usage_low() { return _mem_tracker->consumption() <= _load_safe_mem_permit; } +int64_t MemTableMemoryLimiter::_need_flush() { + int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit; + int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark(); + int64_t limit3 = _process_used_mem_more_than_soft_mem_limit(); + int64_t need_flush = std::max(limit1, std::max(limit2, limit3)); + return need_flush - _queue_mem_usage; +} + void MemTableMemoryLimiter::handle_memtable_flush() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); @@ -112,34 +120,29 @@ void MemTableMemoryLimiter::handle_memtable_flush() { timer.start(); std::unique_lock l(_lock); g_memtable_memory_limit_waiting_threads << 1; - while (_hard_limit_reached()) { - LOG(INFO) << "reached memtable memory hard limit" - << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) - << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) - << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; - if (_active_mem_usage >= - _write_mem_usage * config::memtable_hard_limit_active_percent / 100) { - _flush_active_memtables(_write_mem_usage / 20); - } - if (!_hard_limit_reached()) { - break; + bool first = true; + do { + if (!first) { + auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000)); + if (st == std::cv_status::timeout) { + LOG(INFO) << "timeout when waiting for memory hard limit end, try again"; + } } - auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000)); - if (st == std::cv_status::timeout) { - LOG(INFO) << "timeout when waiting for memory hard limit end, try again"; + first = false; + int64_t need_flush = _need_flush(); + if (need_flush > 0) { + auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT; + LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft") + << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", memtable writers num: " << _writers.size() + << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); + _flush_active_memtables(need_flush); } - } + } while (_hard_limit_reached()); g_memtable_memory_limit_waiting_threads << -1; - if (_soft_limit_reached()) { - LOG(INFO) << "reached memtable memory soft limit" - << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) - << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) - << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; - if (_active_mem_usage >= - _write_mem_usage * config::memtable_soft_limit_active_percent / 100) { - _flush_active_memtables(_write_mem_usage / 20); - } - } timer.stop(); int64_t time_ms = timer.elapsed_time() / 1000 / 1000; g_memtable_memory_limit_latency_ms << time_ms; @@ -155,50 +158,50 @@ void MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) { if (_active_writers.size() == 0) { return; } + + using WriterMem = std::pair, int64_t>; + auto cmp = [](WriterMem left, WriterMem right) { return left.second > right.second; }; + std::priority_queue, decltype(cmp)> heap(cmp); + + for (auto writer : _active_writers) { + auto w = writer.lock(); + if (w == nullptr) { + continue; + } + heap.emplace(w, w->active_memtable_mem_consumption()); + } + int64_t mem_flushed = 0; int64_t num_flushed = 0; - int64_t avg_mem = _active_mem_usage / _active_writers.size(); - for (auto writer : _active_writers) { - int64_t mem = _flush_memtable(writer, avg_mem); + + while (mem_flushed < need_flush && !heap.empty()) { + auto [writer, sort_mem] = heap.top(); + heap.pop(); + auto w = writer.lock(); + if (w == nullptr) { + continue; + } + int64_t mem = w->active_memtable_mem_consumption(); + if (mem < sort_mem * 0.9) { + // if the memtable writer just got flushed, don't flush it again + continue; + } + Status st = w->flush_async(); + if (!st.ok()) { + auto err_msg = fmt::format( + "tablet writer failed to reduce mem consumption by flushing memtable, " + "tablet_id={}, err={}", + w->tablet_id(), st.to_string()); + LOG(WARNING) << err_msg; + static_cast(w->cancel_with_status(st)); + } mem_flushed += mem; num_flushed += (mem > 0); - if (mem_flushed >= need_flush) { - break; - } } LOG(INFO) << "flushed " << num_flushed << " out of " << _active_writers.size() << " active writers, flushed size: " << PrettyPrinter::print_bytes(mem_flushed); } -int64_t MemTableMemoryLimiter::_flush_memtable(std::weak_ptr writer_to_flush, - int64_t threshold) { - auto writer = writer_to_flush.lock(); - if (!writer) { - return 0; - } - auto mem_usage = writer->active_memtable_mem_consumption(); - // if the memtable writer just got flushed, don't flush it again - if (mem_usage < threshold) { - VLOG_DEBUG << "flushing active memtables, active mem usage " - << PrettyPrinter::print_bytes(mem_usage) << " is less than " - << PrettyPrinter::print_bytes(threshold) << ", skipping"; - return 0; - } - VLOG_DEBUG << "flushing active memtables, active mem usage " - << PrettyPrinter::print_bytes(mem_usage); - Status st = writer->flush_async(); - if (!st.ok()) { - auto err_msg = fmt::format( - "tablet writer failed to reduce mem consumption by flushing memtable, " - "tablet_id={}, err={}", - writer->tablet_id(), st.to_string()); - LOG(WARNING) << err_msg; - static_cast(writer->cancel_with_status(st)); - return 0; - } - return mem_usage; -} - void MemTableMemoryLimiter::refresh_mem_tracker() { std::lock_guard l(_lock); _refresh_mem_tracker(); @@ -219,21 +222,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() { _last_limit = limit; _log_timer.reset(); - // if not exist load task, this log should not be printed. - if (_mem_usage != 0) { - LOG(INFO) << fmt::format( - "{}, {}, load mem: {}, memtable writers num: {} (active: {}, write: {}, flush: {})", - ss.str(), GlobalMemoryArbitrator::process_memory_used_details_str(), - PrettyPrinter::print_bytes(_mem_tracker->consumption()), _writers.size(), - PrettyPrinter::print_bytes(_active_mem_usage), - PrettyPrinter::print_bytes(_write_mem_usage), - PrettyPrinter::print_bytes(_flush_mem_usage)); - } + LOG(INFO) << ss.str() << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", memtable writers num: " << _writers.size() + << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); } void MemTableMemoryLimiter::_refresh_mem_tracker() { _flush_mem_usage = 0; - _write_mem_usage = 0; + _queue_mem_usage = 0; _active_mem_usage = 0; _active_writers.clear(); for (auto it = _writers.begin(); it != _writers.end();) { @@ -245,16 +244,16 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { _active_writers.push_back(writer); } _flush_mem_usage += writer->mem_consumption(MemType::FLUSH); - _write_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED); + _queue_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED); ++it; } else { *it = std::move(_writers.back()); _writers.pop_back(); } } - _mem_usage = _flush_mem_usage + _write_mem_usage; + _mem_usage = _flush_mem_usage + _queue_mem_usage; g_memtable_active_memory.set_value(_active_mem_usage); - g_memtable_write_memory.set_value(_write_mem_usage); + g_memtable_write_memory.set_value(_queue_mem_usage); g_memtable_flush_memory.set_value(_flush_mem_usage); g_memtable_load_memory.set_value(_mem_usage); VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size(); diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index 66f5fb2a8d0c20..1e32cb165e4721 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -50,21 +50,21 @@ class MemTableMemoryLimiter { int64_t mem_usage() const { return _mem_usage; } private: - static inline bool _sys_avail_mem_less_than_warning_water_mark(); - static inline bool _process_used_mem_more_than_soft_mem_limit(); + static inline int64_t _sys_avail_mem_less_than_warning_water_mark(); + static inline int64_t _process_used_mem_more_than_soft_mem_limit(); bool _soft_limit_reached(); bool _hard_limit_reached(); bool _load_usage_low(); + int64_t _need_flush(); void _flush_active_memtables(int64_t need_flush); - int64_t _flush_memtable(std::weak_ptr writer_to_flush, int64_t threshold); void _refresh_mem_tracker(); std::mutex _lock; std::condition_variable _hard_limit_end_cond; int64_t _mem_usage = 0; int64_t _flush_mem_usage = 0; - int64_t _write_mem_usage = 0; + int64_t _queue_mem_usage = 0; int64_t _active_mem_usage = 0; // sum of all mem table memory. From d92fcad4640526252ba41478fcedff8d4ff89e5e Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 26 Sep 2024 10:05:40 +0800 Subject: [PATCH 2/3] [fix](load) fix priority queue order in memtable memory limiter (#41278) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Proposed changes #41018 used priority queue when selecting memtables to flush. But the compare function is wrong and causing the order to be the opposite. > Note that the Compare parameter is defined such that it returns true if its first argument comes before its second argument in a weak ordering. But because the priority queue outputs largest elements first, the elements that "come before" are actually output last. That is, the front of the queue contains the "last" element according to the weak ordering imposed by Compare. This PR fixes the compare function to make larger memtables come front. --- be/src/olap/memtable_memory_limiter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 213d5aaab9a7bc..a76318af65f84f 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -160,7 +160,7 @@ void MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) { } using WriterMem = std::pair, int64_t>; - auto cmp = [](WriterMem left, WriterMem right) { return left.second > right.second; }; + auto cmp = [](WriterMem left, WriterMem right) { return left.second < right.second; }; std::priority_queue, decltype(cmp)> heap(cmp); for (auto writer : _active_writers) { From 311c9ff316b776443bfa4156153b97df3b9292e8 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 25 Sep 2024 14:14:35 +0800 Subject: [PATCH 3/3] [fix](load) fix memtable memory limiter total mem usage (#41245) ## Proposed changes Previously, `mem_usage = write_mem + flush_mem`, because `active_mem` is included in `write_mem`. After #40912, `write_mem` becomes `queue_mem`, which no longer includes `active_mem`. This PR fixes this problem, by setting `mem_usage = active_mem + queue_mem + flush_mem` --- be/src/olap/memtable_memory_limiter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index a76318af65f84f..1cb6c0c8e2de04 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -251,7 +251,7 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { _writers.pop_back(); } } - _mem_usage = _flush_mem_usage + _queue_mem_usage; + _mem_usage = _active_mem_usage + _queue_mem_usage + _flush_mem_usage; g_memtable_active_memory.set_value(_active_mem_usage); g_memtable_write_memory.set_value(_queue_mem_usage); g_memtable_flush_memory.set_value(_flush_mem_usage);