From a1a02edca87146e9c801b0d217574ebf4c36e539 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 28 Apr 2022 17:54:30 +0800 Subject: [PATCH 1/6] fix some details --- be/src/exec/analytic_eval_node.cpp | 6 +- be/src/exec/base_scanner.cpp | 2 +- be/src/exec/blocking_join_node.cpp | 2 +- be/src/exec/hash_join_node.cpp | 2 +- be/src/exec/olap_scanner.cpp | 14 ++-- be/src/exec/partitioned_aggregation_node.cc | 8 +-- be/src/exec/set_operation_node.cpp | 2 +- be/src/exec/tablet_sink.cpp | 2 +- be/src/exec/tablet_sink.h | 2 +- be/src/exec/topn_node.cpp | 2 +- be/src/exprs/expr_context.cpp | 2 +- be/src/http/action/compaction_action.h | 2 +- be/src/olap/memtable.cpp | 4 +- be/src/olap/reader.cpp | 4 ++ be/src/olap/row_block.cpp | 2 +- be/src/olap/row_block2.cpp | 2 +- .../rowset/segment_v2/binary_dict_page.cpp | 2 +- .../olap/rowset/segment_v2/zone_map_index.cpp | 4 +- be/src/olap/tablet_manager.cpp | 12 +++- be/src/olap/tablet_manager.h | 3 + be/src/olap/tablet_schema.cpp | 22 ++++++- be/src/olap/task/engine_alter_tablet_task.cpp | 2 +- be/src/olap/task/engine_batch_load_task.cpp | 4 +- be/src/olap/task/engine_checksum_task.cpp | 2 +- be/src/olap/task/engine_clone_task.cpp | 2 +- be/src/runtime/fold_constant_executor.cpp | 2 +- be/src/runtime/load_channel.cpp | 12 ++-- be/src/runtime/load_channel.h | 2 +- be/src/runtime/load_channel_mgr.cpp | 4 +- be/src/runtime/load_channel_mgr.h | 2 +- be/src/runtime/mem_pool.cpp | 21 +++--- be/src/runtime/mem_pool.h | 8 +-- be/src/runtime/mem_tracker.cpp | 54 ++++++++------- be/src/runtime/mem_tracker.h | 20 ++++-- be/src/runtime/mem_tracker_task_pool.cpp | 9 +-- be/src/runtime/mem_tracker_task_pool.h | 10 +-- be/src/runtime/memory/chunk_allocator.cpp | 24 +++---- be/src/runtime/result_file_sink.cpp | 4 -- be/src/runtime/tablets_channel.cpp | 5 +- be/src/runtime/tablets_channel.h | 2 +- be/src/runtime/tcmalloc_hook.h | 66 +++++++++++-------- be/src/runtime/thread_context.cpp | 4 ++ be/src/runtime/thread_context.h | 10 +++ be/src/runtime/thread_mem_tracker_mgr.cpp | 8 +-- be/src/runtime/thread_mem_tracker_mgr.h | 20 +++--- be/src/service/doris_main.cpp | 3 +- be/src/service/internal_service.cpp | 2 +- be/src/vec/exec/vanalytic_eval_node.cpp | 2 +- be/src/vec/exec/vblocking_join_node.cpp | 2 +- be/src/vec/exec/ves_http_scan_node.cpp | 2 +- be/src/vec/exprs/vexpr_context.cpp | 2 +- 51 files changed, 240 insertions(+), 171 deletions(-) diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index 43e193d0ab44b9..5e76d89aa6dfe1 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -146,9 +146,9 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) { SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); DCHECK(child(0)->row_desc().is_prefix_of(row_desc())); _child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0]; - _curr_tuple_pool.reset(new MemPool(mem_tracker().get())); - _prev_tuple_pool.reset(new MemPool(mem_tracker().get())); - _mem_pool.reset(new MemPool(mem_tracker().get())); + _curr_tuple_pool.reset(new MemPool(mem_tracker())); + _prev_tuple_pool.reset(new MemPool(mem_tracker())); + _mem_pool.reset(new MemPool(mem_tracker())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); DCHECK_EQ(_result_tuple_desc->slots().size(), _evaluators.size()); diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index ca5b08831fdfb2..8d2892c305a131 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -47,7 +47,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, ? "BaseScanner:" + std::to_string(state->load_job_id()) : "BaseScanner:Select")), #endif - _mem_pool(std::make_unique(_mem_tracker.get())), + _mem_pool(std::make_unique(_mem_tracker)), _dest_tuple_desc(nullptr), _pre_filter_texprs(pre_filter_texprs), _strict_mode(false), diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp index d88ce56ecf6914..cf3544d894e243 100644 --- a/be/src/exec/blocking_join_node.cpp +++ b/be/src/exec/blocking_join_node.cpp @@ -52,7 +52,7 @@ Status BlockingJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - _build_pool.reset(new MemPool(mem_tracker().get())); + _build_pool.reset(new MemPool(mem_tracker())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _left_child_timer = ADD_TIMER(runtime_profile(), "LeftChildTime"); _build_row_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 1ee401fe51133e..d2e15543c4265b 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -100,7 +100,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - _build_pool.reset(new MemPool(mem_tracker().get())); + _build_pool.reset(new MemPool(mem_tracker())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime"); _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 40029414b822e3..0d412a723b9d42 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -49,10 +49,14 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool _is_open(false), _aggregation(aggregation), _need_agg_finalize(need_agg_finalize), - _version(-1), - _mem_tracker(MemTracker::create_tracker( - tracker->limit(), tracker->label() + ":OlapScanner:" + tls_ctx()->thread_id_str(), - tracker)) {} + _version(-1) { +#ifndef NDEBUG + _mem_tracker = MemTracker::create_tracker(tracker->limit(), + "OlapScanner:" + tls_ctx()->thread_id_str(), tracker); +#else + _mem_tracker = tracker; +#endif +} Status OlapScanner::prepare( const TPaloScanRange& scan_range, const std::vector& key_ranges, @@ -281,7 +285,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size()); Tuple* tuple = reinterpret_cast(tuple_buf); - std::unique_ptr mem_pool(new MemPool(_mem_tracker.get())); + std::unique_ptr mem_pool(new MemPool(_mem_tracker)); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; { diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index a99b0da23b5338..805bfa7e9d3483 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -189,8 +189,8 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) { SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); state_ = state; - mem_pool_.reset(new MemPool(mem_tracker().get())); - agg_fn_pool_.reset(new MemPool(expr_mem_tracker().get())); + mem_pool_.reset(new MemPool(mem_tracker())); + agg_fn_pool_.reset(new MemPool(expr_mem_tracker())); ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime"); get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime"); @@ -233,7 +233,7 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(NewAggFnEvaluator::Create(agg_fns_, state, _pool, agg_fn_pool_.get(), &agg_fn_evals_, expr_mem_tracker(), row_desc)); - expr_results_pool_.reset(new MemPool(expr_mem_tracker().get())); + expr_results_pool_.reset(new MemPool(expr_mem_tracker())); if (!grouping_exprs_.empty()) { RowDescriptor build_row_desc(intermediate_tuple_desc_, false); RETURN_IF_ERROR(PartitionedHashTableCtx::Create( @@ -728,7 +728,7 @@ PartitionedAggregationNode::Partition::~Partition() { } Status PartitionedAggregationNode::Partition::InitStreams() { - agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker().get())); + agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker())); DCHECK_EQ(agg_fn_evals.size(), 0); NewAggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(), parent->agg_fn_evals_, &agg_fn_evals); diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 6d0ee3676d1e6a..d7a0c4ed7df280 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -45,7 +45,7 @@ Status SetOperationNode::prepare(RuntimeState* state) { SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); - _build_pool.reset(new MemPool(mem_tracker().get())); + _build_pool.reset(new MemPool(mem_tracker())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); for (size_t i = 0; i < _child_expr_lists.size(); ++i) { diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 0e89069cb5cc28..15f2bf86f109af 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -53,7 +53,7 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int _tuple_data_buffer_ptr = &_tuple_data_buffer; } _node_channel_tracker = - MemTracker::create_tracker(-1, "NodeChannel" + tls_ctx()->thread_id_str()); + MemTracker::create_tracker(-1, "NodeChannel:" + std::to_string(_index_channel->_index_id)); } NodeChannel::~NodeChannel() noexcept { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 53a7ff3b427523..0bd98e2c12c8f5 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -325,7 +325,7 @@ class IndexChannel { public: IndexChannel(OlapTableSink* parent, int64_t index_id, bool is_vec) : _parent(parent), _index_id(index_id), _is_vectorized(is_vec) { - _index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel"); + _index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel:" + std::to_string(_index_id)); } ~IndexChannel() = default; diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index 73ad4410a1f9dc..545acaf55b53fb 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -63,7 +63,7 @@ Status TopNNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - _tuple_pool.reset(new MemPool(mem_tracker().get())); + _tuple_pool.reset(new MemPool(mem_tracker())); RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor, expr_mem_tracker())); // AddExprCtxsToFree(_sort_exec_exprs); diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp index 518a69d77439be..6259def394c108 100644 --- a/be/src/exprs/expr_context.cpp +++ b/be/src/exprs/expr_context.cpp @@ -55,7 +55,7 @@ Status ExprContext::prepare(RuntimeState* state, const RowDescriptor& row_desc, SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); DCHECK(_pool.get() == nullptr); _prepared = true; - _pool.reset(new MemPool(_mem_tracker.get())); + _pool.reset(new MemPool(_mem_tracker)); return _root->prepare(state, row_desc, this); } diff --git a/be/src/http/action/compaction_action.h b/be/src/http/action/compaction_action.h index 08dfb0ce70d90b..16e6ae19d95cd7 100644 --- a/be/src/http/action/compaction_action.h +++ b/be/src/http/action/compaction_action.h @@ -42,7 +42,7 @@ class CompactionAction : public HttpHandler { CompactionAction(CompactionActionType type) : _type(type) { _compaction_mem_tracker = type == RUN_COMPACTION ? MemTracker::create_tracker(-1, "ManualCompaction", nullptr, - MemTrackerLevel::TASK) + MemTrackerLevel::INSTANCE) : nullptr; } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 7b9521e775ca7a..7a1b1bb2232bb4 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -40,8 +40,8 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _slot_descs(slot_descs), _keys_type(keys_type), _mem_tracker(MemTracker::create_tracker(-1, "MemTable", parent_tracker)), - _buffer_mem_pool(new MemPool(_mem_tracker.get())), - _table_mem_pool(new MemPool(_mem_tracker.get())), + _buffer_mem_pool(new MemPool(_mem_tracker)), + _table_mem_pool(new MemPool(_mem_tracker)), _schema_size(_schema->schema_size()), _rowset_writer(rowset_writer), _is_first_insertion(true), diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index d173ca56975fb6..32d6854d0f3d2c 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -106,7 +106,11 @@ TabletReader::~TabletReader() { } Status TabletReader::init(const ReaderParams& read_params) { +#ifndef NDEBUG _predicate_mem_pool.reset(new MemPool("TabletReader:" + read_params.tablet->full_name())); +#else + _predicate_mem_pool.reset(new MemPool()); +#endif Status res = _init_params(read_params); if (!res.ok()) { diff --git a/be/src/olap/row_block.cpp b/be/src/olap/row_block.cpp index 84b43d3f0e5747..b98bee09906ced 100644 --- a/be/src/olap/row_block.cpp +++ b/be/src/olap/row_block.cpp @@ -38,7 +38,7 @@ using std::vector; namespace doris { RowBlock::RowBlock(const TabletSchema* schema) : _capacity(0), _schema(schema) { - _mem_pool.reset(new MemPool("RowBlock")); + _mem_pool.reset(new MemPool()); } RowBlock::~RowBlock() { diff --git a/be/src/olap/row_block2.cpp b/be/src/olap/row_block2.cpp index 12e72998dd3ec3..fd4b0ef23ec337 100644 --- a/be/src/olap/row_block2.cpp +++ b/be/src/olap/row_block2.cpp @@ -38,7 +38,7 @@ RowBlockV2::RowBlockV2(const Schema& schema, uint16_t capacity) : _schema(schema), _capacity(capacity), _column_vector_batches(_schema.num_columns()), - _pool(new MemPool("RowBlockV2")), + _pool(new MemPool()), _selection_vector(nullptr) { for (auto cid : _schema.column_ids()) { Status status = ColumnVectorBatch::create( diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp index f8c67e5e210e32..28859732266068 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp @@ -39,7 +39,7 @@ BinaryDictPageBuilder::BinaryDictPageBuilder(const PageBuilderOptions& options) _data_page_builder(nullptr), _dict_builder(nullptr), _encoding_type(DICT_ENCODING), - _pool("BinaryDictPageBuilder") { + _pool() { // initially use DICT_ENCODING // TODO: the data page builder type can be created by Factory according to user config _data_page_builder.reset(new BitshufflePageBuilder(options)); diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.cpp b/be/src/olap/rowset/segment_v2/zone_map_index.cpp index e2f4f9ff6a1d44..0fbe5a1f033590 100644 --- a/be/src/olap/rowset/segment_v2/zone_map_index.cpp +++ b/be/src/olap/rowset/segment_v2/zone_map_index.cpp @@ -30,7 +30,7 @@ namespace doris { namespace segment_v2 { -ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field), _pool("ZoneMapIndexWriter") { +ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field), _pool() { _page_zone_map.min_value = _field->allocate_zone_map_value(&_pool); _page_zone_map.max_value = _field->allocate_zone_map_value(&_pool); _reset_zone_map(&_page_zone_map); @@ -127,7 +127,7 @@ Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) { RETURN_IF_ERROR(reader.load(use_page_cache, kept_in_memory)); IndexedColumnIterator iter(&reader); - MemPool pool("ZoneMapIndexReader ColumnBlock"); + MemPool pool; _page_zone_maps.resize(reader.num_values()); // read and cache all page zone maps diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 83da1df3dba27f..c89059a8f8396d 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -74,13 +74,14 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE mem_consumption, Labels({{"type", "tablet_meta"}})); TabletManager::TabletManager(int32_t tablet_map_lock_shard_size) - : _mem_tracker(MemTracker::create_tracker(-1, "TabletManager", nullptr, + : _mem_tracker(MemTracker::create_tracker(-1, "TabletMeta", nullptr, MemTrackerLevel::OVERVIEW)), _tablets_shards_size(tablet_map_lock_shard_size), _tablets_shards_mask(tablet_map_lock_shard_size - 1) { CHECK_GT(_tablets_shards_size, 0); CHECK_EQ(_tablets_shards_size & _tablets_shards_mask, 0); _tablets_shards.resize(_tablets_shards_size); + _mem_tracker_logic = MemTracker::create_virtual_tracker(-1, "TabletMeta[Logic]", _mem_tracker); REGISTER_HOOK_METRIC(tablet_meta_mem_consumption, [this]() { return _mem_tracker->consumption(); }); } @@ -197,6 +198,9 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, tablet_map_t& tablet_map = _get_tablet_map(tablet_id); tablet_map[tablet_id] = tablet; _add_tablet_to_partition(tablet); +#ifndef NDEBUG + _mem_tracker_logic->consume(tablet->tablet_meta()->mem_size()); +#endif VLOG_NOTICE << "add tablet to map successfully." << " tablet_id=" << tablet_id; @@ -900,6 +904,7 @@ Status TabletManager::build_all_report_tablets_info(std::map } Status TabletManager::start_trash_sweep() { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); { std::vector all_tablets; // we use this vector to save all tablet ptr for saving lock time. @@ -1021,6 +1026,7 @@ void TabletManager::unregister_clone_tablet(int64_t tablet_id) { void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id, SchemaHash schema_hash, const string& schema_hash_path) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); // acquire the read lock, so that there is no creating tablet or load tablet from meta tasks // create tablet and load tablet task should check whether the dir exists tablets_shard& shard = _get_tablets_shard(tablet_id); @@ -1132,6 +1138,7 @@ void TabletManager::get_partition_related_tablets(int64_t partition_id, } void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::vector related_tablets; { for (auto& tablets_shard : _tablets_shards) { @@ -1311,6 +1318,9 @@ Status TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bool k } dropped_tablet->deregister_tablet_from_dir(); +#ifndef NDEBUG + _mem_tracker_logic->release(dropped_tablet->tablet_meta()->mem_size()); +#endif return Status::OK(); } diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index a2420a5d814493..a5d63b1de61108 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -205,6 +205,9 @@ class TabletManager { // trace the memory use by meta of tablet std::shared_ptr _mem_tracker; + // The logical memory given by sizeof is less than the actual memory allocated by tcmalloc, + // Because the minimum memory allocation unit of tcmalloc is page, memory fragmentation will occur. + std::shared_ptr _mem_tracker_logic; const int32_t _tablets_shards_size; const int32_t _tablets_shards_mask; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index e13a55e566c963..715afbee69d361 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -309,6 +309,8 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _has_default_value = column.has_default_value(); if (_has_default_value) { _default_value = column.default_value(); + } else { + _default_value = ""; } if (column.has_precision()) { @@ -316,9 +318,12 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _precision = column.precision(); } else { _is_decimal = false; + _precision = 0; } if (column.has_frac()) { _frac = column.frac(); + } else { + _frac = 0; } _length = column.length(); _index_length = column.index_length(); @@ -335,18 +340,29 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _has_referenced_column = column.has_referenced_column_id(); if (_has_referenced_column) { _referenced_column_id = column.referenced_column_id(); + } else { + _referenced_column_id = 0; } + _referenced_column = ""; if (column.has_aggregation()) { _aggregation = get_aggregation_type_by_string(column.aggregation()); + } else { + _aggregation = FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE; } if (column.has_visible()) { _visible = column.visible(); + } else { + _visible = true; } if (_type == FieldType::OLAP_FIELD_TYPE_ARRAY) { DCHECK(column.children_columns_size() == 1) << "ARRAY type has more than 1 children types."; TabletColumn child_column; child_column.init_from_pb(column.children_columns(0)); add_sub_column(child_column); + } else { + _parent = nullptr; + _sub_columns.clear(); // no swap space + _sub_column_count = 0; } } @@ -386,9 +402,13 @@ void TabletColumn::to_schema_pb(ColumnPB* column) { uint32_t TabletColumn::mem_size() const { auto size = sizeof(TabletColumn); + size += _col_name.size(); if (_has_default_value) { size += _default_value.size(); } + if (_has_referenced_column) { + size += _referenced_column.size(); + } for (auto& sub_column : _sub_columns) { size += sub_column.mem_size(); } @@ -408,8 +428,8 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { _num_null_columns = 0; _cols.clear(); _field_name_to_index.clear(); + TabletColumn column; for (auto& column_pb : schema.column()) { - TabletColumn column; column.init_from_pb(column_pb); if (column.is_key()) { _num_key_columns++; diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index f8ffec16a4385d..50ffcb87b0a0a9 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -29,7 +29,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) : _alter_tablet_req(request) { _mem_tracker = MemTracker::create_tracker( config::memory_limitation_per_thread_for_schema_change_bytes, - fmt::format("EngineAlterTabletTask: {}-{}", + fmt::format("EngineAlterTabletTask:baseTabletId={}-newTabletId{}", std::to_string(_alter_tablet_req.base_tablet_id), std::to_string(_alter_tablet_req.new_tablet_id)), StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK); diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 4d16a334c6c168..c24c4c3108dff6 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -54,7 +54,9 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vectorbatch_load_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 112def6ea3bab9..cb6918e21df489 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -26,7 +26,7 @@ namespace doris { EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash, TVersion version, uint32_t* checksum) : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) { - _mem_tracker = MemTracker::create_tracker(-1, "compute checksum: " + std::to_string(tablet_id), + _mem_tracker = MemTracker::create_tracker(-1, "EngineChecksumTask:tabletId=" + std::to_string(tablet_id), StorageEngine::instance()->consistency_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 41e1fa1f9cf5b7..e49472f5f68bbb 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -58,7 +58,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo& _signature(signature), _master_info(master_info) { _mem_tracker = MemTracker::create_tracker( - -1, "clone tablet: " + std::to_string(_clone_req.tablet_id), + -1, "EngineCloneTask:tabletId=" + std::to_string(_clone_req.tablet_id), StorageEngine::instance()->clone_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index 4707622c72e9fa..14d9b85b8ae165 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -181,7 +181,7 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals) { _runtime_profile->set_name("FoldConstantExpr"); _mem_tracker = MemTracker::create_tracker(-1, "FoldConstantExpr", _runtime_state->instance_mem_tracker()); - _mem_pool.reset(new MemPool(_mem_tracker.get())); + _mem_pool.reset(new MemPool(_mem_tracker)); return Status::OK(); } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index ce5448a910ccb2..85f2c5da3f3526 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -18,6 +18,7 @@ #include "runtime/load_channel.h" #include "olap/lru_cache.h" +#include "runtime/exec_env.h" #include "runtime/mem_tracker.h" #include "runtime/tablets_channel.h" #include "runtime/thread_context.h" @@ -31,8 +32,11 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t tim _is_high_priority(is_high_priority), _sender_ip(sender_ip), _is_vec(is_vec) { - _mem_tracker = MemTracker::create_tracker(mem_limit, "LoadChannel:" + _load_id.to_string(), - nullptr, MemTrackerLevel::TASK); + _mem_tracker = MemTracker::create_tracker( + mem_limit, "LoadChannel:tabletId=" + _load_id.to_string(), + ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker( + _load_id.to_string()), + MemTrackerLevel::TASK); // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased // immediately by gc thread. @@ -47,7 +51,7 @@ LoadChannel::~LoadChannel() { } Status LoadChannel::open(const PTabletWriterOpenRequest& params) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t index_id = params.index_id(); std::shared_ptr channel; { @@ -133,7 +137,7 @@ bool LoadChannel::is_finished() { } Status LoadChannel::cancel() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); for (auto& it : _tablets_channels) { it.second->cancel(); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 1d0c3f04e1becc..644e546524d78c 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -128,7 +128,7 @@ class LoadChannel { template Status LoadChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t index_id = request.index_id(); // 1. get tablets channel std::shared_ptr channel; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 7b2ee5bb263fbc..2259eb24034ef5 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -102,7 +102,7 @@ LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64 } Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(params.id()); std::shared_ptr channel; { @@ -179,7 +179,7 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { } Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(params.id()); std::shared_ptr cancelled_channel; { diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index c9e84019b13430..49074fa38039a9 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -116,7 +116,7 @@ Status LoadChannelMgr::_get_load_channel(std::shared_ptr& channel, template Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(request.id()); // 1. get load channel std::shared_ptr channel; diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index adc86f0e67849b..f13fed484e65a1 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -41,7 +41,7 @@ const int MemPool::MAX_CHUNK_SIZE; const int MemPool::DEFAULT_ALIGNMENT; uint32_t MemPool::k_zero_length_region_ alignas(std::max_align_t) = MEM_POOL_POISON; -MemPool::MemPool(MemTracker* mem_tracker) +MemPool::MemPool(const std::shared_ptr& mem_tracker) : current_chunk_idx_(-1), next_chunk_size_(INITIAL_CHUNK_SIZE), total_allocated_bytes_(0), @@ -55,8 +55,7 @@ MemPool::MemPool(const std::string& label) total_allocated_bytes_(0), total_reserved_bytes_(0), peak_allocated_bytes_(0) { - _mem_tracker_own = MemTracker::create_tracker(-1, label + ":MemPool"); - _mem_tracker = _mem_tracker_own.get(); + _mem_tracker = MemTracker::create_tracker(-1, label + ":MemPool"); } MemPool::MemPool() @@ -65,7 +64,7 @@ MemPool::MemPool() total_allocated_bytes_(0), total_reserved_bytes_(0), peak_allocated_bytes_(0), - _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get()) {} + _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()) {} MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_bytes(0) { DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size); @@ -75,7 +74,7 @@ MemPool::~MemPool() { int64_t total_bytes_released = 0; for (auto& chunk : chunks_) { total_bytes_released += chunk.chunk.size; - ChunkAllocator::instance()->free(chunk.chunk, _mem_tracker); + ChunkAllocator::instance()->free(chunk.chunk, _mem_tracker.get()); } DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -94,7 +93,7 @@ void MemPool::free_all() { int64_t total_bytes_released = 0; for (auto& chunk : chunks_) { total_bytes_released += chunk.chunk.size; - ChunkAllocator::instance()->free(chunk.chunk, _mem_tracker); + ChunkAllocator::instance()->free(chunk.chunk, _mem_tracker.get()); } chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; @@ -146,8 +145,8 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) { // Allocate a new chunk. Return early if allocate fails. Chunk chunk; - RETURN_IF_ERROR( - ChunkAllocator::instance()->allocate(chunk_size, &chunk, _mem_tracker, check_limits)); + RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk, _mem_tracker.get(), + check_limits)); ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. if (first_free_idx == static_cast(chunks_.size())) { @@ -192,7 +191,7 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { // Skip unnecessary atomic ops if the mem_trackers are the same. if (src->_mem_tracker != _mem_tracker) { - src->_mem_tracker->transfer_to(_mem_tracker, total_transferred_bytes); + src->_mem_tracker->transfer_to(_mem_tracker.get(), total_transferred_bytes); } // insert new chunks after current_chunk_idx_ @@ -221,7 +220,9 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { void MemPool::exchange_data(MemPool* other) { int64_t delta_size = other->total_reserved_bytes_ - total_reserved_bytes_; - other->_mem_tracker->transfer_to(_mem_tracker, delta_size); + if (other->_mem_tracker != _mem_tracker) { + other->_mem_tracker->transfer_to(_mem_tracker.get(), delta_size); + } std::swap(current_chunk_idx_, other->current_chunk_idx_); std::swap(next_chunk_size_, other->next_chunk_size_); diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index fc17854f488cf5..506f01376c2bbc 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -93,7 +93,7 @@ class MemTracker; class MemPool { public: // 'tracker' tracks the amount of memory allocated by this pool. Must not be nullptr. - MemPool(MemTracker* mem_tracker); + MemPool(const std::shared_ptr& mem_tracker); MemPool(const std::string& label); MemPool(); @@ -161,7 +161,7 @@ class MemPool { int64_t total_reserved_bytes() const { return total_reserved_bytes_; } int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; } - MemTracker* mem_tracker() { return _mem_tracker; } + std::shared_ptr mem_tracker() { return _mem_tracker; } static constexpr int DEFAULT_ALIGNMENT = 8; @@ -308,9 +308,7 @@ class MemPool { /// The current and peak memory footprint of this pool. This is different from /// total allocated_bytes_ since it includes bytes in chunks that are not used. - MemTracker* _mem_tracker; - // TODO(zxy) temp variable, In the future, mem trackers should all use raw pointers. - std::shared_ptr _mem_tracker_own; + std::shared_ptr _mem_tracker; }; // Stamp out templated implementations here so they're included in IR module diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index f42de2ef59c088..eead60ddc818f9 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -60,18 +60,16 @@ MemTracker* MemTracker::get_raw_process_tracker() { return raw_process_tracker; } -// Track memory for all brpc server responses. -static std::shared_ptr brpc_server_tracker; -static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT; - -void MemTracker::create_brpc_server_tracker() { - brpc_server_tracker = MemTracker::create_tracker(-1, "Brpc", get_process_tracker(), - MemTrackerLevel::OVERVIEW); -} - -std::shared_ptr MemTracker::get_brpc_server_tracker() { - GoogleOnceInit(&brpc_server_tracker_once, &MemTracker::create_brpc_server_tracker); - return brpc_server_tracker; +static TrackersMap _temporary_mem_trackers; + +std::shared_ptr MemTracker::get_temporary_mem_tracker(const std::string& label) { + // First time this label registered, make a new object, otherwise do nothing. + // Avoid using locks to resolve erase conflicts. + _temporary_mem_trackers.try_emplace_l( + label, [](std::shared_ptr) {}, + MemTracker::create_tracker(-1, fmt::format("[Temporary]-{}", label), nullptr, + MemTrackerLevel::OVERVIEW)); + return _temporary_mem_trackers[label]; } void MemTracker::list_process_trackers(std::vector>* trackers) { @@ -102,14 +100,8 @@ std::shared_ptr MemTracker::create_tracker(int64_t byte_limit, const const std::shared_ptr& parent, MemTrackerLevel level, RuntimeProfile* profile) { - std::shared_ptr reset_parent = - parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); - DCHECK(reset_parent); - - std::shared_ptr tracker( - new MemTracker(byte_limit, label, reset_parent, - level > reset_parent->_level ? level : reset_parent->_level, profile)); - reset_parent->add_child_tracker(tracker); + std::shared_ptr tracker = + MemTracker::create_tracker_impl(byte_limit, label, parent, level, profile); tracker->init(); return tracker; } @@ -117,14 +109,30 @@ std::shared_ptr MemTracker::create_tracker(int64_t byte_limit, const std::shared_ptr MemTracker::create_virtual_tracker( int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, MemTrackerLevel level) { + std::shared_ptr tracker = MemTracker::create_tracker_impl( + byte_limit, "[Virtual]-" + label, parent, level, nullptr); + tracker->init_virtual(); + return tracker; +} + +std::shared_ptr MemTracker::create_tracker_impl( + int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, + MemTrackerLevel level, RuntimeProfile* profile) { std::shared_ptr reset_parent = parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); DCHECK(reset_parent); + std::string reset_label; + MemTracker* task_parent_tracker = reset_parent->parent_task_mem_tracker(); + if (task_parent_tracker) { + reset_label = fmt::format("{}:{}", label, split(task_parent_tracker->label(), ":")[1]); + } else { + reset_label = label; + } std::shared_ptr tracker( - new MemTracker(byte_limit, "[Virtual]-" + label, reset_parent, level, nullptr)); + new MemTracker(byte_limit, reset_label, reset_parent, + level > reset_parent->_level ? level : reset_parent->_level, profile)); reset_parent->add_child_tracker(tracker); - tracker->init_virtual(); return tracker; } @@ -318,7 +326,7 @@ bool MemTracker::gc_memory(int64_t max_consumption) { if (pre_gc_consumption < max_consumption) return false; int64_t curr_consumption = pre_gc_consumption; - const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L; // TODO(zxy) Consider as config + const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L; // Try to free up some memory for (int i = 0; i < _gc_functions.size(); ++i) { // Try to free up the amount we are over plus some extra so that we don't have to diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index 74a7b4bef6bb12..902b5e2efb210d 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -20,6 +20,8 @@ #pragma once +#include + #include #include #include @@ -40,6 +42,11 @@ enum class MemTrackerLevel { OVERVIEW = 0, TASK, INSTANCE, VERBOSE }; class MemTracker; class RuntimeState; +using TrackersMap = phmap::parallel_flat_hash_map< + std::string, std::shared_ptr, phmap::priv::hash_default_hash, + phmap::priv::hash_default_eq, + std::allocator>>, 12, std::mutex>; + /// A MemTracker tracks memory consumption; it contains an optional limit /// and can be arranged into a tree structure such that the consumption tracked /// by a MemTracker is also tracked by its ancestors. @@ -97,8 +104,9 @@ class MemTracker { // Gets a shared_ptr to the "process" tracker, creating it if necessary. static std::shared_ptr get_process_tracker(); static MemTracker* get_raw_process_tracker(); - // Gets a shared_ptr to the "brpc server" tracker, creating it if necessary. - static std::shared_ptr get_brpc_server_tracker(); + // Get a temporary tracker with a specified label, and the tracker will be created when the label is first get. + // Temporary trackers are not automatically destructed, which is usually used for debugging. + static std::shared_ptr get_temporary_mem_tracker(const std::string& label); Status check_sys_mem_info(int64_t bytes) { if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) { @@ -423,6 +431,10 @@ class MemTracker { static const std::string COUNTER_NAME; private: + static std::shared_ptr create_tracker_impl( + int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, + MemTrackerLevel level, RuntimeProfile* profile); + /// 'byte_limit' < 0 means no limit /// 'label' is the label used in the usage string (log_usage()) MemTracker(int64_t byte_limit, const std::string& label, @@ -466,8 +478,6 @@ class MemTracker { // Creates the process tracker. static void create_process_tracker(); - // Creates the brpc server tracker. - static void create_brpc_server_tracker(); // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. int64_t _limit; @@ -486,8 +496,6 @@ class MemTracker { // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate // to avoid frequent calls to consume/release of MemTracker. - // TODO(zxy) It may be more performant to use thread_local static, which is inherently thread-safe. - // Test after introducing TCMalloc hook std::atomic _untracked_mem = 0; std::vector _all_trackers; // this tracker plus all of its ancestors diff --git a/be/src/runtime/mem_tracker_task_pool.cpp b/be/src/runtime/mem_tracker_task_pool.cpp index 2d43b927e80329..132b47f57356ab 100644 --- a/be/src/runtime/mem_tracker_task_pool.cpp +++ b/be/src/runtime/mem_tracker_task_pool.cpp @@ -33,15 +33,15 @@ std::shared_ptr MemTrackerTaskPool::register_task_mem_tracker_impl( _task_mem_trackers.try_emplace_l( task_id, [](std::shared_ptr) {}, MemTracker::create_tracker(mem_limit, label, parent, MemTrackerLevel::TASK)); - std::shared_ptr tracker = get_task_mem_tracker(task_id); - return tracker; + return get_task_mem_tracker(task_id); } std::shared_ptr MemTrackerTaskPool::register_query_mem_tracker( const std::string& query_id, int64_t mem_limit) { VLOG_FILE << "Register Query memory tracker, query id: " << query_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); - return register_task_mem_tracker_impl(query_id, mem_limit, fmt::format("queryId={}", query_id), + return register_task_mem_tracker_impl(query_id, mem_limit, + fmt::format("Query:queryId={}", query_id), ExecEnv::GetInstance()->query_pool_mem_tracker()); } @@ -49,7 +49,8 @@ std::shared_ptr MemTrackerTaskPool::register_load_mem_tracker( const std::string& load_id, int64_t mem_limit) { VLOG_FILE << "Register Load memory tracker, load id: " << load_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); - return register_task_mem_tracker_impl(load_id, mem_limit, fmt::format("loadId={}", load_id), + return register_task_mem_tracker_impl(load_id, mem_limit, + fmt::format("Load:loadId={}", load_id), ExecEnv::GetInstance()->load_pool_mem_tracker()); } diff --git a/be/src/runtime/mem_tracker_task_pool.h b/be/src/runtime/mem_tracker_task_pool.h index 20b0eaf7be1fda..e3baec8d518c51 100644 --- a/be/src/runtime/mem_tracker_task_pool.h +++ b/be/src/runtime/mem_tracker_task_pool.h @@ -17,8 +17,6 @@ #pragma once -#include - #include "runtime/mem_tracker.h" namespace doris { @@ -50,13 +48,7 @@ class MemTrackerTaskPool { // All per-task MemTracker objects. // The life cycle of task memtracker in the process is the same as task runtime state, // MemTrackers will be removed from this map after query finish or cancel. - using TaskTrackersMap = phmap::parallel_flat_hash_map< - std::string, std::shared_ptr, phmap::priv::hash_default_hash, - phmap::priv::hash_default_eq, - std::allocator>>, 12, - std::mutex>; - - TaskTrackersMap _task_mem_trackers; + TrackersMap _task_mem_trackers; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 7f8259c03496d8..af809560bb35c0 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -145,6 +145,7 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, } else { _mem_tracker->transfer_to(reset_tracker, size); } + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); // fast path: allocate from current core arena int core_id = CpuInfo::get_current_core(); @@ -177,9 +178,6 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, SCOPED_RAW_TIMER(&cost_ns); // allocate from system allocator chunk->data = SystemAllocator::allocate(size); - // The allocated chunk is consumed in the tls mem tracker, we want to consume in the ChunkAllocator tracker, - // transfer memory ownership. TODO(zxy) replace with switch tls tracker - tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), size); } chunk_pool_system_alloc_count->increment(1); chunk_pool_system_alloc_cost_ns->increment(cost_ns); @@ -193,6 +191,13 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, } void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { + // The chunk's memory ownership is transferred from tls tracker to ChunkAllocator. + if (tracker) { + tracker->transfer_to(_mem_tracker.get(), chunk.size); + } else { + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), chunk.size); + } + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); if (chunk.core_id == -1) { return; } @@ -205,12 +210,6 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { { SCOPED_RAW_TIMER(&cost_ns); SystemAllocator::free(chunk.data, chunk.size); - // The freed chunk is released in the tls mem tracker. When the chunk was allocated, - // it was consumed in the parameter tracker, so if the tls mem tracker and the parameter - // tracker are different, transfer memory ownership. - if (tracker) - tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), - chunk.size); } chunk_pool_system_free_count->increment(1); chunk_pool_system_free_cost_ns->increment(cost_ns); @@ -219,13 +218,6 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { } } while (!_reserved_bytes.compare_exchange_weak(old_reserved_bytes, new_reserved_bytes)); - // The chunk's memory ownership is transferred from MemPool to ChunkAllocator. - if (tracker) { - tracker->transfer_to(_mem_tracker.get(), chunk.size); - } else { - tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), - chunk.size); - } _arenas[chunk.core_id]->push_free_chunk(chunk.data, chunk.size); } diff --git a/be/src/runtime/result_file_sink.cpp b/be/src/runtime/result_file_sink.cpp index ee7fa490db884b..1f29c3057422b1 100644 --- a/be/src/runtime/result_file_sink.cpp +++ b/be/src/runtime/result_file_sink.cpp @@ -114,10 +114,6 @@ Status ResultFileSink::prepare(RuntimeState* state) { _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES); _uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); - // TODO(zxy) used after - _mem_tracker = MemTracker::create_tracker( - -1, "ResultFileSink:" + print_id(state->fragment_instance_id()), - state->instance_mem_tracker(), MemTrackerLevel::VERBOSE, _profile); // create writer _output_batch = new RowBatch(_output_row_descriptor, 1024); _writer.reset(new (std::nothrow) FileResultWriter( diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 07dbca425e046f..2ede9538358170 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -53,7 +53,7 @@ TabletsChannel::~TabletsChannel() { } Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); if (_state == kOpened) { // Normal case, already open by other sender @@ -138,7 +138,6 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, } Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); if (_state == kFinished) { // TabletsChannel is closed without LoadChannel's lock, @@ -239,7 +238,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request } Status TabletsChannel::cancel() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); if (_state == kFinished) { return _close_status; diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index eb39956cba68a1..5934c401083252 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -168,7 +168,7 @@ Status TabletsChannel::_get_current_seq(int64_t& cur_seq, const Request& request template Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t cur_seq = 0; auto status = _get_current_seq(cur_seq, request); diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h index 548b8862990b8f..6750ce8e746869 100644 --- a/be/src/runtime/tcmalloc_hook.h +++ b/be/src/runtime/tcmalloc_hook.h @@ -23,32 +23,40 @@ #include "runtime/thread_context.h" -// Notice: modify the command in New/Delete Hook should be careful enough!, -// and should be as simple as possible, otherwise it may cause weird errors. E.g: -// 1. The first New Hook call of the process may be before some variables of -// the process are initialized. -// 2. Allocating memory in the Hook command causes the Hook to be entered again, -// infinite recursion. -// 3. TCMalloc hook will be triggered during the process of initializing/Destructor -// memtracker shared_ptr, Using the object pointed to by this memtracker shared_ptr -// in TCMalloc hook may cause crash. -// 4. Modifying additional thread local variables in ThreadContext construction and -// destructor to control the behavior of consume can lead to unexpected behavior, -// like this: if (LIKELY(doris::start_thread_mem_tracker)) { -void new_hook(const void* ptr, size_t size) { - doris::tls_ctx()->consume_mem(tc_nallocx(size, 0)); -} - -void delete_hook(const void* ptr) { - doris::tls_ctx()->release_mem(tc_malloc_size(const_cast(ptr))); -} - -void init_hook() { - MallocHook::AddNewHook(&new_hook); - MallocHook::AddDeleteHook(&delete_hook); -} - -void destroy_hook() { - MallocHook::RemoveNewHook(&new_hook); - MallocHook::RemoveDeleteHook(&delete_hook); -} +namespace doris { + +class TcmallocHook { +public: + // Notice: modify the command in New/Delete Hook should be careful enough!, + // and should be as simple as possible, otherwise it may cause weird errors. E.g: + // 1. The first New Hook call of the process may be before some variables of + // the process are initialized. + // 2. Allocating memory in the Hook command causes the Hook to be entered again, + // infinite recursion. + // 3. TCMalloc hook will be triggered during the process of initializing/Destructor + // memtracker shared_ptr, Using the object pointed to by this memtracker shared_ptr + // in TCMalloc hook may cause crash. + // 4. Modifying additional thread local variables in ThreadContext construction and + // destructor to control the behavior of consume can lead to unexpected behavior, + // like this: if (LIKELY(doris::start_thread_mem_tracker)) { + static void new_hook(const void* ptr, size_t size) { + doris::tls_ctx()->consume_mem(tc_nallocx(size, 0)); + } + + static void delete_hook(const void* ptr) { + doris::tls_ctx()->release_mem(tc_malloc_size(const_cast(ptr))); + } + + static void init_hook() { + MallocHook::AddNewHook(&new_hook); + MallocHook::AddDeleteHook(&delete_hook); + } + + static void destroy_hook() { + MallocHook::RemoveNewHook(&new_hook); + MallocHook::RemoveDeleteHook(&delete_hook); + } + + static void destroy_hook1() { MallocHook::RemoveDeleteHook(&delete_hook); } +}; +} // namespace doris diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index d1f206dedc8740..c6989045e9c74c 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -128,7 +128,9 @@ SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack( SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() { tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb); +#ifndef NDEBUG DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1); +#endif } SwitchBthread::SwitchBthread() { @@ -141,6 +143,7 @@ SwitchBthread::SwitchBthread() { // set the data so that next time bthread_getspecific in the thread returns the data. CHECK_EQ(0, bthread_setspecific(btls_key, tls)); } else { + tls->_thread_mem_tracker_mgr->clear_untracked_mems(); tls->_thread_mem_tracker_mgr->init_bthread(); } } @@ -148,6 +151,7 @@ SwitchBthread::SwitchBthread() { SwitchBthread::~SwitchBthread() { DCHECK(tls != nullptr); tls->_thread_mem_tracker_mgr->clear_untracked_mems(); + tls->_thread_mem_tracker_mgr->init(); #ifndef NDEBUG DorisMetrics::instance()->switch_bthread_count->increment(1); #endif diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 8ab72be634df0d..804b1a766885ea 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -45,12 +45,22 @@ // The query thread will automatically clear_untracked_mems when detach_task. #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, false) +// `detach task/~switch bthread` will clear cached trackers and unconsumed tracks. +// Used after `attach task/switch bthread` to avoid cached trackers not being destroyed in time. #define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, true); #define SCOPED_SWITCH_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, false) #define SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, true) +// Count the memory in the scope to a temporary tracker with the specified label name. +// This is very useful when debugging. You can find the position where the tracker statistics are +// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots. +// TODO(zxy) track specifies the memory for each line in the code segment, instead of manually adding +// a switch temporary tracker to each line. Maybe there are open source tools to do this? +#define SCOPED_SWITCH_TEMPORARY_THREAD_LOCAL_MEM_TRACKER(label) \ + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker( \ + MemTracker::get_temporary_mem_tracker(label), false) // After the non-query thread switches the mem tracker, if the thread will not switch the mem // tracker again in the short term, can consider manually clear_untracked_mems. // The query thread will automatically clear_untracked_mems when detach_task. diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp index e55a4620f00e31..e9768bf86bd697 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/thread_mem_tracker_mgr.cpp @@ -28,6 +28,8 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { DCHECK(switch_count == 0) << print_debug_string(); + clear_untracked_mems(); + init(); _task_id = task_id; _fragment_instance_id = fragment_instance_id; _consume_err_cb.cancel_msg = cancel_msg; @@ -37,10 +39,10 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: return; } #endif - _temp_task_mem_tracker = + std::shared_ptr tracker = ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker( task_id); - update_tracker(_temp_task_mem_tracker); + update_tracker(tracker); } else { update_tracker(mem_tracker); } @@ -48,9 +50,7 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: void ThreadMemTrackerMgr::detach_task() { DCHECK(switch_count == 0) << print_debug_string(); - _task_id = ""; _fragment_instance_id = TUniqueId(); - _consume_err_cb.init(); clear_untracked_mems(); init(); } diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index 404837a73a0aa8..eb81c2ba78ef52 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -160,14 +160,16 @@ class ThreadMemTrackerMgr { // NOTE: flat_hash_map, int replaces string as key, all to improve the speed of map find, // the expected speed is increased by more than 10 times. phmap::flat_hash_map> _mem_trackers; - int64_t _tracker_id; phmap::flat_hash_map _untracked_mems; + // After the tracker is added to _mem_trackers, if tracker = null is found when using it, + // we can confirm the tracker label that was added through _mem_tracker_labels. + // Because for performance, all map keys are tracker id. phmap::flat_hash_map _mem_tracker_labels; - // Avoid memory allocation in functions and fall into an infinite loop + int64_t _tracker_id; + // Avoid memory allocation in functions. int64_t _temp_tracker_id; ConsumeErrCallBackInfo _temp_consume_err_cb; - std::shared_ptr _temp_task_mem_tracker; std::string _task_id; TUniqueId _fragment_instance_id; @@ -175,6 +177,8 @@ class ThreadMemTrackerMgr { }; inline void ThreadMemTrackerMgr::init() { + _task_id = ""; + _consume_err_cb.init(); _tracker_id = 0; _mem_trackers.clear(); _mem_trackers[0] = MemTracker::get_process_tracker(); @@ -186,10 +190,10 @@ inline void ThreadMemTrackerMgr::init() { inline void ThreadMemTrackerMgr::init_bthread() { init(); - _mem_trackers[1] = MemTracker::get_brpc_server_tracker(); - _untracked_mems[1] = 0; - _mem_tracker_labels[1] = MemTracker::get_brpc_server_tracker()->label(); - _tracker_id = 1; + // `is_attach_task=true` when using `SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER`. + // This means that trackers cached in `_mem_trackers` are expected to be emptied later, + // preventing trackers from being released in time. + _task_id = "brpc"; } inline void ThreadMemTrackerMgr::clear_untracked_mems() { @@ -279,7 +283,7 @@ inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr& _mem_trackers[mem_tracker->id()] = mem_tracker; DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string(); _untracked_mems[mem_tracker->id()] = 0; - _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); + _mem_tracker_labels[mem_tracker->id()] = mem_tracker->label(); } inline std::shared_ptr ThreadMemTrackerMgr::mem_tracker() { diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 1bcdfdb9d34983..5a121a177c4bea 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -334,7 +334,7 @@ int main(int argc, char** argv) { return -1; } if (doris::config::track_new_delete) { - init_hook(); + doris::TcmallocHook::init_hook(); } #endif @@ -484,6 +484,7 @@ int main(int argc, char** argv) { doris::MemInfo::refresh_current_mem(); #endif // TODO(zxy) 10s is too long to clear the expired task mem tracker. + // A query mem tracker is about 57 bytes, assuming 10000 qps, which wastes about 55M of memory. // It should be actively triggered at the end of query/load. doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); sleep(10); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index d9e575621034d2..ca9a2f9ad87d96 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -152,7 +152,6 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer add batch, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); @@ -161,6 +160,7 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll // exhausted, so we put this to a local thread pool to process int64_t submit_task_time_ns = MonotonicNanos(); _tablet_worker_pool.offer([cntl_base, request, response, done, submit_task_time_ns, this]() { + SCOPED_SWITCH_BTHREAD(); int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index b4fe98ec869474..1ee0f35ab2c468 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -149,7 +149,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); DCHECK(child(0)->row_desc().is_prefix_of(row_desc())); - _mem_pool.reset(new MemPool(mem_tracker().get())); + _mem_pool.reset(new MemPool(mem_tracker())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); SCOPED_TIMER(_evaluation_timer); diff --git a/be/src/vec/exec/vblocking_join_node.cpp b/be/src/vec/exec/vblocking_join_node.cpp index 05140888aee7d1..3846fb595a5ad5 100644 --- a/be/src/vec/exec/vblocking_join_node.cpp +++ b/be/src/vec/exec/vblocking_join_node.cpp @@ -43,7 +43,7 @@ Status VBlockingJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - _build_pool.reset(new MemPool(mem_tracker().get())); + _build_pool.reset(new MemPool(mem_tracker())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _left_child_timer = ADD_TIMER(runtime_profile(), "LeftChildTime"); _build_row_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); diff --git a/be/src/vec/exec/ves_http_scan_node.cpp b/be/src/vec/exec/ves_http_scan_node.cpp index a70acadb89bb2a..d215e7be05f6c8 100644 --- a/be/src/vec/exec/ves_http_scan_node.cpp +++ b/be/src/vec/exec/ves_http_scan_node.cpp @@ -106,7 +106,7 @@ Status VEsHttpScanNode::scanner_scan(std::unique_ptr scanner) { bool scanner_eof = false; const int batch_size = _runtime_state->batch_size(); - std::unique_ptr tuple_pool(new MemPool(mem_tracker().get())); + std::unique_ptr tuple_pool(new MemPool(mem_tracker())); size_t slot_num = _tuple_desc->slots().size(); while (!scanner_eof) { diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 6615e0ff8c56ae..e1f3b2a4d79a40 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -42,7 +42,7 @@ doris::Status VExprContext::prepare(doris::RuntimeState* state, _prepared = true; _mem_tracker = tracker; SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); - _pool.reset(new MemPool(_mem_tracker.get())); + _pool.reset(new MemPool(_mem_tracker)); return _root->prepare(state, row_desc, this); } From 70a552dbb44ac334d1802be0c5e60c597a5474fb Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 4 May 2022 11:42:54 +0800 Subject: [PATCH 2/6] fix comment --- be/src/exec/analytic_eval_node.cpp | 6 +- be/src/exec/base_scanner.cpp | 2 +- be/src/exec/blocking_join_node.cpp | 2 +- be/src/exec/hash_join_node.cpp | 2 +- be/src/exec/olap_scanner.cpp | 2 +- be/src/exec/partitioned_aggregation_node.cc | 8 +-- be/src/exec/set_operation_node.cpp | 2 +- be/src/exec/tablet_sink.cpp | 5 +- be/src/exec/topn_node.cpp | 2 +- be/src/exprs/expr_context.cpp | 2 +- be/src/olap/memtable.cpp | 4 +- be/src/olap/task/engine_alter_tablet_task.cpp | 2 +- be/src/olap/task/engine_batch_load_task.cpp | 2 +- be/src/runtime/fold_constant_executor.cpp | 2 +- be/src/runtime/load_channel_mgr.h | 3 +- be/src/runtime/mem_pool.cpp | 19 +++--- be/src/runtime/mem_pool.h | 8 ++- be/src/runtime/mem_tracker.cpp | 1 + be/src/runtime/tcmalloc_hook.h | 67 +++++++++---------- be/src/service/doris_main.cpp | 2 +- be/src/service/internal_service.cpp | 4 ++ be/src/vec/exec/vanalytic_eval_node.cpp | 2 +- be/src/vec/exec/vblocking_join_node.cpp | 2 +- be/src/vec/exec/ves_http_scan_node.cpp | 2 +- be/src/vec/exprs/vexpr_context.cpp | 2 +- 25 files changed, 79 insertions(+), 76 deletions(-) diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index 5e76d89aa6dfe1..43e193d0ab44b9 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -146,9 +146,9 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) { SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); DCHECK(child(0)->row_desc().is_prefix_of(row_desc())); _child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0]; - _curr_tuple_pool.reset(new MemPool(mem_tracker())); - _prev_tuple_pool.reset(new MemPool(mem_tracker())); - _mem_pool.reset(new MemPool(mem_tracker())); + _curr_tuple_pool.reset(new MemPool(mem_tracker().get())); + _prev_tuple_pool.reset(new MemPool(mem_tracker().get())); + _mem_pool.reset(new MemPool(mem_tracker().get())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); DCHECK_EQ(_result_tuple_desc->slots().size(), _evaluators.size()); diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 8d2892c305a131..ca5b08831fdfb2 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -47,7 +47,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, ? "BaseScanner:" + std::to_string(state->load_job_id()) : "BaseScanner:Select")), #endif - _mem_pool(std::make_unique(_mem_tracker)), + _mem_pool(std::make_unique(_mem_tracker.get())), _dest_tuple_desc(nullptr), _pre_filter_texprs(pre_filter_texprs), _strict_mode(false), diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp index cf3544d894e243..d88ce56ecf6914 100644 --- a/be/src/exec/blocking_join_node.cpp +++ b/be/src/exec/blocking_join_node.cpp @@ -52,7 +52,7 @@ Status BlockingJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - _build_pool.reset(new MemPool(mem_tracker())); + _build_pool.reset(new MemPool(mem_tracker().get())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _left_child_timer = ADD_TIMER(runtime_profile(), "LeftChildTime"); _build_row_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index d2e15543c4265b..1ee401fe51133e 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -100,7 +100,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - _build_pool.reset(new MemPool(mem_tracker())); + _build_pool.reset(new MemPool(mem_tracker().get())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime"); _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 0d412a723b9d42..523474a50316ef 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -285,7 +285,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size()); Tuple* tuple = reinterpret_cast(tuple_buf); - std::unique_ptr mem_pool(new MemPool(_mem_tracker)); + std::unique_ptr mem_pool(new MemPool(_mem_tracker.get())); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; { diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index 805bfa7e9d3483..a99b0da23b5338 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -189,8 +189,8 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) { SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); state_ = state; - mem_pool_.reset(new MemPool(mem_tracker())); - agg_fn_pool_.reset(new MemPool(expr_mem_tracker())); + mem_pool_.reset(new MemPool(mem_tracker().get())); + agg_fn_pool_.reset(new MemPool(expr_mem_tracker().get())); ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime"); get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime"); @@ -233,7 +233,7 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(NewAggFnEvaluator::Create(agg_fns_, state, _pool, agg_fn_pool_.get(), &agg_fn_evals_, expr_mem_tracker(), row_desc)); - expr_results_pool_.reset(new MemPool(expr_mem_tracker())); + expr_results_pool_.reset(new MemPool(expr_mem_tracker().get())); if (!grouping_exprs_.empty()) { RowDescriptor build_row_desc(intermediate_tuple_desc_, false); RETURN_IF_ERROR(PartitionedHashTableCtx::Create( @@ -728,7 +728,7 @@ PartitionedAggregationNode::Partition::~Partition() { } Status PartitionedAggregationNode::Partition::InitStreams() { - agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker())); + agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker().get())); DCHECK_EQ(agg_fn_evals.size(), 0); NewAggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(), parent->agg_fn_evals_, &agg_fn_evals); diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index d7a0c4ed7df280..6d0ee3676d1e6a 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -45,7 +45,7 @@ Status SetOperationNode::prepare(RuntimeState* state) { SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); - _build_pool.reset(new MemPool(mem_tracker())); + _build_pool.reset(new MemPool(mem_tracker().get())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); for (size_t i = 0; i < _child_expr_lists.size(); ++i) { diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 15f2bf86f109af..4f3cfaaef6489e 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -52,8 +52,9 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int if (_parent->_transfer_data_by_brpc_attachment) { _tuple_data_buffer_ptr = &_tuple_data_buffer; } - _node_channel_tracker = - MemTracker::create_tracker(-1, "NodeChannel:" + std::to_string(_index_channel->_index_id)); + _node_channel_tracker = MemTracker::create_tracker( + -1, fmt::format("NodeChannel:indexID={}:threadId={}", + std::to_string(_index_channel->_index_id), tls_ctx()->thread_id_str())); } NodeChannel::~NodeChannel() noexcept { diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index 545acaf55b53fb..73ad4410a1f9dc 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -63,7 +63,7 @@ Status TopNNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - _tuple_pool.reset(new MemPool(mem_tracker())); + _tuple_pool.reset(new MemPool(mem_tracker().get())); RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor, expr_mem_tracker())); // AddExprCtxsToFree(_sort_exec_exprs); diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp index 6259def394c108..518a69d77439be 100644 --- a/be/src/exprs/expr_context.cpp +++ b/be/src/exprs/expr_context.cpp @@ -55,7 +55,7 @@ Status ExprContext::prepare(RuntimeState* state, const RowDescriptor& row_desc, SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); DCHECK(_pool.get() == nullptr); _prepared = true; - _pool.reset(new MemPool(_mem_tracker)); + _pool.reset(new MemPool(_mem_tracker.get())); return _root->prepare(state, row_desc, this); } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 7a1b1bb2232bb4..7b9521e775ca7a 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -40,8 +40,8 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _slot_descs(slot_descs), _keys_type(keys_type), _mem_tracker(MemTracker::create_tracker(-1, "MemTable", parent_tracker)), - _buffer_mem_pool(new MemPool(_mem_tracker)), - _table_mem_pool(new MemPool(_mem_tracker)), + _buffer_mem_pool(new MemPool(_mem_tracker.get())), + _table_mem_pool(new MemPool(_mem_tracker.get())), _schema_size(_schema->schema_size()), _rowset_writer(rowset_writer), _is_first_insertion(true), diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index 50ffcb87b0a0a9..7686a632ae46ad 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -29,7 +29,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) : _alter_tablet_req(request) { _mem_tracker = MemTracker::create_tracker( config::memory_limitation_per_thread_for_schema_change_bytes, - fmt::format("EngineAlterTabletTask:baseTabletId={}-newTabletId{}", + fmt::format("EngineAlterTabletTask:baseTabletId={}:newTabletId={}", std::to_string(_alter_tablet_req.base_tablet_id), std::to_string(_alter_tablet_req.new_tablet_id)), StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK); diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index c24c4c3108dff6..3f6b85b928613c 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -55,7 +55,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vectorbatch_load_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index 14d9b85b8ae165..4707622c72e9fa 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -181,7 +181,7 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals) { _runtime_profile->set_name("FoldConstantExpr"); _mem_tracker = MemTracker::create_tracker(-1, "FoldConstantExpr", _runtime_state->instance_mem_tracker()); - _mem_pool.reset(new MemPool(_mem_tracker)); + _mem_pool.reset(new MemPool(_mem_tracker.get())); return Status::OK(); } diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 49074fa38039a9..121a9ae2740883 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -58,6 +58,8 @@ class LoadChannelMgr { // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); + std::shared_ptr mem_tracker() { return _mem_tracker; } + private: static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, bool is_high_priority, @@ -116,7 +118,6 @@ Status LoadChannelMgr::_get_load_channel(std::shared_ptr& channel, template Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(request.id()); // 1. get load channel std::shared_ptr channel; diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index f13fed484e65a1..ae70dc4df68c82 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -41,7 +41,7 @@ const int MemPool::MAX_CHUNK_SIZE; const int MemPool::DEFAULT_ALIGNMENT; uint32_t MemPool::k_zero_length_region_ alignas(std::max_align_t) = MEM_POOL_POISON; -MemPool::MemPool(const std::shared_ptr& mem_tracker) +MemPool::MemPool(MemTracker* mem_tracker) : current_chunk_idx_(-1), next_chunk_size_(INITIAL_CHUNK_SIZE), total_allocated_bytes_(0), @@ -55,7 +55,8 @@ MemPool::MemPool(const std::string& label) total_allocated_bytes_(0), total_reserved_bytes_(0), peak_allocated_bytes_(0) { - _mem_tracker = MemTracker::create_tracker(-1, label + ":MemPool"); + _mem_tracker_own = MemTracker::create_tracker(-1, label + ":MemPool"); + _mem_tracker = _mem_tracker_own.get(); } MemPool::MemPool() @@ -64,7 +65,7 @@ MemPool::MemPool() total_allocated_bytes_(0), total_reserved_bytes_(0), peak_allocated_bytes_(0), - _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()) {} + _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get()) {} MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_bytes(0) { DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size); @@ -74,7 +75,7 @@ MemPool::~MemPool() { int64_t total_bytes_released = 0; for (auto& chunk : chunks_) { total_bytes_released += chunk.chunk.size; - ChunkAllocator::instance()->free(chunk.chunk, _mem_tracker.get()); + ChunkAllocator::instance()->free(chunk.chunk, _mem_tracker); } DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -93,7 +94,7 @@ void MemPool::free_all() { int64_t total_bytes_released = 0; for (auto& chunk : chunks_) { total_bytes_released += chunk.chunk.size; - ChunkAllocator::instance()->free(chunk.chunk, _mem_tracker.get()); + ChunkAllocator::instance()->free(chunk.chunk, _mem_tracker); } chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; @@ -145,8 +146,8 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) { // Allocate a new chunk. Return early if allocate fails. Chunk chunk; - RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk, _mem_tracker.get(), - check_limits)); + RETURN_IF_ERROR( + ChunkAllocator::instance()->allocate(chunk_size, &chunk, _mem_tracker, check_limits)); ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. if (first_free_idx == static_cast(chunks_.size())) { @@ -191,7 +192,7 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { // Skip unnecessary atomic ops if the mem_trackers are the same. if (src->_mem_tracker != _mem_tracker) { - src->_mem_tracker->transfer_to(_mem_tracker.get(), total_transferred_bytes); + src->_mem_tracker->transfer_to(_mem_tracker, total_transferred_bytes); } // insert new chunks after current_chunk_idx_ @@ -221,7 +222,7 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { void MemPool::exchange_data(MemPool* other) { int64_t delta_size = other->total_reserved_bytes_ - total_reserved_bytes_; if (other->_mem_tracker != _mem_tracker) { - other->_mem_tracker->transfer_to(_mem_tracker.get(), delta_size); + other->_mem_tracker->transfer_to(_mem_tracker, delta_size); } std::swap(current_chunk_idx_, other->current_chunk_idx_); diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 506f01376c2bbc..fc17854f488cf5 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -93,7 +93,7 @@ class MemTracker; class MemPool { public: // 'tracker' tracks the amount of memory allocated by this pool. Must not be nullptr. - MemPool(const std::shared_ptr& mem_tracker); + MemPool(MemTracker* mem_tracker); MemPool(const std::string& label); MemPool(); @@ -161,7 +161,7 @@ class MemPool { int64_t total_reserved_bytes() const { return total_reserved_bytes_; } int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; } - std::shared_ptr mem_tracker() { return _mem_tracker; } + MemTracker* mem_tracker() { return _mem_tracker; } static constexpr int DEFAULT_ALIGNMENT = 8; @@ -308,7 +308,9 @@ class MemPool { /// The current and peak memory footprint of this pool. This is different from /// total allocated_bytes_ since it includes bytes in chunks that are not used. - std::shared_ptr _mem_tracker; + MemTracker* _mem_tracker; + // TODO(zxy) temp variable, In the future, mem trackers should all use raw pointers. + std::shared_ptr _mem_tracker_own; }; // Stamp out templated implementations here so they're included in IR module diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index eead60ddc818f9..bc2ae3efc1ab13 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -326,6 +326,7 @@ bool MemTracker::gc_memory(int64_t max_consumption) { if (pre_gc_consumption < max_consumption) return false; int64_t curr_consumption = pre_gc_consumption; + // Free some extra memory to avoid frequent GC, 4M is an empirical value, maybe it will be tested later. const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L; // Try to free up some memory for (int i = 0; i < _gc_functions.size(); ++i) { diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h index 6750ce8e746869..179b11f13ff09d 100644 --- a/be/src/runtime/tcmalloc_hook.h +++ b/be/src/runtime/tcmalloc_hook.h @@ -23,40 +23,33 @@ #include "runtime/thread_context.h" -namespace doris { - -class TcmallocHook { -public: - // Notice: modify the command in New/Delete Hook should be careful enough!, - // and should be as simple as possible, otherwise it may cause weird errors. E.g: - // 1. The first New Hook call of the process may be before some variables of - // the process are initialized. - // 2. Allocating memory in the Hook command causes the Hook to be entered again, - // infinite recursion. - // 3. TCMalloc hook will be triggered during the process of initializing/Destructor - // memtracker shared_ptr, Using the object pointed to by this memtracker shared_ptr - // in TCMalloc hook may cause crash. - // 4. Modifying additional thread local variables in ThreadContext construction and - // destructor to control the behavior of consume can lead to unexpected behavior, - // like this: if (LIKELY(doris::start_thread_mem_tracker)) { - static void new_hook(const void* ptr, size_t size) { - doris::tls_ctx()->consume_mem(tc_nallocx(size, 0)); - } - - static void delete_hook(const void* ptr) { - doris::tls_ctx()->release_mem(tc_malloc_size(const_cast(ptr))); - } - - static void init_hook() { - MallocHook::AddNewHook(&new_hook); - MallocHook::AddDeleteHook(&delete_hook); - } - - static void destroy_hook() { - MallocHook::RemoveNewHook(&new_hook); - MallocHook::RemoveDeleteHook(&delete_hook); - } - - static void destroy_hook1() { MallocHook::RemoveDeleteHook(&delete_hook); } -}; -} // namespace doris +// Notice: modify the command in New/Delete Hook should be careful enough!, +// and should be as simple as possible, otherwise it may cause weird errors. E.g: +// 1. The first New Hook call of the process may be before some variables of +// the process are initialized. +// 2. Allocating memory in the Hook command causes the Hook to be entered again, +// infinite recursion. +// 3. TCMalloc hook will be triggered during the process of initializing/Destructor +// memtracker shared_ptr, Using the object pointed to by this memtracker shared_ptr +// in TCMalloc hook may cause crash. +// 4. Modifying additional thread local variables in ThreadContext construction and +// destructor to control the behavior of consume can lead to unexpected behavior, +// like this: if (LIKELY(doris::start_thread_mem_tracker)) { +static void new_hook(const void* ptr, size_t size) { + doris::tls_ctx()->consume_mem(tc_nallocx(size, 0)); +} + +static void delete_hook(const void* ptr) { + doris::tls_ctx()->release_mem(tc_malloc_size(const_cast(ptr))); +} + +static void init_hook() { + MallocHook::AddNewHook(&new_hook); + MallocHook::AddDeleteHook(&delete_hook); +} + +// For debug. +// static void destroy_hook() { +// MallocHook::RemoveNewHook(&new_hook); +// MallocHook::RemoveDeleteHook(&delete_hook); +// } diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 5a121a177c4bea..259276809cfbbd 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -334,7 +334,7 @@ int main(int argc, char** argv) { return -1; } if (doris::config::track_new_delete) { - doris::TcmallocHook::init_hook(); + init_hook(); } #endif diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ca9a2f9ad87d96..ab579ec2b004d2 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -132,6 +132,8 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); + SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, + _exec_env->load_channel_mgr()->mem_tracker()); brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); @@ -166,6 +168,8 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); + SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, + _exec_env->load_channel_mgr()->mem_tracker()); brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_row_batch(request, cntl); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 1ee0f35ab2c468..b4fe98ec869474 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -149,7 +149,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); DCHECK(child(0)->row_desc().is_prefix_of(row_desc())); - _mem_pool.reset(new MemPool(mem_tracker())); + _mem_pool.reset(new MemPool(mem_tracker().get())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); SCOPED_TIMER(_evaluation_timer); diff --git a/be/src/vec/exec/vblocking_join_node.cpp b/be/src/vec/exec/vblocking_join_node.cpp index 3846fb595a5ad5..05140888aee7d1 100644 --- a/be/src/vec/exec/vblocking_join_node.cpp +++ b/be/src/vec/exec/vblocking_join_node.cpp @@ -43,7 +43,7 @@ Status VBlockingJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - _build_pool.reset(new MemPool(mem_tracker())); + _build_pool.reset(new MemPool(mem_tracker().get())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _left_child_timer = ADD_TIMER(runtime_profile(), "LeftChildTime"); _build_row_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); diff --git a/be/src/vec/exec/ves_http_scan_node.cpp b/be/src/vec/exec/ves_http_scan_node.cpp index d215e7be05f6c8..a70acadb89bb2a 100644 --- a/be/src/vec/exec/ves_http_scan_node.cpp +++ b/be/src/vec/exec/ves_http_scan_node.cpp @@ -106,7 +106,7 @@ Status VEsHttpScanNode::scanner_scan(std::unique_ptr scanner) { bool scanner_eof = false; const int batch_size = _runtime_state->batch_size(); - std::unique_ptr tuple_pool(new MemPool(mem_tracker())); + std::unique_ptr tuple_pool(new MemPool(mem_tracker().get())); size_t slot_num = _tuple_desc->slots().size(); while (!scanner_eof) { diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index e1f3b2a4d79a40..6615e0ff8c56ae 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -42,7 +42,7 @@ doris::Status VExprContext::prepare(doris::RuntimeState* state, _prepared = true; _mem_tracker = tracker; SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); - _pool.reset(new MemPool(_mem_tracker)); + _pool.reset(new MemPool(_mem_tracker.get())); return _root->prepare(state, row_desc, this); } From 48727ce4cb02666e364f731f88c292f2d776d4e3 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 5 May 2022 04:52:59 +0800 Subject: [PATCH 3/6] fix local --- be/src/runtime/thread_context.cpp | 9 ++++++--- be/src/runtime/thread_context.h | 12 ++++++++---- be/src/runtime/thread_mem_tracker_mgr.h | 10 ---------- be/src/service/internal_service.cpp | 2 +- 4 files changed, 15 insertions(+), 18 deletions(-) diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index c6989045e9c74c..fade4fd51e793e 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -92,7 +92,8 @@ SwitchThreadMemTracker::SwitchThreadMemTracker( DCHECK(mem_tracker); // The thread tracker must be switched after the attach task, otherwise switching // in the main thread will cause the cached tracker not be cleaned up in time. - DCHECK(in_task == false || tls_ctx()->_thread_mem_tracker_mgr->is_attach_task()); + DCHECK(in_task == false || tls_ctx()->type() != ThreadContext::TaskType::UNKNOWN) + << ",tls ctx type=" << tls_ctx()->type(); if (Existed) { _old_tracker_id = tls_ctx()->_thread_mem_tracker_mgr->update_tracker(mem_tracker); } else { @@ -139,19 +140,21 @@ SwitchBthread::SwitchBthread() { if (tls == nullptr) { // Create thread-local data on demand. tls = new ThreadContext; - tls->_thread_mem_tracker_mgr->init_bthread(); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK_EQ(0, bthread_setspecific(btls_key, tls)); } else { + DCHECK(tls->type() == ThreadContext::TaskType::UNKNOWN); tls->_thread_mem_tracker_mgr->clear_untracked_mems(); - tls->_thread_mem_tracker_mgr->init_bthread(); } + tls->_thread_mem_tracker_mgr->init(); + tls->set_type(ThreadContext::TaskType::BRPC); } SwitchBthread::~SwitchBthread() { DCHECK(tls != nullptr); tls->_thread_mem_tracker_mgr->clear_untracked_mems(); tls->_thread_mem_tracker_mgr->init(); + tls->set_type(ThreadContext::TaskType::UNKNOWN); #ifndef NDEBUG DorisMetrics::instance()->switch_bthread_count->increment(1); #endif diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 804b1a766885ea..8dc8f5267e757b 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -105,7 +105,8 @@ class ThreadContext { QUERY = 1, LOAD = 2, COMPACTION = 3, - STORAGE = 4 + STORAGE = 4, + BRPC = 5 // to be added ... }; inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", @@ -122,9 +123,10 @@ class ThreadContext { void attach(const TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { - DCHECK(_type == TaskType::UNKNOWN && _task_id == "") - << ",old tracker label: " << mem_tracker->label() - << ",new tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label(); + DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && _task_id == "") + << ",new tracker label: " << mem_tracker->label() + << ",old tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label(); + DCHECK(type != TaskType::UNKNOWN); _type = type; _task_id = task_id; _fragment_instance_id = fragment_instance_id; @@ -139,6 +141,8 @@ class ThreadContext { _thread_mem_tracker_mgr->detach_task(); } + const TaskType& type() const { return _type; } + const void set_type(const TaskType& type) { _type = type; } const std::string& task_id() const { return _task_id; } const std::string& thread_id_str() const { return _thread_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index eb81c2ba78ef52..84033ff4e7ab94 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -70,8 +70,6 @@ class ThreadMemTrackerMgr { // to avoid memory tracking loss. void init(); - void init_bthread(); - void clear_untracked_mems(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker @@ -188,14 +186,6 @@ inline void ThreadMemTrackerMgr::init() { _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); } -inline void ThreadMemTrackerMgr::init_bthread() { - init(); - // `is_attach_task=true` when using `SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER`. - // This means that trackers cached in `_mem_trackers` are expected to be emptied later, - // preventing trackers from being released in time. - _task_id = "brpc"; -} - inline void ThreadMemTrackerMgr::clear_untracked_mems() { for (const auto& untracked_mem : _untracked_mems) { if (untracked_mem.second != 0) { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ab579ec2b004d2..b8968ad6f60d0f 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -91,6 +91,7 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c PTabletWriterOpenResult* response, google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_exec_env->load_channel_mgr()->mem_tracker()); VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); brpc::ClosureGuard closure_guard(done); @@ -162,7 +163,6 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll // exhausted, so we put this to a local thread pool to process int64_t submit_task_time_ns = MonotonicNanos(); _tablet_worker_pool.offer([cntl_base, request, response, done, submit_task_time_ns, this]() { - SCOPED_SWITCH_BTHREAD(); int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; From 1530cf5440176f6d395e75b32f07cc3ef13d7f58 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 5 May 2022 14:34:50 +0800 Subject: [PATCH 4/6] fix comment2 --- be/src/exec/tablet_sink.h | 2 +- be/src/olap/task/engine_batch_load_task.cpp | 2 +- be/src/runtime/tcmalloc_hook.h | 8 ++++---- be/src/service/internal_service.cpp | 1 - 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 0bd98e2c12c8f5..bc9bef4da44a79 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -325,7 +325,7 @@ class IndexChannel { public: IndexChannel(OlapTableSink* parent, int64_t index_id, bool is_vec) : _parent(parent), _index_id(index_id), _is_vectorized(is_vec) { - _index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel:" + std::to_string(_index_id)); + _index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel:indexID=" + std::to_string(_index_id)); } ~IndexChannel() = default; diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 3f6b85b928613c..d6e9af62eb20c2 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -55,7 +55,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vectorbatch_load_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h index 179b11f13ff09d..600668aea90671 100644 --- a/be/src/runtime/tcmalloc_hook.h +++ b/be/src/runtime/tcmalloc_hook.h @@ -35,20 +35,20 @@ // 4. Modifying additional thread local variables in ThreadContext construction and // destructor to control the behavior of consume can lead to unexpected behavior, // like this: if (LIKELY(doris::start_thread_mem_tracker)) { -static void new_hook(const void* ptr, size_t size) { +void new_hook(const void* ptr, size_t size) { doris::tls_ctx()->consume_mem(tc_nallocx(size, 0)); } -static void delete_hook(const void* ptr) { +void delete_hook(const void* ptr) { doris::tls_ctx()->release_mem(tc_malloc_size(const_cast(ptr))); } -static void init_hook() { +void init_hook() { MallocHook::AddNewHook(&new_hook); MallocHook::AddDeleteHook(&delete_hook); } -// For debug. +// For later debug. // static void destroy_hook() { // MallocHook::RemoveNewHook(&new_hook); // MallocHook::RemoveDeleteHook(&delete_hook); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index b8968ad6f60d0f..bfdd1f31ba3eb3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -91,7 +91,6 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c PTabletWriterOpenResult* response, google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_exec_env->load_channel_mgr()->mem_tracker()); VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); brpc::ClosureGuard closure_guard(done); From bef533f05769e07ce287f25e558cecc68a57e6c5 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 6 May 2022 10:49:24 +0800 Subject: [PATCH 5/6] fix comment3 --- be/src/http/action/compaction_action.h | 2 +- be/src/olap/tablet_manager.cpp | 9 +-------- be/src/olap/tablet_manager.h | 3 --- be/src/olap/tablet_schema.cpp | 18 +----------------- be/src/runtime/mem_tracker.h | 3 +++ 5 files changed, 6 insertions(+), 29 deletions(-) diff --git a/be/src/http/action/compaction_action.h b/be/src/http/action/compaction_action.h index 16e6ae19d95cd7..80b11bb4362916 100644 --- a/be/src/http/action/compaction_action.h +++ b/be/src/http/action/compaction_action.h @@ -42,7 +42,7 @@ class CompactionAction : public HttpHandler { CompactionAction(CompactionActionType type) : _type(type) { _compaction_mem_tracker = type == RUN_COMPACTION ? MemTracker::create_tracker(-1, "ManualCompaction", nullptr, - MemTrackerLevel::INSTANCE) + MemTrackerLevel::VERBOSE) : nullptr; } diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index c89059a8f8396d..5c4da665ea1f1d 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -74,14 +74,13 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE mem_consumption, Labels({{"type", "tablet_meta"}})); TabletManager::TabletManager(int32_t tablet_map_lock_shard_size) - : _mem_tracker(MemTracker::create_tracker(-1, "TabletMeta", nullptr, + : _mem_tracker(MemTracker::create_tracker(-1, "TabletManager", nullptr, MemTrackerLevel::OVERVIEW)), _tablets_shards_size(tablet_map_lock_shard_size), _tablets_shards_mask(tablet_map_lock_shard_size - 1) { CHECK_GT(_tablets_shards_size, 0); CHECK_EQ(_tablets_shards_size & _tablets_shards_mask, 0); _tablets_shards.resize(_tablets_shards_size); - _mem_tracker_logic = MemTracker::create_virtual_tracker(-1, "TabletMeta[Logic]", _mem_tracker); REGISTER_HOOK_METRIC(tablet_meta_mem_consumption, [this]() { return _mem_tracker->consumption(); }); } @@ -198,9 +197,6 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, tablet_map_t& tablet_map = _get_tablet_map(tablet_id); tablet_map[tablet_id] = tablet; _add_tablet_to_partition(tablet); -#ifndef NDEBUG - _mem_tracker_logic->consume(tablet->tablet_meta()->mem_size()); -#endif VLOG_NOTICE << "add tablet to map successfully." << " tablet_id=" << tablet_id; @@ -1318,9 +1314,6 @@ Status TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bool k } dropped_tablet->deregister_tablet_from_dir(); -#ifndef NDEBUG - _mem_tracker_logic->release(dropped_tablet->tablet_meta()->mem_size()); -#endif return Status::OK(); } diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index a5d63b1de61108..a2420a5d814493 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -205,9 +205,6 @@ class TabletManager { // trace the memory use by meta of tablet std::shared_ptr _mem_tracker; - // The logical memory given by sizeof is less than the actual memory allocated by tcmalloc, - // Because the minimum memory allocation unit of tcmalloc is page, memory fragmentation will occur. - std::shared_ptr _mem_tracker_logic; const int32_t _tablets_shards_size; const int32_t _tablets_shards_mask; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 715afbee69d361..c0377e7434e5ab 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -309,8 +309,6 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _has_default_value = column.has_default_value(); if (_has_default_value) { _default_value = column.default_value(); - } else { - _default_value = ""; } if (column.has_precision()) { @@ -318,12 +316,9 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _precision = column.precision(); } else { _is_decimal = false; - _precision = 0; } if (column.has_frac()) { _frac = column.frac(); - } else { - _frac = 0; } _length = column.length(); _index_length = column.index_length(); @@ -340,29 +335,18 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _has_referenced_column = column.has_referenced_column_id(); if (_has_referenced_column) { _referenced_column_id = column.referenced_column_id(); - } else { - _referenced_column_id = 0; } - _referenced_column = ""; if (column.has_aggregation()) { _aggregation = get_aggregation_type_by_string(column.aggregation()); - } else { - _aggregation = FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE; } if (column.has_visible()) { _visible = column.visible(); - } else { - _visible = true; } if (_type == FieldType::OLAP_FIELD_TYPE_ARRAY) { DCHECK(column.children_columns_size() == 1) << "ARRAY type has more than 1 children types."; TabletColumn child_column; child_column.init_from_pb(column.children_columns(0)); add_sub_column(child_column); - } else { - _parent = nullptr; - _sub_columns.clear(); // no swap space - _sub_column_count = 0; } } @@ -428,8 +412,8 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { _num_null_columns = 0; _cols.clear(); _field_name_to_index.clear(); - TabletColumn column; for (auto& column_pb : schema.column()) { + TabletColumn column; column.init_from_pb(column_pb); if (column.is_key()) { _num_key_columns++; diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index 902b5e2efb210d..3d4eb740d23ab8 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -87,6 +87,9 @@ class MemTracker { // Cosume/release will not sync to parent.Usually used to manually record the specified memory, // It is independent of the recording of TCMalloc Hook in the thread local tracker, so the same // block of memory is recorded independently in these two trackers. + // TODO(zxy) At present, the purpose of most virtual trackers is only to preserve the logic of + // manually recording memory before, which may be used later. After each virtual tracker is + // required case by case, discuss its necessity. static std::shared_ptr create_virtual_tracker( int64_t byte_limit = -1, const std::string& label = std::string(), const std::shared_ptr& parent = std::shared_ptr(), From 5b5669089efd5ff6769069e2bde7a38bfaecf50f Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Sat, 7 May 2022 11:07:40 +0800 Subject: [PATCH 6/6] reformatted --- be/src/exec/tablet_sink.h | 3 ++- be/src/olap/reader.cpp | 2 +- be/src/olap/task/engine_checksum_task.cpp | 6 +++--- be/src/runtime/memory/chunk_allocator.cpp | 3 ++- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index bc9bef4da44a79..47b519c41a5e22 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -325,7 +325,8 @@ class IndexChannel { public: IndexChannel(OlapTableSink* parent, int64_t index_id, bool is_vec) : _parent(parent), _index_id(index_id), _is_vectorized(is_vec) { - _index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel:indexID=" + std::to_string(_index_id)); + _index_channel_tracker = + MemTracker::create_tracker(-1, "IndexChannel:indexID=" + std::to_string(_index_id)); } ~IndexChannel() = default; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 32d6854d0f3d2c..06cbfbc7da7216 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -109,7 +109,7 @@ Status TabletReader::init(const ReaderParams& read_params) { #ifndef NDEBUG _predicate_mem_pool.reset(new MemPool("TabletReader:" + read_params.tablet->full_name())); #else - _predicate_mem_pool.reset(new MemPool()); + _predicate_mem_pool.reset(new MemPool()); #endif Status res = _init_params(read_params); diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index cb6918e21df489..c4c59c86008d2d 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -26,9 +26,9 @@ namespace doris { EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash, TVersion version, uint32_t* checksum) : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) { - _mem_tracker = MemTracker::create_tracker(-1, "EngineChecksumTask:tabletId=" + std::to_string(tablet_id), - StorageEngine::instance()->consistency_mem_tracker(), - MemTrackerLevel::TASK); + _mem_tracker = MemTracker::create_tracker( + -1, "EngineChecksumTask:tabletId=" + std::to_string(tablet_id), + StorageEngine::instance()->consistency_mem_tracker(), MemTrackerLevel::TASK); } Status EngineChecksumTask::execute() { diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index af809560bb35c0..269dc12fcfd870 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -195,7 +195,8 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { if (tracker) { tracker->transfer_to(_mem_tracker.get(), chunk.size); } else { - tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), chunk.size); + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), + chunk.size); } SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); if (chunk.core_id == -1) {