diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 38466cffc9733e..0e0b9e63fd8780 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -27,12 +27,12 @@ #include "gen_cpp/Exprs_types.h" #include "runtime/descriptors.h" #include "runtime/query_statistics.h" +#include "util/runtime_profile.h" #include "util/telemetry/telemetry.h" namespace doris { class ObjectPool; -class RuntimeProfile; class RuntimeState; class TPlanFragmentExecParams; class RowDescriptor; diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index aab581012daeeb..b4c837f3d83bb4 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -35,6 +35,7 @@ #include "http/web_page_handler.h" #include "runtime/memory/mem_tracker_limiter.h" #include "util/debug_util.h" +#include "util/mem_info.h" #include "util/perf_counters.h" #include "util/pretty_printer.h" #include "util/thread.h" diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index 8bb48add6813ec..bef87541a3f82a 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -34,6 +34,7 @@ #include "http/http_headers.h" #include "http/http_request.h" #include "runtime/thread_context.h" +#include "service/backend_options.h" #include "service/brpc.h" #include "util/debug_util.h" #include "util/threadpool.h" diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c8f863f42e7b98..a5d94bd80d1c5a 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -47,6 +47,7 @@ #include "runtime/thread_context.h" #include "service/backend_options.h" #include "util/doris_metrics.h" +#include "util/mem_info.h" #include "util/network_util.h" #include "util/stopwatch.hpp" #include "util/telemetry/telemetry.h" diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index fca3e0c0b0b96e..413d7077cf5394 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -27,6 +27,8 @@ #include "runtime/load_channel.h" #include "runtime/memory/mem_tracker.h" #include "util/doris_metrics.h" +#include "util/mem_info.h" +#include "util/perf_counters.h" #include "util/stopwatch.hpp" #include "util/time.h" diff --git a/be/src/runtime/memory/jemalloc_hook.cpp b/be/src/runtime/memory/jemalloc_hook.cpp index 82b546b8857a3e..bfc358c827c3cb 100644 --- a/be/src/runtime/memory/jemalloc_hook.cpp +++ b/be/src/runtime/memory/jemalloc_hook.cpp @@ -27,11 +27,14 @@ #endif extern "C" { + +// Both je_nallocx and je_malloc will use the lock je_malloc_mutex_lock_slow, +// so enabling the jemalloc hook will double the lock usage. +// In extreme cases this will affect performance, consider turning off mem hook +// mem hook should avoid nesting new/malloc. + void* doris_malloc(size_t size) __THROW { - // Both je_nallocx and je_malloc will use the lock je_malloc_mutex_lock_slow, - // so enabling the jemalloc hook will double the lock usage. - // In extreme cases this will affect performance, consider turning off mem hook - TRY_CONSUME_MEM_TRACKER(jenallocx(size, 0), nullptr); + CONSUME_MEM_TRACKER(jenallocx(size, 0)); void* ptr = jemalloc(size); if (UNLIKELY(ptr == nullptr)) { RELEASE_MEM_TRACKER(jenallocx(size, 0)); @@ -53,7 +56,7 @@ void* doris_realloc(void* p, size_t size) __THROW { int64_t old_size = jemalloc_usable_size(p); #endif - TRY_CONSUME_MEM_TRACKER(jenallocx(size, 0) - old_size, nullptr); + CONSUME_MEM_TRACKER(jenallocx(size, 0) - old_size); void* ptr = jerealloc(p, size); if (UNLIKELY(ptr == nullptr)) { RELEASE_MEM_TRACKER(jenallocx(size, 0) - old_size); @@ -66,7 +69,7 @@ void* doris_calloc(size_t n, size_t size) __THROW { return nullptr; } - TRY_CONSUME_MEM_TRACKER(n * size, nullptr); + CONSUME_MEM_TRACKER(n * size); void* ptr = jecalloc(n, size); if (UNLIKELY(ptr == nullptr)) { RELEASE_MEM_TRACKER(n * size); @@ -82,7 +85,7 @@ void doris_cfree(void* ptr) __THROW { } void* doris_memalign(size_t align, size_t size) __THROW { - TRY_CONSUME_MEM_TRACKER(size, nullptr); + CONSUME_MEM_TRACKER(size); void* ptr = jealigned_alloc(align, size); if (UNLIKELY(ptr == nullptr)) { RELEASE_MEM_TRACKER(size); @@ -93,7 +96,7 @@ void* doris_memalign(size_t align, size_t size) __THROW { } void* doris_aligned_alloc(size_t align, size_t size) __THROW { - TRY_CONSUME_MEM_TRACKER(size, nullptr); + CONSUME_MEM_TRACKER(size); void* ptr = jealigned_alloc(align, size); if (UNLIKELY(ptr == nullptr)) { RELEASE_MEM_TRACKER(size); @@ -104,7 +107,7 @@ void* doris_aligned_alloc(size_t align, size_t size) __THROW { } void* doris_valloc(size_t size) __THROW { - TRY_CONSUME_MEM_TRACKER(size, nullptr); + CONSUME_MEM_TRACKER(size); void* ptr = jevalloc(size); if (UNLIKELY(ptr == nullptr)) { RELEASE_MEM_TRACKER(size); @@ -115,7 +118,7 @@ void* doris_valloc(size_t size) __THROW { } void* doris_pvalloc(size_t size) __THROW { - TRY_CONSUME_MEM_TRACKER(size, nullptr); + CONSUME_MEM_TRACKER(size); void* ptr = jevalloc(size); if (UNLIKELY(ptr == nullptr)) { RELEASE_MEM_TRACKER(size); @@ -126,7 +129,7 @@ void* doris_pvalloc(size_t size) __THROW { } int doris_posix_memalign(void** r, size_t align, size_t size) __THROW { - TRY_CONSUME_MEM_TRACKER(size, ENOMEM); + CONSUME_MEM_TRACKER(size); int ret = jeposix_memalign(r, align, size); if (UNLIKELY(ret != 0)) { RELEASE_MEM_TRACKER(size); diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index f817da9d782df7..509f190358d708 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -25,7 +25,9 @@ #include "runtime/fragment_mgr.h" #include "runtime/load_channel_mgr.h" #include "runtime/runtime_state.h" -#include "runtime/thread_context.h" +#include "service/backend_options.h" +#include "util/mem_info.h" +#include "util/perf_counters.h" #include "util/pretty_printer.h" #include "util/stack_util.h" @@ -237,25 +239,73 @@ void MemTrackerLimiter::print_log_process_usage(const std::string& msg, bool wit } } -std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, - const std::string& limit_exceeded_errmsg) { - STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); - std::string detail = fmt::format( - "Memory limit exceeded:, {}>, executing msg:<{}>. backend {} " - "process memory used {}, limit {}. If query tracker exceed, `set " +bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) { + if (!_oom_avoidance) { + return false; + } + // Limit process memory usage using the actual physical memory of the process in `/proc/self/status`. + // This is independent of the consumption value of the mem tracker, which counts the virtual memory + // of the process malloc. + // for fast, expect MemInfo::initialized() to be true. + // + // tcmalloc/jemalloc allocator cache does not participate in the mem check as part of the process physical memory. + // because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc allocator cache, + // but it may not actually alloc physical memory, which is not expected in mem hook fail. + if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() || + MemInfo::sys_mem_available() < MemInfo::sys_mem_available_low_water_mark()) { + print_log_process_usage( + fmt::format("System Mem Exceed Limit Check Faild, Try Alloc: {}", bytes)); + return true; + } + return false; +} + +std::string MemTrackerLimiter::process_mem_log_str() { + return fmt::format( + "OS physical memory {}. Process memory usage {}, limit {}, soft limit {}. Sys " + "available memory {}, low water mark {}, warning water mark {}. Refresh interval " + "memory growth {} B", + PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), + PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), MemInfo::soft_mem_limit_str(), + MemInfo::sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), + PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES), + MemInfo::refresh_interval_memory_growth); +} + +std::string MemTrackerLimiter::process_limit_exceeded_errmsg_str(int64_t bytes) { + return fmt::format( + "process memory used {} exceed limit {} or sys mem available {} less than low " + "water mark {}, failed alloc size {}", + PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), + MemInfo::sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), + print_bytes(bytes)); +} + +std::string MemTrackerLimiter::query_tracker_limit_exceeded_str( + const std::string& tracker_limit_exceeded, const std::string& last_consumer_tracker, + const std::string& executing_msg) { + return fmt::format( + "Memory limit exceeded:{}, exec node:<{}>, execute msg:{}. backend {} " + "process memory used {}, limit {}. Can `set " "exec_mem_limit=8G` to change limit, details see be.INFO.", - _label, limit_exceeded_errmsg, msg, BackendOptions::get_localhost(), - PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str()); - return detail; + tracker_limit_exceeded, last_consumer_tracker, executing_msg, + BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), + MemInfo::mem_limit_str()); +} + +std::string MemTrackerLimiter::tracker_limit_exceeded_str() { + return fmt::format( + "exceeded tracker:<{}>, limit {}, peak " + "used {}, current used {}", + label(), print_bytes(limit()), print_bytes(_consumption->peak_value()), + print_bytes(_consumption->current_value())); } -Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg, - int64_t failed_alloc_size) { - auto failed_msg = - mem_limit_exceeded(msg, tracker_limit_exceeded_errmsg_str(failed_alloc_size, this)); - print_log_usage(failed_msg); - state->log_error(failed_msg); - return Status::MemoryLimitExceeded(failed_msg); +std::string MemTrackerLimiter::tracker_limit_exceeded_str(int64_t bytes) { + return fmt::format("failed alloc size {}, {}", print_bytes(bytes), + tracker_limit_exceeded_str()); } int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 32e212155e13fc..e1e2dd0b14fe2f 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -20,11 +20,7 @@ #include #include "common/config.h" -#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" -#include "service/backend_options.h" -#include "util/mem_info.h" -#include "util/perf_counters.h" #include "util/string_util.h" namespace doris { @@ -77,26 +73,7 @@ class MemTrackerLimiter final : public MemTracker { ~MemTrackerLimiter(); - static bool sys_mem_exceed_limit_check(int64_t bytes) { - if (!_oom_avoidance) { - return false; - } - // Limit process memory usage using the actual physical memory of the process in `/proc/self/status`. - // This is independent of the consumption value of the mem tracker, which counts the virtual memory - // of the process malloc. - // for fast, expect MemInfo::initialized() to be true. - // - // tcmalloc/jemalloc allocator cache does not participate in the mem check as part of the process physical memory. - // because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc allocator cache, - // but it may not actually alloc physical memory, which is not expected in mem hook fail. - if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() || - MemInfo::sys_mem_available() < MemInfo::sys_mem_available_low_water_mark()) { - print_log_process_usage( - fmt::format("System Mem Exceed Limit Check Faild, Try Alloc: {}", bytes)); - return true; - } - return false; - } + static bool sys_mem_exceed_limit_check(int64_t bytes); void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; } Type type() const { return _type; } @@ -141,12 +118,6 @@ class MemTrackerLimiter final : public MemTracker { static std::string log_process_usage_str(const std::string& msg, bool with_stacktrace = true); static void print_log_process_usage(const std::string& msg, bool with_stacktrace = true); - // Log the memory usage when memory limit is exceeded. - std::string mem_limit_exceeded(const std::string& msg, - const std::string& limit_exceeded_errmsg); - Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg, - int64_t failed_allocation_size = 0); - // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed. // vm_rss_str and mem_available_str recorded when gc is triggered, for log printing. static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str, @@ -176,28 +147,14 @@ class MemTrackerLimiter final : public MemTracker { return querytid; } - static std::string process_mem_log_str() { - return fmt::format( - "OS physical memory {}. Process memory usage {}, limit {}, soft limit {}. Sys " - "available memory {}, low water mark {}, warning water mark {}. Refresh interval " - "memory growth {} B", - PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), - PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(), - PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), - PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES), - MemInfo::refresh_interval_memory_growth); - } - - static std::string process_limit_exceeded_errmsg_str(int64_t bytes) { - return fmt::format( - "process memory used {} exceed limit {} or sys mem available {} less than low " - "water mark {}, failed alloc size {}", - PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::sys_mem_available_str(), - PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), - print_bytes(bytes)); - } + static std::string process_mem_log_str(); + static std::string process_limit_exceeded_errmsg_str(int64_t bytes); + // Log the memory usage when memory limit is exceeded. + std::string query_tracker_limit_exceeded_str(const std::string& tracker_limit_exceeded, + const std::string& last_consumer_tracker, + const std::string& executing_msg); + std::string tracker_limit_exceeded_str(); + std::string tracker_limit_exceeded_str(int64_t bytes); std::string debug_string() { std::stringstream msg; @@ -211,26 +168,11 @@ class MemTrackerLimiter final : public MemTracker { private: friend class ThreadMemTrackerMgr; - // Increases consumption of this tracker by 'bytes' only if will not exceeding limit. - // Returns true if the consumption was successfully updated. - WARN_UNUSED_RESULT - bool try_consume(int64_t bytes, std::string& failed_msg, bool& is_process_exceed); - // When the accumulated untracked memory value exceeds the upper limit, // the current value is returned and set to 0. // Thread safety. int64_t add_untracked_mem(int64_t bytes); - static std::string tracker_limit_exceeded_errmsg_str(int64_t bytes, - MemTrackerLimiter* exceed_tracker) { - return fmt::format( - "failed alloc size {}, exceeded tracker:<{}>, limit {}, peak " - "used {}, current used {}", - print_bytes(bytes), exceed_tracker->label(), print_bytes(exceed_tracker->limit()), - print_bytes(exceed_tracker->_consumption->peak_value()), - print_bytes(exceed_tracker->_consumption->current_value())); - } - private: Type _type; @@ -267,46 +209,12 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) { consume(consume_bytes); } -inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg, - bool& is_process_exceed) { - if (bytes <= 0) { - release(-bytes); - failed_msg = std::string(); - return true; - } - - if (config::memory_debug && bytes > 1073741824) { // 1G - print_log_process_usage(fmt::format("Alloc Large Memory, Try Alloc: {}", bytes)); - } - - if (sys_mem_exceed_limit_check(bytes)) { - failed_msg = process_limit_exceeded_errmsg_str(bytes); - is_process_exceed = true; - return false; - } - - if (_limit < 0 || (is_overcommit_tracker() && config::enable_query_memroy_overcommit)) { - _consumption->add(bytes); // No limit at this tracker. - } else { - if (!_consumption->try_add(bytes, _limit)) { - failed_msg = tracker_limit_exceeded_errmsg_str(bytes, this); - is_process_exceed = false; - return false; - } - } - failed_msg = std::string(); - return true; -} - inline Status MemTrackerLimiter::check_limit(int64_t bytes) { - if (sys_mem_exceed_limit_check(bytes)) { - return Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes)); - } if (bytes <= 0 || (is_overcommit_tracker() && config::enable_query_memroy_overcommit)) { return Status::OK(); } if (_limit > 0 && _consumption->current_value() + bytes > _limit) { - return Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this)); + return Status::MemoryLimitExceeded(tracker_limit_exceeded_str(bytes)); } return Status::OK(); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 6057ce03257506..b166163227fd00 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -30,38 +30,25 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr& mem_tracker, const TUniqueId& fragment_instance_id) { DCHECK(mem_tracker); - flush_untracked_mem(); + flush_untracked_mem(); _fragment_instance_id = fragment_instance_id; _limiter_tracker = mem_tracker; _limiter_tracker_raw = mem_tracker.get(); - _check_limit = true; + _wait_gc = true; } void ThreadMemTrackerMgr::detach_limiter_tracker( const std::shared_ptr& old_mem_tracker) { - flush_untracked_mem(); + flush_untracked_mem(); _fragment_instance_id = TUniqueId(); _limiter_tracker = old_mem_tracker; _limiter_tracker_raw = old_mem_tracker.get(); + _wait_gc = false; } void ThreadMemTrackerMgr::cancel_fragment(const std::string& exceed_msg) { - if (_check_limit) { - ExecEnv::GetInstance()->fragment_mgr()->cancel( - _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg); - } - _check_limit = false; // Make sure it will only be canceled once -} - -void ThreadMemTrackerMgr::exceeded(int64_t size) { - if (_cb_func != nullptr) { - _cb_func(); - } - _limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg); - - if (is_attach_query()) { - cancel_fragment(_exceed_mem_limit_msg); - } + ExecEnv::GetInstance()->fragment_mgr()->cancel( + _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg); } } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 6f37da9f547a50..3665ec3d5319a0 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -21,13 +21,12 @@ #include #include "gutil/macros.h" +#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" namespace doris { -using ExceedCallBack = void (*)(); - // Memory Hook is counted in the memory tracker of the current thread. class ThreadMemTrackerMgr { public: @@ -35,7 +34,7 @@ class ThreadMemTrackerMgr { ~ThreadMemTrackerMgr() { // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once. - if (_init) flush_untracked_mem(); + if (_init) flush_untracked_mem(); } void init(); @@ -59,26 +58,16 @@ class ThreadMemTrackerMgr { } int64_t stop_count_scope_mem() { - flush_untracked_mem(); + flush_untracked_mem(); _count_scope_mem = false; return _scope_mem; } - void set_exceed_call_back(ExceedCallBack cb_func) { _cb_func = cb_func; } - // Note that, If call the memory allocation operation in Memory Hook, // such as calling LOG/iostream/sstream/stringstream/etc. related methods, // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, // Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded. void consume(int64_t size); - // If the memory exceeds the limit, return false, and will not consume mem tracker. - bool try_consume(int64_t size); - - // Force is equal to false. When the memory exceeds the limit,this alloc will be terminated and false - // will be returned. - // Force is equal to true, even if the memory is found to be overrun, continue to consume mem tracker, - // because this time alloc will still actually allocate memory, and always return true. - template bool flush_untracked_mem(); bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } @@ -92,9 +81,8 @@ class ThreadMemTrackerMgr { return _limiter_tracker_raw; } - bool check_limit() { return _check_limit; } - void set_check_limit(bool check_limit) { _check_limit = check_limit; } std::string exceed_mem_limit_msg() { return _exceed_mem_limit_msg; } + void save_exceed_mem_limit_msg(const std::string& msg) { _exceed_mem_limit_msg = msg; } void clear_exceed_mem_limit_msg() { _exceed_mem_limit_msg = ""; } void disable_wait_gc() { _wait_gc = false; } bool wait_gc() { return _wait_gc; } @@ -112,14 +100,6 @@ class ThreadMemTrackerMgr { fmt::to_string(consumer_tracker_buf)); } -private: - void exceeded(int64_t size); - - void save_exceed_mem_limit_msg() { - _exceed_mem_limit_msg = _limiter_tracker_raw->mem_limit_exceeded( - fmt::format("execute:<{}>", last_consumer_tracker()), _failed_consume_msg); - } - private: // is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized bool _init = false; @@ -132,19 +112,17 @@ class ThreadMemTrackerMgr { std::string _failed_consume_msg = std::string(); std::string _exceed_mem_limit_msg = std::string(); - bool _is_process_exceed = false; - bool _wait_gc = true; + // If true, the Allocator will wait for the GC to free memory if it finds that the memory exceed limit. + // A thread of query/load will only wait once during execution. + bool _wait_gc = false; std::shared_ptr _limiter_tracker; MemTrackerLimiter* _limiter_tracker_raw = nullptr; std::vector _consumer_tracker_stack; - // If true, call memtracker try_consume, otherwise call consume. - bool _check_limit = false; // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; TUniqueId _fragment_instance_id = TUniqueId(); - ExceedCallBack _cb_func = nullptr; }; inline void ThreadMemTrackerMgr::init() { @@ -154,7 +132,7 @@ inline void ThreadMemTrackerMgr::init() { DCHECK(_limiter_tracker == nullptr); _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker(); _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); - _check_limit = true; + _wait_gc = true; _init = true; } } @@ -184,48 +162,38 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) { if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes || _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) && !_stop_consume && ExecEnv::GetInstance()->initialized()) { - if (_check_limit) { - flush_untracked_mem(); - } else { - flush_untracked_mem(); - } + flush_untracked_mem(); } -} - -inline bool ThreadMemTrackerMgr::try_consume(int64_t size) { - _untracked_mem += size; - if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes || - _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) && - !_stop_consume && ExecEnv::GetInstance()->initialized()) { - if (_check_limit) { - return flush_untracked_mem(); - } else { - return flush_untracked_mem(); + // Large memory alloc should use allocator.h + // Direct malloc or new large memory, unable to catch std::bad_alloc, BE may OOM. + if (size > 4294967296) { // 4G + _stop_consume = true; + LOG(WARNING) << fmt::format("MemHook alloc large memory: {}.", size); + if (config::memory_debug) { + doris::MemTrackerLimiter::print_log_process_usage( + fmt::format("MemHook alloc large memory: {}.", size)); } + _stop_consume = false; } - return true; } -template -bool ThreadMemTrackerMgr::flush_untracked_mem() { +inline bool ThreadMemTrackerMgr::flush_untracked_mem() { // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the Memory Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; init(); DCHECK(_limiter_tracker_raw); + if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(_untracked_mem)) { + LOG(WARNING) << fmt::format( + "MemHook alloc:{} failed, not enough system memory, consuming tracker:<{}>, exec " + "node:<{}>, {}.", + _untracked_mem, _limiter_tracker_raw->label(), last_consumer_tracker(), + doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str(_untracked_mem)); + } + old_untracked_mem = _untracked_mem; if (_count_scope_mem) _scope_mem += _untracked_mem; - if (CheckLimit) { - if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _failed_consume_msg, - _is_process_exceed)) { - if (Force) _limiter_tracker_raw->consume(old_untracked_mem); - save_exceed_mem_limit_msg(); - exceeded(old_untracked_mem); - if (!Force) return false; - } - } else { - _limiter_tracker_raw->consume(old_untracked_mem); - } + _limiter_tracker_raw->consume(old_untracked_mem); for (auto tracker : _consumer_tracker_stack) { tracker->consume(old_untracked_mem); } diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h index 496c7db4ee5c3f..d261aa4582dd1d 100644 --- a/be/src/runtime/primitive_type.h +++ b/be/src/runtime/primitive_type.h @@ -19,6 +19,7 @@ #include +#include "gen_cpp/PaloInternalService_types.h" #include "runtime/define_primitive_type.h" #include "vec/columns/column_decimal.h" #include "vec/columns/columns_number.h" diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 9cad1a1f68ebbe..052daa75154478 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -271,9 +271,17 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) { Status RuntimeState::check_query_state(const std::string& msg) { // TODO: it would be nice if this also checked for cancellation, but doing so breaks // cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached. + // + // If the thread MemTrackerLimiter exceeds the limit, an error status is returned. + // Usually used after SCOPED_ATTACH_TASK, during query execution. if (thread_context()->thread_mem_tracker()->limit_exceeded() && !config::enable_query_memroy_overcommit) { - RETURN_LIMIT_EXCEEDED(this, msg); + auto failed_msg = thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str( + thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str(), + thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), msg); + thread_context()->thread_mem_tracker()->print_log_usage(failed_msg); + log_error(failed_msg); + return Status::MemoryLimitExceeded(failed_msg); } return query_status(); } diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index e5d0c03718f70b..3a29ddd6206294 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index c4ab4800dbe375..9e0fbf50f4e731 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -19,7 +19,6 @@ #include "common/signal_handler.h" #include "runtime/runtime_state.h" -#include "util/doris_metrics.h" namespace doris { @@ -30,15 +29,6 @@ ThreadContextPtr::ThreadContextPtr() { init = true; } -ScopeMemCount::ScopeMemCount(int64_t* scope_mem) { - _scope_mem = scope_mem; - thread_context()->thread_mem_tracker_mgr->start_count_scope_mem(); -} - -ScopeMemCount::~ScopeMemCount() { - *_scope_mem += thread_context()->thread_mem_tracker_mgr->stop_count_scope_mem(); -} - AttachTask::AttachTask(const std::shared_ptr& mem_tracker, const std::string& task_id, const TUniqueId& fragment_instance_id) { thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker); @@ -59,16 +49,6 @@ AttachTask::~AttachTask() { #endif // NDEBUG } -SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter( - const std::shared_ptr& mem_tracker) { - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId()); -} - -SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() { - thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); -} - AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { if (mem_tracker) _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index e4a707d2be426d..c86691a0620d09 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -23,7 +23,7 @@ #include #include "common/logging.h" -#include "gen_cpp/PaloInternalService_types.h" // for TQueryType +// #include "gen_cpp/PaloInternalService_types.h" // for TQueryType #include "gutil/macros.h" #include "runtime/memory/thread_mem_tracker_mgr.h" #include "runtime/threadlocal.h" @@ -35,6 +35,7 @@ // Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx; xxx; } #define SCOPED_MEM_COUNT(scope_mem) \ auto VARNAME_LINENUM(scope_mem_count) = doris::ScopeMemCount(scope_mem) + // Count a code segment memory (memory malloc - memory free) to MemTracker. // Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe. // Usage example: std::unique_ptr tracker = std::make_unique("first_tracker"); @@ -56,29 +57,15 @@ // Usage is similar to SCOPED_CONSUME_MEM_TRACKER. #define SCOPED_ATTACH_TASK(arg1, ...) \ auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__) + // Switch MemTrackerLimiter for count memory during thread execution. // Usually used after SCOPED_ATTACH_TASK, in order to count the memory of the specified code segment into another // MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task. #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) \ auto VARNAME_LINENUM(switch_mem_tracker) = SwitchThreadMemTrackerLimiter(mem_tracker_limiter) -// If you don't want to cancel query after thread MemTrackerLimiter exceed limit in a code segment, then use it. -// Usually used after SCOPED_ATTACH_TASK. -#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \ - auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit() -// If the thread MemTrackerLimiter exceeds the limit, an error status is returned. -// Usually used after SCOPED_ATTACH_TASK, during query execution. -#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ - return doris::thread_context()->thread_mem_tracker()->fragment_mem_limit_exceeded( \ - state, \ - fmt::format("exec node:<{}>, {}", \ - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), \ - msg), \ - ##__VA_ARGS__); #else #define SCOPED_ATTACH_TASK(arg1, ...) (void)0 #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) (void)0 -#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() (void)0 -#define RETURN_LIMIT_EXCEEDED(state, msg, ...) (void)0 #endif namespace doris { @@ -234,9 +221,14 @@ static ThreadContext* thread_context() { class ScopeMemCount { public: - explicit ScopeMemCount(int64_t* scope_mem); + explicit ScopeMemCount(int64_t* scope_mem) { + _scope_mem = scope_mem; + thread_context()->thread_mem_tracker_mgr->start_count_scope_mem(); + } - ~ScopeMemCount(); + ~ScopeMemCount() { + *_scope_mem += thread_context()->thread_mem_tracker_mgr->stop_count_scope_mem(); + } private: int64_t* _scope_mem; @@ -255,9 +247,14 @@ class AttachTask { class SwitchThreadMemTrackerLimiter { public: - explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr& mem_tracker); + explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr& mem_tracker) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId()); + } - ~SwitchThreadMemTrackerLimiter(); + ~SwitchThreadMemTrackerLimiter() { + thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); + } private: std::shared_ptr _old_mem_tracker; @@ -279,28 +276,11 @@ class AddThreadMemTrackerConsumer { bool _need_pop = false; }; -class StopCheckThreadMemTrackerLimit { -public: - explicit StopCheckThreadMemTrackerLimit() { - _pre = thread_context()->thread_mem_tracker_mgr->check_limit(); - thread_context()->thread_mem_tracker_mgr->set_check_limit(false); - } - - ~StopCheckThreadMemTrackerLimit() { - thread_context()->thread_mem_tracker_mgr->set_check_limit(_pre); - } - -private: - bool _pre; -}; - // Basic macros for mem tracker, usually do not need to be modified and used. #ifdef USE_MEM_TRACKER // For the memory that cannot be counted by mem hook, manually count it into the mem tracker, such as mmap. #define CONSUME_THREAD_MEM_TRACKER(size) \ doris::thread_context()->thread_mem_tracker_mgr->consume(size) -#define TRY_CONSUME_THREAD_MEM_TRACKER(size) \ - doris::thread_context()->thread_mem_tracker_mgr->try_consume(size) #define RELEASE_THREAD_MEM_TRACKER(size) \ doris::thread_context()->thread_mem_tracker_mgr->consume(-size) @@ -354,22 +334,6 @@ class StopCheckThreadMemTrackerLimit { doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \ } \ } while (0) -// NOTE, The LOG cannot be printed in the mem hook. If the LOG statement triggers the mem hook LOG, -// the nested LOG may cause an unknown crash. -#define TRY_CONSUME_MEM_TRACKER(size, fail_ret) \ - do { \ - if (doris::thread_context_ptr.init) { \ - if (doris::enable_thread_catch_bad_alloc) { \ - if (!doris::thread_context()->thread_mem_tracker_mgr->try_consume(size)) { \ - return fail_ret; \ - } \ - } else { \ - doris::thread_context()->thread_mem_tracker_mgr->consume(size); \ - } \ - } else if (doris::ExecEnv::GetInstance()->initialized()) { \ - doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \ - } \ - } while (0) #define RELEASE_MEM_TRACKER(size) \ do { \ if (doris::thread_context_ptr.init) { \ @@ -381,12 +345,10 @@ class StopCheckThreadMemTrackerLimit { } while (0) #else #define CONSUME_THREAD_MEM_TRACKER(size) (void)0 -#define TRY_CONSUME_THREAD_MEM_TRACKER(size) true #define RELEASE_THREAD_MEM_TRACKER(size) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0 #define CONSUME_MEM_TRACKER(size) (void)0 -#define TRY_CONSUME_MEM_TRACKER(size, fail_ret) (void)0 #define RELEASE_MEM_TRACKER(size) (void)0 #define RETURN_IF_CATCH_BAD_ALLOC(stmt) (stmt) #endif diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index c278dae9a3872b..235dd170f3ba22 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -21,6 +21,8 @@ #include +#include "runtime/exec_env.h" +#include "runtime/thread_context.h" #include "service/brpc.h" namespace doris { diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index ce61f91af94ef9..cb9f9c2b262b58 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -73,6 +73,7 @@ set(VEC_FILES common/sort/vsort_exec_exprs.cpp common/string_utils/string_utils.cpp common/hex.cpp + common/allocator.cpp core/block.cpp core/block_spill_reader.cpp core/block_spill_writer.cpp diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp new file mode 100644 index 00000000000000..edfe657c6d38a2 --- /dev/null +++ b/be/src/vec/common/allocator.cpp @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/common/allocator.h" + +// Allocator is used by too many files. For compilation speed, put dependencies in `.cpp` as much as possible. +#include "runtime/thread_context.h" +#include "util/mem_info.h" + +template +void Allocator::sys_memory_check(size_t size) const { + if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { + // Only thread attach query, and has not completely waited for thread_wait_gc_max_milliseconds, + // will wait for gc, asynchronous cancel or throw bad::alloc. + // Otherwise, if the external catch, directly throw bad::alloc. + if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && + doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { + int64_t wait_milliseconds = doris::config::thread_wait_gc_max_milliseconds; + while (wait_milliseconds > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { + doris::MemInfo::refresh_interval_memory_growth += size; + break; + } + wait_milliseconds -= 100; + } + if (wait_milliseconds <= 0) { + // Make sure to completely wait thread_wait_gc_max_milliseconds only once. + doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); + auto err_msg = fmt::format( + "Allocator sys memory check failed: Cannot alloc:{}, consuming " + "tracker:<{}>, exec node:<{}>, {}.", + size, doris::thread_context()->thread_mem_tracker()->label(), + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), + doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str(size)); + // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. + if (!doris::enable_thread_catch_bad_alloc) { + doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); + } else { + doris::thread_context()->thread_mem_tracker_mgr->save_exceed_mem_limit_msg( + err_msg); + throw std::bad_alloc {}; + } + } + } else if (doris::enable_thread_catch_bad_alloc) { + auto err_msg = fmt::format( + "Allocator sys memory check failed: Cannot alloc:{}, consuming tracker:<{}>, " + "exec node:<{}>, {}.", + size, doris::thread_context()->thread_mem_tracker()->label(), + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), + doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str(size)); + doris::thread_context()->thread_mem_tracker_mgr->save_exceed_mem_limit_msg(err_msg); + throw std::bad_alloc {}; + } + } +} + +template +void Allocator::memory_tracker_check(size_t size) const { + auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size); + if (!st) { + doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); + auto err_msg = + doris::thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str( + st.to_string(), + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), + "Allocator mem tracker check failed"); + // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. + if (!doris::enable_thread_catch_bad_alloc) { + doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); + doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg); + } else { + doris::thread_context()->thread_mem_tracker_mgr->save_exceed_mem_limit_msg(err_msg); + throw std::bad_alloc {}; + } + } +} + +template +void Allocator::memory_check(size_t size) const { + sys_memory_check(size); + memory_tracker_check(size); +} + +template +void Allocator::consume_memory(size_t size) const { + CONSUME_THREAD_MEM_TRACKER(size); +} + +template +void Allocator::release_memory(size_t size) const { + RELEASE_THREAD_MEM_TRACKER(size); +} + +template +void Allocator::throw_bad_alloc(const std::string& err) const { + LOG(WARNING) << err; + if (!doris::enable_thread_catch_bad_alloc) + doris::MemTrackerLimiter::print_log_process_usage(err); + throw std::bad_alloc {}; +} + +template class Allocator; +template class Allocator; +template class Allocator; +template class Allocator; diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 7e72893a371a44..7ae7ff7082e2c6 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -29,7 +29,6 @@ #include "common/status.h" #include "runtime/memory/chunk.h" #include "runtime/memory/chunk_allocator.h" -#include "runtime/thread_context.h" #ifdef NDEBUG #define ALLOCATOR_ASLR 0 @@ -100,24 +99,6 @@ static constexpr size_t CHUNK_THRESHOLD = 1024; static constexpr size_t MMAP_MIN_ALIGNMENT = 4096; static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; -#define RETURN_BAD_ALLOC(err) \ - do { \ - LOG(WARNING) << err; \ - if (!doris::enable_thread_catch_bad_alloc) \ - doris::MemTrackerLimiter::print_log_process_usage(err); \ - throw std::bad_alloc {}; \ - } while (0) - -#define RETURN_BAD_ALLOC_IF_PRE_CATCH(err) \ - do { \ - LOG(WARNING) << err; \ - if (!doris::enable_thread_catch_bad_alloc) { \ - doris::MemTrackerLimiter::print_log_process_usage(err); \ - } else { \ - throw std::bad_alloc {}; \ - } \ - } while (0) - /** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena. * Also used in hash tables. * The interface is different from std::allocator @@ -131,45 +112,19 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; template class Allocator { public: - void sys_memory_check(size_t size) { - if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { - if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && - doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { - int64_t wait_milliseconds = doris::config::thread_wait_gc_max_milliseconds; - while (wait_milliseconds > 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { - doris::MemInfo::refresh_interval_memory_growth += size; - break; - } - wait_milliseconds -= 100; - } - if (wait_milliseconds <= 0) { - auto err_msg = fmt::format( - "Allocator Sys Memory Check Failed In Query/Load: Cannot alloc {}, " - "{}.", - size, - doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str(size)); - doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); - if (!doris::enable_thread_catch_bad_alloc) { - doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); - } else { - LOG(WARNING) << err_msg; - throw std::bad_alloc {}; - } - } - } else if (doris::enable_thread_catch_bad_alloc) { - LOG(WARNING) << fmt::format( - "Allocator Sys Memory Check Failed: Cannot alloc {}, {}.", size, - doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str(size)); - throw std::bad_alloc {}; - } - } - } + void sys_memory_check(size_t size) const; + void memory_tracker_check(size_t size) const; + // If sys memory or tracker exceeds the limit, but there is no external catch bad_alloc, + // alloc will continue to execute, so the consume memtracker is forced. + void memory_check(size_t size) const; + // Increases consumption of this tracker by 'bytes'. + void consume_memory(size_t size) const; + void release_memory(size_t size) const; + void throw_bad_alloc(const std::string& err) const; /// Allocate memory range. void* alloc(size_t size, size_t alignment = 0) { - sys_memory_check(size); + memory_check(size); void* buf; if (size >= MMAP_THRESHOLD) { @@ -179,24 +134,18 @@ class Allocator { "Too large alignment {}: more than page size when allocating {}.", alignment, size); - if (!TRY_CONSUME_THREAD_MEM_TRACKER(size)) { - RETURN_BAD_ALLOC_IF_PRE_CATCH( - fmt::format("Allocator Pre Catch: Cannot mmap {}.", size)); - // memory exceeds the limit, consume mem tracker fails, but there is no external catch bad_alloc, - // alloc will continue to execute, so the consume memtracker is forced. - CONSUME_THREAD_MEM_TRACKER(size); - } + consume_memory(size); buf = mmap(get_mmap_hint(), size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); if (MAP_FAILED == buf) { - RELEASE_THREAD_MEM_TRACKER(size); - RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot mmap {}.", size)); + release_memory(size); + throw_bad_alloc(fmt::format("Allocator: Cannot mmap {}.", size)); } /// No need for zero-fill, because mmap guarantees it. } else if (!doris::config::disable_chunk_allocator_in_vec && size >= CHUNK_THRESHOLD) { doris::Chunk chunk; if (!doris::ChunkAllocator::instance()->allocate_align(size, &chunk)) { - RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot allocate chunk {}.", size)); + throw_bad_alloc(fmt::format("Allocator: Cannot allocate chunk {}.", size)); } buf = chunk.data; if constexpr (clear_memory) memset(buf, 0, chunk.size); @@ -208,14 +157,14 @@ class Allocator { buf = ::malloc(size); if (nullptr == buf) { - RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot malloc {}.", size)); + throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size)); } } else { buf = nullptr; int res = posix_memalign(&buf, alignment, size); if (0 != res) { - RETURN_BAD_ALLOC( + throw_bad_alloc( fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } @@ -229,13 +178,9 @@ class Allocator { void free(void* buf, size_t size) { if (size >= MMAP_THRESHOLD) { if (0 != munmap(buf, size)) { - auto err = fmt::format("Allocator: Cannot munmap {}.", size); - LOG(ERROR) << err; - if (!doris::enable_thread_catch_bad_alloc) - doris::MemTrackerLimiter::print_log_process_usage(err); - throw std::bad_alloc {}; + throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); } else { - RELEASE_THREAD_MEM_TRACKER(size); + release_memory(size); } } else if (!doris::config::disable_chunk_allocator_in_vec && size >= CHUNK_THRESHOLD && ((size & (size - 1)) == 0)) { @@ -256,12 +201,12 @@ class Allocator { /// BTW, it's not possible to change alignment while doing realloc. } else if (old_size < CHUNK_THRESHOLD && new_size < CHUNK_THRESHOLD && alignment <= MALLOC_MIN_ALIGNMENT) { - sys_memory_check(new_size); + memory_check(new_size); /// Resize malloc'd memory region with no special alignment requirement. void* new_buf = ::realloc(buf, new_size); if (nullptr == new_buf) { - RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, - new_size)); + throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, + new_size)); } buf = new_buf; @@ -269,22 +214,16 @@ class Allocator { if (new_size > old_size) memset(reinterpret_cast(buf) + old_size, 0, new_size - old_size); } else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) { - sys_memory_check(new_size); + memory_check(new_size); /// Resize mmap'd memory region. - if (!TRY_CONSUME_THREAD_MEM_TRACKER(new_size - old_size)) { - RETURN_BAD_ALLOC_IF_PRE_CATCH(fmt::format( - "Allocator Pre Catch: Cannot mremap memory chunk from {} to {}.", old_size, - new_size)); - CONSUME_THREAD_MEM_TRACKER(new_size - old_size); - } - + consume_memory(new_size - old_size); // On apple and freebsd self-implemented mremap used (common/mremap.h) buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); if (MAP_FAILED == buf) { - RELEASE_THREAD_MEM_TRACKER(new_size - old_size); - RETURN_BAD_ALLOC(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.", - old_size, new_size)); + release_memory(new_size - old_size); + throw_bad_alloc(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.", + old_size, new_size)); } /// No need for zero-fill, because mmap guarantees it. @@ -296,7 +235,7 @@ class Allocator { memset(reinterpret_cast(buf) + old_size, 0, new_size - old_size); } } else { - sys_memory_check(new_size); + memory_check(new_size); // CHUNK_THRESHOLD <= old_size <= MMAP_THRESHOLD use system realloc is slow, use ChunkAllocator. // Big allocs that requires a copy. void* new_buf = alloc(new_size, alignment); diff --git a/be/src/vec/core/block_spill_reader.h b/be/src/vec/core/block_spill_reader.h index 48e9434c8ff9a1..5b46541f84533e 100644 --- a/be/src/vec/core/block_spill_reader.h +++ b/be/src/vec/core/block_spill_reader.h @@ -18,6 +18,7 @@ #pragma once #include "io/fs/file_reader.h" +#include "util/runtime_profile.h" #include "vec/core/block.h" namespace doris { diff --git a/be/src/vec/core/block_spill_writer.h b/be/src/vec/core/block_spill_writer.h index 817fb04b93fadd..dad8e836a7278f 100644 --- a/be/src/vec/core/block_spill_writer.h +++ b/be/src/vec/core/block_spill_writer.h @@ -18,6 +18,7 @@ #pragma once #include "io/fs/local_file_writer.h" +#include "util/runtime_profile.h" #include "vec/core/block.h" namespace doris { namespace vectorized { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 1d937c5fb8290f..95b6593e891180 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "runtime/descriptors.h" #include "util/lock.h" +#include "util/runtime_profile.h" #include "util/uid_util.h" #include "vec/core/block.h" @@ -37,6 +38,7 @@ namespace vectorized { class VScanner; class VScanNode; +class ScannerScheduler; // ScannerContext is responsible for recording the execution status // of a group of Scanners corresponding to a ScanNode. diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 6b78d64b6ae792..1b9049f2131b55 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -18,12 +18,15 @@ #pragma once #include "common/status.h" +#include "runtime/exec_env.h" #include "util/blocking_queue.hpp" #include "util/threadpool.h" #include "vec/exec/scan/scanner_context.h" namespace doris::vectorized { +class ScannerContext; + // Responsible for the scheduling and execution of all Scanners of a BE node. // ScannerScheduler has two types of thread pools: // 1. Scheduling thread pool diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp index 5f591991ffd454..a093ac64cfb337 100644 --- a/be/src/vec/functions/function_rpc.cpp +++ b/be/src/vec/functions/function_rpc.cpp @@ -24,6 +24,7 @@ #include "gen_cpp/Exprs_types.h" #include "json2pb/json_to_pb.h" #include "json2pb/pb_to_json.h" +#include "runtime/exec_env.h" namespace doris::vectorized { diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index c3bb910d705d7d..39d70ee4f1f824 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -103,9 +103,6 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block* block, bool* eos) void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { - // Avoid deadlock when calling SenderQueue::cancel() in tcmalloc hook, - // limit memory via DataStreamRecvr::exceeds_limit. - STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); { std::lock_guard l(_lock); if (_is_cancelled) { @@ -170,9 +167,6 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe } void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { - // Avoid deadlock when calling SenderQueue::cancel() in tcmalloc hook, - // limit memory via DataStreamRecvr::exceeds_limit. - STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); { std::unique_lock l(_lock); if (_is_cancelled || !block->rows()) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 66941dae48ba21..dd179ca29e00a9 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -221,10 +221,6 @@ class VDataStreamRecvr::PipSenderQueue : public SenderQueue { } void add_block(Block* block, bool use_move) override { - // Avoid deadlock when calling SenderQueue::cancel() in tcmalloc hook, - // limit memory via DataStreamRecvr::exceeds_limit. - STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); - if (_is_cancelled || !block->rows()) { return; } diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index b5840b22fd70da..bd49319b8fc6d8 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -86,9 +86,6 @@ Status VResultSink::open(RuntimeState* state) { Status VResultSink::send(RuntimeState* state, Block* block, bool eos) { INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VResultSink::send"); - // The memory consumption in the process of sending the results is not check query memory limit. - // Avoid the query being cancelled when the memory limit is reached after the query result comes out. - STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); return _writer->append_block(*block); }