Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int6
string process_name;
switch (task_type) {
case TTaskType::ALTER:
process_name = "alter";
process_name = "AlterTablet";
break;
default:
std::string task_name;
Expand Down
22 changes: 19 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -595,12 +595,28 @@ CONF_Int32(aws_log_level, "3");
// the buffer size when read data from remote storage like s3
CONF_mInt32(remote_storage_read_buffer_mb, "16");

// Whether to initialize TCmalloc new/delete Hook, MemTracker is currently counted in Hook.
CONF_mBool(use_tc_hook, "true");
Copy link
Member

@yangzhg yangzhg Jan 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

track_new_delete maybe better, is this can be mutable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thats sounds good


// Default level of MemTracker to show in web page
// now MemTracker support two level:
// RELEASE: 0
// DEBUG: 1
// OVERVIEW: 0
// TASK: 1
// INSTANCE: 2
// VERBOSE: 3
// the level equal or lower than mem_tracker_level will show in web page
CONF_Int16(mem_tracker_level, "0");
CONF_mInt16(mem_tracker_level, "0");

// The minimum length when TCMalloc Hook consumes/releases MemTracker, consume size
// smaller than this value will continue to accumulate. specified as number of bytes.
// Decreasing this value will increase the frequency of consume/release.
// Increasing this value will cause MemTracker statistics to be inaccurate.
CONF_mInt32(mem_tracker_consume_min_size_bytes, "2097152");

// When MemTracker is a negative value, it is considered that a memory leak has occurred,
// but the actual MemTracker records inaccurately will also cause a negative value,
// so this feature is in the experimental stage.
CONF_mBool(memory_leak_detection, "false");

// The version information of the tablet will be stored in the memory
// in an adjacency graph data structure.
Expand Down
11 changes: 0 additions & 11 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,6 @@ void Daemon::memory_maintenance_thread() {
if (env != nullptr) {
BufferPool* buffer_pool = env->buffer_pool();
if (buffer_pool != nullptr) buffer_pool->Maintenance();

// The process limit as measured by our trackers may get out of sync with the
// process usage if memory is allocated or freed without updating a MemTracker.
// The metric is refreshed whenever memory is consumed or released via a MemTracker,
// so on a system with queries executing it will be refreshed frequently. However
// if the system is idle, we need to refresh the tracker occasionally since
// untracked memory may be allocated or freed, e.g. by background threads.
if (env->process_mem_tracker() != nullptr &&
!env->process_mem_tracker()->is_consumption_metric_null()) {
env->process_mem_tracker()->RefreshConsumptionFromMetric();
}
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions be/src/exec/aggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.hpp"
#include "runtime/thread_context.h"
#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -77,6 +78,7 @@ Status AggregationNode::init(const TPlanNode& tnode, RuntimeState* state) {

Status AggregationNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
_hash_table_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
Expand Down Expand Up @@ -106,7 +108,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
RowDescriptor build_row_desc(_intermediate_tuple_desc, false);
RETURN_IF_ERROR(Expr::prepare(_build_expr_ctxs, state, build_row_desc, expr_mem_tracker()));

_tuple_pool.reset(new MemPool(mem_tracker().get()));
_tuple_pool.reset(new MemPool());

_agg_fn_ctxs.resize(_aggregate_evaluators.size());
int j = _probe_expr_ctxs.size();
Expand Down Expand Up @@ -141,6 +143,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
}

Status AggregationNode::open(RuntimeState* state) {
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
Expand All @@ -153,7 +156,7 @@ Status AggregationNode::open(RuntimeState* state) {

RETURN_IF_ERROR(_children[0]->open(state));

RowBatch batch(_children[0]->row_desc(), state->batch_size(), mem_tracker().get());
RowBatch batch(_children[0]->row_desc(), state->batch_size());
int64_t num_input_rows = 0;
int64_t num_agg_rows = 0;

Expand Down Expand Up @@ -227,6 +230,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
// 3. `child(0)->rows_returned() == 0` mean not data from child
// in level two aggregation node should return nullptr result
// level one aggregation node set `eos = true` return directly
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
if (UNLIKELY(!_needs_finalize && _singleton_output_tuple != nullptr &&
child(0)->rows_returned() == 0)) {
*eos = true;
Expand Down Expand Up @@ -288,6 +292,7 @@ Status AggregationNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());

// Iterate through the remaining rows in the hash table and call Serialize/Finalize on
// them in order to free any memory allocated by UDAs. Finalize() requires a dst tuple
Expand Down
21 changes: 12 additions & 9 deletions be/src/exec/analytic_eval_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "udf/udf_internal.h"

namespace doris {
Expand Down Expand Up @@ -141,10 +142,11 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
DCHECK(child(0)->row_desc().is_prefix_of(row_desc()));
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
_child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0];
_curr_tuple_pool.reset(new MemPool(mem_tracker().get()));
_prev_tuple_pool.reset(new MemPool(mem_tracker().get()));
_mem_pool.reset(new MemPool(mem_tracker().get()));
_curr_tuple_pool.reset(new MemPool());
_prev_tuple_pool.reset(new MemPool());
_mem_pool.reset(new MemPool());

_evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime");
DCHECK_EQ(_result_tuple_desc->slots().size(), _evaluators.size());
Expand Down Expand Up @@ -183,6 +185,7 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) {
}

Status AnalyticEvalNode::open(RuntimeState* state) {
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
Expand All @@ -201,7 +204,7 @@ Status AnalyticEvalNode::open(RuntimeState* state) {
"Failed to acquire initial read buffer for analytic function "
"evaluation. Reducing query concurrency or increasing the memory limit may "
"help this query to complete successfully.");
return mem_tracker()->MemLimitExceeded(state, msg, -1);
RETURN_LIMIT_EXCEEDED(mem_tracker(), state, msg);
}

DCHECK_EQ(_evaluators.size(), _fn_ctxs.size());
Expand Down Expand Up @@ -236,10 +239,8 @@ Status AnalyticEvalNode::open(RuntimeState* state) {

// Fetch the first input batch so that some _prev_input_row can be set here to avoid
// special casing in GetNext().
_prev_child_batch.reset(
new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker().get()));
_curr_child_batch.reset(
new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker().get()));
_prev_child_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size()));
_curr_child_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size()));

while (!_input_eos && _prev_input_row == nullptr) {
RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(), &_input_eos));
Expand Down Expand Up @@ -738,7 +739,7 @@ Status AnalyticEvalNode::get_next_output_batch(RuntimeState* state, RowBatch* ou
ExprContext** ctxs = &_conjunct_ctxs[0];
int num_ctxs = _conjunct_ctxs.size();

RowBatch input_batch(child(0)->row_desc(), output_batch->capacity(), mem_tracker().get());
RowBatch input_batch(child(0)->row_desc(), output_batch->capacity());
int64_t stream_idx = _input_stream->rows_returned();
RETURN_IF_ERROR(_input_stream->get_next(&input_batch, eos));

Expand Down Expand Up @@ -813,6 +814,7 @@ inline int64_t AnalyticEvalNode::num_output_rows_ready() const {
}

Status AnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
Expand Down Expand Up @@ -857,6 +859,7 @@ Status AnalyticEvalNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());

if (_input_stream.get() != nullptr) {
_input_stream->close();
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/assert_num_rows_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "gutil/strings/substitute.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"

namespace doris {
Expand Down Expand Up @@ -48,6 +49,7 @@ Status AssertNumRowsNode::prepare(RuntimeState* state) {
}

Status AssertNumRowsNode::open(RuntimeState* state) {
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
// ISSUE-3435
Expand All @@ -56,6 +58,7 @@ Status AssertNumRowsNode::open(RuntimeState* state) {
}

Status AssertNumRowsNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) {
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());
output_batch->reset();
Expand Down
11 changes: 4 additions & 7 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ namespace doris {

BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TExpr>& pre_filter_texprs,
ScannerCounter* counter)
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: _state(state),
_params(params),
_counter(counter),
Expand All @@ -43,11 +42,10 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
#if BE_TEST
_mem_tracker(new MemTracker()),
#else
_mem_tracker(
MemTracker::CreateTracker(-1, "BaseScanner:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker())),
_mem_tracker(MemTracker::create_tracker(
-1, "Scanner:" + std::to_string(state->load_job_id()))),
#endif
_mem_pool(_mem_tracker.get()),
_mem_pool(_mem_tracker),
_dest_tuple_desc(nullptr),
_pre_filter_texprs(pre_filter_texprs),
_strict_mode(false),
Expand Down Expand Up @@ -259,5 +257,4 @@ void BaseScanner::close() {
}
}


} // namespace doris
9 changes: 7 additions & 2 deletions be/src/exec/blocking_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"

namespace doris {
Expand All @@ -45,8 +46,9 @@ BlockingJoinNode::~BlockingJoinNode() {
Status BlockingJoinNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());

_build_pool.reset(new MemPool(mem_tracker().get()));
_build_pool.reset(new MemPool());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_left_child_timer = ADD_TIMER(runtime_profile(), "LeftChildTime");
_build_row_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT);
Expand All @@ -69,7 +71,7 @@ Status BlockingJoinNode::prepare(RuntimeState* state) {
_probe_tuple_row_size = num_left_tuples * sizeof(Tuple*);
_build_tuple_row_size = num_build_tuples * sizeof(Tuple*);

_left_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker().get()));
_left_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size()));
return Status::OK();
}

Expand All @@ -82,10 +84,13 @@ Status BlockingJoinNode::close(RuntimeState* state) {
}

void BlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) {
SCOPED_ATTACH_TASK_THREAD_4ARG(state->query_type(), print_id(state->query_id()),
state->fragment_instance_id(), mem_tracker());
status->set_value(construct_build_side(state));
}

Status BlockingJoinNode::open(RuntimeState* state) {
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_TIMER(_runtime_profile->total_time_counter());
// RETURN_IF_ERROR(Expr::open(_conjuncts, state));
Expand Down
9 changes: 7 additions & 2 deletions be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "runtime/dpp_sink_internal.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"

namespace doris {
Expand Down Expand Up @@ -60,6 +61,7 @@ Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status BrokerScanNode::prepare(RuntimeState* state) {
VLOG_QUERY << "BrokerScanNode prepare";
RETURN_IF_ERROR(ScanNode::prepare(state));
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
// get tuple desc
_runtime_state = state;
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
Expand All @@ -86,6 +88,7 @@ Status BrokerScanNode::prepare(RuntimeState* state) {
}

Status BrokerScanNode::open(RuntimeState* state) {
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
Expand All @@ -106,6 +109,7 @@ Status BrokerScanNode::start_scanners() {
}

Status BrokerScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
SCOPED_TIMER(_runtime_profile->total_time_counter());
// check if CANCELLED.
if (state->is_cancelled()) {
Expand Down Expand Up @@ -190,6 +194,7 @@ Status BrokerScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
// SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_1ARG(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
SCOPED_TIMER(_runtime_profile->total_time_counter());
_scan_finished.store(true);
Expand Down Expand Up @@ -254,7 +259,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range,
while (!scanner_eof) {
// Fill one row batch
std::shared_ptr<RowBatch> row_batch(
new RowBatch(row_desc(), _runtime_state->batch_size(), mem_tracker().get()));
new RowBatch(row_desc(), _runtime_state->batch_size()));

// create new tuple buffer for row_batch
MemPool* tuple_pool = row_batch->tuple_data_pool();
Expand Down Expand Up @@ -318,7 +323,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range,
// 1. too many batches in queue, or
// 2. at least one batch in queue and memory exceed limit.
(_batch_queue.size() >= _max_buffered_batches ||
(mem_tracker()->AnyLimitExceeded(MemLimit::HARD) && !_batch_queue.empty()))) {
(mem_tracker()->any_limit_exceeded() && !_batch_queue.empty()))) {
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
}
// Process already set failed, so we just return OK
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "exprs/expr.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class ExprContext;
class TupleDescriptor;
class TupleRow;
class RowDescriptor;
class MemTracker;
class RuntimeProfile;
class StreamLoadPipe;

Expand Down
Loading