Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,54 +87,65 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}


void OlapScanNode::init_scan_profile() {
_scanner_profile.reset(new RuntimeProfile("OlapScanner"));
runtime_profile()->add_child(_scanner_profile.get(), true, NULL);

_segment_profile.reset(new RuntimeProfile("SegmentIterator"));
_scanner_profile->add_child(_segment_profile.get(), true, NULL);
}

void OlapScanNode::_init_counter(RuntimeState* state) {
ADD_TIMER(_runtime_profile, "ShowHintsTime");
ADD_TIMER(_scanner_profile, "ShowHintsTime_V1");

_reader_init_timer = ADD_TIMER(_runtime_profile, "ReaderInitTime");
_read_compressed_counter = ADD_COUNTER(_runtime_profile, "CompressedBytesRead", TUnit::BYTES);
_read_uncompressed_counter = ADD_COUNTER(_runtime_profile, "UncompressedBytesRead", TUnit::BYTES);
_block_load_timer = ADD_TIMER(_runtime_profile, "BlockLoadTime");
_block_load_counter = ADD_COUNTER(_runtime_profile, "BlocksLoad", TUnit::UNIT);
_block_fetch_timer = ADD_TIMER(_runtime_profile, "BlockFetchTime");
_raw_rows_counter = ADD_COUNTER(_runtime_profile, "RawRowsRead", TUnit::UNIT);
_block_convert_timer = ADD_TIMER(_runtime_profile, "BlockConvertTime");
_block_seek_timer = ADD_TIMER(_runtime_profile, "BlockSeekTime");
_block_seek_counter = ADD_COUNTER(_runtime_profile, "BlockSeekCount", TUnit::UNIT);
_reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime");
_read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES);
_read_uncompressed_counter = ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES);
_block_load_timer = ADD_TIMER(_segment_profile, "BlockLoadTime");
_block_load_counter = ADD_COUNTER(_segment_profile, "BlocksLoad", TUnit::UNIT);
_block_fetch_timer = ADD_TIMER(_scanner_profile, "BlockFetchTime");
_raw_rows_counter = ADD_COUNTER(_segment_profile, "RawRowsRead", TUnit::UNIT);
_block_convert_timer = ADD_TIMER(_scanner_profile, "BlockConvertTime");
_block_seek_timer = ADD_TIMER(_segment_profile, "BlockSeekTime");
_block_seek_counter = ADD_COUNTER(_segment_profile, "BlockSeekCount", TUnit::UNIT);

_rows_vec_cond_counter = ADD_COUNTER(_runtime_profile, "RowsVectorPredFiltered", TUnit::UNIT);
_vec_cond_timer = ADD_TIMER(_runtime_profile, "VectorPredEvalTime");
_rows_vec_cond_counter = ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT);
_vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime");

_stats_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsStatsFiltered", TUnit::UNIT);
_bf_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsBloomFilterFiltered", TUnit::UNIT);
_del_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsDelFiltered", TUnit::UNIT);
_key_range_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsKeyRangeFiltered", TUnit::UNIT);
_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT);
_bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT);
_del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT);
_conditions_filtered_counter = ADD_COUNTER(_segment_profile, "RowsConditionsFiltered", TUnit::UNIT);
_key_range_filtered_counter = ADD_COUNTER(_segment_profile, "RowsKeyRangeFiltered", TUnit::UNIT);

_io_timer = ADD_TIMER(_runtime_profile, "IOTimer");
_decompressor_timer = ADD_TIMER(_runtime_profile, "DecompressorTimer");
_index_load_timer = ADD_TIMER(_runtime_profile, "IndexLoadTime");
_io_timer = ADD_TIMER(_segment_profile, "IOTimer");
_decompressor_timer = ADD_TIMER(_segment_profile, "DecompressorTimer");
_index_load_timer = ADD_TIMER(_segment_profile, "IndexLoadTime_V1");

_scan_timer = ADD_TIMER(_runtime_profile, "ScanTime");
_scan_timer = ADD_TIMER(_scanner_profile, "ScanTime");

_total_pages_num_counter = ADD_COUNTER(_runtime_profile, "TotalPagesNum", TUnit::UNIT);
_cached_pages_num_counter = ADD_COUNTER(_runtime_profile, "CachedPagesNum", TUnit::UNIT);
_total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT);
_cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT);

_bitmap_index_filter_counter = ADD_COUNTER(_runtime_profile, "RowsBitmapIndexFiltered", TUnit::UNIT);
_bitmap_index_filter_timer = ADD_TIMER(_runtime_profile, "BitmapIndexFilterTimer");
_bitmap_index_filter_counter = ADD_COUNTER(_segment_profile, "RowsBitmapIndexFiltered", TUnit::UNIT);
_bitmap_index_filter_timer = ADD_TIMER(_segment_profile, "BitmapIndexFilterTimer");

_num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);

_filtered_segment_counter = ADD_COUNTER(_runtime_profile, "NumSegmentFiltered", TUnit::UNIT);
_total_segment_counter = ADD_COUNTER(_runtime_profile, "NumSegmentTotal", TUnit::UNIT);
_filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT);
_total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT);
}

Status OlapScanNode::prepare(RuntimeState* state) {
init_scan_profile();
RETURN_IF_ERROR(ScanNode::prepare(state));
// create scanner profile
// create timer
_tablet_counter =
ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT);
_rows_pushed_cond_filtered_counter =
ADD_COUNTER(_runtime_profile, "RowsPushedCondFiltered", TUnit::UNIT);
ADD_COUNTER(_scanner_profile, "RowsPushedCondFiltered", TUnit::UNIT);
_init_counter(state);
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == NULL) {
Expand Down Expand Up @@ -567,7 +578,7 @@ static Status get_hints(
return Status::InternalError(ss.str());
}

RuntimeProfile::Counter* show_hints_timer = profile->get_counter("ShowHintsTime");
RuntimeProfile::Counter* show_hints_timer = profile->get_counter("ShowHintsTime_V1");
std::vector<std::vector<OlapTuple>> ranges;
bool have_valid_range = false;
for (auto& key_range : scan_key_range) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ class OlapScanNode : public ScanNode {

private:
void _init_counter(RuntimeState* state);
// OLAP_SCAN_NODE profile layering: OLAP_SCAN_NODE, OlapScanner, and SegmentIterator
// according to the calling relationship
void init_scan_profile();

void construct_is_null_pred_in_where_pred(Expr* expr, SlotDescriptor* slot, std::string is_null_str);

Expand Down Expand Up @@ -254,6 +257,9 @@ class OlapScanNode : public ScanNode {
// or be overwritten by value in TQueryOptions
int32_t _max_pushdown_conditions_per_column = 1024;

std::unique_ptr<RuntimeProfile> _scanner_profile;
std::unique_ptr<RuntimeProfile> _segment_profile;

// Counters
RuntimeProfile::Counter* _io_timer = nullptr;
RuntimeProfile::Counter* _read_compressed_counter = nullptr;
Expand All @@ -267,6 +273,7 @@ class OlapScanNode : public ScanNode {
RuntimeProfile::Counter* _stats_filtered_counter = nullptr;
RuntimeProfile::Counter* _bf_filtered_counter = nullptr;
RuntimeProfile::Counter* _del_filtered_counter = nullptr;
RuntimeProfile::Counter* _conditions_filtered_counter = nullptr;
RuntimeProfile::Counter* _key_range_filtered_counter = nullptr;

RuntimeProfile::Counter* _block_seek_timer = nullptr;
Expand Down
7 changes: 2 additions & 5 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@

namespace doris {

static const std::string SCANNER_THREAD_TOTAL_WALLCLOCK_TIME =
"ScannerThreadsTotalWallClockTime";
static const std::string MATERIALIZE_TUPLE_TIMER =
"MaterializeTupleTime(*)";

OlapScanner::OlapScanner(
RuntimeState* runtime_state,
OlapScanNode* parent,
Expand Down Expand Up @@ -481,6 +476,8 @@ void OlapScanner::update_counter() {
COUNTER_UPDATE(_parent->_stats_filtered_counter, _reader->stats().rows_stats_filtered);
COUNTER_UPDATE(_parent->_bf_filtered_counter, _reader->stats().rows_bf_filtered);
COUNTER_UPDATE(_parent->_del_filtered_counter, _reader->stats().rows_del_filtered);
COUNTER_UPDATE(_parent->_conditions_filtered_counter, _reader->stats().rows_conditions_filtered);

COUNTER_UPDATE(_parent->_key_range_filtered_counter, _reader->stats().rows_key_range_filtered);

COUNTER_UPDATE(_parent->_index_load_timer, _reader->stats().index_load_ns);
Expand Down
14 changes: 0 additions & 14 deletions be/src/exec/scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,27 @@ namespace doris {

const string ScanNode::_s_bytes_read_counter = "BytesRead";
const string ScanNode::_s_rows_read_counter = "RowsRead";
const string ScanNode::_s_total_read_timer = "TotalRawReadTime(*)";
const string ScanNode::_s_total_throughput_counter = "TotalReadThroughput";
const string ScanNode::_s_materialize_tuple_timer = "MaterializeTupleTime(*)";
const string ScanNode::_s_per_read_thread_throughput_counter =
"PerReadThreadRawHdfsThroughput";
const string ScanNode::_s_num_disks_accessed_counter = "NumDiskAccess";
const string ScanNode::_s_scanner_thread_counters_prefix = "ScannerThreads";
const string ScanNode::_s_scanner_thread_total_wallclock_time =
"ScannerThreadsTotalWallClockTime";

const string ScanNode::_s_num_scanner_threads_started ="NumScannerThreadsStarted";

Status ScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));

_scanner_thread_counters =
ADD_THREAD_COUNTERS(runtime_profile(), _s_scanner_thread_counters_prefix);
_bytes_read_counter =
ADD_COUNTER(runtime_profile(), _s_bytes_read_counter, TUnit::BYTES);
//TODO: The _rows_read_counter == RowsReturned counter in exec node, there is no need to keep both of them
_rows_read_counter =
ADD_COUNTER(runtime_profile(), _s_rows_read_counter, TUnit::UNIT);
_read_timer = ADD_TIMER(runtime_profile(), _s_total_read_timer);
#ifndef BE_TEST
_total_throughput_counter = runtime_profile()->add_rate_counter(
_s_total_throughput_counter, _bytes_read_counter);
#endif
_materialize_tuple_timer = ADD_CHILD_TIMER(runtime_profile(), _s_materialize_tuple_timer,
_s_scanner_thread_total_wallclock_time);
_per_read_thread_throughput_counter = runtime_profile()->add_derived_counter(
_s_per_read_thread_throughput_counter, TUnit::BYTES_PER_SECOND,
boost::bind<int64_t>(&RuntimeProfile::units_per_second,
_bytes_read_counter,
_read_timer),
"");
_num_disks_accessed_counter =
ADD_COUNTER(runtime_profile(), _s_num_disks_accessed_counter, TUnit::UNIT);

Expand Down
19 changes: 0 additions & 19 deletions be/src/exec/scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,49 +96,30 @@ class ScanNode : public ExecNode {
RuntimeProfile::Counter* rows_read_counter() const {
return _rows_read_counter;
}
RuntimeProfile::Counter* read_timer() const {
return _read_timer;
}
RuntimeProfile::Counter* total_throughput_counter() const {
return _total_throughput_counter;
}
RuntimeProfile::Counter* per_read_thread_throughput_counter() const {
return _per_read_thread_throughput_counter;
}
RuntimeProfile::Counter* materialize_tuple_timer() const {
return _materialize_tuple_timer;
}
RuntimeProfile::ThreadCounters* scanner_thread_counters() const {
return _scanner_thread_counters;
}

// names of ScanNode common counters
static const std::string _s_bytes_read_counter;
static const std::string _s_rows_read_counter;
static const std::string _s_total_read_timer;
static const std::string _s_total_throughput_counter;
static const std::string _s_per_read_thread_throughput_counter;
static const std::string _s_num_disks_accessed_counter;
static const std::string _s_materialize_tuple_timer;
static const std::string _s_scanner_thread_counters_prefix;
static const std::string _s_scanner_thread_total_wallclock_time;
static const std::string _s_average_io_mgr_queue_capacity;
static const std::string _s_num_scanner_threads_started;

protected:
RuntimeProfile::Counter* _bytes_read_counter; // # bytes read from the scanner
// # rows/tuples read from the scanner (including those discarded by eval_conjucts())
RuntimeProfile::Counter* _rows_read_counter;
RuntimeProfile::Counter* _read_timer; // total read time
// Wall based aggregate read throughput [bytes/sec]
RuntimeProfile::Counter* _total_throughput_counter;
// Per thread read throughput [bytes/sec]
RuntimeProfile::Counter* _per_read_thread_throughput_counter;
RuntimeProfile::Counter* _num_disks_accessed_counter;
RuntimeProfile::Counter* _materialize_tuple_timer; // time writing tuple slots
// Aggregated scanner thread counters
RuntimeProfile::ThreadCounters* _scanner_thread_counters;
RuntimeProfile::Counter* _num_scanner_threads_started_counter;
};

}
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ struct OlapReaderStatistics {
int64_t rows_stats_filtered = 0;
int64_t rows_bf_filtered = 0;
int64_t rows_del_filtered = 0;
int64_t rows_conditions_filtered = 0;

int64_t index_load_ns = 0;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
RETURN_IF_ERROR(_get_row_ranges_from_conditions(&condition_row_ranges));
size_t pre_size = _row_bitmap.cardinality();
_row_bitmap &= RowRanges::ranges_to_roaring(condition_row_ranges);
_opts.stats->rows_del_filtered += (pre_size - _row_bitmap.cardinality());
_opts.stats->rows_conditions_filtered += (pre_size - _row_bitmap.cardinality());
}

// TODO(hkp): calculate filter rate to decide whether to
Expand Down
Loading