From 0a154d46ba5961893de00673f099c911f8c4f489 Mon Sep 17 00:00:00 2001 From: yangzhg Date: Mon, 1 Nov 2021 14:35:37 +0800 Subject: [PATCH] dd a method to get doris current memory usage Add all memory usage check when TryConsume memory --- be/src/exec/hash_table.cpp | 4 +- be/src/exec/partitioned_hash_table.cc | 4 +- be/src/runtime/buffered_block_mgr2.cc | 23 ++++-- .../runtime/bufferpool/reservation_tracker.cc | 4 +- be/src/runtime/exec_env_init.cpp | 76 ++++++++++--------- be/src/runtime/mem_pool.cpp | 4 +- be/src/runtime/mem_tracker.h | 27 ++++--- be/src/service/doris_main.cpp | 4 + be/src/util/mem_info.cpp | 13 +++- be/src/util/mem_info.h | 18 ++++- be/src/util/parse_util.cpp | 6 +- be/src/util/parse_util.h | 4 +- be/src/util/system_metrics.cpp | 11 +-- be/test/exec/hash_table_test.cpp | 8 +- be/test/olap/column_reader_test.cpp | 1 + be/test/olap/schema_change_test.cpp | 1 + be/test/runtime/mem_limit_test.cpp | 11 ++- be/test/util/parse_util_test.cpp | 13 ++-- 18 files changed, 147 insertions(+), 85 deletions(-) diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp index 9275b18b5b7985..4aabbc58ec286d 100644 --- a/be/src/exec/hash_table.cpp +++ b/be/src/exec/hash_table.cpp @@ -181,7 +181,9 @@ void HashTable::resize_buckets(int64_t num_buckets) { int64_t old_num_buckets = _num_buckets; int64_t delta_bytes = (num_buckets - old_num_buckets) * sizeof(Bucket); - if (!_mem_tracker->TryConsume(delta_bytes)) { + Status st = _mem_tracker->TryConsume(delta_bytes); + WARN_IF_ERROR(st, "resize bucket failed"); + if (!st) { mem_limit_exceeded(delta_bytes); return; } diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc index 0f6092e05b5370..b8cbdaab631b3c 100644 --- a/be/src/exec/partitioned_hash_table.cc +++ b/be/src/exec/partitioned_hash_table.cc @@ -310,7 +310,9 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state, MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_)); int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_); - if (UNLIKELY(!tracker->TryConsume(mem_usage))) { + Status st = tracker->TryConsume(mem_usage); + WARN_IF_ERROR(st, "PartitionedHashTableCtx::ExprValuesCache failed"); + if (UNLIKELY(!st)) { capacity_ = 0; string details = Substitute( "PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes.", mem_usage); diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc index 64317c053bafb3..92edcdcabe5c3e 100644 --- a/be/src/runtime/buffered_block_mgr2.cc +++ b/be/src/runtime/buffered_block_mgr2.cc @@ -324,8 +324,9 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) { } int buffers_needed = BitUtil::ceil(size, max_block_size()); unique_lock lock(_lock); - - if (size < max_block_size() && _mem_tracker->TryConsume(size)) { + Status st = _mem_tracker->TryConsume(size); + WARN_IF_ERROR(st, "consume failed"); + if (size < max_block_size() && st) { // For small allocations (less than a block size), just let the allocation through. client->_tracker->ConsumeLocal(size, client->_query_tracker.get()); // client->_tracker->Consume(size); @@ -335,8 +336,9 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) { if (available_buffers(client) + client->_num_tmp_reserved_buffers < buffers_needed) { return false; } - - if (_mem_tracker->TryConsume(size)) { + st = _mem_tracker->TryConsume(size); + WARN_IF_ERROR(st, "consume failed"); + if (st) { // There was still unallocated memory, don't need to recycle allocated blocks. client->_tracker->ConsumeLocal(size, client->_query_tracker.get()); // client->_tracker->Consume(size); @@ -393,7 +395,9 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) { DCHECK_GE(buffers_acquired * max_block_size(), size); _mem_tracker->Release(buffers_acquired * max_block_size()); - if (!_mem_tracker->TryConsume(size)) { + st = _mem_tracker->TryConsume(size); + WARN_IF_ERROR(st, "consume failed"); + if (!st) { return false; } client->_tracker->ConsumeLocal(size, client->_query_tracker.get()); @@ -465,7 +469,9 @@ Status BufferedBlockMgr2::get_new_block(Client* client, Block* unpin_block, Bloc if (len > 0 && len < _max_block_size) { DCHECK(unpin_block == nullptr); - if (client->_tracker->TryConsume(len)) { + Status st = client->_tracker->TryConsume(len); + WARN_IF_ERROR(st, "get_new_block failed"); + if (st) { // TODO: Have a cache of unused blocks of size 'len' (0, _max_block_size) uint8_t* buffer = new uint8_t[len]; // Descriptors for non-I/O sized buffers are deleted when the block is deleted. @@ -1088,9 +1094,10 @@ Status BufferedBlockMgr2::find_buffer_for_block(Block* block, bool* in_mem) { Status BufferedBlockMgr2::find_buffer(unique_lock& lock, BufferDescriptor** buffer_desc) { *buffer_desc = nullptr; + Status st = _mem_tracker->TryConsume(_max_block_size); + WARN_IF_ERROR(st, "try to allocate a new buffer failed"); // First, try to allocate a new buffer. - if (_free_io_buffers.size() < _block_write_threshold && - _mem_tracker->TryConsume(_max_block_size)) { + if (_free_io_buffers.size() < _block_write_threshold && st) { uint8_t* new_buffer = new uint8_t[_max_block_size]; *buffer_desc = _obj_pool.add(new BufferDescriptor(new_buffer, _max_block_size)); (*buffer_desc)->all_buffers_it = diff --git a/be/src/runtime/bufferpool/reservation_tracker.cc b/be/src/runtime/bufferpool/reservation_tracker.cc index 9f79a68b16dee2..4fa41d85e1b751 100644 --- a/be/src/runtime/bufferpool/reservation_tracker.cc +++ b/be/src/runtime/bufferpool/reservation_tracker.cc @@ -187,7 +187,9 @@ bool ReservationTracker::TryConsumeFromMemTracker(int64_t reservation_increase) if (GetParentMemTracker() == nullptr) { // At the topmost link, which may be a MemTracker with a limit, we need to use // TryConsume() to check the limit. - return mem_tracker_->TryConsume(reservation_increase); + Status st = mem_tracker_->TryConsume(reservation_increase); + WARN_IF_ERROR(st, "TryConsumeFromMemTracker failed"); + return st.ok(); } else { // For lower links, there shouldn't be a limit to enforce, so we just need to // update the consumption of the linked MemTracker since the reservation is diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index e7947fa701a03e..a79112a92e449e 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -70,8 +70,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::N DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(etl_thread_pool_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, "", - mem_consumption, Labels({{"type", "query"}})); +DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, "", mem_consumption, + Labels({{"type", "query"}})); Status ExecEnv::init(ExecEnv* env, const std::vector& store_paths) { return env->_init(store_paths); @@ -95,20 +95,20 @@ Status ExecEnv::_init(const std::vector& store_paths) { _pool_mem_trackers = new PoolMemTrackerRegistry(); _thread_mgr = new ThreadResourceMgr(); _scan_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size); + config::doris_scanner_thread_pool_queue_size); ThreadPoolBuilder("LimitedScanThreadPool") - .set_min_threads(1) + .set_min_threads(1) .set_max_threads(config::doris_scanner_thread_pool_thread_num) .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) - .build(&_limited_scan_thread_pool); + .build(&_limited_scan_thread_pool); ThreadPoolBuilder("SendBatchThreadPool") .set_min_threads(1) .set_max_threads(config::send_batch_thread_pool_thread_num) .set_max_queue_size(config::send_batch_thread_pool_queue_size) .build(&_send_batch_thread_pool); - + _etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size, config::etl_thread_pool_queue_size); _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups); @@ -158,26 +158,28 @@ Status ExecEnv::_init_mem_tracker() { int64_t global_memory_limit_bytes = 0; bool is_percent = false; std::stringstream ss; - global_memory_limit_bytes = ParseUtil::parse_mem_spec(config::mem_limit, -1, &is_percent); + global_memory_limit_bytes = + ParseUtil::parse_mem_spec(config::mem_limit, -1, MemInfo::physical_mem(), &is_percent); if (global_memory_limit_bytes <= 0) { ss << "Failed to parse mem limit from '" + config::mem_limit + "'."; return Status::InternalError(ss.str()); } if (global_memory_limit_bytes > MemInfo::physical_mem()) { - LOG(WARNING) << "Memory limit " << PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES) + LOG(WARNING) << "Memory limit " + << PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES) << " exceeds physical memory of " << PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES) << ". Using physical memory instead"; global_memory_limit_bytes = MemInfo::physical_mem(); } - _mem_tracker = MemTracker::CreateTracker(global_memory_limit_bytes, "Process", MemTracker::GetRootTracker(), - false, false, MemTrackerLevel::OVERVIEW); - REGISTER_HOOK_METRIC(query_mem_consumption, [this]() { - return _mem_tracker->consumption(); - }); - LOG(INFO) << "Using global memory limit: " << PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES) - << ", origin config value: " << config::mem_limit; + _mem_tracker = MemTracker::CreateTracker(global_memory_limit_bytes, "Process", + MemTracker::GetRootTracker(), false, false, + MemTrackerLevel::OVERVIEW); + REGISTER_HOOK_METRIC(query_mem_consumption, [this]() { return _mem_tracker->consumption(); }); + LOG(INFO) << "Using global memory limit: " + << PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES) + << ", origin config value: " << config::mem_limit; // 2. init buffer pool if (!BitUtil::IsPowerOf2(config::min_buffer_size)) { @@ -185,7 +187,9 @@ Status ExecEnv::_init_mem_tracker() { return Status::InternalError(ss.str()); } - int64_t buffer_pool_limit = ParseUtil::parse_mem_spec(config::buffer_pool_limit, global_memory_limit_bytes, &is_percent); + int64_t buffer_pool_limit = + ParseUtil::parse_mem_spec(config::buffer_pool_limit, global_memory_limit_bytes, + MemInfo::physical_mem(), &is_percent); if (buffer_pool_limit <= 0) { ss << "Invalid config buffer_pool_limit value, must be a percentage or " "positive bytes value or percentage: " @@ -201,7 +205,8 @@ Status ExecEnv::_init_mem_tracker() { } int64_t clean_pages_limit = - ParseUtil::parse_mem_spec(config::buffer_pool_clean_pages_limit, buffer_pool_limit, &is_percent); + ParseUtil::parse_mem_spec(config::buffer_pool_clean_pages_limit, buffer_pool_limit, + MemInfo::physical_mem(), &is_percent); if (clean_pages_limit <= 0) { ss << "Invalid buffer_pool_clean_pages_limit value, must be a percentage or " "positive bytes value or percentage: " @@ -213,22 +218,25 @@ Status ExecEnv::_init_mem_tracker() { clean_pages_limit = clean_pages_limit / 2; } _init_buffer_pool(config::min_buffer_size, buffer_pool_limit, clean_pages_limit); - LOG(INFO) << "Buffer pool memory limit: " << PrettyPrinter::print(buffer_pool_limit, TUnit::BYTES) - << ", origin config value: " << config::buffer_pool_limit - << ". clean pages limit: " << PrettyPrinter::print(clean_pages_limit, TUnit::BYTES) - << ", origin config value: " << config::buffer_pool_clean_pages_limit; + LOG(INFO) << "Buffer pool memory limit: " + << PrettyPrinter::print(buffer_pool_limit, TUnit::BYTES) + << ", origin config value: " << config::buffer_pool_limit + << ". clean pages limit: " << PrettyPrinter::print(clean_pages_limit, TUnit::BYTES) + << ", origin config value: " << config::buffer_pool_clean_pages_limit; // 3. init storage page cache int64_t storage_cache_limit = - ParseUtil::parse_mem_spec(config::storage_page_cache_limit, global_memory_limit_bytes, &is_percent); + ParseUtil::parse_mem_spec(config::storage_page_cache_limit, global_memory_limit_bytes, + MemInfo::physical_mem(), &is_percent); while (!is_percent && storage_cache_limit > global_memory_limit_bytes / 2) { // Reason same as buffer_pool_limit storage_cache_limit = storage_cache_limit / 2; } int32_t index_page_cache_percentage = config::index_page_cache_percentage; StoragePageCache::create_global_cache(storage_cache_limit, index_page_cache_percentage); - LOG(INFO) << "Storage page cache memory limit: " << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES) - << ", origin config value: " << config::storage_page_cache_limit; + LOG(INFO) << "Storage page cache memory limit: " + << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES) + << ", origin config value: " << config::storage_page_cache_limit; SegmentLoader::create_global_instance(config::segment_cache_capacity); @@ -250,21 +258,17 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, int64_t capacity, } void ExecEnv::_register_metrics() { - REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size, [this]() { - return _scan_thread_pool->get_queue_size(); - }); + REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size, + [this]() { return _scan_thread_pool->get_queue_size(); }); - REGISTER_HOOK_METRIC(etl_thread_pool_queue_size, [this]() { - return _etl_thread_pool->get_queue_size(); - }); + REGISTER_HOOK_METRIC(etl_thread_pool_queue_size, + [this]() { return _etl_thread_pool->get_queue_size(); }); - REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, [this]() { - return _send_batch_thread_pool->num_threads(); - }); + REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, + [this]() { return _send_batch_thread_pool->num_threads(); }); - REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, [this]() { - return _send_batch_thread_pool->get_queue_size(); - }); + REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, + [this]() { return _send_batch_thread_pool->get_queue_size(); }); } void ExecEnv::_deregister_metrics() { diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 44829efe3d9aff..bcaaa27c448b25 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -116,7 +116,9 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) { chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); if (check_limits) { - if (!mem_tracker_->TryConsume(chunk_size)) return false; + Status st = mem_tracker_->TryConsume(chunk_size); + WARN_IF_ERROR(st, "try to allocate a new buffer failed"); + if (!st) return false; } else { mem_tracker_->Consume(chunk_size); } diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index 172ff20538875c..df15d617b7f9aa 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -30,6 +30,7 @@ #include "common/status.h" #include "gen_cpp/Types_types.h" // for TUniqueId +#include "util/mem_info.h" #include "util/metrics.h" #include "util/runtime_profile.h" #include "util/spinlock.h" @@ -166,11 +167,16 @@ class MemTracker : public std::enable_shared_from_this { /// other callers that may not tolerate allocation failures have a better chance /// of success. Returns true if the consumption was successfully updated. WARN_UNUSED_RESULT - bool TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) { + Status TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) { // DCHECK_GE(bytes, 0); if (bytes <= 0) { Release(-bytes); - return true; + return Status::OK(); + } + if (MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) { + return Status::MemoryLimitExceeded(fmt::format( + "{}: TryConsume failed, bytes={} process whole consumption={} mem limit={}", + label_, bytes, MemInfo::current_mem(), MemInfo::mem_limit())); } // if (UNLIKELY(bytes == 0)) return true; // if (UNLIKELY(bytes < 0)) return false; // needed in RELEASE, hits DCHECK in DEBUG @@ -189,26 +195,27 @@ class MemTracker : public std::enable_shared_from_this { while (true) { if (LIKELY(tracker->consumption_->try_add(bytes, limit))) break; - VLOG_RPC << "TryConsume failed, bytes=" << bytes - << " consumption=" << tracker->consumption_->current_value() - << " limit=" << limit << " attempting to GC"; if (UNLIKELY(tracker->GcMemory(limit - bytes))) { DCHECK_GE(i, 0); // Failed for this mem tracker. Roll back the ones that succeeded. for (int j = all_trackers_.size() - 1; j > i; --j) { all_trackers_[j]->consumption_->add(-bytes); } - return false; + return Status::MemoryLimitExceeded(fmt::format( + "{}: TryConsume failed, bytes={} consumption={} imit={} " + "attempting to GC", + tracker->label(), bytes, tracker->consumption_->current_value(), + limit)); } - VLOG_RPC << "GC succeeded, TryConsume bytes=" << bytes - << " consumption=" << tracker->consumption_->current_value() - << " limit=" << limit; + VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes + << " consumption=" << tracker->consumption_->current_value() + << " limit=" << limit; } } } // Everyone succeeded, return. DCHECK_EQ(i, -1); - return true; + return Status::OK(); } /// Decreases consumption of this tracker and its ancestors by 'bytes'. diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 430ac5e8c5a3e8..a5b4fdf39e7f51 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -264,6 +264,10 @@ int main(int argc, char** argv) { #if defined(LEAK_SANITIZER) __lsan_do_leak_check(); #endif + +#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) + doris::MemInfo::refresh_current_mem(); +#endif sleep(10); } http_service.stop(); diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index bdd32965f6786d..3a8c6f494b498b 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -25,8 +25,10 @@ #include #include +#include "common/config.h" #include "gutil/strings/split.h" #include "util/cgroup_util.h" +#include "util/parse_util.h" #include "util/pretty_printer.h" #include "util/string_parser.hpp" @@ -34,6 +36,8 @@ namespace doris { bool MemInfo::_s_initialized = false; int64_t MemInfo::_s_physical_mem = -1; +int64_t MemInfo::_s_mem_limit = -1; +size_t MemInfo::_s_current_mem = 0; void MemInfo::init() { // Read from /proc/meminfo @@ -79,8 +83,10 @@ void MemInfo::init() { LOG(WARNING) << "Could not determine amount of physical memory on this machine."; } - LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES); + bool is_percent = true; + _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent); + LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES); _s_initialized = true; } @@ -88,7 +94,10 @@ std::string MemInfo::debug_string() { DCHECK(_s_initialized); CGroupUtil util; std::stringstream stream; - stream << "Mem Info: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << std::endl; + stream << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) + << std::endl; + stream << "Memory Limt: " << PrettyPrinter::print(_s_mem_limit, TUnit::BYTES) << std::endl; + stream << "Current Usage: " << PrettyPrinter::print(_s_current_mem, TUnit::BYTES) << std::endl; stream << "CGroup Info: " << util.debug_string() << std::endl; return stream.str(); } diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 04aa17d07e330f..6ae8669f868d50 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -18,6 +18,8 @@ #ifndef DORIS_BE_SRC_COMMON_UTIL_MEM_INFO_H #define DORIS_BE_SRC_COMMON_UTIL_MEM_INFO_H +#include + #include #include "common/logging.h" @@ -33,16 +35,30 @@ class MemInfo { static void init(); // Get total physical memory in bytes (if has cgroups memory limits, return the limits). - static int64_t physical_mem() { + static inline int64_t physical_mem() { DCHECK(_s_initialized); return _s_physical_mem; } + static inline size_t current_mem() { return _s_current_mem; } + + static inline void refresh_current_mem() { + MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes", + &_s_current_mem); + } + + static inline int64_t mem_limit() { + DCHECK(_s_initialized); + return _s_mem_limit; + } + static std::string debug_string(); private: static bool _s_initialized; static int64_t _s_physical_mem; + static int64_t _s_mem_limit; + static size_t _s_current_mem; }; } // namespace doris diff --git a/be/src/util/parse_util.cpp b/be/src/util/parse_util.cpp index 3d0c8101d86a76..81e074c3acb0d7 100644 --- a/be/src/util/parse_util.cpp +++ b/be/src/util/parse_util.cpp @@ -17,12 +17,12 @@ #include "util/parse_util.h" -#include "util/mem_info.h" #include "util/string_parser.hpp" namespace doris { -int64_t ParseUtil::parse_mem_spec(const std::string& mem_spec_str, int64_t parent_limit, bool* is_percent) { +int64_t ParseUtil::parse_mem_spec(const std::string& mem_spec_str, int64_t parent_limit, + int64_t physical_mem, bool* is_percent) { if (mem_spec_str.empty()) { return 0; } @@ -82,7 +82,7 @@ int64_t ParseUtil::parse_mem_spec(const std::string& mem_spec_str, int64_t paren bytes = multiplier * limit_val; } else if (*is_percent) { if (parent_limit == -1) { - bytes = (static_cast(limit_val) / 100.0) * MemInfo::physical_mem(); + bytes = (static_cast(limit_val) / 100.0) * physical_mem; } else { bytes = (static_cast(limit_val) / 100.0) * parent_limit; } diff --git a/be/src/util/parse_util.h b/be/src/util/parse_util.h index 30f4dcd7787b30..f71ecb2ef30875 100644 --- a/be/src/util/parse_util.h +++ b/be/src/util/parse_util.h @@ -36,9 +36,9 @@ class ParseUtil { // Returns 0 if mem_spec_str is empty or '-1'. // Returns -1 if parsing failed. // if is_percent, return the percent of parent_limit. - // if parent_limit is -1, use MemInfo::physical_mem() as parent limit. + // if parent_limit is -1, use physical_mem as parent limit. static int64_t parse_mem_spec(const std::string& mem_spec_str, int64_t parent_limit, - bool* is_percent); + int64_t physical_mem, bool* is_percent); }; } // namespace doris diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index 56c84931be60b4..1cdb50a5638afc 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -17,7 +17,6 @@ #include "util/system_metrics.h" -#include #include #include @@ -25,6 +24,7 @@ #include "gutil/strings/split.h" // for string split #include "gutil/strtoint.h" // for atoi64 #include "util/doris_metrics.h" +#include "util/mem_info.h" namespace doris { @@ -313,14 +313,7 @@ void SystemMetrics::_install_memory_metrics(MetricEntity* entity) { } void SystemMetrics::_update_memory_metrics() { -#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) - LOG(INFO) << "Memory tracking is not available with address sanitizer builds."; -#else - size_t allocated_bytes = 0; - MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes", - &allocated_bytes); - _memory_metrics->memory_allocated_bytes->set_value(allocated_bytes); -#endif + _memory_metrics->memory_allocated_bytes->set_value(MemInfo::current_mem()); } void SystemMetrics::_install_disk_metrics(const std::set& disk_devices) { diff --git a/be/test/exec/hash_table_test.cpp b/be/test/exec/hash_table_test.cpp index cc0d22c38fc11a..3502d0b9d104bf 100644 --- a/be/test/exec/hash_table_test.cpp +++ b/be/test/exec/hash_table_test.cpp @@ -357,7 +357,7 @@ TEST_F(HashTableTest, GrowTableTest2) { int expected_size = 0; std::shared_ptr mem_tracker = - MemTracker::CreateTracker(1024 * 1024, "hash-table-grow2-tracker", _tracker); + MemTracker::CreateTracker(1024 * 1024 * 1024, "hash-table-grow2-tracker", _tracker); std::vector is_null_safe = {false}; int initial_seed = 1; int64_t num_buckets = 4; @@ -396,6 +396,12 @@ TEST_F(HashTableTest, GrowTableTest2) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } doris::CpuInfo::init(); + doris::MemInfo::init(); return RUN_ALL_TESTS(); } diff --git a/be/test/olap/column_reader_test.cpp b/be/test/olap/column_reader_test.cpp index ed0d13a0e5f5ee..bba88ae9674282 100644 --- a/be/test/olap/column_reader_test.cpp +++ b/be/test/olap/column_reader_test.cpp @@ -2905,6 +2905,7 @@ int main(int argc, char** argv) { fprintf(stderr, "error read config file. \n"); return -1; } + doris::MemInfo::init(); int ret = doris::OLAP_SUCCESS; testing::InitGoogleTest(&argc, argv); ret = RUN_ALL_TESTS(); diff --git a/be/test/olap/schema_change_test.cpp b/be/test/olap/schema_change_test.cpp index ee3df975daa997..24cb92cad3c3a4 100644 --- a/be/test/olap/schema_change_test.cpp +++ b/be/test/olap/schema_change_test.cpp @@ -955,6 +955,7 @@ int main(int argc, char** argv) { return -1; } doris::init_glog("be-test"); + doris::MemInfo::init(); int ret = doris::OLAP_SUCCESS; testing::InitGoogleTest(&argc, argv); ret = RUN_ALL_TESTS(); diff --git a/be/test/runtime/mem_limit_test.cpp b/be/test/runtime/mem_limit_test.cpp index 2e2797048d81dd..b2c4017ea1e47f 100644 --- a/be/test/runtime/mem_limit_test.cpp +++ b/be/test/runtime/mem_limit_test.cpp @@ -100,7 +100,7 @@ TEST(MemTestTest, TrackerHierarchyTryConsume) { auto c2 = MemTracker::CreateTracker(50, "c2", p); // everything below limits - bool consumption = c1->TryConsume(60); + bool consumption = c1->TryConsume(60).ok(); EXPECT_EQ(consumption, true); EXPECT_EQ(c1->consumption(), 60); EXPECT_FALSE(c1->LimitExceeded(MemLimit::HARD)); @@ -113,7 +113,7 @@ TEST(MemTestTest, TrackerHierarchyTryConsume) { EXPECT_FALSE(p->AnyLimitExceeded(MemLimit::HARD)); // p goes over limit - consumption = c2->TryConsume(50); + consumption = c2->TryConsume(50).ok(); EXPECT_EQ(consumption, false); EXPECT_EQ(c1->consumption(), 60); EXPECT_FALSE(c1->LimitExceeded(MemLimit::HARD)); @@ -137,7 +137,6 @@ TEST(MemTestTest, TrackerHierarchyTryConsume) { EXPECT_EQ(p->consumption(), 50); EXPECT_FALSE(p->LimitExceeded(MemLimit::HARD)); - c1->Release(40); c2->Release(10); } @@ -145,7 +144,13 @@ TEST(MemTestTest, TrackerHierarchyTryConsume) { } // end namespace doris int main(int argc, char** argv) { + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } doris::init_glog("be-test"); ::testing::InitGoogleTest(&argc, argv); + doris::MemInfo::init(); return RUN_ALL_TESTS(); } diff --git a/be/test/util/parse_util_test.cpp b/be/test/util/parse_util_test.cpp index e8a16ec0724c6b..511c6759148485 100644 --- a/be/test/util/parse_util_test.cpp +++ b/be/test/util/parse_util_test.cpp @@ -28,7 +28,8 @@ namespace doris { static void test_parse_mem_spec(const std::string& mem_spec_str, int64_t result) { bool is_percent = true; - int64_t bytes = ParseUtil::parse_mem_spec(mem_spec_str, -1, &is_percent); + int64_t bytes = + ParseUtil::parse_mem_spec(mem_spec_str, -1, MemInfo::_s_physical_mem, &is_percent); ASSERT_EQ(result, bytes); ASSERT_FALSE(is_percent); } @@ -52,24 +53,24 @@ TEST(TestParseMemSpec, Normal) { test_parse_mem_spec("128T", 128L * 1024 * 1024 * 1024 * 1024L); bool is_percent = false; - int64_t bytes = ParseUtil::parse_mem_spec("20%", -1, &is_percent); + int64_t bytes = ParseUtil::parse_mem_spec("20%", -1, MemInfo::_s_physical_mem, &is_percent); ASSERT_GT(bytes, 0); ASSERT_TRUE(is_percent); MemInfo::_s_physical_mem = 1000; is_percent = true; - bytes = ParseUtil::parse_mem_spec("0.1%", -1, &is_percent); + bytes = ParseUtil::parse_mem_spec("0.1%", -1, MemInfo::_s_physical_mem, &is_percent); ASSERT_EQ(bytes, 1); ASSERT_TRUE(is_percent); // test with parent limit is_percent = false; - bytes = ParseUtil::parse_mem_spec("1%", 1000, &is_percent); + bytes = ParseUtil::parse_mem_spec("1%", 1000, MemInfo::_s_physical_mem, &is_percent); ASSERT_TRUE(is_percent); ASSERT_EQ(10, bytes); is_percent = true; - bytes = ParseUtil::parse_mem_spec("1001", 1000, &is_percent); + bytes = ParseUtil::parse_mem_spec("1001", 1000, MemInfo::_s_physical_mem, &is_percent); ASSERT_FALSE(is_percent); ASSERT_EQ(1001, bytes); } @@ -91,7 +92,7 @@ TEST(TestParseMemSpec, Bad) { bad_values.push_back("%"); for (const auto& value : bad_values) { bool is_percent = false; - int64_t bytes = ParseUtil::parse_mem_spec(value, -1, &is_percent); + int64_t bytes = ParseUtil::parse_mem_spec(value, -1, MemInfo::_s_physical_mem, &is_percent); ASSERT_EQ(-1, bytes); } }