diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index b8f00e4d16cef0..9369ffb43a2a72 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -45,6 +45,7 @@ #include "common/config.h" #include "common/consts.h" #include "common/exception.h" +#include "common/signal_handler.h" #include "exec/tablet_info.h" // DorisNodesInfo #include "olap/olap_common.h" #include "olap/rowset/beta_rowset.h" @@ -58,6 +59,7 @@ #include "runtime/fragment_mgr.h" // FragmentMgr #include "runtime/runtime_state.h" // RuntimeState #include "runtime/types.h" +#include "runtime/workload_group/workload_group_manager.h" #include "util/brpc_client_cache.h" // BrpcClientCache #include "util/defer_op.h" #include "vec/columns/column.h" @@ -488,8 +490,9 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request, int64_t acquire_segments_ms = 0; int64_t lookup_row_data_ms = 0; - int64_t external_init_reader_ms = 0; - int64_t external_get_block_ms = 0; + int64_t external_init_reader_avg_ms = 0; + int64_t external_get_block_avg_ms = 0; + size_t external_scan_range_cnt = 0; // Add counters for different file mapping types std::unordered_map file_type_counts; @@ -507,6 +510,7 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request, for (int i = 0; i < request.request_block_descs_size(); ++i) { const auto& request_block_desc = request.request_block_descs(i); + PMultiGetBlockV2* pblock = response->add_blocks(); if (request_block_desc.row_id_size() >= 1) { // Since this block belongs to the same table, we only need to take the first type for judgment. auto first_file_id = request_block_desc.file_id(0); @@ -542,9 +546,10 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request, &acquire_segments_ms, &lookup_row_data_ms)); } else { RETURN_IF_ERROR(read_batch_external_row( - request_block_desc, id_file_map, slots, first_file_mapping, - tquery_id, result_blocks[i], &external_init_reader_ms, - &external_get_block_ms)); + request.wg_id(), request_block_desc, id_file_map, slots, + first_file_mapping, tquery_id, result_blocks[i], + pblock->mutable_profile(), &external_init_reader_avg_ms, + &external_get_block_avg_ms, &external_scan_range_cnt)); } } catch (const Exception& e) { return Status::Error(e.code(), "Row id fetch failed because {}", @@ -558,9 +563,9 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request, [[maybe_unused]] size_t compressed_size = 0; [[maybe_unused]] size_t uncompressed_size = 0; int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0; - RETURN_IF_ERROR(result_blocks[i].serialize( - be_exec_version, response->add_blocks()->mutable_block(), &uncompressed_size, - &compressed_size, segment_v2::CompressionTypePB::LZ4)); + RETURN_IF_ERROR(result_blocks[i].serialize(be_exec_version, pblock->mutable_block(), + &uncompressed_size, &compressed_size, + segment_v2::CompressionTypePB::LZ4)); } // Build file type statistics string @@ -579,12 +584,14 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request, "io_latency:{}ns, uncompressed_bytes_read:{}, bytes_read:{}, " "acquire_tablet_ms:{}, acquire_rowsets_ms:{}, acquire_segments_ms:{}, " "lookup_row_data_ms:{}, file_types:[{}]; " - "External table : init_reader_ms:{}, get_block_ms:{}", + "External table : init_reader_ms:{}, get_block_ms:{}, " + "external_scan_range_cnt:{}", stats.cached_pages_num, stats.total_pages_num, stats.compressed_bytes_read, stats.io_ns, stats.uncompressed_bytes_read, stats.bytes_read, acquire_tablet_ms, acquire_rowsets_ms, acquire_segments_ms, lookup_row_data_ms, - file_type_stats, external_init_reader_ms, external_get_block_ms); + file_type_stats, external_init_reader_avg_ms, + external_get_block_avg_ms, external_scan_range_cnt); } if (request.has_gc_id_map() && request.gc_id_map()) { @@ -635,22 +642,23 @@ Status RowIdStorageReader::read_batch_doris_format_row( return Status::OK(); } -Status RowIdStorageReader::read_batch_external_row(const PRequestBlockDesc& request_block_desc, - std::shared_ptr id_file_map, - std::vector& slots, - std::shared_ptr first_file_mapping, - const TUniqueId& query_id, - vectorized::Block& result_block, - int64_t* init_reader_ms, int64_t* get_block_ms) { +const std::string RowIdStorageReader::ScannersRunningTimeProfile = "ScannersRunningTime"; +const std::string RowIdStorageReader::InitReaderAvgTimeProfile = "InitReaderAvgTime"; +const std::string RowIdStorageReader::GetBlockAvgTimeProfile = "GetBlockAvgTime"; +const std::string RowIdStorageReader::FileReadLinesProfile = "FileReadLines"; + +Status RowIdStorageReader::read_batch_external_row( + const uint64_t workload_group_id, const PRequestBlockDesc& request_block_desc, + std::shared_ptr id_file_map, std::vector& slots, + std::shared_ptr first_file_mapping, const TUniqueId& query_id, + vectorized::Block& result_block, PRuntimeProfileTree* pprofile, int64_t* init_reader_avg_ms, + int64_t* get_block_avg_ms, size_t* scan_range_cnt) { TFileScanRangeParams rpc_scan_params; TupleDescriptor tuple_desc(request_block_desc.desc(), false); std::unordered_map colname_to_slot_id; - std::unique_ptr runtime_state = nullptr; - std::unique_ptr runtime_profile; - runtime_profile = std::make_unique("ExternalRowIDFetcher"); - - std::unique_ptr vfile_scanner_ptr = nullptr; + std::shared_ptr runtime_state = nullptr; + int max_file_scanners = 0; { if (result_block.is_empty_column()) [[likely]] { result_block = vectorized::Block(slots, request_block_desc.row_id_size()); @@ -698,16 +706,38 @@ Status RowIdStorageReader::read_batch_external_row(const PRequestBlockDesc& requ * To ensure the same behavior as the scan stage, I get query_options query_globals from id_file_map, then create runtime_state * and pass it to vfile_scanner so that the runtime_state information is the same as the scan stage and the behavior is also consistent. */ - runtime_state = RuntimeState::create_unique(query_id, -1, query_options, query_globals, - ExecEnv::GetInstance()); - - vfile_scanner_ptr = vectorized::FileScanner::create_unique( - runtime_state.get(), runtime_profile.get(), &rpc_scan_params, &colname_to_slot_id, - &tuple_desc); + runtime_state = RuntimeState::create_shared( + query_id, -1, query_options, query_globals, ExecEnv::GetInstance(), + ExecEnv::GetInstance()->rowid_storage_reader_tracker()); - RETURN_IF_ERROR(vfile_scanner_ptr->prepare_for_read_one_line(first_scan_range_desc)); + max_file_scanners = id_file_map->get_max_file_scanners(); } + // Hash(TFileRangeDesc) => { all the rows that need to be read and their positions in the result block. } + file mapping + std::map, std::shared_ptr>> + scan_rows; + + // Block corresponding to the order of `scan_rows` map. + std::vector scan_blocks; + + // row_id (Indexing of vectors) => < In which block, which line in the block > + std::vector> row_id_block_idx; + + // Count the time/bytes it takes to read each TFileRangeDesc. (for profile) + std::vector fetch_statistics; + + auto hash_file_range = [](const TFileRangeDesc& file_range_desc) { + std::string value; + value.resize(file_range_desc.path.size() + sizeof(file_range_desc.start_offset)); + auto* ptr = value.data(); + + memcpy(ptr, &file_range_desc.start_offset, sizeof(file_range_desc.start_offset)); + ptr += sizeof(file_range_desc.start_offset); + memcpy(ptr, file_range_desc.path.data(), file_range_desc.path.size()); + return value; + }; + for (size_t j = 0; j < request_block_desc.row_id_size(); ++j) { auto file_id = request_block_desc.file_id(j); auto file_mapping = id_file_map->get_file_mapping(file_id); @@ -717,19 +747,212 @@ Status RowIdStorageReader::read_batch_external_row(const PRequestBlockDesc& requ BackendOptions::get_localhost(), print_id(query_id), file_id); } - auto& external_info = file_mapping->get_external_file_info(); - auto& scan_range_desc = external_info.scan_range_desc; + const auto& external_info = file_mapping->get_external_file_info(); + const auto& scan_range_desc = external_info.scan_range_desc; - // Clear to avoid reading iceberg position delete file... - scan_range_desc.table_format_params.iceberg_params = TIcebergFileDesc {}; + auto scan_range_hash = hash_file_range(scan_range_desc); + if (scan_rows.contains(scan_range_hash)) { + scan_rows.at(scan_range_hash).first.emplace(request_block_desc.row_id(j), j); + } else { + std::map tmp {{request_block_desc.row_id(j), j}}; + scan_rows.emplace(scan_range_hash, std::make_pair(tmp, file_mapping)); + } + } - // Clear to avoid reading hive transactional delete delta file... - scan_range_desc.table_format_params.transactional_hive_params = TTransactionalHiveDesc {}; + scan_blocks.resize(scan_rows.size()); + row_id_block_idx.resize(request_block_desc.row_id_size()); + fetch_statistics.resize(scan_rows.size()); + + // Get the workload group for subsequent scan task submission. + std::vector workload_group_ids; + workload_group_ids.emplace_back(workload_group_id); + auto wg = ExecEnv::GetInstance()->workload_group_mgr()->get_group(workload_group_ids); + doris::pipeline::TaskScheduler* exec_sched = nullptr; + vectorized::SimplifiedScanScheduler* scan_sched = nullptr; + vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr; + wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched); + DCHECK(remote_scan_sched); + + int64_t scan_running_time = 0; + RETURN_IF_ERROR(scope_timer_run( + [&]() -> Status { + // Make sure to insert data into result_block only after all scan tasks have been executed. + std::atomic producer_count {0}; + std::condition_variable cv; + std::mutex mtx; + + //semaphore: Limit the number of scan tasks submitted at one time + std::counting_semaphore semaphore {max_file_scanners}; + + size_t idx = 0; + for (const auto& [_, scan_info] : scan_rows) { + semaphore.acquire(); + RETURN_IF_ERROR( + remote_scan_sched->submit_scan_task(vectorized::SimplifiedScanTask( + [&, scan_info, idx]() { + auto& row_ids = scan_info.first; + auto& file_mapping = scan_info.second; + + SCOPED_ATTACH_TASK( + ExecEnv::GetInstance() + ->rowid_storage_reader_tracker()); + signal::set_signal_task_id(query_id); + + scan_blocks[idx] = + vectorized::Block(slots, scan_info.first.size()); + + size_t j = 0; + std::list read_ids; + //Generate an ordered list with the help of the orderliness of the map. + for (const auto& [row_id, result_block_idx] : row_ids) { + read_ids.emplace_back(row_id); + row_id_block_idx[result_block_idx] = + std::make_pair(idx, j); + j++; + } + + auto& external_info = + file_mapping->get_external_file_info(); + auto& scan_range_desc = external_info.scan_range_desc; + + // Clear to avoid reading iceberg position delete file... + scan_range_desc.table_format_params.iceberg_params = + TIcebergFileDesc {}; + + // Clear to avoid reading hive transactional delete delta file... + scan_range_desc.table_format_params + .transactional_hive_params = + TTransactionalHiveDesc {}; + + std::unique_ptr sub_runtime_profile = + std::make_unique( + "ExternalRowIDFetcher"); + { + std::unique_ptr + vfile_scanner_ptr = + vectorized::FileScanner::create_unique( + runtime_state.get(), + sub_runtime_profile.get(), + &rpc_scan_params, + &colname_to_slot_id, + &tuple_desc); + + RETURN_IF_ERROR( + vfile_scanner_ptr->prepare_for_read_lines( + scan_range_desc)); + RETURN_IF_ERROR( + vfile_scanner_ptr->read_lines_from_range( + scan_range_desc, read_ids, + &scan_blocks[idx], external_info, + &fetch_statistics[idx].init_reader_ms, + &fetch_statistics[idx].get_block_ms)); + } + + auto file_read_bytes_counter = + sub_runtime_profile->get_counter( + vectorized::FileScanner:: + FileReadBytesProfile); + + if (file_read_bytes_counter != nullptr) { + fetch_statistics[idx].file_read_bytes = + PrettyPrinter::print( + file_read_bytes_counter->value(), + file_read_bytes_counter->type()); + } + + auto file_read_times_counter = + sub_runtime_profile->get_counter( + vectorized::FileScanner:: + FileReadTimeProfile); + if (file_read_times_counter != nullptr) { + fetch_statistics[idx].file_read_times = + PrettyPrinter::print( + file_read_times_counter->value(), + file_read_times_counter->type()); + } + + semaphore.release(); + if (++producer_count == scan_rows.size()) { + std::lock_guard lock(mtx); + cv.notify_one(); + } + return Status::OK(); + }, + nullptr))); + idx++; + } + + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return producer_count == scan_rows.size(); }); + } + return Status::OK(); + }, + &scan_running_time)); + + // Insert the read data into result_block. + for (size_t column_id = 0; column_id < result_block.get_columns().size(); column_id++) { + auto dst_col = + const_cast(result_block.get_columns()[column_id].get()); + + std::vector scan_src_columns; + scan_src_columns.reserve(row_id_block_idx.size()); + std::vector scan_positions; + scan_positions.reserve(row_id_block_idx.size()); + for (const auto& [pos_block, block_idx] : row_id_block_idx) { + DCHECK(scan_blocks.size() > pos_block); + DCHECK(scan_blocks[pos_block].get_columns().size() > column_id); + scan_src_columns.emplace_back(scan_blocks[pos_block].get_columns()[column_id].get()); + scan_positions.emplace_back(block_idx); + } + dst_col->insert_from_multi_column(scan_src_columns, scan_positions); + } - RETURN_IF_ERROR(vfile_scanner_ptr->read_one_line_from_range( - scan_range_desc, request_block_desc.row_id(j), &result_block, external_info, - init_reader_ms, get_block_ms)); + // Statistical runtime profile information. + std::unique_ptr runtime_profile = + std::make_unique("ExternalRowIDFetcher"); + { + runtime_profile->add_info_string(ScannersRunningTimeProfile, + std::to_string(scan_running_time) + "ms"); + fmt::memory_buffer file_read_lines_buffer; + format_to(file_read_lines_buffer, "["); + fmt::memory_buffer file_read_bytes_buffer; + format_to(file_read_bytes_buffer, "["); + fmt::memory_buffer file_read_times_buffer; + format_to(file_read_times_buffer, "["); + + size_t idx = 0; + for (const auto& [_, scan_info] : scan_rows) { + format_to(file_read_lines_buffer, "{}, ", scan_info.first.size()); + *init_reader_avg_ms = fetch_statistics[idx].init_reader_ms; + *get_block_avg_ms += fetch_statistics[idx].get_block_ms; + format_to(file_read_bytes_buffer, "{}, ", fetch_statistics[idx].file_read_bytes); + format_to(file_read_times_buffer, "{}, ", fetch_statistics[idx].file_read_times); + idx++; + } + + format_to(file_read_lines_buffer, "]"); + format_to(file_read_bytes_buffer, "]"); + format_to(file_read_times_buffer, "]"); + + *init_reader_avg_ms /= fetch_statistics.size(); + *get_block_avg_ms /= fetch_statistics.size(); + runtime_profile->add_info_string(InitReaderAvgTimeProfile, + std::to_string(*init_reader_avg_ms) + "ms"); + runtime_profile->add_info_string(GetBlockAvgTimeProfile, + std::to_string(*init_reader_avg_ms) + "ms"); + runtime_profile->add_info_string(FileReadLinesProfile, + fmt::to_string(file_read_lines_buffer)); + runtime_profile->add_info_string(vectorized::FileScanner::FileReadBytesProfile, + fmt::to_string(file_read_bytes_buffer)); + runtime_profile->add_info_string(vectorized::FileScanner::FileReadTimeProfile, + fmt::to_string(file_read_times_buffer)); } + + runtime_profile->to_proto(pprofile, 2); + + *scan_range_cnt = scan_rows.size(); + return Status::OK(); } diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h index c3cc48db6d4103..08d306f7bb625b 100644 --- a/be/src/exec/rowid_fetcher.h +++ b/be/src/exec/rowid_fetcher.h @@ -86,6 +86,12 @@ struct RowStoreReadStruct { class RowIdStorageReader { public: + //external profile info key. + static const std::string ScannersRunningTimeProfile; + static const std::string InitReaderAvgTimeProfile; + static const std::string GetBlockAvgTimeProfile; + static const std::string FileReadLinesProfile; + static Status read_by_rowids(const PMultiGetRequest& request, PMultiGetResponse* response); static Status read_by_rowids(const PMultiGetRequestV2& request, PMultiGetResponseV2* response); @@ -107,13 +113,19 @@ class RowIdStorageReader { int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* acquire_segments_ms, int64_t* lookup_row_data_ms); - static Status read_batch_external_row(const PRequestBlockDesc& request_block_desc, - std::shared_ptr id_file_map, - std::vector& slots, - std::shared_ptr first_file_mapping, - const TUniqueId& query_id, - vectorized::Block& result_block, int64_t* init_reader_ms, - int64_t* get_block_ms); + static Status read_batch_external_row( + const uint64_t workload_group_id, const PRequestBlockDesc& request_block_desc, + std::shared_ptr id_file_map, std::vector& slots, + std::shared_ptr first_file_mapping, const TUniqueId& query_id, + vectorized::Block& result_block, PRuntimeProfileTree* pprofile, + int64_t* init_reader_avg_ms, int64_t* get_block_avg_ms, size_t* scan_range_cnt); + + struct ExternalFetchStatistics { + int64_t init_reader_ms = 0; + int64_t get_block_ms = 0; + std::string file_read_bytes; + std::string file_read_times; + }; }; template diff --git a/be/src/olap/id_manager.h b/be/src/olap/id_manager.h index 94765e5eab159f..4b9974e0ee3596 100644 --- a/be/src/olap/id_manager.h +++ b/be/src/olap/id_manager.h @@ -197,12 +197,13 @@ class IdFileMap { int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; } - void set_external_scan_params(QueryContext* query_ctx) { + void set_external_scan_params(QueryContext* query_ctx, int max_file_scanners) { std::call_once(once_flag_for_external, [&] { DCHECK(query_ctx != nullptr); _query_global = query_ctx->get_query_globals(); _query_options = query_ctx->get_query_options(); _file_scan_range_params_map = query_ctx->file_scan_range_params_map; + _max_file_scanners = max_file_scanners; }); } @@ -214,6 +215,8 @@ class IdFileMap { return _file_scan_range_params_map; } + int get_max_file_scanners() const { return _max_file_scanners; } + private: std::shared_mutex _mtx; uint32_t _init_id = 0; @@ -225,6 +228,7 @@ class IdFileMap { TQueryOptions _query_options; std::map _file_scan_range_params_map; std::once_flag once_flag_for_external; + int _max_file_scanners = 10; // use in Doris Format to keep temp rowsets, preventing them from being deleted by compaction std::unordered_map, RowsetSharedPtr> _temp_rowset_maps; diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 7f98ad875bbe13..a23e4003d9d5a4 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -21,6 +21,7 @@ #include #include "common/logging.h" +#include "exec/rowid_fetcher.h" #include "pipeline/exec/multi_cast_data_streamer.h" #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_task.h" @@ -28,6 +29,7 @@ #include "runtime/memory/mem_tracker.h" #include "runtime_filter/runtime_filter_consumer.h" #include "util/brpc_client_cache.h" +#include "vec/exec/scan/file_scanner.h" #include "vec/exprs/vectorized_agg_fn.h" #include "vec/exprs/vslot_ref.h" #include "vec/spill/spill_stream_manager.h" @@ -481,6 +483,11 @@ Status MaterializationSharedState::merge_multi_response(vectorized::Block* block DCHECK(rpc_struct.callback->response_->blocks_size() > i); RETURN_IF_ERROR( partial_block.deserialize(rpc_struct.callback->response_->blocks(i).block())); + if (rpc_struct.callback->response_->blocks(i).has_profile()) { + auto response_profile = RuntimeProfile::from_proto( + rpc_struct.callback->response_->blocks(i).profile()); + _update_profile_info(backend_id, response_profile.get()); + } if (!partial_block.is_empty_column()) { _block_maps[backend_id] = std::make_pair(std::move(partial_block), 0); @@ -534,6 +541,34 @@ Status MaterializationSharedState::merge_multi_response(vectorized::Block* block return Status::OK(); } +void MaterializationSharedState::_update_profile_info(int64_t backend_id, + RuntimeProfile* response_profile) { + if (!backend_profile_info_string.contains(backend_id)) { + backend_profile_info_string.emplace(backend_id, + std::map {}); + } + auto& info_map = backend_profile_info_string[backend_id]; + + auto update_profile_info_key = [&](const std::string& info_key) { + const auto* info_value = response_profile->get_info_string(info_key); + if (info_value == nullptr) [[unlikely]] { + LOG(WARNING) << "Get row id fetch rpc profile success, but no info key :" << info_key; + return; + } + if (!info_map.contains(info_key)) { + info_map.emplace(info_key, fmt::memory_buffer {}); + } + fmt::format_to(info_map[info_key], "{}, ", *info_value); + }; + + update_profile_info_key(RowIdStorageReader::ScannersRunningTimeProfile); + update_profile_info_key(RowIdStorageReader::InitReaderAvgTimeProfile); + update_profile_info_key(RowIdStorageReader::GetBlockAvgTimeProfile); + update_profile_info_key(RowIdStorageReader::FileReadLinesProfile); + update_profile_info_key(vectorized::FileScanner::FileReadBytesProfile); + update_profile_info_key(vectorized::FileScanner::FileReadTimeProfile); +} + void MaterializationSharedState::create_counter_dependency(int operator_id, int node_id, const std::string& name) { auto dep = diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 3272c52ab22102..737656b59d6014 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -828,6 +828,10 @@ struct MaterializationSharedState : public BasicSharedState { void create_counter_dependency(int operator_id, int node_id, const std::string& name); +private: + void _update_profile_info(int64_t backend_id, RuntimeProfile* response_profile); + +public: bool rpc_struct_inited = false; AtomicStatus rpc_status; @@ -842,6 +846,8 @@ struct MaterializationSharedState : public BasicSharedState { // Register each line in which block to ensure the order of the result. // Zero means NULL value. std::vector> block_order_results; + // backend id => . + std::map> backend_profile_info_string; }; #include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index cb98d1a9b128d7..658a0c626bfd50 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -39,7 +39,7 @@ Status FileScanLocalState::_init_scanners(std::list* sc auto& id_file_map = state()->get_id_file_map(); if (id_file_map != nullptr) { - id_file_map->set_external_scan_params(state()->get_query_ctx()); + id_file_map->set_external_scan_params(state()->get_query_ctx(), _max_scanners); } auto& p = _parent->cast(); diff --git a/be/src/pipeline/exec/materialization_source_operator.cpp b/be/src/pipeline/exec/materialization_source_operator.cpp index 396ce319fb9054..23d17c0167dff4 100644 --- a/be/src/pipeline/exec/materialization_source_operator.cpp +++ b/be/src/pipeline/exec/materialization_source_operator.cpp @@ -51,6 +51,17 @@ Status MaterializationSourceOperatorX::get_block(RuntimeState* state, vectorized max_rpc_time = std::max(max_rpc_time, rpc_struct.rpc_timer.elapsed_time()); } COUNTER_SET(local_state._max_rpc_timer, (int64_t)max_rpc_time); + + for (const auto& [backend_id, child_info] : + local_state._shared_state->backend_profile_info_string) { + auto child_profile = local_state.operator_profile()->create_child( + "RowIDFetcher: BackendId:" + std::to_string(backend_id)); + for (const auto& [info_key, info_value] : + local_state._shared_state->backend_profile_info_string[backend_id]) { + child_profile->add_info_string(info_key, "{" + fmt::to_string(info_value) + "}"); + } + local_state.operator_profile()->add_child(child_profile, true); + } } return Status::OK(); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index af1670125beb57..ba459b7323bea7 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -132,7 +132,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, - ExecEnv* exec_env) + ExecEnv* exec_env, + const std::shared_ptr& query_mem_tracker) : _profile("PipelineX " + std::to_string(fragment_id)), _load_channel_profile(""), _obj_pool(new ObjectPool()), @@ -150,7 +151,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, _error_row_number(0) { Status status = init(TUniqueId(), query_options, query_globals, exec_env); DCHECK(status.ok()); - init_mem_trackers(""); + _query_mem_tracker = query_mem_tracker; + DCHECK(_query_mem_tracker != nullptr); } RuntimeState::RuntimeState(const TQueryGlobals& query_globals) diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index cf9333ff980955..38331732e2c852 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -97,7 +97,8 @@ class RuntimeState { // Only used in the materialization phase of delayed materialization, // when there may be no corresponding QueryContext. RuntimeState(const TUniqueId& query_id, int32_t fragment_id, const TQueryOptions& query_options, - const TQueryGlobals& query_globals, ExecEnv* exec_env); + const TQueryGlobals& query_globals, ExecEnv* exec_env, + const std::shared_ptr& query_mem_tracker); // RuntimeState for executing expr in fe-support. RuntimeState(const TQueryGlobals& query_globals); diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 97920f77eebf63..cc663cd3f50ca8 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -58,6 +58,17 @@ std::unique_ptr RuntimeProfile::from_thrift(const TRuntimeProfil return res; } +std::unique_ptr RuntimeProfile::from_proto(const PRuntimeProfileTree& tree) { + if (tree.nodes().empty()) { + return std::make_unique(""); + } + + const PRuntimeProfileNode& root_node = tree.nodes(0); + std::unique_ptr res = std::make_unique(root_node.name()); + res->update(tree); + return res; +} + RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile) : _pool(new ObjectPool()), _name(name), @@ -151,6 +162,12 @@ void RuntimeProfile::update(const TRuntimeProfileTree& thrift_profile) { DCHECK_EQ(idx, thrift_profile.nodes.size()); } +void RuntimeProfile::update(const PRuntimeProfileTree& proto_profile) { + int idx = 0; + update(proto_profile.nodes(), &idx); + DCHECK_EQ(idx, proto_profile.nodes_size()); +} + void RuntimeProfile::update(const std::vector& nodes, int* idx) { DCHECK_LT(*idx, nodes.size()); const TRuntimeProfileNode& node = nodes[*idx]; @@ -233,6 +250,82 @@ void RuntimeProfile::update(const std::vector& nodes, int* } } +void RuntimeProfile::update(const google::protobuf::RepeatedPtrField& nodes, + int* idx) { + DCHECK_LT(*idx, nodes.size()); + const PRuntimeProfileNode& node = nodes.Get(*idx); + + { + std::lock_guard l(_counter_map_lock); + + for (const auto& pcounter : node.counters()) { + const std::string& name = pcounter.name(); + auto j = _counter_map.find(name); + + if (j == _counter_map.end()) { + _counter_map[name] = + _pool->add(new Counter(unit_to_thrift(pcounter.type()), pcounter.value())); + } else { + if (unit_to_proto(j->second->type()) != pcounter.type()) { + LOG(ERROR) << "Cannot update counters with the same name (" << name + << ") but different types."; + } else { + j->second->set(pcounter.value()); + } + } + } + + for (const auto& kv : node.child_counters_map()) { + std::set* child_counters = + find_or_insert(&_child_counter_map, kv.first, std::set()); + for (const auto& child_name : kv.second.child_counters()) { + child_counters->insert(child_name); + } + } + } + + { + std::lock_guard l(_info_strings_lock); + const auto& info_map = node.info_strings(); + + for (const std::string& key : node.info_strings_display_order()) { + auto it = info_map.find(key); + DCHECK(it != info_map.end()); + + auto existing = _info_strings.find(key); + if (existing == _info_strings.end()) { + _info_strings.insert(std::make_pair(key, it->second)); + _info_strings_display_order.push_back(key); + } else { + _info_strings[key] = it->second; + } + } + } + + ++*idx; + + { + std::lock_guard l(_children_lock); + for (int i = 0; i < node.num_children(); ++i) { + const PRuntimeProfileNode& pchild = nodes.Get(*idx); + RuntimeProfile* child = nullptr; + + auto j = _child_map.find(pchild.name()); + if (j != _child_map.end()) { + child = j->second; + } else { + child = _pool->add(new RuntimeProfile(pchild.name())); + child->_metadata = pchild.metadata(); + child->_timestamp = pchild.timestamp(); + _child_map[pchild.name()] = child; + _children.emplace_back(child, pchild.indent()); + } + + child->update(nodes, idx); + } + } +} + void RuntimeProfile::divide(int n) { DCHECK_GT(n, 0); std::map::iterator iter; @@ -626,6 +719,53 @@ void RuntimeProfile::to_thrift(std::vector* nodes, int64_t } } +void RuntimeProfile::to_proto(PRuntimeProfileTree* tree, int64_t profile_level) { + tree->clear_nodes(); + to_proto(tree->mutable_nodes(), profile_level); +} + +void RuntimeProfile::to_proto(google::protobuf::RepeatedPtrField* nodes, + int64_t profile_level) { + PRuntimeProfileNode* node = nodes->Add(); // allocate new node + node->set_name(_name); + node->set_metadata(_metadata); + node->set_timestamp(_timestamp); + node->set_indent(true); + + { + std::lock_guard l(_counter_map_lock); + RuntimeProfileCounterTreeNode counter_tree = RuntimeProfileCounterTreeNode::from_map( + _counter_map, _child_counter_map, ROOT_COUNTER); + counter_tree = RuntimeProfileCounterTreeNode::prune_the_tree(counter_tree, profile_level); + counter_tree.to_proto(node->mutable_counters(), node->mutable_child_counters_map()); + } + + { + std::lock_guard l(_info_strings_lock); + auto* info_map = node->mutable_info_strings(); + for (const auto& kv : _info_strings) { + (*info_map)[kv.first] = kv.second; + } + for (const auto& key : _info_strings_display_order) { + node->add_info_strings_display_order(key); + } + } + + ChildVector children; + { + std::lock_guard l(_children_lock); + children = _children; + } + + node->set_num_children(children.size()); + + for (const auto& child : children) { + int child_index = nodes->size(); // capture index for indent correction + child.first->to_proto(nodes, profile_level); + (*nodes)[child_index].set_indent(child.second); + } +} + int64_t RuntimeProfile::units_per_second(const RuntimeProfile::Counter* total_counter, const RuntimeProfile::Counter* timer) { DCHECK(total_counter->type() == TUnit::BYTES || total_counter->type() == TUnit::UNIT); diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 14d14a1195d3e6..6e574403a090e7 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -99,6 +100,86 @@ class RuntimeProfile { public: static std::unique_ptr from_thrift(const TRuntimeProfileTree& node); + static std::unique_ptr from_proto(const PRuntimeProfileTree& tree); + + static PProfileUnit unit_to_proto(const TUnit::type& type) { + switch (type) { + case TUnit::UNIT: { + return PProfileUnit::UNIT; + } + case TUnit::UNIT_PER_SECOND: { + return PProfileUnit::UNIT_PER_SECOND; + } + case TUnit::CPU_TICKS: { + return PProfileUnit::CPU_TICKS; + } + case TUnit::BYTES: { + return PProfileUnit::BYTES; + } + case TUnit::BYTES_PER_SECOND: { + return PProfileUnit::BYTES_PER_SECOND; + } + case TUnit::TIME_NS: { + return PProfileUnit::TIME_NS; + } + case TUnit::DOUBLE_VALUE: { + return PProfileUnit::DOUBLE_VALUE; + } + case TUnit::NONE: { + return PProfileUnit::NONE; + } + case TUnit::TIME_MS: { + return PProfileUnit::TIME_MS; + } + case TUnit::TIME_S: { + return PProfileUnit::TIME_S; + } + default: { + DCHECK(false); + return PProfileUnit::NONE; + } + } + } + + static TUnit::type unit_to_thrift(const PProfileUnit& unit) { + switch (unit) { + case PProfileUnit::UNIT: { + return TUnit::UNIT; + } + case PProfileUnit::UNIT_PER_SECOND: { + return TUnit::UNIT_PER_SECOND; + } + case PProfileUnit::CPU_TICKS: { + return TUnit::CPU_TICKS; + } + case PProfileUnit::BYTES: { + return TUnit::BYTES; + } + case PProfileUnit::BYTES_PER_SECOND: { + return TUnit::BYTES_PER_SECOND; + } + case PProfileUnit::TIME_NS: { + return TUnit::TIME_NS; + } + case PProfileUnit::DOUBLE_VALUE: { + return TUnit::DOUBLE_VALUE; + } + case PProfileUnit::NONE: { + return TUnit::NONE; + } + case PProfileUnit::TIME_MS: { + return TUnit::TIME_MS; + } + case PProfileUnit::TIME_S: { + return TUnit::TIME_S; + } + default: { + DCHECK(false); + return TUnit::NONE; + } + } + } + // The root counter name for all top level counters. static const std::string ROOT_COUNTER; class Counter { @@ -135,6 +216,15 @@ class RuntimeProfile { return counter; } + virtual PProfileCounter to_proto(const std::string& name) const { + PProfileCounter counter; + counter.set_name(name); + counter.set_value(this->value()); + counter.set_type(unit_to_proto(this->type())); + counter.set_level(this->value()); + return counter; + } + virtual void pretty_print(std::ostream* s, const std::string& prefix, const std::string& name) const { std::ostream& stream = *s; @@ -192,6 +282,15 @@ class RuntimeProfile { return counter; } + PProfileCounter to_proto(const std::string& name) const override { + PProfileCounter counter; + counter.set_name(name); + counter.set_value(current_value()); + counter.set_type(unit_to_proto(this->type())); + counter.set_level(this->value()); + return counter; + } + TCounter to_thrift_peak(std::string name) { TCounter counter; counter.name = std::move(name); @@ -201,6 +300,10 @@ class RuntimeProfile { return counter; } + PProfileCounter to_proto_peak(const std::string& name) const { + return Counter::to_proto(name); + } + virtual void pretty_print(std::ostream* s, const std::string& prefix, const std::string& name) const override { std::ostream& stream = *s; @@ -364,6 +467,14 @@ class RuntimeProfile { return counter; } + PProfileCounter to_proto(const std::string& name) const override { + PProfileCounter counter; + counter.set_name(name); + counter.set_level(2); + counter.set_description(_description); + return counter; + } + private: const std::string _description; const std::string _name; @@ -401,6 +512,9 @@ class RuntimeProfile { // the key has already been registered. void update(const TRuntimeProfileTree& thrift_profile); + //Similar to `void update(const TRuntimeProfileTree& thrift_profile)` + void update(const PRuntimeProfileTree& proto_profile); + // Add a counter with 'name'/'type'. Returns a counter object that the caller can // update. The counter is owned by the RuntimeProfile object. // If parent_counter_name is a non-empty string, the counter is added as a child of @@ -476,6 +590,11 @@ class RuntimeProfile { void to_thrift(TRuntimeProfileTree* tree, int64_t profile_level = 2); void to_thrift(std::vector* nodes, int64_t profile_level = 2); + // Similar to `to_thrift`. + void to_proto(PRuntimeProfileTree* tree, int64_t profile_level = 2); + void to_proto(google::protobuf::RepeatedPtrField* nodes, + int64_t profile_level = 2); + // Divides all counters by n void divide(int n); @@ -594,6 +713,9 @@ class RuntimeProfile { // On return, *idx points to the node immediately following this subtree. void update(const std::vector& nodes, int* idx); + // Similar to `void update(const std::vector& nodes, int* idx)` + void update(const google::protobuf::RepeatedPtrField& nodes, int* idx); + // Helper function to compute compute the fraction of the total time spent in // this profile and its children. // Called recursively. diff --git a/be/src/util/runtime_profile_counter_tree_node.cpp b/be/src/util/runtime_profile_counter_tree_node.cpp index bff458af1390d7..cbbb02bdadfd5b 100644 --- a/be/src/util/runtime_profile_counter_tree_node.cpp +++ b/be/src/util/runtime_profile_counter_tree_node.cpp @@ -98,6 +98,43 @@ void RuntimeProfileCounterTreeNode::to_thrift( } } +void RuntimeProfileCounterTreeNode::to_proto( + google::protobuf::RepeatedPtrField* proto_counters, + google::protobuf::Map* child_counter_map) const { + if (name != RuntimeProfile::ROOT_COUNTER && counter != nullptr) { + if (auto* highWaterMarkCounter = + dynamic_cast(counter)) { + // Convert both current and peak values + *proto_counters->Add() = highWaterMarkCounter->to_proto(name); + *proto_counters->Add() = highWaterMarkCounter->to_proto_peak(name + "Peak"); + + (*(*child_counter_map)[highWaterMarkCounter->parent_name()].mutable_child_counters()) + .Add(name + "Peak"); + + } else if (auto* nonZeroCounter = dynamic_cast(counter)) { + if (nonZeroCounter->value() > 0) { + *proto_counters->Add() = to_proto(); + } else { + // Skip zero-valued counter and remove from parent's child map + auto it = child_counter_map->find(nonZeroCounter->parent_name()); + if (it != child_counter_map->end()) { + auto* set = it->second.mutable_child_counters(); + auto remove_it = std::find(set->begin(), set->end(), name); + if (remove_it != set->end()) set->erase(remove_it); + } + return; + } + } else { + *proto_counters->Add() = to_proto(); + } + } + + for (const auto& child : children) { + (*child_counter_map)[name].add_child_counters(child.name); + child.to_proto(proto_counters, child_counter_map); + } +} + TCounter RuntimeProfileCounterTreeNode::to_thrift() const { TCounter tcounter; if (counter != nullptr) { @@ -107,4 +144,17 @@ TCounter RuntimeProfileCounterTreeNode::to_thrift() const { } return tcounter; } + +PProfileCounter RuntimeProfileCounterTreeNode::to_proto() const { + PProfileCounter pcounter; + + if (counter != nullptr) { + pcounter = counter->to_proto(name); + } else { + pcounter.set_name(name); + } + + return pcounter; +} + } // namespace doris diff --git a/be/src/util/runtime_profile_counter_tree_node.h b/be/src/util/runtime_profile_counter_tree_node.h index 7963febe457c05..85284797cb6452 100644 --- a/be/src/util/runtime_profile_counter_tree_node.h +++ b/be/src/util/runtime_profile_counter_tree_node.h @@ -50,9 +50,15 @@ class RuntimeProfileCounterTreeNode { void to_thrift(std::vector& tcounter, std::map>& child_counter_map) const; + void to_proto( + google::protobuf::RepeatedPtrField* proto_counters, + google::protobuf::Map* child_counter_map) const; + // Conver this node to a TCounter object. TCounter to_thrift() const; + PProfileCounter to_proto() const; + private: std::string name; // counter is not owned by this class diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 1dd7e7cd317b49..f760f40b3f5a69 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -26,6 +26,7 @@ #include #include + // IWYU pragma: no_include #include // IWYU pragma: keep #include @@ -77,6 +78,7 @@ #include "vec/data_types/data_type_struct.h" #include "vec/exec/format/orc/orc_file_reader.h" #include "vec/exec/format/table/transactional_hive_common.h" +#include "vec/exec/scan/file_scanner.h" #include "vec/exprs/vbloom_predicate.h" #include "vec/exprs/vdirect_in_predicate.h" #include "vec/exprs/vectorized_fn_call.h" @@ -246,10 +248,11 @@ void OrcReader::_init_profile() { if (_profile != nullptr) { static const char* orc_profile = "OrcReader"; ADD_TIMER_WITH_LEVEL(_profile, orc_profile, 1); - _orc_profile.read_time = ADD_TIMER_WITH_LEVEL(_profile, "FileReadTime", 1); + _orc_profile.read_time = + ADD_TIMER_WITH_LEVEL(_profile, FileScanner::FileReadTimeProfile, 1); _orc_profile.read_calls = ADD_COUNTER_WITH_LEVEL(_profile, "FileReadCalls", TUnit::UNIT, 1); - _orc_profile.read_bytes = - ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes", TUnit::BYTES, 1); + _orc_profile.read_bytes = ADD_COUNTER_WITH_LEVEL( + _profile, FileScanner::FileReadBytesProfile, TUnit::BYTES, 1); _orc_profile.column_read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime", orc_profile, 1); _orc_profile.get_batch_time = @@ -1169,44 +1172,47 @@ Status OrcReader::set_fill_columns( _row_reader_options.include(_read_cols); _row_reader_options.setEnableLazyDecoding(true); - uint64_t number_of_stripes = _reader->getNumberOfStripes(); - auto all_stripes_needed = _reader->getNeedReadStripes(_row_reader_options); + //orc reader should not use the tiny stripe optimization when reading by row id. + if (!_read_line_mode_mode) { + uint64_t number_of_stripes = _reader->getNumberOfStripes(); + auto all_stripes_needed = _reader->getNeedReadStripes(_row_reader_options); - int64_t range_end_offset = _range_start_offset + _range_size; + int64_t range_end_offset = _range_start_offset + _range_size; - bool all_tiny_stripes = true; - std::vector tiny_stripe_ranges; + bool all_tiny_stripes = true; + std::vector tiny_stripe_ranges; - for (uint64_t i = 0; i < number_of_stripes; i++) { - std::unique_ptr strip_info = _reader->getStripe(i); - uint64_t strip_start_offset = strip_info->getOffset(); - uint64_t strip_end_offset = strip_start_offset + strip_info->getLength(); + for (uint64_t i = 0; i < number_of_stripes; i++) { + std::unique_ptr strip_info = _reader->getStripe(i); + uint64_t strip_start_offset = strip_info->getOffset(); + uint64_t strip_end_offset = strip_start_offset + strip_info->getLength(); - if (strip_start_offset >= range_end_offset || strip_end_offset < _range_start_offset || - !all_stripes_needed[i]) { - continue; - } - if (strip_info->getLength() > _orc_tiny_stripe_threshold_bytes) { - all_tiny_stripes = false; - break; - } + if (strip_start_offset >= range_end_offset || + strip_end_offset < _range_start_offset || !all_stripes_needed[i]) { + continue; + } + if (strip_info->getLength() > _orc_tiny_stripe_threshold_bytes) { + all_tiny_stripes = false; + break; + } - tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset); - } - if (all_tiny_stripes && number_of_stripes > 0) { - std::vector prefetch_merge_ranges = - io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges, - _orc_max_merge_distance_bytes, - _orc_once_max_read_bytes); - auto range_finder = - std::make_shared(std::move(prefetch_merge_ranges)); + tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset); + } + if (all_tiny_stripes && number_of_stripes > 0) { + std::vector prefetch_merge_ranges = + io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges, + _orc_max_merge_distance_bytes, + _orc_once_max_read_bytes); + auto range_finder = std::make_shared( + std::move(prefetch_merge_ranges)); - auto* orc_input_stream_ptr = static_cast(_reader->getStream()); - orc_input_stream_ptr->set_all_tiny_stripes(); - auto& orc_file_reader = orc_input_stream_ptr->get_file_reader(); - auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader(); - orc_file_reader = std::make_shared(_profile, orc_inner_reader, - range_finder); + auto* orc_input_stream_ptr = static_cast(_reader->getStream()); + orc_input_stream_ptr->set_all_tiny_stripes(); + auto& orc_file_reader = orc_input_stream_ptr->get_file_reader(); + auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader(); + orc_file_reader = std::make_shared( + _profile, orc_inner_reader, range_finder); + } } if (!_lazy_read_ctx.can_lazy_read) { diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 1febad1e0f1710..4b7fa620428b7f 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -74,6 +74,7 @@ struct IOContext; namespace doris::vectorized { #include "common/compile_check_begin.h" const std::vector RowGroupReader::NO_DELETE = {}; +static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits::max(); RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, const std::vector& read_columns, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 35bf6970d6f631..d54f2e1d233191 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -62,7 +62,6 @@ class RowGroup; namespace doris::vectorized { #include "common/compile_check_begin.h" // TODO: we need to determine it by test. -static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits::max(); class RowGroupReader : public ProfileCollector { public: diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 1ce9761e8234d9..eb06bc8a84c9f5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -49,6 +49,7 @@ #include "vec/exec/format/parquet/vparquet_file_metadata.h" #include "vec/exec/format/parquet/vparquet_group_reader.h" #include "vec/exec/format/parquet/vparquet_page_index.h" +#include "vec/exec/scan/file_scanner.h" #include "vec/exprs/vbloom_predicate.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" @@ -158,13 +159,14 @@ void ParquetReader::_init_profile() { _parquet_profile.row_group_filter_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RowGroupFilterTime", parquet_profile, 1); - _parquet_profile.file_read_time = ADD_TIMER_WITH_LEVEL(_profile, "FileReadTime", 1); + _parquet_profile.file_read_time = + ADD_TIMER_WITH_LEVEL(_profile, FileScanner::FileReadTimeProfile, 1); _parquet_profile.file_read_calls = ADD_COUNTER_WITH_LEVEL(_profile, "FileReadCalls", TUnit::UNIT, 1); _parquet_profile.file_meta_read_calls = ADD_COUNTER_WITH_LEVEL(_profile, "FileMetaReadCalls", TUnit::UNIT, 1); - _parquet_profile.file_read_bytes = - ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes", TUnit::BYTES, 1); + _parquet_profile.file_read_bytes = ADD_COUNTER_WITH_LEVEL( + _profile, FileScanner::FileReadBytesProfile, TUnit::BYTES, 1); _parquet_profile.decompress_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime", parquet_profile, 1); _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL( diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 0eef649e112178..0fe83406b48cee 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -94,6 +94,9 @@ class ShardedKVCache; namespace doris::vectorized { using namespace ErrorCode; +const std::string FileScanner::FileReadBytesProfile = "FileReadBytes"; +const std::string FileScanner::FileReadTimeProfile = "FileReadTime"; + FileScanner::FileScanner( RuntimeState* state, pipeline::FileScanLocalState* local_state, int64_t limit, std::shared_ptr split_source, RuntimeProfile* profile, @@ -1258,7 +1261,7 @@ Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) { return Status::OK(); } -Status FileScanner::prepare_for_read_one_line(const TFileRangeDesc& range) { +Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) { _current_range = range; RETURN_IF_ERROR(_init_io_ctx()); @@ -1275,10 +1278,10 @@ Status FileScanner::prepare_for_read_one_line(const TFileRangeDesc& range) { return Status::OK(); } -Status FileScanner::read_one_line_from_range(const TFileRangeDesc& range, - const segment_v2::rowid_t rowid, Block* result_block, - const ExternalFileMappingInfo& external_info, - int64_t* init_reader_ms, int64_t* get_block_ms) { +Status FileScanner::read_lines_from_range(const TFileRangeDesc& range, + const std::list& row_ids, Block* result_block, + const ExternalFileMappingInfo& external_info, + int64_t* init_reader_ms, int64_t* get_block_ms) { _current_range = range; RETURN_IF_ERROR(_generate_parititon_columns()); @@ -1298,7 +1301,8 @@ Status FileScanner::read_one_line_from_range(const TFileRangeDesc& range, ? ExecEnv::GetInstance()->file_meta_cache() : nullptr, false); - RETURN_IF_ERROR(parquet_reader->set_read_lines_mode({rowid})); + + RETURN_IF_ERROR(parquet_reader->set_read_lines_mode(row_ids)); RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader))); break; } @@ -1308,7 +1312,7 @@ Status FileScanner::read_one_line_from_range(const TFileRangeDesc& range, 1, _state->timezone(), _io_ctx.get(), false); - RETURN_IF_ERROR(orc_reader->set_read_lines_mode({rowid})); + RETURN_IF_ERROR(orc_reader->set_read_lines_mode(row_ids)); RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader))); break; } @@ -1328,11 +1332,15 @@ Status FileScanner::read_one_line_from_range(const TFileRangeDesc& range, RETURN_IF_ERROR(scope_timer_run( [&]() -> Status { - bool eof = false; - return _get_block_impl(_state, result_block, &eof); + while (!_cur_reader_eof) { + bool eof = false; + RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof)); + } + return Status::OK(); }, get_block_ms)); + _cur_reader->collect_profile_before_close(); RETURN_IF_ERROR(_cur_reader->close()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/file_scanner.h b/be/src/vec/exec/scan/file_scanner.h index caa83e45b727f5..f03aaae2724dc6 100644 --- a/be/src/vec/exec/scan/file_scanner.h +++ b/be/src/vec/exec/scan/file_scanner.h @@ -62,6 +62,10 @@ class FileScanner : public Scanner { public: static constexpr const char* NAME = "FileScanner"; + // sub profile name (for parquet/orc) + static const std::string FileReadBytesProfile; + static const std::string FileReadTimeProfile; + FileScanner(RuntimeState* state, pipeline::FileScanLocalState* parent, int64_t limit, std::shared_ptr split_source, RuntimeProfile* profile, ShardedKVCache* kv_cache, @@ -89,12 +93,11 @@ class FileScanner : public Scanner { _col_name_to_slot_id(colname_to_slot_id), _real_tuple_desc(tuple_desc) {}; - Status read_one_line_from_range(const TFileRangeDesc& range, const segment_v2::rowid_t rowid, - Block* result_block, - const ExternalFileMappingInfo& external_info, - int64_t* init_reader_ms, int64_t* get_block_ms); + Status read_lines_from_range(const TFileRangeDesc& range, const std::list& row_ids, + Block* result_block, const ExternalFileMappingInfo& external_info, + int64_t* init_reader_ms, int64_t* get_block_ms); - Status prepare_for_read_one_line(const TFileRangeDesc& range); + Status prepare_for_read_lines(const TFileRangeDesc& range); protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; @@ -119,7 +122,7 @@ class FileScanner : public Scanner { TFileRangeDesc _current_range; std::unique_ptr _cur_reader; - bool _cur_reader_eof; + bool _cur_reader_eof = false; const std::unordered_map* _colname_to_value_range = nullptr; // File source slot descriptors std::vector _file_slot_descs; diff --git a/be/test/util/runtime_profile_test.cpp b/be/test/util/runtime_profile_test.cpp index 8c36c2cc33f316..9dce62a2a233cd 100644 --- a/be/test/util/runtime_profile_test.cpp +++ b/be/test/util/runtime_profile_test.cpp @@ -102,6 +102,82 @@ TEST(RuntimeProfileTest, Basic) { EXPECT_EQ(counter_updated->value(), 1); } +TEST(RuntimeProfileTest, ProtoBasic) { + RuntimeProfile profile_a("ProfileA"); + RuntimeProfile profile_a1("ProfileA1"); + RuntimeProfile profile_a2("ProfileA2"); + + profile_a.add_child(&profile_a1, true); + profile_a.add_child(&profile_a2, true); + + PRuntimeProfileTree proto_profile; + + // Test Empty serialization + profile_a.to_proto(&proto_profile); + EXPECT_EQ(proto_profile.nodes_size(), 3); + proto_profile.clear_nodes(); + + RuntimeProfile::Counter* counter_a; + RuntimeProfile::Counter* counter_b; + RuntimeProfile::Counter* counter_merged; + + // Updating/setting counter + counter_a = profile_a.add_counter("A", TUnit::UNIT); + EXPECT_TRUE(counter_a != nullptr); + counter_a->update(10); + counter_a->update(-5); + EXPECT_EQ(counter_a->value(), 5); + counter_a->set(1L); + EXPECT_EQ(counter_a->value(), 1); + + counter_b = profile_a2.add_counter("B", TUnit::BYTES); + EXPECT_TRUE(counter_b != nullptr); + + std::stringstream ss; + // Serialize to proto + profile_a.to_proto(&proto_profile, 2); + profile_a.pretty_print(&ss, " ", 4); + std::cout << "Profile A:\n" << ss.str() << std::endl; + ss.str(""); + ss.clear(); + + ASSERT_EQ(proto_profile.nodes_size(), 3); + + // Deserialize from proto + std::unique_ptr from_proto = RuntimeProfile::from_proto(proto_profile); + from_proto->pretty_print(&ss, "", 4); + std::cout << "From proto profile:\n" << ss.str() << std::endl; + ss.str(""); + ss.clear(); + + counter_merged = from_proto->get_counter("A"); + ASSERT_NE(counter_merged, nullptr); + EXPECT_EQ(counter_merged->value(), 1); + EXPECT_TRUE(from_proto->get_counter("Not there") == nullptr); + + // merge + RuntimeProfile merged_profile("Merged"); + merged_profile.merge(from_proto.get()); + counter_merged = merged_profile.get_counter("A"); + EXPECT_EQ(counter_merged->value(), 1); + + // merge 2 more times, counters should get aggregated + merged_profile.merge(from_proto.get()); + merged_profile.merge(from_proto.get()); + EXPECT_EQ(counter_merged->value(), 3); + + // update + RuntimeProfile updated_profile("updated"); + updated_profile.update(proto_profile); + RuntimeProfile::Counter* counter_updated = updated_profile.get_counter("A"); + EXPECT_EQ(counter_updated->value(), 1); + + // update 2 more times, counters should stay the same + updated_profile.update(proto_profile); + updated_profile.update(proto_profile); + EXPECT_EQ(counter_updated->value(), 1); +} + void ValidateCounter(RuntimeProfile* profile, const string& name, int64_t value) { RuntimeProfile::Counter* counter = profile->get_counter(name); ASSERT_TRUE(counter != nullptr); @@ -225,6 +301,118 @@ TEST(RuntimeProfileTest, MergeAndupdate) { profile2.pretty_print(&dummy); } +TEST(RuntimeProfileTest, ProtoMergeAndUpdate) { + ObjectPool pool; + RuntimeProfile profile1("Parent1"); + RuntimeProfile p1_child1("Child1"); + RuntimeProfile p1_child2("Child2"); + profile1.add_child(&p1_child1, true); + profile1.add_child(&p1_child2, true); + + RuntimeProfile profile2("Parent2"); + RuntimeProfile p2_child1("Child1"); + RuntimeProfile p2_child3("Child3"); + profile2.add_child(&p2_child1, true); + profile2.add_child(&p2_child3, true); + + // Create parent-level counters + RuntimeProfile::Counter* parent1_shared = profile1.add_counter("Parent Shared", TUnit::UNIT); + RuntimeProfile::Counter* parent2_shared = profile2.add_counter("Parent Shared", TUnit::UNIT); + RuntimeProfile::Counter* parent1_only = profile1.add_counter("Parent 1 Only", TUnit::UNIT); + RuntimeProfile::Counter* parent2_only = profile2.add_counter("Parent 2 Only", TUnit::UNIT); + parent1_shared->update(1); + parent2_shared->update(3); + parent1_only->update(2); + parent2_only->update(5); + + // Create child-level counters + RuntimeProfile::Counter* p1_c1_shared = p1_child1.add_counter("Child1 Shared", TUnit::UNIT); + RuntimeProfile::Counter* p1_c1_only = + p1_child1.add_counter("Child1 Parent 1 Only", TUnit::UNIT); + RuntimeProfile::Counter* p1_c2 = p1_child2.add_counter("Child2", TUnit::UNIT); + + RuntimeProfile::Counter* p2_c1_shared = p2_child1.add_counter("Child1 Shared", TUnit::UNIT); + RuntimeProfile::Counter* p2_c1_only = + p1_child1.add_counter("Child1 Parent 2 Only", TUnit::UNIT); + RuntimeProfile::Counter* p2_c3 = p2_child3.add_counter("Child3", TUnit::UNIT); + + p1_c1_shared->update(10); + p1_c1_only->update(50); + p2_c1_shared->update(20); + p2_c1_only->update(100); + p2_c3->update(30); + p1_c2->update(40); + + // Serialize profile1 to proto + PRuntimeProfileTree proto_profile1; + profile1.to_proto(&proto_profile1, 2); + + // Deserialize from proto and merge with profile2 + std::unique_ptr merged_profile = RuntimeProfile::from_proto(proto_profile1); + merged_profile->merge(&profile2); + + std::stringstream ss; + merged_profile->pretty_print(&ss); + std::cout << "Merged profile:\n" << ss.str() << std::endl; + + EXPECT_EQ(4, merged_profile->num_counters()); + ValidateCounter(merged_profile.get(), "Parent Shared", 4); + ValidateCounter(merged_profile.get(), "Parent 1 Only", 2); + ValidateCounter(merged_profile.get(), "Parent 2 Only", 5); + + std::vector children; + merged_profile->get_children(&children); + EXPECT_EQ(children.size(), 3); + + for (RuntimeProfile* profile : children) { + if (profile->name() == "Child1") { + EXPECT_EQ(4, profile->num_counters()); + ValidateCounter(profile, "Child1 Shared", 30); + ValidateCounter(profile, "Child1 Parent 1 Only", 50); + ValidateCounter(profile, "Child1 Parent 2 Only", 100); + } else if (profile->name() == "Child2") { + EXPECT_EQ(2, profile->num_counters()); + ValidateCounter(profile, "Child2", 40); + } else if (profile->name() == "Child3") { + EXPECT_EQ(2, profile->num_counters()); + ValidateCounter(profile, "Child3", 30); + } else { + FAIL() << "Unexpected child profile name: " << profile->name(); + } + } + + // Test update: update profile2 with profile1's proto tree + profile2.update(proto_profile1); + EXPECT_EQ(4, profile2.num_counters()); + ValidateCounter(&profile2, "Parent Shared", 1); + ValidateCounter(&profile2, "Parent 1 Only", 2); + ValidateCounter(&profile2, "Parent 2 Only", 5); + + profile2.get_children(&children); + EXPECT_EQ(children.size(), 3); + + for (RuntimeProfile* profile : children) { + if (profile->name() == "Child1") { + EXPECT_EQ(4, profile->num_counters()); + ValidateCounter(profile, "Child1 Shared", 10); + ValidateCounter(profile, "Child1 Parent 1 Only", 50); + ValidateCounter(profile, "Child1 Parent 2 Only", 100); + } else if (profile->name() == "Child2") { + EXPECT_EQ(2, profile->num_counters()); + ValidateCounter(profile, "Child2", 40); + } else if (profile->name() == "Child3") { + EXPECT_EQ(2, profile->num_counters()); + ValidateCounter(profile, "Child3", 30); + } else { + FAIL() << "Unexpected child profile name: " << profile->name(); + } + } + + // Ensure pretty_print doesn't crash + std::stringstream dummy; + profile2.pretty_print(&dummy); +} + TEST(RuntimeProfileTest, DerivedCounters) { ObjectPool pool; RuntimeProfile profile("Profile"); @@ -288,6 +476,47 @@ TEST(RuntimeProfileTest, InfoStringTest) { EXPECT_EQ(*update_dst_profile.get_info_string("Foo"), "Bar"); } +TEST(RuntimeProfileTest, ProtoInfoStringTest) { + ObjectPool pool; + RuntimeProfile profile("Profile"); + + EXPECT_TRUE(profile.get_info_string("Key") == nullptr); + + profile.add_info_string("Key", "Value"); + const std::string* value = profile.get_info_string("Key"); + EXPECT_TRUE(value != nullptr); + EXPECT_EQ(*value, "Value"); + + // Convert it to proto + PRuntimeProfileTree pprofile; + profile.to_proto(&pprofile); + + // Convert it back from proto + std::unique_ptr from_proto = RuntimeProfile::from_proto(pprofile); + value = from_proto->get_info_string("Key"); + EXPECT_TRUE(value != nullptr); + EXPECT_EQ(*value, "Value"); + + // Test update + RuntimeProfile update_dst_profile("Profile2"); + update_dst_profile.update(pprofile); + value = update_dst_profile.get_info_string("Key"); + EXPECT_TRUE(value != nullptr); + EXPECT_EQ(*value, "Value"); + + // Modify original profile, convert again, and update dst + profile.add_info_string("Key", "NewValue"); + profile.add_info_string("Foo", "Bar"); + EXPECT_EQ(*profile.get_info_string("Key"), "NewValue"); + EXPECT_EQ(*profile.get_info_string("Foo"), "Bar"); + + profile.to_proto(&pprofile); + update_dst_profile.update(pprofile); + + EXPECT_EQ(*update_dst_profile.get_info_string("Key"), "NewValue"); + EXPECT_EQ(*update_dst_profile.get_info_string("Foo"), "Bar"); +} + // This case could be added back when we fixed the issue. // TEST(RuntimeProfileTest, AddSameCounter) { // RuntimeProfile profile("Profile"); diff --git a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp index 8a90415b78bb47..2008bf257a05b7 100644 --- a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp +++ b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp @@ -212,15 +212,15 @@ static void read_parquet_lines(std::vector numeric_types, auto vf = FileScanner::create_unique(&runtime_state, runtime_profile.get(), &scan_params, &colname_to_slot_id, tuple_desc); - EXPECT_TRUE(vf->prepare_for_read_one_line(scan_range).ok()); + EXPECT_TRUE(vf->prepare_for_read_lines(scan_range).ok()); ExternalFileMappingInfo external_info(0, scan_range, false); int64_t init_reader_ms = 0; int64_t get_block_ms = 0; auto read_lines_tmp2 = read_lines; while (!read_lines_tmp2.empty()) { - auto st = vf->read_one_line_from_range(scan_range, read_lines_tmp2.front(), block.get(), - external_info, &init_reader_ms, &get_block_ms); + auto st = vf->read_lines_from_range(scan_range, {read_lines_tmp2.front()}, block.get(), + external_info, &init_reader_ms, &get_block_ms); std::cout << st.to_string() << "\n"; EXPECT_TRUE(st.ok()); diff --git a/be/test/vec/exec/orc/orc_read_lines.cpp b/be/test/vec/exec/orc/orc_read_lines.cpp index 9dac42b4d9bcd4..bb149528d13d60 100644 --- a/be/test/vec/exec/orc/orc_read_lines.cpp +++ b/be/test/vec/exec/orc/orc_read_lines.cpp @@ -184,12 +184,12 @@ static void read_orc_line(int64_t line, std::string block_dump) { auto vf = FileScanner::create_unique(runtime_state.get(), runtime_profile.get(), ¶ms, &colname_to_slot_id, tuple_desc); - EXPECT_TRUE(vf->prepare_for_read_one_line(range).ok()); + EXPECT_TRUE(vf->prepare_for_read_lines(range).ok()); ExternalFileMappingInfo external_info(0, range, false); int64_t init_reader_ms = 0; int64_t get_block_ms = 0; - auto st = vf->read_one_line_from_range(range, line, block.get(), external_info, &init_reader_ms, - &get_block_ms); + auto st = vf->read_lines_from_range(range, {line}, block.get(), external_info, &init_reader_ms, + &get_block_ms); EXPECT_TRUE(st.ok()); EXPECT_EQ(block->dump_data(1), block_dump); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 2b5d935d7f3272..ec408b682020ba 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -26,6 +26,7 @@ import "descriptors.proto"; import "types.proto"; import "olap_common.proto"; import "olap_file.proto"; +import "runtime_profile.proto"; option cc_generic_services = true; @@ -795,6 +796,7 @@ message PMultiGetBlockV2 { }; optional RowFormat format = 2; repeated bytes binary_row_data = 3; + optional PRuntimeProfileTree profile = 4; } message PMultiGetResponseV2 { diff --git a/gensrc/proto/runtime_profile.proto b/gensrc/proto/runtime_profile.proto new file mode 100644 index 00000000000000..3d8503bec6aa04 --- /dev/null +++ b/gensrc/proto/runtime_profile.proto @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto2"; + +package doris; +option java_package = "org.apache.doris.proto"; +option cc_generic_services = true; +// Similar to gensrc/thrift/RuntimeProfile.thrift, this is changed to protobuf implementation + +// Counter unit types (equivalent to Metrics.TUnit) +enum PProfileUnit { + // A dimensionless numerical quantity + UNIT = 0; + // Rate of a dimensionless numerical quantity + UNIT_PER_SECOND = 1; + CPU_TICKS = 2; + BYTES = 3; + BYTES_PER_SECOND = 4; + TIME_NS = 5; + DOUBLE_VALUE = 6; + // No units at all, may not be a numerical quantity + // It is used as a label now, so do not treat it as + // a real counter. + NONE = 7; + TIME_MS = 8; + TIME_S = 9; +} + +// A single performance counter +message PProfileCounter { + required string name = 1; + required PProfileUnit type = 2; + required int64 value = 3; + optional int64 level = 4; + optional string description = 5; +} + +// A set of child counters (used in map>) +message PProfileChildCounterSet { + repeated string child_counters = 1; +} + +// A single runtime profile node +message PRuntimeProfileNode { + required string name = 1; + required int32 num_children = 2; + + // Flattened counters for this node and all its children + repeated PProfileCounter counters = 3; + + // Node metadata (e.g., node id) + required int64 metadata = 4; + + // Whether the child is indented + required bool indent = 5; + + // Key-value info strings describing additional metadata + map info_strings = 6; + + // Order to display info strings + repeated string info_strings_display_order = 7; + + // Map from parent counter name to a set of child counter names + map child_counters_map = 8; + + // Timestamp for this node + required int64 timestamp = 9; + + // Deprecated field + optional bool deprecated_is_sink = 10; +} + +// A flattened runtime profile tree in in-order traversal +message PRuntimeProfileTree { + repeated PRuntimeProfileNode nodes = 1; +}