Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,14 @@ CONF_Bool(track_new_delete, "true");

// If true, switch TLS MemTracker to count more detailed memory,
// including caches such as ExecNode operators and TabletManager.
CONF_Bool(memory_verbose_track, "true");
//
// At present, there is a performance problem in the frequent switch thread mem tracker.
// This is because the mem tracker exists as a shared_ptr in the thread local. Each time it is switched,
// the atomic variable use_count in the shared_ptr of the current tracker will be -1, and the tracker to be
// replaced use_count +1, multi-threading Frequent changes to the same tracker shared_ptr are slow.
// TODO: 1. Reduce unnecessary thread mem tracker switches,
// 2. Consider using raw pointers for mem tracker in thread local
CONF_Bool(memory_verbose_track, "false");

// Default level of MemTracker to show in web page
// now MemTracker support two level:
Expand Down
12 changes: 10 additions & 2 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ Status DeltaWriter::init() {
return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
}

_mem_tracker =
MemTracker::create_tracker(-1, "DeltaWriter:" + std::to_string(_tablet->tablet_id()));
// Only consume mem tracker manually in mem table. Using the virtual tracker can avoid
// frequent recursive consumption of the parent tracker, thereby improving performance.
_mem_tracker = MemTracker::create_virtual_tracker(
-1, "DeltaWriter:" + std::to_string(_tablet->tablet_id()));
// check tablet version number
if (_tablet->version_count() > config::max_tablet_version_num) {
LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count()
Expand Down Expand Up @@ -295,6 +297,11 @@ Status DeltaWriter::close_wait() {
// return error if previous flush failed
RETURN_NOT_OK(_flush_token->wait());

_mem_table.reset();
// In allocate/free of mem_pool, the consume_cache of _mem_tracker will be called,
// and _untracked_mem must be flushed first.
MemTracker::memory_leak_check(_mem_tracker.get());

// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
if (_cur_rowset == nullptr) {
Expand Down Expand Up @@ -327,6 +334,7 @@ Status DeltaWriter::cancel() {
// cancel and wait all memtables in flush queue to be finished
_flush_token->cancel();
}
MemTracker::memory_leak_check(_mem_tracker.get());
_is_cancelled = true;
return Status::OK();
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
MemTable::~MemTable() {
std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
_mem_tracker->release(_mem_usage);
_buffer_mem_pool->free_all();
_table_mem_pool->free_all();
MemTracker::memory_leak_check(_mem_tracker.get(), true);
}

MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {}
Expand Down
13 changes: 12 additions & 1 deletion be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,18 @@ Status FlushToken::wait() {
}

void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable, int64_t submit_task_time) {
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, memtable->mem_tracker());
#ifndef BE_TEST
// The memtable mem tracker needs to be completely accurate,
// because DeltaWriter judges whether to flush memtable according to the memtable memory usage.
// The macro of attach thread mem tracker is affected by the destructuring order of local variables,
// so it cannot completely correspond to the number of new/delete bytes recorded in scoped,
// and there is a small range of errors. So direct track load mem tracker.
// TODO(zxy) After rethinking the use of switch thread mem tracker, choose the appropriate way to get
// load mem tracke here.
// DCHECK(memtable->mem_tracker()->parent_task_mem_tracker_no_own());
// SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD,
// memtable->mem_tracker()->parent_task_mem_tracker_no_own());
#endif
_stats.flush_wait_time_ns += (MonotonicNanos() - submit_task_time);
SCOPED_CLEANUP({ memtable.reset(); });
// If previous flush has failed, return directly
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,13 @@ MemTracker::MemTracker(int64_t byte_limit, const std::string& label,
void MemTracker::init() {
DCHECK_GE(_limit, -1);
MemTracker* tracker = this;
while (tracker != nullptr && tracker->_virtual == false) {
while (tracker != nullptr) {
_all_trackers.push_back(tracker);
if (tracker->has_limit()) _limit_trackers.push_back(tracker);
// This means that it terminates when recursively consume/release from the current tracker up to the virtual tracker.
if (tracker->_virtual == true) {
break;
}
tracker = tracker->_parent.get();
}
DCHECK_GT(_all_trackers.size(), 0);
Expand Down
12 changes: 10 additions & 2 deletions be/src/runtime/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,17 @@ class MemTracker {

// If an ancestor of this tracker is a Task MemTracker, return that tracker. Otherwise return nullptr.
MemTracker* parent_task_mem_tracker() {
MemTracker* tracker = this;
if (this->_level == MemTrackerLevel::TASK) {
return this;
} else {
return parent_task_mem_tracker_no_own().get();
}
}

std::shared_ptr<MemTracker> parent_task_mem_tracker_no_own() {
std::shared_ptr<MemTracker> tracker = this->_parent;
while (tracker != nullptr && tracker->_level != MemTrackerLevel::TASK) {
tracker = tracker->_parent.get();
tracker = tracker->_parent;
}
return tracker;
}
Expand Down