Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,6 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "20");
// After minor gc, no minor gc during sleep, but full gc is possible.
DEFINE_mInt32(memory_gc_sleep_time_ms, "500");

// percent of (active memtables size / all memtables size) when reach hard limit
DEFINE_mInt32(memtable_hard_limit_active_percent, "50");

// percent of (active memtables size / all memtables size) when reach soft limit
DEFINE_mInt32(memtable_soft_limit_active_percent, "50");

// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "209715200");
// max buffer size used in memtable for the aggregated table, default 400MB
Expand Down
6 changes: 0 additions & 6 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,12 +648,6 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms);
// After minor gc, no minor gc during sleep, but full gc is possible.
DECLARE_mInt32(memory_gc_sleep_time_ms);

// percent of (active memtables size / all memtables size) when reach hard limit
DECLARE_mInt32(memtable_hard_limit_active_percent);

// percent of (active memtables size / all memtables size) when reach soft limit
DECLARE_mInt32(memtable_soft_limit_active_percent);

// max write buffer size before flush, default 200MB
DECLARE_mInt64(write_buffer_size);
// max buffer size used in memtable for the aggregated table, default 400MB
Expand Down
169 changes: 84 additions & 85 deletions be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,33 +75,41 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer
_writers.push_back(writer);
}

bool MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() {
int64_t MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() {
// reserve a small amount of memory so we do not trigger MinorGC
return doris::GlobalMemoryArbitrator::sys_mem_available() <
doris::MemInfo::sys_mem_available_warning_water_mark() +
config::memtable_limiter_reserved_memory_bytes;
return doris::MemInfo::sys_mem_available_warning_water_mark() -
doris::GlobalMemoryArbitrator::sys_mem_available() +
config::memtable_limiter_reserved_memory_bytes;
}

bool MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() {
int64_t MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() {
// reserve a small amount of memory so we do not trigger MinorGC
return GlobalMemoryArbitrator::process_memory_usage() >
MemInfo::soft_mem_limit() - config::memtable_limiter_reserved_memory_bytes;
return GlobalMemoryArbitrator::process_memory_usage() - MemInfo::soft_mem_limit() +
config::memtable_limiter_reserved_memory_bytes;
}

bool MemTableMemoryLimiter::_soft_limit_reached() {
return _mem_tracker->consumption() >= _load_soft_mem_limit || _hard_limit_reached();
return _mem_tracker->consumption() > _load_soft_mem_limit || _hard_limit_reached();
}

bool MemTableMemoryLimiter::_hard_limit_reached() {
return _mem_tracker->consumption() >= _load_hard_mem_limit ||
_sys_avail_mem_less_than_warning_water_mark() ||
_process_used_mem_more_than_soft_mem_limit();
return _mem_tracker->consumption() > _load_hard_mem_limit ||
_sys_avail_mem_less_than_warning_water_mark() > 0 ||
_process_used_mem_more_than_soft_mem_limit() > 0;
}

bool MemTableMemoryLimiter::_load_usage_low() {
return _mem_tracker->consumption() <= _load_safe_mem_permit;
}

int64_t MemTableMemoryLimiter::_need_flush() {
int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit;
int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark();
int64_t limit3 = _process_used_mem_more_than_soft_mem_limit();
int64_t need_flush = std::max(limit1, std::max(limit2, limit3));
return need_flush - _queue_mem_usage;
}

void MemTableMemoryLimiter::handle_memtable_flush() {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
Expand All @@ -112,34 +120,29 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
timer.start();
std::unique_lock<std::mutex> l(_lock);
g_memtable_memory_limit_waiting_threads << 1;
while (_hard_limit_reached()) {
LOG(INFO) << "reached memtable memory hard limit"
<< " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
if (_active_mem_usage >=
_write_mem_usage * config::memtable_hard_limit_active_percent / 100) {
_flush_active_memtables(_write_mem_usage / 20);
}
if (!_hard_limit_reached()) {
break;
bool first = true;
do {
if (!first) {
auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000));
if (st == std::cv_status::timeout) {
LOG(INFO) << "timeout when waiting for memory hard limit end, try again";
}
}
auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000));
if (st == std::cv_status::timeout) {
LOG(INFO) << "timeout when waiting for memory hard limit end, try again";
first = false;
int64_t need_flush = _need_flush();
if (need_flush > 0) {
auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft")
<< ", " << GlobalMemoryArbitrator::process_memory_used_details_str()
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
_flush_active_memtables(need_flush);
}
}
} while (_hard_limit_reached());
g_memtable_memory_limit_waiting_threads << -1;
if (_soft_limit_reached()) {
LOG(INFO) << "reached memtable memory soft limit"
<< " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
if (_active_mem_usage >=
_write_mem_usage * config::memtable_soft_limit_active_percent / 100) {
_flush_active_memtables(_write_mem_usage / 20);
}
}
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_memtable_memory_limit_latency_ms << time_ms;
Expand All @@ -155,50 +158,50 @@ void MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
if (_active_writers.size() == 0) {
return;
}

using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>;
auto cmp = [](WriterMem left, WriterMem right) { return left.second > right.second; };
std::priority_queue<WriterMem, std::vector<WriterMem>, decltype(cmp)> heap(cmp);

for (auto writer : _active_writers) {
auto w = writer.lock();
if (w == nullptr) {
continue;
}
heap.emplace(w, w->active_memtable_mem_consumption());
}

int64_t mem_flushed = 0;
int64_t num_flushed = 0;
int64_t avg_mem = _active_mem_usage / _active_writers.size();
for (auto writer : _active_writers) {
int64_t mem = _flush_memtable(writer, avg_mem);

while (mem_flushed < need_flush && !heap.empty()) {
auto [writer, sort_mem] = heap.top();
heap.pop();
auto w = writer.lock();
if (w == nullptr) {
continue;
}
int64_t mem = w->active_memtable_mem_consumption();
if (mem < sort_mem * 0.9) {
// if the memtable writer just got flushed, don't flush it again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will reach this code?

continue;
}
Status st = w->flush_async();
if (!st.ok()) {
auto err_msg = fmt::format(
"tablet writer failed to reduce mem consumption by flushing memtable, "
"tablet_id={}, err={}",
w->tablet_id(), st.to_string());
LOG(WARNING) << err_msg;
static_cast<void>(w->cancel_with_status(st));
}
mem_flushed += mem;
num_flushed += (mem > 0);
if (mem_flushed >= need_flush) {
break;
}
}
LOG(INFO) << "flushed " << num_flushed << " out of " << _active_writers.size()
<< " active writers, flushed size: " << PrettyPrinter::print_bytes(mem_flushed);
}

int64_t MemTableMemoryLimiter::_flush_memtable(std::weak_ptr<MemTableWriter> writer_to_flush,
int64_t threshold) {
auto writer = writer_to_flush.lock();
if (!writer) {
return 0;
}
auto mem_usage = writer->active_memtable_mem_consumption();
// if the memtable writer just got flushed, don't flush it again
if (mem_usage < threshold) {
VLOG_DEBUG << "flushing active memtables, active mem usage "
<< PrettyPrinter::print_bytes(mem_usage) << " is less than "
<< PrettyPrinter::print_bytes(threshold) << ", skipping";
return 0;
}
VLOG_DEBUG << "flushing active memtables, active mem usage "
<< PrettyPrinter::print_bytes(mem_usage);
Status st = writer->flush_async();
if (!st.ok()) {
auto err_msg = fmt::format(
"tablet writer failed to reduce mem consumption by flushing memtable, "
"tablet_id={}, err={}",
writer->tablet_id(), st.to_string());
LOG(WARNING) << err_msg;
static_cast<void>(writer->cancel_with_status(st));
return 0;
}
return mem_usage;
}

void MemTableMemoryLimiter::refresh_mem_tracker() {
std::lock_guard<std::mutex> l(_lock);
_refresh_mem_tracker();
Expand All @@ -219,21 +222,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {

_last_limit = limit;
_log_timer.reset();
// if not exist load task, this log should not be printed.
if (_mem_usage != 0) {
LOG(INFO) << fmt::format(
"{}, {}, load mem: {}, memtable writers num: {} (active: {}, write: {}, flush: {})",
ss.str(), GlobalMemoryArbitrator::process_memory_used_details_str(),
PrettyPrinter::print_bytes(_mem_tracker->consumption()), _writers.size(),
PrettyPrinter::print_bytes(_active_mem_usage),
PrettyPrinter::print_bytes(_write_mem_usage),
PrettyPrinter::print_bytes(_flush_mem_usage));
}
LOG(INFO) << ss.str() << ", " << GlobalMemoryArbitrator::process_memory_used_details_str()
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
}

void MemTableMemoryLimiter::_refresh_mem_tracker() {
_flush_mem_usage = 0;
_write_mem_usage = 0;
_queue_mem_usage = 0;
_active_mem_usage = 0;
_active_writers.clear();
for (auto it = _writers.begin(); it != _writers.end();) {
Expand All @@ -245,16 +244,16 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
_active_writers.push_back(writer);
}
_flush_mem_usage += writer->mem_consumption(MemType::FLUSH);
_write_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED);
_queue_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED);
++it;
} else {
*it = std::move(_writers.back());
_writers.pop_back();
}
}
_mem_usage = _flush_mem_usage + _write_mem_usage;
_mem_usage = _flush_mem_usage + _queue_mem_usage;
g_memtable_active_memory.set_value(_active_mem_usage);
g_memtable_write_memory.set_value(_write_mem_usage);
g_memtable_write_memory.set_value(_queue_mem_usage);
g_memtable_flush_memory.set_value(_flush_mem_usage);
g_memtable_load_memory.set_value(_mem_usage);
VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/memtable_memory_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,21 @@ class MemTableMemoryLimiter {
int64_t mem_usage() const { return _mem_usage; }

private:
static inline bool _sys_avail_mem_less_than_warning_water_mark();
static inline bool _process_used_mem_more_than_soft_mem_limit();
static inline int64_t _sys_avail_mem_less_than_warning_water_mark();
static inline int64_t _process_used_mem_more_than_soft_mem_limit();

bool _soft_limit_reached();
bool _hard_limit_reached();
bool _load_usage_low();
int64_t _need_flush();
void _flush_active_memtables(int64_t need_flush);
int64_t _flush_memtable(std::weak_ptr<MemTableWriter> writer_to_flush, int64_t threshold);
void _refresh_mem_tracker();

std::mutex _lock;
std::condition_variable _hard_limit_end_cond;
int64_t _mem_usage = 0;
int64_t _flush_mem_usage = 0;
int64_t _write_mem_usage = 0;
int64_t _queue_mem_usage = 0;
int64_t _active_mem_usage = 0;

// sum of all mem table memory.
Expand Down