diff --git a/be/src/common/config.h b/be/src/common/config.h index 554735b7c5400b..9a6750067a5ce1 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -48,7 +48,7 @@ CONF_String(priority_networks, ""); // memory mode // performance or compact -CONF_String(memory_mode, "performance"); +CONF_String(memory_mode, "moderate"); // process memory limit specified as number of bytes // ('[bB]?'), megabytes ('[mM]'), gigabytes ('[gG]'), diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 8e3cc663da5236..9a828b3d7a6990 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -72,31 +72,109 @@ void Daemon::tcmalloc_gc_thread() { #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \ !defined(USE_JEMALLOC) - size_t tc_use_memory_min = MemInfo::mem_limit(); + // Limit size of tcmalloc cache via release_rate and max_cache_percent. + // We adjust release_rate according to memory_pressure, which is usage percent of memory. + int64_t max_cache_percent = 60; + double release_rates[10] = {1.0, 1.0, 1.0, 5.0, 5.0, 20.0, 50.0, 100.0, 500.0, 2000.0}; + int64_t pressure_limit = 90; + bool is_performance_mode = false; + size_t physical_limit_bytes = std::min(MemInfo::hard_mem_limit(), MemInfo::mem_limit()); + if (config::memory_mode == std::string("performance")) { - tc_use_memory_min = std::max(tc_use_memory_min / 10 * 9, - tc_use_memory_min - (size_t)10 * 1024 * 1024 * 1024); - } else { - tc_use_memory_min >>= 1; + max_cache_percent = 100; + pressure_limit = 90; + is_performance_mode = true; + physical_limit_bytes = std::min(MemInfo::mem_limit(), MemInfo::physical_mem()); + } else if (config::memory_mode == std::string("compact")) { + max_cache_percent = 20; + pressure_limit = 80; } - while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(10))) { - size_t used_size = 0; - size_t free_size = 0; - + int last_ms = 0; + const int kMaxLastMs = 30000; + const int kIntervalMs = 10; + size_t init_aggressive_decommit = 0; + size_t current_aggressive_decommit = 0; + size_t expected_aggressive_decommit = 0; + int64_t last_memory_pressure = 0; + + MallocExtension::instance()->GetNumericProperty("tcmalloc.aggressive_memory_decommit", + &init_aggressive_decommit); + current_aggressive_decommit = init_aggressive_decommit; + + while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(kIntervalMs))) { + size_t tc_used_bytes = 0; + size_t tc_alloc_bytes = 0; + size_t rss = PerfCounters::get_vm_rss(); + + MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes", + &tc_alloc_bytes); MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes", - &used_size); - MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_free_bytes", &free_size); - size_t alloc_size = used_size + free_size; - LOG(INFO) << "tcmalloc.pageheap_free_bytes " << free_size - << ", generic.current_allocated_bytes " << used_size << ", tc_use_memory_min " - << tc_use_memory_min; - - if (alloc_size > tc_use_memory_min) { - size_t max_free_size = alloc_size * 20 / 100; - if (free_size > max_free_size) { - MallocExtension::instance()->ReleaseToSystem(free_size - max_free_size); + &tc_used_bytes); + int64_t tc_cached_bytes = tc_alloc_bytes - tc_used_bytes; + int64_t to_free_bytes = + (int64_t)tc_cached_bytes - (tc_used_bytes * max_cache_percent / 100); + + int64_t memory_pressure = 0; + int64_t alloc_bytes = std::max(rss, tc_alloc_bytes); + memory_pressure = alloc_bytes * 100 / physical_limit_bytes; + + expected_aggressive_decommit = init_aggressive_decommit; + if (memory_pressure > pressure_limit) { + // We are reaching oom, so release cache aggressively. + // Ideally, we should reuse cache and not allocate from system any more, + // however, it is hard to set limit on cache of tcmalloc and doris + // use mmap in vectorized mode. + if (last_memory_pressure <= pressure_limit) { + int64_t min_free_bytes = alloc_bytes - physical_limit_bytes * 9 / 10; + to_free_bytes = std::max(to_free_bytes, min_free_bytes); + to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 30 / 100); + to_free_bytes = std::min(to_free_bytes, tc_cached_bytes); + expected_aggressive_decommit = 1; + } else { + // release rate is enough. + to_free_bytes = 0; + } + last_ms = kMaxLastMs; + } else if (memory_pressure > (pressure_limit - 10)) { + if (last_memory_pressure <= (pressure_limit - 10)) { + to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 10 / 100); + } else { + to_free_bytes = 0; + } + } + + int release_rate_index = memory_pressure / 10; + double release_rate = 1.0; + if (release_rate_index >= sizeof(release_rates)) { + release_rate = 2000.0; + } else { + release_rate = release_rates[release_rate_index]; + } + MallocExtension::instance()->SetMemoryReleaseRate(release_rate); + + if ((current_aggressive_decommit != expected_aggressive_decommit) && !is_performance_mode) { + MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit", + expected_aggressive_decommit); + current_aggressive_decommit = expected_aggressive_decommit; + } + + last_memory_pressure = memory_pressure; + if (to_free_bytes > 0) { + last_ms += kIntervalMs; + if (last_ms >= kMaxLastMs) { + LOG(INFO) << "generic.current_allocated_bytes " << tc_used_bytes + << ", generic.total_physical_bytes " << tc_alloc_bytes << ", rss " << rss + << ", max_cache_percent " << max_cache_percent << ", release_rate " + << release_rate << ", memory_pressure " << memory_pressure + << ", physical_limit_bytes " << physical_limit_bytes << ", to_free_bytes " + << to_free_bytes << ", current_aggressive_decommit " + << current_aggressive_decommit; + MallocExtension::instance()->ReleaseToSystem(to_free_bytes); + last_ms = 0; } + } else { + last_ms = 0; } } #endif diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 9d40acbdd5df88..c1ef640c7b5df7 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -41,6 +41,7 @@ struct TrackerLimiterGroup { static std::vector mem_tracker_limiter_pool(1000); std::atomic MemTrackerLimiter::_enable_print_log_process_usage {true}; +bool MemTrackerLimiter::_oom_avoidance {true}; MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit, RuntimeProfile* profile) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 3c8876f40847db..8685cdf9532ce4 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -79,6 +79,9 @@ class MemTrackerLimiter final : public MemTracker { ~MemTrackerLimiter(); static bool sys_mem_exceed_limit_check(int64_t bytes) { + if (!_oom_avoidance) { + return false; + } // Limit process memory usage using the actual physical memory of the process in `/proc/self/status`. // This is independent of the consumption value of the mem tracker, which counts the virtual memory // of the process malloc. @@ -109,6 +112,8 @@ class MemTrackerLimiter final : public MemTracker { // this tracker limiter. int64_t spare_capacity() const { return _limit - consumption(); } + static void disable_oom_avoidance() { _oom_avoidance = false; } + public: // If need to consume the tracker frequently, use it void cache_consume(int64_t bytes); @@ -208,6 +213,7 @@ class MemTrackerLimiter final : public MemTracker { // Avoid frequent printing. bool _enable_print_log_usage = false; static std::atomic _enable_print_log_process_usage; + static bool _oom_avoidance; // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. std::list::iterator _tracker_limiter_group_it; diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index f43d95eaa9a8dd..793fd4d4c828f6 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -323,21 +323,18 @@ int main(int argc, char** argv) { #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) // Change the total TCMalloc thread cache size if necessary. - size_t total_thread_cache_bytes; - if (!MallocExtension::instance()->GetNumericProperty("tcmalloc.max_total_thread_cache_bytes", - &total_thread_cache_bytes)) { - fprintf(stderr, "Failed to get TCMalloc total thread cache size.\n"); - } const size_t kDefaultTotalThreadCacheBytes = 1024 * 1024 * 1024; - if (total_thread_cache_bytes < kDefaultTotalThreadCacheBytes) { - if (!MallocExtension::instance()->SetNumericProperty( - "tcmalloc.max_total_thread_cache_bytes", kDefaultTotalThreadCacheBytes)) { - fprintf(stderr, "Failed to change TCMalloc total thread cache size.\n"); - return -1; - } + if (!MallocExtension::instance()->SetNumericProperty("tcmalloc.max_total_thread_cache_bytes", + kDefaultTotalThreadCacheBytes)) { + fprintf(stderr, "Failed to change TCMalloc total thread cache size.\n"); + return -1; } #endif + if (doris::config::memory_mode == std::string("performance")) { + doris::MemTrackerLimiter::disable_oom_avoidance(); + } + std::vector paths; auto olap_res = doris::parse_conf_store_paths(doris::config::storage_root_path, &paths); if (!olap_res) {