Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions be/src/exec/aggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,6 @@ Status AggregationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(state->check_query_state("Aggregation, after hashing the child 0."));

COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets());
COUNTER_SET(memory_used_counter(),
_tuple_pool->peak_allocated_bytes() + _hash_tbl->byte_size());
COUNTER_SET(_hash_table_load_factor_counter, _hash_tbl->load_factor());
num_agg_rows += (_hash_tbl->size() - agg_rows_before);
num_input_rows += batch.num_rows();
Expand Down Expand Up @@ -276,10 +274,8 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*

*eos = _output_iterator.at_end() || reached_limit();
if (*eos) {
if (memory_used_counter() != NULL && _hash_tbl.get() != NULL &&
if (_hash_tbl.get() != NULL &&
_hash_table_buckets_counter != NULL) {
COUNTER_SET(memory_used_counter(),
_tuple_pool->peak_allocated_bytes() + _hash_tbl->byte_size());
COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets());
}
}
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/csv_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,6 @@ Status CsvScanNode::close(RuntimeState* state) {

SCOPED_TIMER(_runtime_profile->total_time_counter());

if (memory_used_counter() != nullptr && _tuple_pool.get() != nullptr) {
COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes());
}

RETURN_IF_ERROR(ExecNode::close(state));

if (state->num_rows_load_success() == 0) {
Expand Down
6 changes: 2 additions & 4 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,13 @@ Status ExecNode::prepare(RuntimeState* state) {
DCHECK(_runtime_profile.get() != NULL);
_rows_returned_counter =
ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT);
_memory_used_counter =
ADD_COUNTER(_runtime_profile, "MemoryUsed", TUnit::BYTES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd better remove this field if it's not used. Btw, _memory_used_counter seems to be used by HashJoinNode::close, is it really ok to remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, as we can see. this counter _memory_used_counter is only used for count mem used for _tuple_pool and _hash_tbl.
But the _tuple_pool and _hash_tbl was already track the mem used by MemTracker and counted mem used in PeakMemoryUsage counter. We do really need this counter _memory_used_counter?

image

So, I think keep the _memory_used_counter named MemoryUsed is confusing.

I have two different suggestions

  • Delete all code use _memory_used_counter in the future, all mem used is tracked by MemTracker, we show peakmemusage it by the counter in MemTracker.
  • Keep the counter _memory_used_counter, but change the name MemoryUsed to MemoryUsedByHashBTAndTuplePool.

Which suggestion do you think is more suitable?@gaodayue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete all code use _memory_used_counter in the future, all mem used is tracked by MemTracker, we show peakmemusage it by the counter in MemTracker.

I prefer this.

_rows_returned_rate = runtime_profile()->add_derived_counter(
ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
boost::bind<int64_t>(&RuntimeProfile::units_per_second,
_rows_returned_counter,
runtime_profile()->total_time_counter()),
"");
_mem_tracker.reset(new MemTracker(-1, _runtime_profile->name(), state->instance_mem_tracker()));
_mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, _runtime_profile->name(), state->instance_mem_tracker()));
_expr_mem_tracker.reset(new MemTracker(-1, "Exprs", _mem_tracker.get()));
_expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get()));
// TODO chenhao
Expand Down Expand Up @@ -339,7 +337,7 @@ Status ExecNode::create_tree_helper(
}

if (!node->_children.empty()) {
node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), false, NULL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seem that the false here is to distinguish child[0] and other children.
Are you sure this is more readable? You can put the result screenshots here to explain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after change the true, we can eaily know olap_scan_node is child of aggregation_node.
image

node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), true, NULL);
}

return Status::OK();
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ Status HashJoinNode::close(RuntimeState* state) {
// Must reset _probe_batch in close() to release resources
_probe_batch.reset(NULL);

if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
}
if (_hash_tbl.get() != NULL) {
_hash_tbl->close();
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/exec/mysql_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,6 @@ Status MysqlScanNode::close(RuntimeState* state) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
SCOPED_TIMER(_runtime_profile->total_time_counter());

if (memory_used_counter() != NULL) {
COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes());
}
_tuple_pool.reset();

return ExecNode::close(state);
Expand Down
14 changes: 4 additions & 10 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,6 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
_direct_conjunct_size = _conjunct_ctxs.size();

if (tnode.olap_scan_node.__isset.sort_column) {
_is_result_order = true;
} else {
_is_result_order = false;
}

// Before, we support scan data ordered, but is not used in production
// Now, we drop this functional
DCHECK(!_is_result_order) << "ordered result don't support any more";

return Status::OK();
}

Expand Down Expand Up @@ -668,6 +658,8 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
}

int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());

std::unordered_set<std::string> disk_set;
for (auto& scan_range : _scan_ranges) {
std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
Expand Down Expand Up @@ -702,8 +694,10 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges);
_scanner_pool->add(scanner);
_olap_scanners.push_back(scanner);
disk_set.insert(scanner->scan_disk());
}
}
COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this metric used for? If this metric is not useful, just remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric help we now how many disk accessed in olap scan node scan data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What can it be used for when you know how many disks accessed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can know whether the tablet of a query is allocated on different disks to know whether is IO bottleneck in disk


// init progress
std::stringstream ss;
Expand Down
3 changes: 0 additions & 3 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ class OlapScanNode : public ScanNode {

std::vector<TCondition> _olap_filter;

// Order Result Flag
bool _is_result_order;

// Pool for storing allocated scanner objects. We don't want to use the
// runtime pool to ensure that the scanner objects are deleted before this
// object is.
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ class OlapScanner {
int64_t raw_rows_read() const { return _raw_rows_read; }

void update_counter();

const std::string& scan_disk() const {
return _tablet->data_dir()->path();
}
private:
Status _prepare(
const TPaloScanRange& scan_range,
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const string ScanNode::_s_total_throughput_counter = "TotalReadThroughput";
const string ScanNode::_s_materialize_tuple_timer = "MaterializeTupleTime(*)";
const string ScanNode::_s_per_read_thread_throughput_counter =
"PerReadThreadRawHdfsThroughput";
const string ScanNode::_s_num_disks_accessed_counter = "NumThread";
const string ScanNode::_s_num_disks_accessed_counter = "NumDiskAccess";
const string ScanNode::_s_scan_ranges_complete_counter = "ScanRangesComplete";
const string ScanNode::_s_scanner_thread_counters_prefix = "ScannerThreads";
const string ScanNode::_s_scanner_thread_total_wallclock_time =
Expand All @@ -43,6 +43,7 @@ Status ScanNode::prepare(RuntimeState* state) {
ADD_THREAD_COUNTERS(runtime_profile(), _s_scanner_thread_counters_prefix);
_bytes_read_counter =
ADD_COUNTER(runtime_profile(), _s_bytes_read_counter, TUnit::BYTES);
//TODO: The _rows_read_counter == RowsReturned counter in exec node, there is no need to keep both of them
_rows_read_counter =
ADD_COUNTER(runtime_profile(), _s_rows_read_counter, TUnit::UNIT);
_read_timer = ADD_TIMER(runtime_profile(), _s_total_read_timer);
Expand Down
5 changes: 0 additions & 5 deletions be/src/exec/scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ class TScanRange;
//
// AverageScannerThreadConcurrency - the average number of active scanner threads. A
// scanner thread is considered active if it is not blocked by IO. This number would
// be low (less than 1) for IO-bound queries. For cpu-bound queries, this number
// would be close to the max scanner threads allowed.
//
// AverageScannerThreadConcurrency - the average number of active scanner threads. A
// scanner thread is considered active if it is not blocked by IO. This number would
// be low (less than 1) for IO bounded queries. For cpu bounded queries, this number
// would be close to the max scanner threads allowed.
//
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/schema_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,6 @@ Status SchemaScanNode::close(RuntimeState* state) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
SCOPED_TIMER(_runtime_profile->total_time_counter());

if (memory_used_counter() != NULL) {
COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes());
}

_tuple_pool.reset();
return ExecNode::close(state);
}
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/set_operation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ Status SetOperationNode::close(RuntimeState* state) {
// Must reset _probe_batch in close() to release resources
_probe_batch.reset(NULL);

if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
}
if (_hash_tbl.get() != NULL) {
_hash_tbl->close();
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/exec/topn_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,6 @@ Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
// block(s) in the pool are all full or when the pool has reached a certain size.
if (*eos) {
row_batch->tuple_data_pool()->acquire_data(_tuple_pool.get(), false);
if (memory_used_counter() != NULL) {
COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes());
}
}
return Status::OK();
}
Expand Down
6 changes: 0 additions & 6 deletions be/src/runtime/bufferpool/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,6 @@ BufferPool::Client::Client(BufferPool* pool, //TmpFileMgr::FileGroup* file_group
ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT);
counters_.cumulative_bytes_alloced =
ADD_COUNTER(child_profile, "CumulativeAllocationBytes", TUnit::BYTES);
counters_.read_wait_time = ADD_TIMER(child_profile, "ReadIoWaitTime");
counters_.read_io_ops = ADD_COUNTER(child_profile, "ReadIoOps", TUnit::UNIT);
counters_.bytes_read = ADD_COUNTER(child_profile, "ReadIoBytes", TUnit::BYTES);
counters_.write_wait_time = ADD_TIMER(child_profile, "WriteIoWaitTime");
counters_.write_io_ops = ADD_COUNTER(child_profile, "WriteIoOps", TUnit::UNIT);
counters_.bytes_written = ADD_COUNTER(child_profile, "WriteIoBytes", TUnit::BYTES);
counters_.peak_unpinned_bytes =
child_profile->AddHighWaterMarkCounter("PeakUnpinnedBytes", TUnit::BYTES);
}
Expand Down
18 changes: 0 additions & 18 deletions be/src/runtime/bufferpool/buffer_pool_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,6 @@ struct BufferPoolClientCounters {
/// Bytes of buffers allocated via BufferAllocator::AllocateBuffer().
RuntimeProfile::Counter* cumulative_bytes_alloced;

/// Amount of time spent waiting for reads from disk to complete.
RuntimeProfile::Counter* read_wait_time;

/// Total number of read I/O operations issued.
RuntimeProfile::Counter* read_io_ops;

/// Total bytes read from disk.
RuntimeProfile::Counter* bytes_read;

/// Amount of time spent waiting for writes to disk to complete.
RuntimeProfile::Counter* write_wait_time;

/// Total number of write I/O operations issued.
RuntimeProfile::Counter* write_io_ops;

/// Total bytes written to disk.
RuntimeProfile::Counter* bytes_written;

/// The peak total size of unpinned pages.
RuntimeProfile::HighWaterMarkCounter* peak_unpinned_bytes;
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/data_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ Status DataStreamSender::prepare(RuntimeState* state) {
_profile = _pool->add(new RuntimeProfile(_pool, title.str()));
SCOPED_TIMER(_profile->total_time_counter());
_mem_tracker.reset(
new MemTracker(-1, "DataStreamSender", state->instance_mem_tracker()));
new MemTracker(_profile, -1, "DataStreamSender", state->instance_mem_tracker()));

if (_part_type == TPartitionType::UNPARTITIONED
|| _part_type == TPartitionType::RANDOM) {
Expand Down