diff --git a/be/src/exec/aggregation_node.cpp b/be/src/exec/aggregation_node.cpp index 0d1806eaacf128..bea37b48e4e533 100644 --- a/be/src/exec/aggregation_node.cpp +++ b/be/src/exec/aggregation_node.cpp @@ -202,8 +202,6 @@ Status AggregationNode::open(RuntimeState* state) { RETURN_IF_ERROR(state->check_query_state("Aggregation, after hashing the child 0.")); COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets()); - COUNTER_SET(memory_used_counter(), - _tuple_pool->peak_allocated_bytes() + _hash_tbl->byte_size()); COUNTER_SET(_hash_table_load_factor_counter, _hash_tbl->load_factor()); num_agg_rows += (_hash_tbl->size() - agg_rows_before); num_input_rows += batch.num_rows(); @@ -276,10 +274,8 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* *eos = _output_iterator.at_end() || reached_limit(); if (*eos) { - if (memory_used_counter() != NULL && _hash_tbl.get() != NULL && + if (_hash_tbl.get() != NULL && _hash_table_buckets_counter != NULL) { - COUNTER_SET(memory_used_counter(), - _tuple_pool->peak_allocated_bytes() + _hash_tbl->byte_size()); COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets()); } } diff --git a/be/src/exec/csv_scan_node.cpp b/be/src/exec/csv_scan_node.cpp index 4eda57a15f3460..f8ed70bf876a51 100644 --- a/be/src/exec/csv_scan_node.cpp +++ b/be/src/exec/csv_scan_node.cpp @@ -340,10 +340,6 @@ Status CsvScanNode::close(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (memory_used_counter() != nullptr && _tuple_pool.get() != nullptr) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } - RETURN_IF_ERROR(ExecNode::close(state)); if (state->num_rows_load_success() == 0) { diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4bde878ae2e1ef..9a6f208c9b8bb2 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -172,15 +172,13 @@ Status ExecNode::prepare(RuntimeState* state) { DCHECK(_runtime_profile.get() != NULL); _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); - _memory_used_counter = - ADD_COUNTER(_runtime_profile, "MemoryUsed", TUnit::BYTES); _rows_returned_rate = runtime_profile()->add_derived_counter( ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, boost::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker.reset(new MemTracker(-1, _runtime_profile->name(), state->instance_mem_tracker())); + _mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, _runtime_profile->name(), state->instance_mem_tracker())); _expr_mem_tracker.reset(new MemTracker(-1, "Exprs", _mem_tracker.get())); _expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get())); // TODO chenhao @@ -339,7 +337,7 @@ Status ExecNode::create_tree_helper( } if (!node->_children.empty()) { - node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), false, NULL); + node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), true, NULL); } return Status::OK(); diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index ba4393db90bc7c..3065cd8d6e4792 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -153,10 +153,6 @@ Status HashJoinNode::close(RuntimeState* state) { // Must reset _probe_batch in close() to release resources _probe_batch.reset(NULL); - if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { - COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); - COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); - } if (_hash_tbl.get() != NULL) { _hash_tbl->close(); } diff --git a/be/src/exec/mysql_scan_node.cpp b/be/src/exec/mysql_scan_node.cpp index b7393e62f325fc..f211b0770725cc 100644 --- a/be/src/exec/mysql_scan_node.cpp +++ b/be/src/exec/mysql_scan_node.cpp @@ -251,9 +251,6 @@ Status MysqlScanNode::close(RuntimeState* state) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (memory_used_counter() != NULL) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } _tuple_pool.reset(); return ExecNode::close(state); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 77b798be2df9a6..8535bfec781e7a 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -72,16 +72,6 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); _direct_conjunct_size = _conjunct_ctxs.size(); - if (tnode.olap_scan_node.__isset.sort_column) { - _is_result_order = true; - } else { - _is_result_order = false; - } - - // Before, we support scan data ordered, but is not used in production - // Now, we drop this functional - DCHECK(!_is_result_order) << "ordered result don't support any more"; - return Status::OK(); } @@ -668,6 +658,8 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); + + std::unordered_set disk_set; for (auto& scan_range : _scan_ranges) { std::vector>* ranges = &cond_ranges; std::vector> split_ranges; @@ -702,8 +694,10 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges); _scanner_pool->add(scanner); _olap_scanners.push_back(scanner); + disk_set.insert(scanner->scan_disk()); } } + COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); // init progress std::stringstream ss; diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 884ee5d3891d3a..11333a30397631 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -186,9 +186,6 @@ class OlapScanNode : public ScanNode { std::vector _olap_filter; - // Order Result Flag - bool _is_result_order; - // Pool for storing allocated scanner objects. We don't want to use the // runtime pool to ensure that the scanner objects are deleted before this // object is. diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index f3e9f394032bb7..75ee5422d3e521 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -81,6 +81,10 @@ class OlapScanner { int64_t raw_rows_read() const { return _raw_rows_read; } void update_counter(); + + const std::string& scan_disk() const { + return _tablet->data_dir()->path(); + } private: Status _prepare( const TPaloScanRange& scan_range, diff --git a/be/src/exec/scan_node.cpp b/be/src/exec/scan_node.cpp index be49d5d17ccaab..6a9ae67b1a9900 100644 --- a/be/src/exec/scan_node.cpp +++ b/be/src/exec/scan_node.cpp @@ -28,7 +28,7 @@ const string ScanNode::_s_total_throughput_counter = "TotalReadThroughput"; const string ScanNode::_s_materialize_tuple_timer = "MaterializeTupleTime(*)"; const string ScanNode::_s_per_read_thread_throughput_counter = "PerReadThreadRawHdfsThroughput"; -const string ScanNode::_s_num_disks_accessed_counter = "NumThread"; +const string ScanNode::_s_num_disks_accessed_counter = "NumDiskAccess"; const string ScanNode::_s_scan_ranges_complete_counter = "ScanRangesComplete"; const string ScanNode::_s_scanner_thread_counters_prefix = "ScannerThreads"; const string ScanNode::_s_scanner_thread_total_wallclock_time = @@ -43,6 +43,7 @@ Status ScanNode::prepare(RuntimeState* state) { ADD_THREAD_COUNTERS(runtime_profile(), _s_scanner_thread_counters_prefix); _bytes_read_counter = ADD_COUNTER(runtime_profile(), _s_bytes_read_counter, TUnit::BYTES); + //TODO: The _rows_read_counter == RowsReturned counter in exec node, there is no need to keep both of them _rows_read_counter = ADD_COUNTER(runtime_profile(), _s_rows_read_counter, TUnit::UNIT); _read_timer = ADD_TIMER(runtime_profile(), _s_total_read_timer); diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h index 8fb016e6990b95..0ca7bd1ecd253b 100644 --- a/be/src/exec/scan_node.h +++ b/be/src/exec/scan_node.h @@ -52,11 +52,6 @@ class TScanRange; // // AverageScannerThreadConcurrency - the average number of active scanner threads. A // scanner thread is considered active if it is not blocked by IO. This number would -// be low (less than 1) for IO-bound queries. For cpu-bound queries, this number -// would be close to the max scanner threads allowed. -// -// AverageScannerThreadConcurrency - the average number of active scanner threads. A -// scanner thread is considered active if it is not blocked by IO. This number would // be low (less than 1) for IO bounded queries. For cpu bounded queries, this number // would be close to the max scanner threads allowed. // diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp index 440838bf7e41e3..a0521af94b8629 100644 --- a/be/src/exec/schema_scan_node.cpp +++ b/be/src/exec/schema_scan_node.cpp @@ -310,10 +310,6 @@ Status SchemaScanNode::close(RuntimeState* state) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (memory_used_counter() != NULL) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } - _tuple_pool.reset(); return ExecNode::close(state); } diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 832189921275a8..627e139f3812d4 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -75,10 +75,6 @@ Status SetOperationNode::close(RuntimeState* state) { // Must reset _probe_batch in close() to release resources _probe_batch.reset(NULL); - if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { - COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); - COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); - } if (_hash_tbl.get() != NULL) { _hash_tbl->close(); } diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index e83346904e4e7b..304820f969b390 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -163,9 +163,6 @@ Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { // block(s) in the pool are all full or when the pool has reached a certain size. if (*eos) { row_batch->tuple_data_pool()->acquire_data(_tuple_pool.get(), false); - if (memory_used_counter() != NULL) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } } return Status::OK(); } diff --git a/be/src/runtime/bufferpool/buffer_pool.cc b/be/src/runtime/bufferpool/buffer_pool.cc index 04b2bba7d1ce51..e66bcc232f70c7 100644 --- a/be/src/runtime/bufferpool/buffer_pool.cc +++ b/be/src/runtime/bufferpool/buffer_pool.cc @@ -392,12 +392,6 @@ BufferPool::Client::Client(BufferPool* pool, //TmpFileMgr::FileGroup* file_group ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT); counters_.cumulative_bytes_alloced = ADD_COUNTER(child_profile, "CumulativeAllocationBytes", TUnit::BYTES); - counters_.read_wait_time = ADD_TIMER(child_profile, "ReadIoWaitTime"); - counters_.read_io_ops = ADD_COUNTER(child_profile, "ReadIoOps", TUnit::UNIT); - counters_.bytes_read = ADD_COUNTER(child_profile, "ReadIoBytes", TUnit::BYTES); - counters_.write_wait_time = ADD_TIMER(child_profile, "WriteIoWaitTime"); - counters_.write_io_ops = ADD_COUNTER(child_profile, "WriteIoOps", TUnit::UNIT); - counters_.bytes_written = ADD_COUNTER(child_profile, "WriteIoBytes", TUnit::BYTES); counters_.peak_unpinned_bytes = child_profile->AddHighWaterMarkCounter("PeakUnpinnedBytes", TUnit::BYTES); } diff --git a/be/src/runtime/bufferpool/buffer_pool_counters.h b/be/src/runtime/bufferpool/buffer_pool_counters.h index 85b29604a775b8..92a06918082788 100644 --- a/be/src/runtime/bufferpool/buffer_pool_counters.h +++ b/be/src/runtime/bufferpool/buffer_pool_counters.h @@ -34,24 +34,6 @@ struct BufferPoolClientCounters { /// Bytes of buffers allocated via BufferAllocator::AllocateBuffer(). RuntimeProfile::Counter* cumulative_bytes_alloced; - /// Amount of time spent waiting for reads from disk to complete. - RuntimeProfile::Counter* read_wait_time; - - /// Total number of read I/O operations issued. - RuntimeProfile::Counter* read_io_ops; - - /// Total bytes read from disk. - RuntimeProfile::Counter* bytes_read; - - /// Amount of time spent waiting for writes to disk to complete. - RuntimeProfile::Counter* write_wait_time; - - /// Total number of write I/O operations issued. - RuntimeProfile::Counter* write_io_ops; - - /// Total bytes written to disk. - RuntimeProfile::Counter* bytes_written; - /// The peak total size of unpinned pages. RuntimeProfile::HighWaterMarkCounter* peak_unpinned_bytes; }; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 82459a855e5668..3c98529b815870 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -385,7 +385,7 @@ Status DataStreamSender::prepare(RuntimeState* state) { _profile = _pool->add(new RuntimeProfile(_pool, title.str())); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker.reset( - new MemTracker(-1, "DataStreamSender", state->instance_mem_tracker())); + new MemTracker(_profile, -1, "DataStreamSender", state->instance_mem_tracker())); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {