Skip to content
26 changes: 11 additions & 15 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,24 +305,22 @@ struct OlapReaderStatistics {
// block_load_ns
// block_init_ns
// block_init_seek_ns
// block_conditions_filtered_ns
// first_read_ns
// block_first_read_seek_ns
// generate_row_ranges_ns
// predicate_column_read_ns
// predicate_column_read_seek_ns
// lazy_read_ns
// block_lazy_read_seek_ns
int64_t block_init_ns = 0;
int64_t block_init_seek_num = 0;
int64_t block_init_seek_ns = 0;
int64_t first_read_ns = 0;
int64_t second_read_ns = 0;
int64_t block_first_read_seek_num = 0;
int64_t block_first_read_seek_ns = 0;
int64_t predicate_column_read_ns = 0;
int64_t non_predicate_read_ns = 0;
int64_t predicate_column_read_seek_num = 0;
int64_t predicate_column_read_seek_ns = 0;
int64_t lazy_read_ns = 0;
int64_t block_lazy_read_seek_num = 0;
int64_t block_lazy_read_seek_ns = 0;

int64_t block_convert_ns = 0;

int64_t raw_rows_read = 0;

int64_t rows_vec_cond_filtered = 0;
Expand Down Expand Up @@ -351,11 +349,10 @@ struct OlapReaderStatistics {
int64_t rows_del_by_bitmap = 0;
// the number of rows filtered by various column indexes.
int64_t rows_conditions_filtered = 0;
int64_t block_conditions_filtered_ns = 0;
int64_t block_conditions_filtered_bf_ns = 0;
int64_t block_conditions_filtered_zonemap_ns = 0;
int64_t block_conditions_filtered_zonemap_rp_ns = 0;
int64_t block_conditions_filtered_dict_ns = 0;
int64_t generate_row_ranges_ns = 0;
int64_t generate_row_ranges_by_bf_ns = 0;
int64_t generate_row_ranges_by_zonemap_ns = 0;
int64_t generate_row_ranges_by_dict_ns = 0;

int64_t index_load_ns = 0;

Expand All @@ -372,7 +369,6 @@ struct OlapReaderStatistics {
int64_t inverted_index_query_cache_miss = 0;
int64_t inverted_index_query_null_bitmap_timer = 0;
int64_t inverted_index_query_bitmap_copy_timer = 0;
int64_t inverted_index_query_bitmap_op_timer = 0;
int64_t inverted_index_searcher_open_timer = 0;
int64_t inverted_index_searcher_search_timer = 0;
int64_t inverted_index_searcher_cache_hit = 0;
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,12 @@ struct RowsetReaderContext {
bool enable_unique_key_merge_on_write = false;
const DeleteBitmap* delete_bitmap = nullptr;
bool record_rowids = false;
bool is_vertical_compaction = false;
bool is_key_column_group = false;
const std::set<int32_t>* output_columns = nullptr;
RowsetId rowset_id;
// slots that cast may be eliminated in storage layer
std::map<std::string, TypeDescriptor> target_cast_type_for_variants;
int64_t ttl_seconds = 0;
size_t topn_limit = 0;
};

} // namespace doris
Expand Down
75 changes: 38 additions & 37 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra
}

Status SegmentIterator::_get_row_ranges_by_column_conditions() {
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_ns);
SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_ns);
if (_row_bitmap.isEmpty()) {
return Status::OK();
}
Expand Down Expand Up @@ -565,7 +565,7 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
size_t pre_size = 0;

{
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_bf_ns);
SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_bf_ns);
// first filter data by bloom filter index
// bloom filter index only use CondColumn
RowRanges bf_row_ranges = RowRanges::create_single(num_rows());
Expand All @@ -588,7 +588,7 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
}

{
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_zonemap_ns);
SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_zonemap_ns);
RowRanges zone_map_row_ranges = RowRanges::create_single(num_rows());
// second filter data by zone map
for (const auto& cid : cids) {
Expand Down Expand Up @@ -652,7 +652,7 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
}

{
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_dict_ns);
SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_dict_ns);
/// Low cardinality optimization is currently not very stable, so to prevent data corruption,
/// we are temporarily disabling its use in data compaction.
if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) {
Expand Down Expand Up @@ -1400,7 +1400,7 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
if (!_is_common_expr_column[cid]) {
_non_predicate_columns.push_back(cid);
} else {
_second_read_column_ids.push_back(cid);
_non_predicate_column_ids.push_back(cid);
}
}
}
Expand All @@ -1410,13 +1410,13 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
if (_lazy_materialization_read) {
// insert pred cid to first_read_columns
for (auto cid : pred_column_ids) {
_first_read_column_ids.push_back(cid);
_predicate_column_ids.push_back(cid);
}
} else if (!_is_need_vec_eval && !_is_need_short_eval &&
!_is_need_expr_eval) { // no pred exists, just read and output column
for (int i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
_first_read_column_ids.push_back(cid);
_predicate_column_ids.push_back(cid);
}
} else {
if (_is_need_vec_eval || _is_need_short_eval) {
Expand All @@ -1428,26 +1428,26 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
_short_cir_pred_column_ids.end());
pred_id_set.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end());

DCHECK(_second_read_column_ids.empty());
// _second_read_column_ids must be empty. Otherwise _lazy_materialization_read must not false.
DCHECK(_non_predicate_column_ids.empty());
// _non_predicate_column_ids must be empty. Otherwise _lazy_materialization_read must not false.
for (int i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
if (pred_id_set.find(cid) != pred_id_set.end()) {
_first_read_column_ids.push_back(cid);
_predicate_column_ids.push_back(cid);
}
// In the past, if schema columns > pred columns, the _lazy_materialization_read maybe == false, but
// we make sure using _lazy_materialization_read= true now, so these logic may never happens. I comment
// these lines and we could delete them in the future to make the code more clear.
// else if (non_pred_set.find(cid) != non_pred_set.end()) {
// _first_read_column_ids.push_back(cid);
// _predicate_column_ids.push_back(cid);
// // when _lazy_materialization_read = false, non-predicate column should also be filtered by sel idx, so we regard it as pred columns
// _is_pred_column[cid] = true;
// }
}
} else if (_is_need_expr_eval) {
DCHECK(!_is_need_vec_eval && !_is_need_short_eval);
for (auto cid : _common_expr_columns) {
_first_read_column_ids.push_back(cid);
_predicate_column_ids.push_back(cid);
}
}
}
Expand Down Expand Up @@ -1633,7 +1633,7 @@ void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
* 1. Reads a batch of rowids (up to the specified limit), and checks if they are continuous.
* Continuous here means that the rowids form an unbroken sequence (e.g., 1, 2, 3, 4...).
*
* 2. For each column that needs to be read (identified by _first_read_column_ids):
* 2. For each column that needs to be read (identified by _predicate_column_ids):
* - If the rowids are continuous, the function uses seek_to_ordinal and next_batch
* for efficient reading.
* - If the rowids are not continuous, the function processes them in smaller batches
Expand All @@ -1646,13 +1646,13 @@ void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
*/
Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32_t& nrows_read,
bool set_block_rowid) {
SCOPED_RAW_TIMER(&_opts.stats->first_read_ns);
SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_ns);

nrows_read = _range_iter->read_batch_rowids(_block_rowids.data(), nrows_read_limit);
bool is_continuous = (nrows_read > 1) &&
(_block_rowids[nrows_read - 1] - _block_rowids[0] == nrows_read - 1);

for (auto cid : _first_read_column_ids) {
for (auto cid : _predicate_column_ids) {
auto& column = _current_return_columns[cid];
if (_no_need_read_key_data(cid, column, nrows_read)) {
continue;
Expand All @@ -1677,9 +1677,9 @@ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32

if (is_continuous) {
size_t rows_read = nrows_read;
_opts.stats->block_first_read_seek_num += 1;
_opts.stats->predicate_column_read_seek_num += 1;
if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns);
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
} else {
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
Expand All @@ -1701,9 +1701,9 @@ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32

if (batch_continuous) {
size_t rows_read = current_batch_size;
_opts.stats->block_first_read_seek_num += 1;
_opts.stats->predicate_column_read_seek_num += 1;
if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns);
RETURN_IF_ERROR(
_column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
} else {
Expand Down Expand Up @@ -2066,8 +2066,8 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
RETURN_IF_ERROR(_read_columns_by_index(
nrows_read_limit, _current_batch_rows_read,
_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval));
if (std::find(_first_read_column_ids.begin(), _first_read_column_ids.end(),
_schema->version_col_idx()) != _first_read_column_ids.end()) {
if (std::find(_predicate_column_ids.begin(), _predicate_column_ids.end(),
_schema->version_col_idx()) != _predicate_column_ids.end()) {
_replace_version_col(_current_batch_rows_read);
}

Expand All @@ -2092,7 +2092,7 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
if (_non_predicate_columns.empty()) {
return Status::InternalError("_non_predicate_columns is empty");
}
RETURN_IF_ERROR(_convert_to_expected_type(_first_read_column_ids));
RETURN_IF_ERROR(_convert_to_expected_type(_predicate_column_ids));
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
_output_non_pred_columns(block);
} else {
Expand All @@ -2113,27 +2113,28 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {

if (selected_size > 0) {
// step 3.1: output short circuit and predicate column
// when lazy materialization enables, _first_read_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
// when lazy materialization enables, _predicate_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
// see _vec_init_lazy_materialization
// todo(wb) need to tell input columnids from output columnids
RETURN_IF_ERROR(_output_column_by_sel_idx(block, _first_read_column_ids,
RETURN_IF_ERROR(_output_column_by_sel_idx(block, _predicate_column_ids,
_sel_rowid_idx.data(), selected_size));

// step 3.2: read remaining expr column and evaluate it.
if (_is_need_expr_eval) {
// The predicate column contains the remaining expr column, no need second read.
if (!_second_read_column_ids.empty()) {
SCOPED_RAW_TIMER(&_opts.stats->second_read_ns);
if (!_non_predicate_column_ids.empty()) {
SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns);
RETURN_IF_ERROR(_read_columns_by_rowids(
_second_read_column_ids, _block_rowids, _sel_rowid_idx.data(),
_non_predicate_column_ids, _block_rowids, _sel_rowid_idx.data(),
selected_size, &_current_return_columns));
if (std::find(_second_read_column_ids.begin(),
_second_read_column_ids.end(), _schema->version_col_idx()) !=
_second_read_column_ids.end()) {
if (std::find(_non_predicate_column_ids.begin(),
_non_predicate_column_ids.end(),
_schema->version_col_idx()) !=
_non_predicate_column_ids.end()) {
_replace_version_col(selected_size);
}
RETURN_IF_ERROR(_convert_to_expected_type(_second_read_column_ids));
for (auto cid : _second_read_column_ids) {
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_column_ids));
for (auto cid : _non_predicate_column_ids) {
auto loc = _schema_block_id_map[cid];
block->replace_by_position(loc,
std::move(_current_return_columns[cid]));
Expand Down Expand Up @@ -2166,17 +2167,17 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
}
}
} else if (_is_need_expr_eval) {
RETURN_IF_ERROR(_convert_to_expected_type(_second_read_column_ids));
for (auto cid : _second_read_column_ids) {
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_column_ids));
for (auto cid : _non_predicate_column_ids) {
auto loc = _schema_block_id_map[cid];
block->replace_by_position(loc, std::move(_current_return_columns[cid]));
}
}
} else if (_is_need_expr_eval) {
DCHECK(!_first_read_column_ids.empty());
RETURN_IF_ERROR(_convert_to_expected_type(_first_read_column_ids));
DCHECK(!_predicate_column_ids.empty());
RETURN_IF_ERROR(_convert_to_expected_type(_predicate_column_ids));
// first read all rows are insert block, initialize sel_rowid_idx to all rows.
for (auto cid : _first_read_column_ids) {
for (auto cid : _predicate_column_ids) {
auto loc = _schema_block_id_map[cid];
block->replace_by_position(loc, std::move(_current_return_columns[cid]));
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ class SegmentIterator : public RowwiseIterator {
// first, read predicate columns by various index
// second, read non-predicate columns
// so we need a field to stand for columns first time to read
std::vector<ColumnId> _first_read_column_ids;
std::vector<ColumnId> _second_read_column_ids;
std::vector<ColumnId> _predicate_column_ids;
std::vector<ColumnId> _non_predicate_column_ids;
std::vector<ColumnId> _columns_to_filter;
std::vector<ColumnId> _converted_column_ids;
std::vector<int> _schema_block_id_map; // map from schema column id to column idx in Block
Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,13 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);

_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
_exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
_merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
_expr_timer = ADD_TIMER(Base::profile(), "ExprTime");
_serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime");
_deserialize_data_timer = ADD_TIMER(Base::profile(), "DeserializeAndMergeTime");
_hash_table_compute_timer = ADD_TIMER(Base::profile(), "HashTableComputeTime");
_hash_table_limit_compute_timer = ADD_TIMER(Base::profile(), "DoLimitComputeTime");
_hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime");
_hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT);
_max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT);

return Status::OK();
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,8 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
RuntimeProfile::Counter* _build_timer = nullptr;
RuntimeProfile::Counter* _expr_timer = nullptr;
RuntimeProfile::Counter* _serialize_key_timer = nullptr;
RuntimeProfile::Counter* _merge_timer = nullptr;
RuntimeProfile::Counter* _serialize_data_timer = nullptr;
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _max_row_size_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr;
Expand Down
Loading