Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/exec/hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ void HashTable::resize_buckets(int64_t num_buckets) {

int64_t old_num_buckets = _num_buckets;
int64_t delta_bytes = (num_buckets - old_num_buckets) * sizeof(Bucket);
if (!_mem_tracker->TryConsume(delta_bytes)) {
Status st = _mem_tracker->TryConsume(delta_bytes);
WARN_IF_ERROR(st, "resize bucket failed");
if (!st) {
mem_limit_exceeded(delta_bytes);
return;
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/partitioned_hash_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state,
MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));

int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
if (UNLIKELY(!tracker->TryConsume(mem_usage))) {
Status st = tracker->TryConsume(mem_usage);
WARN_IF_ERROR(st, "PartitionedHashTableCtx::ExprValuesCache failed");
if (UNLIKELY(!st)) {
capacity_ = 0;
string details = Substitute(
"PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes.", mem_usage);
Expand Down
23 changes: 15 additions & 8 deletions be/src/runtime/buffered_block_mgr2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {
}
int buffers_needed = BitUtil::ceil(size, max_block_size());
unique_lock<mutex> lock(_lock);

if (size < max_block_size() && _mem_tracker->TryConsume(size)) {
Status st = _mem_tracker->TryConsume(size);
WARN_IF_ERROR(st, "consume failed");
if (size < max_block_size() && st) {
// For small allocations (less than a block size), just let the allocation through.
client->_tracker->ConsumeLocal(size, client->_query_tracker.get());
// client->_tracker->Consume(size);
Expand All @@ -335,8 +336,9 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {
if (available_buffers(client) + client->_num_tmp_reserved_buffers < buffers_needed) {
return false;
}

if (_mem_tracker->TryConsume(size)) {
st = _mem_tracker->TryConsume(size);
WARN_IF_ERROR(st, "consume failed");
if (st) {
// There was still unallocated memory, don't need to recycle allocated blocks.
client->_tracker->ConsumeLocal(size, client->_query_tracker.get());
// client->_tracker->Consume(size);
Expand Down Expand Up @@ -393,7 +395,9 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {

DCHECK_GE(buffers_acquired * max_block_size(), size);
_mem_tracker->Release(buffers_acquired * max_block_size());
if (!_mem_tracker->TryConsume(size)) {
st = _mem_tracker->TryConsume(size);
WARN_IF_ERROR(st, "consume failed");
if (!st) {
return false;
}
client->_tracker->ConsumeLocal(size, client->_query_tracker.get());
Expand Down Expand Up @@ -465,7 +469,9 @@ Status BufferedBlockMgr2::get_new_block(Client* client, Block* unpin_block, Bloc

if (len > 0 && len < _max_block_size) {
DCHECK(unpin_block == nullptr);
if (client->_tracker->TryConsume(len)) {
Status st = client->_tracker->TryConsume(len);
WARN_IF_ERROR(st, "get_new_block failed");
if (st) {
// TODO: Have a cache of unused blocks of size 'len' (0, _max_block_size)
uint8_t* buffer = new uint8_t[len];
// Descriptors for non-I/O sized buffers are deleted when the block is deleted.
Expand Down Expand Up @@ -1088,9 +1094,10 @@ Status BufferedBlockMgr2::find_buffer_for_block(Block* block, bool* in_mem) {
Status BufferedBlockMgr2::find_buffer(unique_lock<mutex>& lock, BufferDescriptor** buffer_desc) {
*buffer_desc = nullptr;

Status st = _mem_tracker->TryConsume(_max_block_size);
WARN_IF_ERROR(st, "try to allocate a new buffer failed");
// First, try to allocate a new buffer.
if (_free_io_buffers.size() < _block_write_threshold &&
_mem_tracker->TryConsume(_max_block_size)) {
if (_free_io_buffers.size() < _block_write_threshold && st) {
uint8_t* new_buffer = new uint8_t[_max_block_size];
*buffer_desc = _obj_pool.add(new BufferDescriptor(new_buffer, _max_block_size));
(*buffer_desc)->all_buffers_it =
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/bufferpool/reservation_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ bool ReservationTracker::TryConsumeFromMemTracker(int64_t reservation_increase)
if (GetParentMemTracker() == nullptr) {
// At the topmost link, which may be a MemTracker with a limit, we need to use
// TryConsume() to check the limit.
return mem_tracker_->TryConsume(reservation_increase);
Status st = mem_tracker_->TryConsume(reservation_increase);
WARN_IF_ERROR(st, "TryConsumeFromMemTracker failed");
return st.ok();
} else {
// For lower links, there shouldn't be a limit to enforce, so we just need to
// update the consumption of the linked MemTracker since the reservation is
Expand Down
76 changes: 40 additions & 36 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::N
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(etl_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, "",
mem_consumption, Labels({{"type", "query"}}));
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, "", mem_consumption,
Labels({{"type", "query"}}));

Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) {
return env->_init(store_paths);
Expand All @@ -95,20 +95,20 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_pool_mem_trackers = new PoolMemTrackerRegistry();
_thread_mgr = new ThreadResourceMgr();
_scan_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size);
config::doris_scanner_thread_pool_queue_size);

ThreadPoolBuilder("LimitedScanThreadPool")
.set_min_threads(1)
.set_min_threads(1)
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
.build(&_limited_scan_thread_pool);
.build(&_limited_scan_thread_pool);

ThreadPoolBuilder("SendBatchThreadPool")
.set_min_threads(1)
.set_max_threads(config::send_batch_thread_pool_thread_num)
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
.build(&_send_batch_thread_pool);

_etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size,
config::etl_thread_pool_queue_size);
_cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
Expand Down Expand Up @@ -158,34 +158,38 @@ Status ExecEnv::_init_mem_tracker() {
int64_t global_memory_limit_bytes = 0;
bool is_percent = false;
std::stringstream ss;
global_memory_limit_bytes = ParseUtil::parse_mem_spec(config::mem_limit, -1, &is_percent);
global_memory_limit_bytes =
ParseUtil::parse_mem_spec(config::mem_limit, -1, MemInfo::physical_mem(), &is_percent);
if (global_memory_limit_bytes <= 0) {
ss << "Failed to parse mem limit from '" + config::mem_limit + "'.";
return Status::InternalError(ss.str());
}

if (global_memory_limit_bytes > MemInfo::physical_mem()) {
LOG(WARNING) << "Memory limit " << PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES)
LOG(WARNING) << "Memory limit "
<< PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES)
<< " exceeds physical memory of "
<< PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES)
<< ". Using physical memory instead";
global_memory_limit_bytes = MemInfo::physical_mem();
}
_mem_tracker = MemTracker::CreateTracker(global_memory_limit_bytes, "Process", MemTracker::GetRootTracker(),
false, false, MemTrackerLevel::OVERVIEW);
REGISTER_HOOK_METRIC(query_mem_consumption, [this]() {
return _mem_tracker->consumption();
});
LOG(INFO) << "Using global memory limit: " << PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES)
<< ", origin config value: " << config::mem_limit;
_mem_tracker = MemTracker::CreateTracker(global_memory_limit_bytes, "Process",
MemTracker::GetRootTracker(), false, false,
MemTrackerLevel::OVERVIEW);
REGISTER_HOOK_METRIC(query_mem_consumption, [this]() { return _mem_tracker->consumption(); });
LOG(INFO) << "Using global memory limit: "
<< PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES)
<< ", origin config value: " << config::mem_limit;

// 2. init buffer pool
if (!BitUtil::IsPowerOf2(config::min_buffer_size)) {
ss << "Config min_buffer_size must be a power-of-two: " << config::min_buffer_size;
return Status::InternalError(ss.str());
}

int64_t buffer_pool_limit = ParseUtil::parse_mem_spec(config::buffer_pool_limit, global_memory_limit_bytes, &is_percent);
int64_t buffer_pool_limit =
ParseUtil::parse_mem_spec(config::buffer_pool_limit, global_memory_limit_bytes,
MemInfo::physical_mem(), &is_percent);
if (buffer_pool_limit <= 0) {
ss << "Invalid config buffer_pool_limit value, must be a percentage or "
"positive bytes value or percentage: "
Expand All @@ -201,7 +205,8 @@ Status ExecEnv::_init_mem_tracker() {
}

int64_t clean_pages_limit =
ParseUtil::parse_mem_spec(config::buffer_pool_clean_pages_limit, buffer_pool_limit, &is_percent);
ParseUtil::parse_mem_spec(config::buffer_pool_clean_pages_limit, buffer_pool_limit,
MemInfo::physical_mem(), &is_percent);
if (clean_pages_limit <= 0) {
ss << "Invalid buffer_pool_clean_pages_limit value, must be a percentage or "
"positive bytes value or percentage: "
Expand All @@ -213,22 +218,25 @@ Status ExecEnv::_init_mem_tracker() {
clean_pages_limit = clean_pages_limit / 2;
}
_init_buffer_pool(config::min_buffer_size, buffer_pool_limit, clean_pages_limit);
LOG(INFO) << "Buffer pool memory limit: " << PrettyPrinter::print(buffer_pool_limit, TUnit::BYTES)
<< ", origin config value: " << config::buffer_pool_limit
<< ". clean pages limit: " << PrettyPrinter::print(clean_pages_limit, TUnit::BYTES)
<< ", origin config value: " << config::buffer_pool_clean_pages_limit;
LOG(INFO) << "Buffer pool memory limit: "
<< PrettyPrinter::print(buffer_pool_limit, TUnit::BYTES)
<< ", origin config value: " << config::buffer_pool_limit
<< ". clean pages limit: " << PrettyPrinter::print(clean_pages_limit, TUnit::BYTES)
<< ", origin config value: " << config::buffer_pool_clean_pages_limit;

// 3. init storage page cache
int64_t storage_cache_limit =
ParseUtil::parse_mem_spec(config::storage_page_cache_limit, global_memory_limit_bytes, &is_percent);
ParseUtil::parse_mem_spec(config::storage_page_cache_limit, global_memory_limit_bytes,
MemInfo::physical_mem(), &is_percent);
while (!is_percent && storage_cache_limit > global_memory_limit_bytes / 2) {
// Reason same as buffer_pool_limit
storage_cache_limit = storage_cache_limit / 2;
}
int32_t index_page_cache_percentage = config::index_page_cache_percentage;
StoragePageCache::create_global_cache(storage_cache_limit, index_page_cache_percentage);
LOG(INFO) << "Storage page cache memory limit: " << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES)
<< ", origin config value: " << config::storage_page_cache_limit;
LOG(INFO) << "Storage page cache memory limit: "
<< PrettyPrinter::print(storage_cache_limit, TUnit::BYTES)
<< ", origin config value: " << config::storage_page_cache_limit;

SegmentLoader::create_global_instance(config::segment_cache_capacity);

Expand All @@ -250,21 +258,17 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, int64_t capacity,
}

void ExecEnv::_register_metrics() {
REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size, [this]() {
return _scan_thread_pool->get_queue_size();
});
REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size,
[this]() { return _scan_thread_pool->get_queue_size(); });

REGISTER_HOOK_METRIC(etl_thread_pool_queue_size, [this]() {
return _etl_thread_pool->get_queue_size();
});
REGISTER_HOOK_METRIC(etl_thread_pool_queue_size,
[this]() { return _etl_thread_pool->get_queue_size(); });

REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, [this]() {
return _send_batch_thread_pool->num_threads();
});
REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
[this]() { return _send_batch_thread_pool->num_threads(); });

REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, [this]() {
return _send_batch_thread_pool->get_queue_size();
});
REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size,
[this]() { return _send_batch_thread_pool->get_queue_size(); });
}

void ExecEnv::_deregister_metrics() {
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/mem_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) {

chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
if (check_limits) {
if (!mem_tracker_->TryConsume(chunk_size)) return false;
Status st = mem_tracker_->TryConsume(chunk_size);
WARN_IF_ERROR(st, "try to allocate a new buffer failed");
if (!st) return false;
} else {
mem_tracker_->Consume(chunk_size);
}
Expand Down
27 changes: 17 additions & 10 deletions be/src/runtime/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "common/status.h"
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
Expand Down Expand Up @@ -166,11 +167,16 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
/// other callers that may not tolerate allocation failures have a better chance
/// of success. Returns true if the consumption was successfully updated.
WARN_UNUSED_RESULT
bool TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) {
Status TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) {
// DCHECK_GE(bytes, 0);
if (bytes <= 0) {
Release(-bytes);
return true;
return Status::OK();
}
if (MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) {
return Status::MemoryLimitExceeded(fmt::format(
"{}: TryConsume failed, bytes={} process whole consumption={} mem limit={}",
label_, bytes, MemInfo::current_mem(), MemInfo::mem_limit()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status::MemoryLimitExceeded not MemTracker::MemLimitExceeded

}
// if (UNLIKELY(bytes == 0)) return true;
// if (UNLIKELY(bytes < 0)) return false; // needed in RELEASE, hits DCHECK in DEBUG
Expand All @@ -189,26 +195,27 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
while (true) {
if (LIKELY(tracker->consumption_->try_add(bytes, limit))) break;

VLOG_RPC << "TryConsume failed, bytes=" << bytes
<< " consumption=" << tracker->consumption_->current_value()
<< " limit=" << limit << " attempting to GC";
if (UNLIKELY(tracker->GcMemory(limit - bytes))) {
DCHECK_GE(i, 0);
// Failed for this mem tracker. Roll back the ones that succeeded.
for (int j = all_trackers_.size() - 1; j > i; --j) {
all_trackers_[j]->consumption_->add(-bytes);
}
return false;
return Status::MemoryLimitExceeded(fmt::format(
"{}: TryConsume failed, bytes={} consumption={} imit={} "
"attempting to GC",
tracker->label(), bytes, tracker->consumption_->current_value(),
limit));
}
VLOG_RPC << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << tracker->consumption_->current_value()
<< " limit=" << limit;
VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << tracker->consumption_->current_value()
<< " limit=" << limit;
}
}
}
// Everyone succeeded, return.
DCHECK_EQ(i, -1);
return true;
return Status::OK();
}

/// Decreases consumption of this tracker and its ancestors by 'bytes'.
Expand Down
4 changes: 4 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ int main(int argc, char** argv) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
#endif

#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
doris::MemInfo::refresh_current_mem();
#endif
sleep(10);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10ms

}
http_service.stop();
Expand Down
13 changes: 11 additions & 2 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
#include <iostream>
#include <sstream>

#include "common/config.h"
#include "gutil/strings/split.h"
#include "util/cgroup_util.h"
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/string_parser.hpp"

namespace doris {

bool MemInfo::_s_initialized = false;
int64_t MemInfo::_s_physical_mem = -1;
int64_t MemInfo::_s_mem_limit = -1;
size_t MemInfo::_s_current_mem = 0;

void MemInfo::init() {
// Read from /proc/meminfo
Expand Down Expand Up @@ -79,16 +83,21 @@ void MemInfo::init() {
LOG(WARNING) << "Could not determine amount of physical memory on this machine.";
}

LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES);
bool is_percent = true;
_s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent);

LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES);
_s_initialized = true;
}

std::string MemInfo::debug_string() {
DCHECK(_s_initialized);
CGroupUtil util;
std::stringstream stream;
stream << "Mem Info: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << std::endl;
stream << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES)
<< std::endl;
stream << "Memory Limt: " << PrettyPrinter::print(_s_mem_limit, TUnit::BYTES) << std::endl;
stream << "Current Usage: " << PrettyPrinter::print(_s_current_mem, TUnit::BYTES) << std::endl;
stream << "CGroup Info: " << util.debug_string() << std::endl;
return stream.str();
}
Expand Down
Loading