Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"

Expand Down Expand Up @@ -52,6 +53,7 @@ Status CrossJoinNode::close(RuntimeState* state) {
Status CrossJoinNode::construct_build_side(RuntimeState* state) {
// Do a full scan of child(1) and store all build row batches.
RETURN_IF_ERROR(child(1)->open(state));
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Cross join, while getting next from child 1");

while (true) {
RowBatch* batch =
Expand All @@ -63,9 +65,6 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) {
bool eos = false;
RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos));

// to prevent use too many memory
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Cross join, while getting next from the child 1.");

SCOPED_TIMER(_build_timer);
_build_batches.add_row_batch(batch);
VLOG_ROW << build_list_debug_string();
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/except_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "exprs/expr.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"

namespace doris {
ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
Expand All @@ -40,6 +41,7 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {

Status ExceptNode::open(RuntimeState* state) {
RETURN_IF_ERROR(SetOperationNode::open(state));
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Except Node, while probing the hash table.");
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
Expand All @@ -62,7 +64,6 @@ Status ExceptNode::open(RuntimeState* state) {
while (!eos) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "runtime/mem_tracker.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -208,6 +209,7 @@ Status ExecNode::prepare(RuntimeState* state) {
_mem_tracker = MemTracker::create_tracker(-1, "ExecNode:" + _runtime_profile->name(),
state->instance_mem_tracker(),
MemTrackerLevel::VERBOSE, _runtime_profile.get());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_expr_mem_tracker = MemTracker::create_tracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(),
_mem_tracker);

Expand All @@ -226,6 +228,7 @@ Status ExecNode::prepare(RuntimeState* state) {
}

Status ExecNode::open(RuntimeState* state) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
if (_vconjunct_ctx_ptr) {
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state));
Expand Down
5 changes: 2 additions & 3 deletions be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) {
// The hash join node needs to keep in memory all build tuples, including the tuple
// row ptrs. The row ptrs are copied into the hash table's internal structure so they
// don't need to be stored in the _build_pool.
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while constructing the hash table.");
RowBatch build_batch(child(1)->row_desc(), state->batch_size());
RETURN_IF_ERROR(child(1)->open(state));

Expand Down Expand Up @@ -303,7 +304,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo
// In most cases, no additional memory overhead will be applied for at this stage,
// but if the expression calculation in this node needs to apply for additional memory,
// it may cause the memory to exceed the limit.
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while execute get_next.");
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while execute get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());

if (reached_limit()) {
Expand Down Expand Up @@ -771,11 +772,9 @@ Status HashJoinNode::process_build_batch(RuntimeState* state, RowBatch* build_ba
_build_pool.get(), false);
}
}
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
} else {
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch->tuple_data_pool(), false);
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch->num_rows()));
for (int i = 0; i < build_batch->num_rows(); ++i) {
_hash_tbl->insert_without_check(build_batch->get_row(i));
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/intersect_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "exprs/expr.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"

namespace doris {
IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
Expand All @@ -44,6 +45,7 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
// repeat [2] this for all the rest child
Status IntersectNode::open(RuntimeState* state) {
RETURN_IF_ERROR(SetOperationNode::open(state));
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Intersect Node, while probing the hash table.");
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
Expand All @@ -66,7 +68,6 @@ Status IntersectNode::open(RuntimeState* state) {
while (!eos) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/set_operation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "runtime/raw_value.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"

namespace doris {
SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
Expand Down Expand Up @@ -137,6 +138,7 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* other) {
Status SetOperationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("SetOperation, while constructing the hash table.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
// open result expr lists.
Expand All @@ -156,7 +158,6 @@ Status SetOperationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " SetOperation, while constructing the hash table.");
// build hash table and remove duplicate items
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch.num_rows()));
for (int i = 0; i < build_batch.num_rows(); ++i) {
Expand Down
16 changes: 10 additions & 6 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,7 @@ void LRUCache::erase(const CacheKey& key, uint32_t hash, MemTracker* tracker) {
}
// free handle out of mutex, when last_ref is true, e must not be nullptr
if (last_ref) {
size_t charge = e->charge;
e->free();
// The parameter tracker is ShardedLRUCache::_mem_tracker,
// because the memory released by LRUHandle is recorded in the tls mem tracker,
// so this part of the memory is subsidized from ShardedLRUCache::_mem_tracker to the tls mem tracker
tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
charge);
}
}

Expand Down Expand Up @@ -449,11 +443,15 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
: _name(name),
_last_id(1),
_mem_tracker(MemTracker::create_tracker(-1, name, nullptr, MemTrackerLevel::OVERVIEW)) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
_shards[s] = new LRUCache(type);
_shards[s]->set_capacity(per_shard);
}
// After the lru cache is created in the main thread, the main thread will not switch to the
// lru cache mem tracker again, so manually clear the untracked mem in tls.
thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why clear untracked mem here?
you can add some comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the lru cache is created in the main thread, the main thread will not switch the lru cache mem tracker again.

After the non-query thread switches the mem tracker, if the thread will not switch the mem tracker again in the short term, can consider manually clear_untracked_mems.

The query thread will automatically clear_untracked_mems when detach_task.


_entity = DorisMetrics::instance()->metric_registry()->register_entity(
std::string("lru_cache:") + name, {{"name", name}});
Expand All @@ -467,6 +465,7 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
}

ShardedLRUCache::~ShardedLRUCache() {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
for (int s = 0; s < kNumShards; s++) {
delete _shards[s];
}
Expand All @@ -481,6 +480,7 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t
// transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
charge);
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority);
}
Expand All @@ -491,11 +491,13 @@ Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
}

void ShardedLRUCache::release(Handle* handle) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a metric counter for this SWITCH operation?
I want to know how many SWITCH may be called.

Copy link
Contributor Author

@xinyiZzz xinyiZzz Mar 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I try to add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix

LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
_shards[_shard(h->hash)]->release(handle);
}

void ShardedLRUCache::erase(const CacheKey& key) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
_shards[_shard(hash)]->erase(key, hash, _mem_tracker.get());
}
Expand All @@ -514,6 +516,7 @@ uint64_t ShardedLRUCache::new_id() {
}

int64_t ShardedLRUCache::prune() {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t num_prune = 0;
for (int s = 0; s < kNumShards; s++) {
num_prune += _shards[s]->prune();
Expand All @@ -522,6 +525,7 @@ int64_t ShardedLRUCache::prune() {
}

int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t num_prune = 0;
for (int s = 0; s < kNumShards; s++) {
num_prune += _shards[s]->prune_if(pred);
Expand Down
15 changes: 5 additions & 10 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,6 @@ OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id,
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map[tablet_id] = tablet;
_add_tablet_to_partition(tablet);
// TODO: remove multiply 2 of tablet meta mem size
// Because table schema will copy in tablet, there will be double mem cost
// so here multiply 2
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(
_mem_tracker.get(), tablet->tablet_meta()->mem_size() * 2);

VLOG_NOTICE << "add tablet to map successfully." << " tablet_id=" << tablet_id ;

Expand All @@ -215,6 +210,7 @@ bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) {

OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
std::vector<DataDir*> stores) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
DorisMetrics::instance()->create_tablet_requests_total->increment(1);

int64_t tablet_id = request.tablet_id;
Expand Down Expand Up @@ -432,6 +428,7 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked(
OLAPStatus TabletManager::drop_tablet(TTabletId tablet_id, SchemaHash schema_hash,
bool keep_files) {
WriteLock wrlock(_get_tablets_shard_lock(tablet_id));
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
return _drop_tablet_unlocked(tablet_id, keep_files);
}

Expand Down Expand Up @@ -460,6 +457,7 @@ OLAPStatus TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, bool keep_f

OLAPStatus TabletManager::drop_tablets_on_error_root_path(
const std::vector<TabletInfo>& tablet_info_vec) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
OLAPStatus res = OLAP_SUCCESS;
if (tablet_info_vec.empty()) { // This is a high probability event
return res;
Expand Down Expand Up @@ -670,6 +668,7 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tab
TSchemaHash schema_hash, const string& meta_binary,
bool update_meta, bool force, bool restore,
bool check_path) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
TabletMetaSharedPtr tablet_meta(new TabletMeta());
OLAPStatus status = tablet_meta->deserialize(meta_binary);
if (status != OLAP_SUCCESS) {
Expand Down Expand Up @@ -752,6 +751,7 @@ OLAPStatus TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_
SchemaHash schema_hash,
const string& schema_hash_path, bool force,
bool restore) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
LOG(INFO) << "begin to load tablet from dir. "
<< " tablet_id=" << tablet_id << " schema_hash=" << schema_hash
<< " path = " << schema_hash_path << " force = " << force << " restore = " << restore;
Expand Down Expand Up @@ -1219,11 +1219,6 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bo
}

dropped_tablet->deregister_tablet_from_dir();
// The dropped tablet meta is expected to be released in the TabletManager mem tracker,
// but is actually released in the tls mem tracker.
// So from TabletManager mem tracker compensate memory to tls tracker.
_mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
dropped_tablet->tablet_meta()->mem_size() * 2);
return OLAP_SUCCESS;
}

Expand Down
17 changes: 12 additions & 5 deletions be/src/runtime/bufferpool/buffer_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "util/cpu_info.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "runtime/thread_context.h"

//DECLARE_bool(disable_mem_pools);

Expand All @@ -48,7 +49,7 @@ class BufferPool::FreeBufferArena : public CacheLineAligned {

/// Add a free buffer to the free lists. May free buffers to the system allocator
/// if the list becomes full. Caller should not hold 'lock_'
void AddFreeBuffer(BufferHandle&& buffer);
bool AddFreeBuffer(BufferHandle&& buffer);

/// Try to get a free buffer of 'buffer_len' bytes from this arena. Returns true and
/// sets 'buffer' if found or false if not found. Caller should not hold 'lock_'.
Expand Down Expand Up @@ -193,7 +194,8 @@ BufferPool::BufferAllocator::BufferAllocator(BufferPool* pool, int64_t min_buffe
clean_page_bytes_limit_(clean_page_bytes_limit),
clean_page_bytes_remaining_(clean_page_bytes_limit),
per_core_arenas_(CpuInfo::get_max_num_cores()),
max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS) {
max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS),
_mem_tracker(MemTracker::create_virtual_tracker(-1, "BufferAllocator", nullptr, MemTrackerLevel::OVERVIEW)) {
DCHECK(BitUtil::IsPowerOf2(min_buffer_len_)) << min_buffer_len_;
DCHECK(BitUtil::IsPowerOf2(max_buffer_len_)) << max_buffer_len_;
DCHECK_LE(0, min_buffer_len_);
Expand Down Expand Up @@ -303,6 +305,7 @@ Status BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle*
system_bytes_remaining_.add(len);
return status;
}
_mem_tracker->consume_cache(len);
return Status::OK();
}

Expand Down Expand Up @@ -375,7 +378,9 @@ void BufferPool::BufferAllocator::Free(BufferHandle&& handle) {
handle.client_ = nullptr; // Buffer is no longer associated with a client.
FreeBufferArena* arena = per_core_arenas_[handle.home_core_].get();
handle.Poison();
arena->AddFreeBuffer(std::move(handle));
if (!arena->AddFreeBuffer(std::move(handle))) {
_mem_tracker->release_cache(handle.len());
}
}

void BufferPool::BufferAllocator::AddCleanPage(const std::unique_lock<std::mutex>& client_lock,
Expand Down Expand Up @@ -426,6 +431,7 @@ int64_t BufferPool::BufferAllocator::FreeToSystem(std::vector<BufferHandle>&& bu
buffer.Unpoison();
system_allocator_->Free(std::move(buffer));
}
_mem_tracker->release_cache(bytes_freed);
return bytes_freed;
}

Expand Down Expand Up @@ -485,16 +491,17 @@ BufferPool::FreeBufferArena::~FreeBufferArena() {
}
}

void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) {
bool BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) {
std::lock_guard<SpinLock> al(lock_);
if (config::disable_mem_pools) {
int64_t len = buffer.len();
parent_->system_allocator_->Free(std::move(buffer));
parent_->system_bytes_remaining_.add(len);
return;
return false;
}
PerSizeLists* lists = GetListsForSize(buffer.len());
lists->AddFreeBuffer(std::move(buffer));
return true;
}

bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page* page) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/bufferpool/buffer_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "runtime/bufferpool/buffer_pool_internal.h"
#include "runtime/bufferpool/free_list.h"
#include "util/aligned_new.h"
#include "runtime/mem_tracker.h"

namespace doris {

Expand Down Expand Up @@ -235,6 +236,8 @@ struct BufferPool::BufferAllocator {
/// all arenas so may fail. The final attempt locks all arenas, which is expensive
/// but is guaranteed to succeed.
int max_scavenge_attempts_;

std::shared_ptr<MemTracker> _mem_tracker;
};
} // namespace doris

Expand Down
Loading