From e90e0cf00e2ebf7e79263f167b381d14ed3994e0 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Mon, 27 Apr 2020 21:02:30 +0800 Subject: [PATCH 1/3] [Profile] Make running profile clearer and more intuitive to improve usability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This CL mainly made the following modifications: 1. Delete Invalid MemoryUsed Counter and Add PeakMemUsage in each exec node and datastreamsender 2. Add intent in child execnode profile,make it is easily to know the relationship between execnode 3. Del _is_result_order we not support any more in olap_scan_node.h and olap_scan_node.cpp 4. Add scan_disk method to olap_scanner to fix the counter _num_disks_accessed_counter 5. Now we do not use buffer pool to read and write disk, so annotation eadio counter and writeio counter code #ISSUE 3365 --- be/src/exec/exec_node.cpp | 6 ++---- be/src/exec/olap_scan_node.cpp | 17 +++++++---------- be/src/exec/olap_scan_node.h | 3 --- be/src/exec/olap_scanner.h | 4 ++++ be/src/exec/scan_node.cpp | 3 ++- be/src/exec/scan_node.h | 5 ----- be/src/runtime/bufferpool/buffer_pool.cc | 8 ++++---- .../runtime/bufferpool/buffer_pool_counters.h | 8 ++++---- be/src/runtime/data_stream_sender.cpp | 2 +- 9 files changed, 24 insertions(+), 32 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4bde878ae2e1ef..9a6f208c9b8bb2 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -172,15 +172,13 @@ Status ExecNode::prepare(RuntimeState* state) { DCHECK(_runtime_profile.get() != NULL); _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); - _memory_used_counter = - ADD_COUNTER(_runtime_profile, "MemoryUsed", TUnit::BYTES); _rows_returned_rate = runtime_profile()->add_derived_counter( ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, boost::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker.reset(new MemTracker(-1, _runtime_profile->name(), state->instance_mem_tracker())); + _mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, _runtime_profile->name(), state->instance_mem_tracker())); _expr_mem_tracker.reset(new MemTracker(-1, "Exprs", _mem_tracker.get())); _expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get())); // TODO chenhao @@ -339,7 +337,7 @@ Status ExecNode::create_tree_helper( } if (!node->_children.empty()) { - node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), false, NULL); + node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), true, NULL); } return Status::OK(); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 77b798be2df9a6..6dad57e2f4eb1d 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -72,16 +72,6 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); _direct_conjunct_size = _conjunct_ctxs.size(); - if (tnode.olap_scan_node.__isset.sort_column) { - _is_result_order = true; - } else { - _is_result_order = false; - } - - // Before, we support scan data ordered, but is not used in production - // Now, we drop this functional - DCHECK(!_is_result_order) << "ordered result don't support any more"; - return Status::OK(); } @@ -668,6 +658,8 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); + + std::unordered_set disk_set; for (auto& scan_range : _scan_ranges) { std::vector>* ranges = &cond_ranges; std::vector> split_ranges; @@ -702,8 +694,13 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges); _scanner_pool->add(scanner); _olap_scanners.push_back(scanner); + + if (disk_set.find(scanner->scan_disk()) == disk_set.end()) { + disk_set.insert(scanner->scan_disk()); + } } } + COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); // init progress std::stringstream ss; diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 884ee5d3891d3a..11333a30397631 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -186,9 +186,6 @@ class OlapScanNode : public ScanNode { std::vector _olap_filter; - // Order Result Flag - bool _is_result_order; - // Pool for storing allocated scanner objects. We don't want to use the // runtime pool to ensure that the scanner objects are deleted before this // object is. diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index f3e9f394032bb7..75ee5422d3e521 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -81,6 +81,10 @@ class OlapScanner { int64_t raw_rows_read() const { return _raw_rows_read; } void update_counter(); + + const std::string& scan_disk() const { + return _tablet->data_dir()->path(); + } private: Status _prepare( const TPaloScanRange& scan_range, diff --git a/be/src/exec/scan_node.cpp b/be/src/exec/scan_node.cpp index be49d5d17ccaab..6a9ae67b1a9900 100644 --- a/be/src/exec/scan_node.cpp +++ b/be/src/exec/scan_node.cpp @@ -28,7 +28,7 @@ const string ScanNode::_s_total_throughput_counter = "TotalReadThroughput"; const string ScanNode::_s_materialize_tuple_timer = "MaterializeTupleTime(*)"; const string ScanNode::_s_per_read_thread_throughput_counter = "PerReadThreadRawHdfsThroughput"; -const string ScanNode::_s_num_disks_accessed_counter = "NumThread"; +const string ScanNode::_s_num_disks_accessed_counter = "NumDiskAccess"; const string ScanNode::_s_scan_ranges_complete_counter = "ScanRangesComplete"; const string ScanNode::_s_scanner_thread_counters_prefix = "ScannerThreads"; const string ScanNode::_s_scanner_thread_total_wallclock_time = @@ -43,6 +43,7 @@ Status ScanNode::prepare(RuntimeState* state) { ADD_THREAD_COUNTERS(runtime_profile(), _s_scanner_thread_counters_prefix); _bytes_read_counter = ADD_COUNTER(runtime_profile(), _s_bytes_read_counter, TUnit::BYTES); + //TODO: The _rows_read_counter == RowsReturned counter in exec node, there is no need to keep both of them _rows_read_counter = ADD_COUNTER(runtime_profile(), _s_rows_read_counter, TUnit::UNIT); _read_timer = ADD_TIMER(runtime_profile(), _s_total_read_timer); diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h index 8fb016e6990b95..0ca7bd1ecd253b 100644 --- a/be/src/exec/scan_node.h +++ b/be/src/exec/scan_node.h @@ -52,11 +52,6 @@ class TScanRange; // // AverageScannerThreadConcurrency - the average number of active scanner threads. A // scanner thread is considered active if it is not blocked by IO. This number would -// be low (less than 1) for IO-bound queries. For cpu-bound queries, this number -// would be close to the max scanner threads allowed. -// -// AverageScannerThreadConcurrency - the average number of active scanner threads. A -// scanner thread is considered active if it is not blocked by IO. This number would // be low (less than 1) for IO bounded queries. For cpu bounded queries, this number // would be close to the max scanner threads allowed. // diff --git a/be/src/runtime/bufferpool/buffer_pool.cc b/be/src/runtime/bufferpool/buffer_pool.cc index 04b2bba7d1ce51..918eb319ec1a18 100644 --- a/be/src/runtime/bufferpool/buffer_pool.cc +++ b/be/src/runtime/bufferpool/buffer_pool.cc @@ -392,11 +392,11 @@ BufferPool::Client::Client(BufferPool* pool, //TmpFileMgr::FileGroup* file_group ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT); counters_.cumulative_bytes_alloced = ADD_COUNTER(child_profile, "CumulativeAllocationBytes", TUnit::BYTES); - counters_.read_wait_time = ADD_TIMER(child_profile, "ReadIoWaitTime"); - counters_.read_io_ops = ADD_COUNTER(child_profile, "ReadIoOps", TUnit::UNIT); +// counters_.read_wait_time = ADD_TIMER(child_profile, "ReadIoWaitTime"); +// counters_.read_io_ops = ADD_COUNTER(child_profile, "ReadIoOps", TUnit::UNIT); counters_.bytes_read = ADD_COUNTER(child_profile, "ReadIoBytes", TUnit::BYTES); - counters_.write_wait_time = ADD_TIMER(child_profile, "WriteIoWaitTime"); - counters_.write_io_ops = ADD_COUNTER(child_profile, "WriteIoOps", TUnit::UNIT); +// counters_.write_wait_time = ADD_TIMER(child_profile, "WriteIoWaitTime"); +// counters_.write_io_ops = ADD_COUNTER(child_profile, "WriteIoOps", TUnit::UNIT); counters_.bytes_written = ADD_COUNTER(child_profile, "WriteIoBytes", TUnit::BYTES); counters_.peak_unpinned_bytes = child_profile->AddHighWaterMarkCounter("PeakUnpinnedBytes", TUnit::BYTES); diff --git a/be/src/runtime/bufferpool/buffer_pool_counters.h b/be/src/runtime/bufferpool/buffer_pool_counters.h index 85b29604a775b8..c447d970936d37 100644 --- a/be/src/runtime/bufferpool/buffer_pool_counters.h +++ b/be/src/runtime/bufferpool/buffer_pool_counters.h @@ -35,19 +35,19 @@ struct BufferPoolClientCounters { RuntimeProfile::Counter* cumulative_bytes_alloced; /// Amount of time spent waiting for reads from disk to complete. - RuntimeProfile::Counter* read_wait_time; + //RuntimeProfile::Counter* read_wait_time; /// Total number of read I/O operations issued. - RuntimeProfile::Counter* read_io_ops; + //RuntimeProfile::Counter* read_io_ops; /// Total bytes read from disk. RuntimeProfile::Counter* bytes_read; /// Amount of time spent waiting for writes to disk to complete. - RuntimeProfile::Counter* write_wait_time; + //RuntimeProfile::Counter* write_wait_time; /// Total number of write I/O operations issued. - RuntimeProfile::Counter* write_io_ops; + //RuntimeProfile::Counter* write_io_ops; /// Total bytes written to disk. RuntimeProfile::Counter* bytes_written; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 82459a855e5668..3c98529b815870 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -385,7 +385,7 @@ Status DataStreamSender::prepare(RuntimeState* state) { _profile = _pool->add(new RuntimeProfile(_pool, title.str())); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker.reset( - new MemTracker(-1, "DataStreamSender", state->instance_mem_tracker())); + new MemTracker(_profile, -1, "DataStreamSender", state->instance_mem_tracker())); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) { From 1c5be321ce04ee39dc182614320bd32247c7cf94 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Tue, 28 Apr 2020 11:32:38 +0800 Subject: [PATCH 2/3] change find and insert to insert operation --- be/src/exec/olap_scan_node.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 6dad57e2f4eb1d..8535bfec781e7a 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -694,10 +694,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges); _scanner_pool->add(scanner); _olap_scanners.push_back(scanner); - - if (disk_set.find(scanner->scan_disk()) == disk_set.end()) { - disk_set.insert(scanner->scan_disk()); - } + disk_set.insert(scanner->scan_disk()); } } COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); From 1da4ffae68f0229fe2187b504a23304bac2e372a Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 30 Apr 2020 01:29:20 +0800 Subject: [PATCH 3/3] 1.delete the idle counter in buffer pool 2.delete the MemUsed counter in exec node --- be/src/exec/aggregation_node.cpp | 6 +----- be/src/exec/csv_scan_node.cpp | 4 ---- be/src/exec/hash_join_node.cpp | 4 ---- be/src/exec/mysql_scan_node.cpp | 3 --- be/src/exec/schema_scan_node.cpp | 4 ---- be/src/exec/set_operation_node.cpp | 4 ---- be/src/exec/topn_node.cpp | 3 --- be/src/runtime/bufferpool/buffer_pool.cc | 6 ------ .../runtime/bufferpool/buffer_pool_counters.h | 18 ------------------ 9 files changed, 1 insertion(+), 51 deletions(-) diff --git a/be/src/exec/aggregation_node.cpp b/be/src/exec/aggregation_node.cpp index 0d1806eaacf128..bea37b48e4e533 100644 --- a/be/src/exec/aggregation_node.cpp +++ b/be/src/exec/aggregation_node.cpp @@ -202,8 +202,6 @@ Status AggregationNode::open(RuntimeState* state) { RETURN_IF_ERROR(state->check_query_state("Aggregation, after hashing the child 0.")); COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets()); - COUNTER_SET(memory_used_counter(), - _tuple_pool->peak_allocated_bytes() + _hash_tbl->byte_size()); COUNTER_SET(_hash_table_load_factor_counter, _hash_tbl->load_factor()); num_agg_rows += (_hash_tbl->size() - agg_rows_before); num_input_rows += batch.num_rows(); @@ -276,10 +274,8 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* *eos = _output_iterator.at_end() || reached_limit(); if (*eos) { - if (memory_used_counter() != NULL && _hash_tbl.get() != NULL && + if (_hash_tbl.get() != NULL && _hash_table_buckets_counter != NULL) { - COUNTER_SET(memory_used_counter(), - _tuple_pool->peak_allocated_bytes() + _hash_tbl->byte_size()); COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets()); } } diff --git a/be/src/exec/csv_scan_node.cpp b/be/src/exec/csv_scan_node.cpp index 4eda57a15f3460..f8ed70bf876a51 100644 --- a/be/src/exec/csv_scan_node.cpp +++ b/be/src/exec/csv_scan_node.cpp @@ -340,10 +340,6 @@ Status CsvScanNode::close(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (memory_used_counter() != nullptr && _tuple_pool.get() != nullptr) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } - RETURN_IF_ERROR(ExecNode::close(state)); if (state->num_rows_load_success() == 0) { diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index ba4393db90bc7c..3065cd8d6e4792 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -153,10 +153,6 @@ Status HashJoinNode::close(RuntimeState* state) { // Must reset _probe_batch in close() to release resources _probe_batch.reset(NULL); - if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { - COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); - COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); - } if (_hash_tbl.get() != NULL) { _hash_tbl->close(); } diff --git a/be/src/exec/mysql_scan_node.cpp b/be/src/exec/mysql_scan_node.cpp index b7393e62f325fc..f211b0770725cc 100644 --- a/be/src/exec/mysql_scan_node.cpp +++ b/be/src/exec/mysql_scan_node.cpp @@ -251,9 +251,6 @@ Status MysqlScanNode::close(RuntimeState* state) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (memory_used_counter() != NULL) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } _tuple_pool.reset(); return ExecNode::close(state); diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp index 440838bf7e41e3..a0521af94b8629 100644 --- a/be/src/exec/schema_scan_node.cpp +++ b/be/src/exec/schema_scan_node.cpp @@ -310,10 +310,6 @@ Status SchemaScanNode::close(RuntimeState* state) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (memory_used_counter() != NULL) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } - _tuple_pool.reset(); return ExecNode::close(state); } diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 832189921275a8..627e139f3812d4 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -75,10 +75,6 @@ Status SetOperationNode::close(RuntimeState* state) { // Must reset _probe_batch in close() to release resources _probe_batch.reset(NULL); - if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { - COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); - COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); - } if (_hash_tbl.get() != NULL) { _hash_tbl->close(); } diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index e83346904e4e7b..304820f969b390 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -163,9 +163,6 @@ Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { // block(s) in the pool are all full or when the pool has reached a certain size. if (*eos) { row_batch->tuple_data_pool()->acquire_data(_tuple_pool.get(), false); - if (memory_used_counter() != NULL) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } } return Status::OK(); } diff --git a/be/src/runtime/bufferpool/buffer_pool.cc b/be/src/runtime/bufferpool/buffer_pool.cc index 918eb319ec1a18..e66bcc232f70c7 100644 --- a/be/src/runtime/bufferpool/buffer_pool.cc +++ b/be/src/runtime/bufferpool/buffer_pool.cc @@ -392,12 +392,6 @@ BufferPool::Client::Client(BufferPool* pool, //TmpFileMgr::FileGroup* file_group ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT); counters_.cumulative_bytes_alloced = ADD_COUNTER(child_profile, "CumulativeAllocationBytes", TUnit::BYTES); -// counters_.read_wait_time = ADD_TIMER(child_profile, "ReadIoWaitTime"); -// counters_.read_io_ops = ADD_COUNTER(child_profile, "ReadIoOps", TUnit::UNIT); - counters_.bytes_read = ADD_COUNTER(child_profile, "ReadIoBytes", TUnit::BYTES); -// counters_.write_wait_time = ADD_TIMER(child_profile, "WriteIoWaitTime"); -// counters_.write_io_ops = ADD_COUNTER(child_profile, "WriteIoOps", TUnit::UNIT); - counters_.bytes_written = ADD_COUNTER(child_profile, "WriteIoBytes", TUnit::BYTES); counters_.peak_unpinned_bytes = child_profile->AddHighWaterMarkCounter("PeakUnpinnedBytes", TUnit::BYTES); } diff --git a/be/src/runtime/bufferpool/buffer_pool_counters.h b/be/src/runtime/bufferpool/buffer_pool_counters.h index c447d970936d37..92a06918082788 100644 --- a/be/src/runtime/bufferpool/buffer_pool_counters.h +++ b/be/src/runtime/bufferpool/buffer_pool_counters.h @@ -34,24 +34,6 @@ struct BufferPoolClientCounters { /// Bytes of buffers allocated via BufferAllocator::AllocateBuffer(). RuntimeProfile::Counter* cumulative_bytes_alloced; - /// Amount of time spent waiting for reads from disk to complete. - //RuntimeProfile::Counter* read_wait_time; - - /// Total number of read I/O operations issued. - //RuntimeProfile::Counter* read_io_ops; - - /// Total bytes read from disk. - RuntimeProfile::Counter* bytes_read; - - /// Amount of time spent waiting for writes to disk to complete. - //RuntimeProfile::Counter* write_wait_time; - - /// Total number of write I/O operations issued. - //RuntimeProfile::Counter* write_io_ops; - - /// Total bytes written to disk. - RuntimeProfile::Counter* bytes_written; - /// The peak total size of unpinned pages. RuntimeProfile::HighWaterMarkCounter* peak_unpinned_bytes; };