Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int6
string process_name;
switch (task_type) {
case TTaskType::ALTER:
process_name = "alter";
process_name = "AlterTablet";
break;
default:
std::string task_name;
Expand Down
19 changes: 16 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,23 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16");

// Default level of MemTracker to show in web page
// now MemTracker support two level:
// RELEASE: 0
// DEBUG: 1
// OVERVIEW: 0
// TASK: 1
// INSTANCE: 2
// VERBOSE: 3
// the level equal or lower than mem_tracker_level will show in web page
CONF_Int16(mem_tracker_level, "0");
CONF_mInt16(mem_tracker_level, "0");

// The minimum length when TCMalloc Hook consumes/releases MemTracker, consume size
// smaller than this value will continue to accumulate. specified as number of bytes.
// Decreasing this value will increase the frequency of consume/release.
// Increasing this value will cause MemTracker statistics to be inaccurate.
CONF_mInt32(mem_tracker_consume_min_size_bytes, "2097152");

// When MemTracker is a negative value, it is considered that a memory leak has occurred,
// but the actual MemTracker records inaccurately will also cause a negative value,
// so this feature is in the experimental stage.
CONF_mBool(memory_leak_detection, "false");

// The version information of the tablet will be stored in the memory
// in an adjacency graph data structure.
Expand Down
11 changes: 0 additions & 11 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,6 @@ void Daemon::memory_maintenance_thread() {
if (env != nullptr) {
BufferPool* buffer_pool = env->buffer_pool();
if (buffer_pool != nullptr) buffer_pool->Maintenance();

// The process limit as measured by our trackers may get out of sync with the
// process usage if memory is allocated or freed without updating a MemTracker.
// The metric is refreshed whenever memory is consumed or released via a MemTracker,
// so on a system with queries executing it will be refreshed frequently. However
// if the system is idle, we need to refresh the tracker occasionally since
// untracked memory may be allocated or freed, e.g. by background threads.
if (env->process_mem_tracker() != nullptr &&
!env->process_mem_tracker()->is_consumption_metric_null()) {
env->process_mem_tracker()->RefreshConsumptionFromMetric();
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/analytic_eval_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ Status AnalyticEvalNode::open(RuntimeState* state) {
"Failed to acquire initial read buffer for analytic function "
"evaluation. Reducing query concurrency or increasing the memory limit may "
"help this query to complete successfully.");
return mem_tracker()->MemLimitExceeded(state, msg, -1);
RETURN_LIMIT_EXCEEDED(mem_tracker(), state, msg);
}

DCHECK_EQ(_evaluators.size(), _fn_ctxs.size());
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
#if BE_TEST
_mem_tracker(new MemTracker()),
#else
_mem_tracker(MemTracker::CreateTracker(
_mem_tracker(MemTracker::create_tracker(
-1, "BaseScanner:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker())),
#endif
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range,
// 1. too many batches in queue, or
// 2. at least one batch in queue and memory exceed limit.
(_batch_queue.size() >= _max_buffered_batches ||
(mem_tracker()->AnyLimitExceeded(MemLimit::HARD) && !_batch_queue.empty()))) {
(mem_tracker()->any_limit_exceeded() && !_batch_queue.empty()))) {
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
}
// Process already set failed, so we just return OK
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "exprs/expr.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class ExprContext;
class TupleDescriptor;
class TupleRow;
class RowDescriptor;
class MemTracker;
class RuntimeProfile;
class StreamLoadPipe;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) {
RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos));

// to prevent use too many memory
RETURN_IF_LIMIT_EXCEEDED(state, "Cross join, while getting next from the child 1.");
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Cross join, while getting next from the child 1.");

SCOPED_TIMER(_build_timer);
_build_batches.add_row_batch(batch);
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ Status DataSink::init(const TDataSink& thrift_sink) {

Status DataSink::prepare(RuntimeState* state) {
_expr_mem_tracker =
MemTracker::CreateTracker(-1, _name + ":Expr:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
MemTracker::create_tracker(-1, _name + ":Expr:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
return Status::OK();
}

Expand Down
10 changes: 6 additions & 4 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,12 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple,
// obj[FIELD_ID] must not be nullptr
std::string _id = obj[FIELD_ID].GetString();
size_t len = _id.length();
char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(len));
Status rst;
char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(len, &rst));
if (UNLIKELY(buffer == nullptr)) {
std::string details = strings::Substitute(ERROR_MEM_LIMIT_EXCEEDED,
"MaterializeNextRow", len, "string slot");
return tuple_pool->mem_tracker()->MemLimitExceeded(nullptr, details, len);
RETURN_LIMIT_EXCEEDED(tuple_pool->mem_tracker(), nullptr, details, len, rst);
}
memcpy(buffer, _id.data(), len);
reinterpret_cast<StringValue*>(slot)->ptr = buffer;
Expand Down Expand Up @@ -410,11 +411,12 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple,
}
}
size_t val_size = val.length();
char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size));
Status rst;
char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size, &rst));
if (UNLIKELY(buffer == nullptr)) {
std::string details = strings::Substitute(
ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", val_size, "string slot");
return tuple_pool->mem_tracker()->MemLimitExceeded(nullptr, details, val_size);
RETURN_LIMIT_EXCEEDED(tuple_pool->mem_tracker(), nullptr, details, val_size, rst);
}
memcpy(buffer, val.data(), val_size);
reinterpret_cast<StringValue*>(slot)->ptr = buffer;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/es_http_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ EsHttpScanner::EsHttpScanner(RuntimeState* state, RuntimeProfile* profile, Tuple
_mem_tracker(new MemTracker()),
#else
_mem_tracker(
MemTracker::CreateTracker(-1, "EsHttpScanner:" + std::to_string(state->load_job_id()),
MemTracker::create_tracker(-1, "EsHttpScanner:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker())),
#endif
_mem_pool(_mem_tracker.get()),
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/es_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,11 +771,12 @@ Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple,
}
const string& val = col.string_vals[val_idx];
size_t val_size = val.size();
char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size));
Status rst;
char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size, &rst));
if (UNLIKELY(buffer == nullptr)) {
std::string details = strings::Substitute(
ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", val_size, "string slot");
return tuple_pool->mem_tracker()->MemLimitExceeded(nullptr, details, val_size);
RETURN_LIMIT_EXCEEDED(tuple_pool->mem_tracker(), nullptr, details, val_size, rst);
}
memcpy(buffer, val.data(), val_size);
reinterpret_cast<StringValue*>(slot)->ptr = buffer;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/except_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Status ExceptNode::open(RuntimeState* state) {
while (!eos) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
Expand Down
15 changes: 5 additions & 10 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,11 @@ Status ExecNode::prepare(RuntimeState* state) {
std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
runtime_profile()->total_time_counter()),
"");
_mem_tracker = MemTracker::CreateTracker(_runtime_profile.get(), -1,
"ExecNode:" + _runtime_profile->name(),
state->instance_mem_tracker());
_expr_mem_tracker = MemTracker::CreateTracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(),
_mem_tracker);
_expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get()));
_mem_tracker = MemTracker::create_tracker(-1, "ExecNode:" + _runtime_profile->name(),
state->instance_mem_tracker(),
MemTrackerLevel::VERBOSE, _runtime_profile.get());
_expr_mem_tracker = MemTracker::create_tracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(),
_mem_tracker);

if (_vconjunct_ctx_ptr) {
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, row_desc(), expr_mem_tracker()));
Expand Down Expand Up @@ -268,10 +267,6 @@ Status ExecNode::close(RuntimeState* state) {
if (_vconjunct_ctx_ptr) (*_vconjunct_ctx_ptr)->close(state);
Expr::close(_conjunct_ctxs, state);

if (expr_mem_pool() != nullptr) {
_expr_mem_pool->free_all();
}

if (_buffer_pool_client.is_registered()) {
VLOG_FILE << _id << " returning reservation " << _resource_profile.min_reservation;
state->initial_reservations()->Return(&_buffer_pool_client,
Expand Down
28 changes: 1 addition & 27 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ class ExecNode {

std::shared_ptr<MemTracker> expr_mem_tracker() const { return _expr_mem_tracker; }

MemPool* expr_mem_pool() const { return _expr_mem_pool.get(); }

// Extract node id from p->name().
static int get_node_id_from_profile(RuntimeProfile* p);

Expand Down Expand Up @@ -306,14 +304,9 @@ class ExecNode {

/// Account for peak memory used by this node
std::shared_ptr<MemTracker> _mem_tracker;

/// MemTracker used by 'expr_mem_pool_'.
// MemTracker used by all Expr.
std::shared_ptr<MemTracker> _expr_mem_tracker;

/// MemPool for allocating data structures used by expression evaluators in this node.
/// Created in Prepare().
std::unique_ptr<MemPool> _expr_mem_pool;

RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::Counter* _rows_returned_rate;
// Account for peak memory used by this node
Expand Down Expand Up @@ -377,25 +370,6 @@ class ExecNode {
bool _is_closed;
};

#define LIMIT_EXCEEDED(tracker, state, msg) \
do { \
stringstream str; \
str << "Memory exceed limit. " << msg << " "; \
str << "Backend: " << BackendOptions::get_localhost() << ", "; \
str << "fragment: " << print_id(state->fragment_instance_id()) << " "; \
str << "Used: " << tracker->consumption() << ", Limit: " << tracker->limit() << ". "; \
str << "You can change the limit by session variable exec_mem_limit."; \
return Status::MemoryLimitExceeded(str.str()); \
} while (false)

#define RETURN_IF_LIMIT_EXCEEDED(state, msg) \
do { \
/* if (UNLIKELY(MemTracker::limit_exceeded(*(state)->mem_trackers()))) { */ \
MemTracker* tracker = state->instance_mem_tracker()->find_limit_exceeded_tracker(); \
if (tracker != nullptr) { \
LIMIT_EXCEEDED(tracker, state, msg); \
} \
} while (false)
} // namespace doris

#endif
6 changes: 3 additions & 3 deletions be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo
// In most cases, no additional memory overhead will be applied for at this stage,
// but if the expression calculation in this node needs to apply for additional memory,
// it may cause the memory to exceed the limit.
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while execute get_next.");
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while execute get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());

if (reached_limit()) {
Expand Down Expand Up @@ -770,11 +770,11 @@ Status HashJoinNode::process_build_batch(RuntimeState* state, RowBatch* build_ba
_build_pool.get(), false);
}
}
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
} else {
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch->tuple_data_pool(), false);
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch->num_rows()));
for (int i = 0; i < build_batch->num_rows(); ++i) {
_hash_tbl->insert_without_check(build_batch->get_row(i));
Expand Down
12 changes: 6 additions & 6 deletions be/src/exec/hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ HashTable::HashTable(const std::vector<ExprContext*>& build_expr_ctxs,
_buckets.resize(num_buckets);
_num_buckets = num_buckets;
_num_buckets_till_resize = MAX_BUCKET_OCCUPANCY_FRACTION * _num_buckets;
_mem_tracker->Consume(_buckets.capacity() * sizeof(Bucket));
_mem_tracker->consume(_buckets.capacity() * sizeof(Bucket));

// Compute the layout and buffer size to store the evaluated expr results
_results_buffer_size = Expr::compute_results_layout(
Expand All @@ -70,7 +70,7 @@ HashTable::HashTable(const std::vector<ExprContext*>& build_expr_ctxs,
_alloc_list.push_back(_current_nodes);
_end_list.push_back(_current_nodes + _current_capacity * _node_byte_size);

_mem_tracker->Consume(_current_capacity * _node_byte_size);
_mem_tracker->consume(_current_capacity * _node_byte_size);
if (_mem_tracker->limit_exceeded()) {
mem_limit_exceeded(_current_capacity * _node_byte_size);
}
Expand All @@ -85,8 +85,8 @@ void HashTable::close() {
for (auto ptr : _alloc_list) {
free(ptr);
}
_mem_tracker->Release(_total_capacity * _node_byte_size);
_mem_tracker->Release(_buckets.size() * sizeof(Bucket));
_mem_tracker->release(_total_capacity * _node_byte_size);
_mem_tracker->release(_buckets.size() * sizeof(Bucket));
}

bool HashTable::eval_row(TupleRow* row, const std::vector<ExprContext*>& ctxs) {
Expand Down Expand Up @@ -180,7 +180,7 @@ Status HashTable::resize_buckets(int64_t num_buckets) {

int64_t old_num_buckets = _num_buckets;
int64_t delta_bytes = (num_buckets - old_num_buckets) * sizeof(Bucket);
Status st = _mem_tracker->TryConsume(delta_bytes);
Status st = _mem_tracker->try_consume(delta_bytes);
if (!st) {
LOG_EVERY_N(WARNING, 100) << "resize bucket failed: " << st.to_string();
mem_limit_exceeded(delta_bytes);
Expand Down Expand Up @@ -244,7 +244,7 @@ void HashTable::grow_node_array() {
_alloc_list.push_back(_current_nodes);
_end_list.push_back(_current_nodes + alloc_size);

_mem_tracker->Consume(alloc_size);
_mem_tracker->consume(alloc_size);
Copy link
Contributor

@yiguolei yiguolei Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many codes call mem_tracker->somsume(size_t), Is there a better method to do this automatically?
For example, we may rewrite the memory allocators and track the memory usage during allocate, like in clickhouse:

/// Implementation of std::allocator interface that tracks memory with MemoryTracker.
/// NOTE We already plug MemoryTracker into new/delete operators. So, everything works even with default allocator.
/// But it is enabled only if jemalloc is used (to obtain the size of the allocation on call to delete).
/// And jemalloc is disabled for builds with sanitizers. In these cases memory was not always tracked.

template
struct AllocatorWithMemoryTracking

Copy link
Contributor Author

@xinyiZzz xinyiZzz Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yiguolei I've done something similar with reference to clickhouse to automatically track memory usage via TCMalloc Hook
PR: #7198
But this PR is too big, so I was advised to split the commit =_=, I will push other codes next week.

Different from Jemalloc, overloading the TCMalloc new/delete operator needs to invade the source code of TCMalloc, so it is implemented by Hook;

Copy link
Contributor

@yiguolei yiguolei Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many places call memory tracker and we may lost to add mem_tracker.consume in some node or thread. If we are following clickhouse's method, I think it do not need to call memory tracker everywhere and call mem tracker.consume so many times.

We could attach the running thread to a conext, the context maybe a query context or a compaction context or olap scanner. And rewrite the new and delete method to update the memory tracker automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many places call memory tracker and we may lost to add mem_tracker.consume in some node or thread. If we are following clickhouse's method, I think it do not need to call memory tracker everywhere and call mem tracker.consume so many times.

We could attach the running thread to a conext, the context maybe a query context or a compaction context or olap scanner. And rewrite the new and delete method to update the memory tracker automatically.

This is done by following next PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many places call memory tracker and we may lost to add mem_tracker.consume in some node or thread. If we are following clickhouse's method, I think it do not need to call memory tracker everywhere and call mem tracker.consume so many times.

We could attach the running thread to a conext, the context maybe a query context or a compaction context or olap scanner. And rewrite the new and delete method to update the memory tracker automatically.

You are right, I have already implemented it like this. In #7198 mentioned above,
you can see the original design document last year: https://shimo.im/docs/DT6JXDRkdTvdyV3G

if (_mem_tracker->limit_exceeded()) {
mem_limit_exceeded(alloc_size);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/intersect_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Status IntersectNode::open(RuntimeState* state) {
while (!eos) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table.");
RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "exprs/json_functions.h"
#include "gutil/strings/split.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"

namespace doris {
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class Tuple;
class SlotDescriptor;
class RuntimeState;
class TupleDescriptor;
class MemTracker;
class JsonReader;
class LineReader;
class FileReader;
Expand Down
Loading