diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 60766d8c7557eb..b0b0dd7aafc6c2 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -598,12 +598,6 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "20"); // After minor gc, no minor gc during sleep, but full gc is possible. DEFINE_mInt32(memory_gc_sleep_time_ms, "500"); -// percent of (active memtables size / all memtables size) when reach hard limit -DEFINE_mInt32(memtable_hard_limit_active_percent, "50"); - -// percent of (active memtables size / all memtables size) when reach soft limit -DEFINE_mInt32(memtable_soft_limit_active_percent, "50"); - // max write buffer size before flush, default 200MB DEFINE_mInt64(write_buffer_size, "209715200"); // max buffer size used in memtable for the aggregated table, default 400MB diff --git a/be/src/common/config.h b/be/src/common/config.h index 3e52bd6cc8db47..4dabccc7db3651 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -648,12 +648,6 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms); // After minor gc, no minor gc during sleep, but full gc is possible. DECLARE_mInt32(memory_gc_sleep_time_ms); -// percent of (active memtables size / all memtables size) when reach hard limit -DECLARE_mInt32(memtable_hard_limit_active_percent); - -// percent of (active memtables size / all memtables size) when reach soft limit -DECLARE_mInt32(memtable_soft_limit_active_percent); - // max write buffer size before flush, default 200MB DECLARE_mInt64(write_buffer_size); // max buffer size used in memtable for the aggregated table, default 400MB diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 9b9ce19f895cd1..213d5aaab9a7bc 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -75,33 +75,41 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr writer _writers.push_back(writer); } -bool MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() { +int64_t MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() { // reserve a small amount of memory so we do not trigger MinorGC - return doris::GlobalMemoryArbitrator::sys_mem_available() < - doris::MemInfo::sys_mem_available_warning_water_mark() + - config::memtable_limiter_reserved_memory_bytes; + return doris::MemInfo::sys_mem_available_warning_water_mark() - + doris::GlobalMemoryArbitrator::sys_mem_available() + + config::memtable_limiter_reserved_memory_bytes; } -bool MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() { +int64_t MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() { // reserve a small amount of memory so we do not trigger MinorGC - return GlobalMemoryArbitrator::process_memory_usage() > - MemInfo::soft_mem_limit() - config::memtable_limiter_reserved_memory_bytes; + return GlobalMemoryArbitrator::process_memory_usage() - MemInfo::soft_mem_limit() + + config::memtable_limiter_reserved_memory_bytes; } bool MemTableMemoryLimiter::_soft_limit_reached() { - return _mem_tracker->consumption() >= _load_soft_mem_limit || _hard_limit_reached(); + return _mem_tracker->consumption() > _load_soft_mem_limit || _hard_limit_reached(); } bool MemTableMemoryLimiter::_hard_limit_reached() { - return _mem_tracker->consumption() >= _load_hard_mem_limit || - _sys_avail_mem_less_than_warning_water_mark() || - _process_used_mem_more_than_soft_mem_limit(); + return _mem_tracker->consumption() > _load_hard_mem_limit || + _sys_avail_mem_less_than_warning_water_mark() > 0 || + _process_used_mem_more_than_soft_mem_limit() > 0; } bool MemTableMemoryLimiter::_load_usage_low() { return _mem_tracker->consumption() <= _load_safe_mem_permit; } +int64_t MemTableMemoryLimiter::_need_flush() { + int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit; + int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark(); + int64_t limit3 = _process_used_mem_more_than_soft_mem_limit(); + int64_t need_flush = std::max(limit1, std::max(limit2, limit3)); + return need_flush - _queue_mem_usage; +} + void MemTableMemoryLimiter::handle_memtable_flush() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); @@ -112,34 +120,29 @@ void MemTableMemoryLimiter::handle_memtable_flush() { timer.start(); std::unique_lock l(_lock); g_memtable_memory_limit_waiting_threads << 1; - while (_hard_limit_reached()) { - LOG(INFO) << "reached memtable memory hard limit" - << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) - << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) - << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; - if (_active_mem_usage >= - _write_mem_usage * config::memtable_hard_limit_active_percent / 100) { - _flush_active_memtables(_write_mem_usage / 20); - } - if (!_hard_limit_reached()) { - break; + bool first = true; + do { + if (!first) { + auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000)); + if (st == std::cv_status::timeout) { + LOG(INFO) << "timeout when waiting for memory hard limit end, try again"; + } } - auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000)); - if (st == std::cv_status::timeout) { - LOG(INFO) << "timeout when waiting for memory hard limit end, try again"; + first = false; + int64_t need_flush = _need_flush(); + if (need_flush > 0) { + auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT; + LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft") + << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", memtable writers num: " << _writers.size() + << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); + _flush_active_memtables(need_flush); } - } + } while (_hard_limit_reached()); g_memtable_memory_limit_waiting_threads << -1; - if (_soft_limit_reached()) { - LOG(INFO) << "reached memtable memory soft limit" - << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) - << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) - << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; - if (_active_mem_usage >= - _write_mem_usage * config::memtable_soft_limit_active_percent / 100) { - _flush_active_memtables(_write_mem_usage / 20); - } - } timer.stop(); int64_t time_ms = timer.elapsed_time() / 1000 / 1000; g_memtable_memory_limit_latency_ms << time_ms; @@ -155,50 +158,50 @@ void MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) { if (_active_writers.size() == 0) { return; } + + using WriterMem = std::pair, int64_t>; + auto cmp = [](WriterMem left, WriterMem right) { return left.second > right.second; }; + std::priority_queue, decltype(cmp)> heap(cmp); + + for (auto writer : _active_writers) { + auto w = writer.lock(); + if (w == nullptr) { + continue; + } + heap.emplace(w, w->active_memtable_mem_consumption()); + } + int64_t mem_flushed = 0; int64_t num_flushed = 0; - int64_t avg_mem = _active_mem_usage / _active_writers.size(); - for (auto writer : _active_writers) { - int64_t mem = _flush_memtable(writer, avg_mem); + + while (mem_flushed < need_flush && !heap.empty()) { + auto [writer, sort_mem] = heap.top(); + heap.pop(); + auto w = writer.lock(); + if (w == nullptr) { + continue; + } + int64_t mem = w->active_memtable_mem_consumption(); + if (mem < sort_mem * 0.9) { + // if the memtable writer just got flushed, don't flush it again + continue; + } + Status st = w->flush_async(); + if (!st.ok()) { + auto err_msg = fmt::format( + "tablet writer failed to reduce mem consumption by flushing memtable, " + "tablet_id={}, err={}", + w->tablet_id(), st.to_string()); + LOG(WARNING) << err_msg; + static_cast(w->cancel_with_status(st)); + } mem_flushed += mem; num_flushed += (mem > 0); - if (mem_flushed >= need_flush) { - break; - } } LOG(INFO) << "flushed " << num_flushed << " out of " << _active_writers.size() << " active writers, flushed size: " << PrettyPrinter::print_bytes(mem_flushed); } -int64_t MemTableMemoryLimiter::_flush_memtable(std::weak_ptr writer_to_flush, - int64_t threshold) { - auto writer = writer_to_flush.lock(); - if (!writer) { - return 0; - } - auto mem_usage = writer->active_memtable_mem_consumption(); - // if the memtable writer just got flushed, don't flush it again - if (mem_usage < threshold) { - VLOG_DEBUG << "flushing active memtables, active mem usage " - << PrettyPrinter::print_bytes(mem_usage) << " is less than " - << PrettyPrinter::print_bytes(threshold) << ", skipping"; - return 0; - } - VLOG_DEBUG << "flushing active memtables, active mem usage " - << PrettyPrinter::print_bytes(mem_usage); - Status st = writer->flush_async(); - if (!st.ok()) { - auto err_msg = fmt::format( - "tablet writer failed to reduce mem consumption by flushing memtable, " - "tablet_id={}, err={}", - writer->tablet_id(), st.to_string()); - LOG(WARNING) << err_msg; - static_cast(writer->cancel_with_status(st)); - return 0; - } - return mem_usage; -} - void MemTableMemoryLimiter::refresh_mem_tracker() { std::lock_guard l(_lock); _refresh_mem_tracker(); @@ -219,21 +222,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() { _last_limit = limit; _log_timer.reset(); - // if not exist load task, this log should not be printed. - if (_mem_usage != 0) { - LOG(INFO) << fmt::format( - "{}, {}, load mem: {}, memtable writers num: {} (active: {}, write: {}, flush: {})", - ss.str(), GlobalMemoryArbitrator::process_memory_used_details_str(), - PrettyPrinter::print_bytes(_mem_tracker->consumption()), _writers.size(), - PrettyPrinter::print_bytes(_active_mem_usage), - PrettyPrinter::print_bytes(_write_mem_usage), - PrettyPrinter::print_bytes(_flush_mem_usage)); - } + LOG(INFO) << ss.str() << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", memtable writers num: " << _writers.size() + << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); } void MemTableMemoryLimiter::_refresh_mem_tracker() { _flush_mem_usage = 0; - _write_mem_usage = 0; + _queue_mem_usage = 0; _active_mem_usage = 0; _active_writers.clear(); for (auto it = _writers.begin(); it != _writers.end();) { @@ -245,16 +244,16 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { _active_writers.push_back(writer); } _flush_mem_usage += writer->mem_consumption(MemType::FLUSH); - _write_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED); + _queue_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED); ++it; } else { *it = std::move(_writers.back()); _writers.pop_back(); } } - _mem_usage = _flush_mem_usage + _write_mem_usage; + _mem_usage = _flush_mem_usage + _queue_mem_usage; g_memtable_active_memory.set_value(_active_mem_usage); - g_memtable_write_memory.set_value(_write_mem_usage); + g_memtable_write_memory.set_value(_queue_mem_usage); g_memtable_flush_memory.set_value(_flush_mem_usage); g_memtable_load_memory.set_value(_mem_usage); VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size(); diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index 66f5fb2a8d0c20..1e32cb165e4721 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -50,21 +50,21 @@ class MemTableMemoryLimiter { int64_t mem_usage() const { return _mem_usage; } private: - static inline bool _sys_avail_mem_less_than_warning_water_mark(); - static inline bool _process_used_mem_more_than_soft_mem_limit(); + static inline int64_t _sys_avail_mem_less_than_warning_water_mark(); + static inline int64_t _process_used_mem_more_than_soft_mem_limit(); bool _soft_limit_reached(); bool _hard_limit_reached(); bool _load_usage_low(); + int64_t _need_flush(); void _flush_active_memtables(int64_t need_flush); - int64_t _flush_memtable(std::weak_ptr writer_to_flush, int64_t threshold); void _refresh_mem_tracker(); std::mutex _lock; std::condition_variable _hard_limit_end_cond; int64_t _mem_usage = 0; int64_t _flush_mem_usage = 0; - int64_t _write_mem_usage = 0; + int64_t _queue_mem_usage = 0; int64_t _active_mem_usage = 0; // sum of all mem table memory.