diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index a678ca9dab5a8a..9dc047f8976fb3 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -646,12 +646,16 @@ Status RowIdStorageReader::read_batch_external_row(const PRequestBlockDesc& requ std::unique_ptr vfile_scanner_ptr = nullptr; { + if (result_block.is_empty_column()) [[likely]] { + result_block = vectorized::Block(slots, request_block_desc.row_id_size()); + } + auto& external_info = first_file_mapping->get_external_file_info(); int plan_node_id = external_info.plan_node_id; const auto& first_scan_range_desc = external_info.scan_range_desc; - auto query_ctx = ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(query_id); - const auto* old_scan_params = &(query_ctx->file_scan_range_params_map[plan_node_id]); + DCHECK(id_file_map->get_external_scan_params().contains(plan_node_id)); + const auto* old_scan_params = &(id_file_map->get_external_scan_params().at(plan_node_id)); rpc_scan_params = *old_scan_params; rpc_scan_params.required_slots.clear(); @@ -679,22 +683,17 @@ Status RowIdStorageReader::read_batch_external_row(const PRequestBlockDesc& requ rpc_scan_params.slot_name_to_schema_pos.emplace(slot.col_name(), column_idx); } - if (result_block.is_empty_column()) [[likely]] { - result_block = vectorized::Block(slots, request_block_desc.row_id_size()); - } - - const auto& query_options = query_ctx->get_query_options(); - const auto& query_globals = query_ctx->get_query_globals(); - + const auto& query_options = id_file_map->get_query_options(); + const auto& query_globals = id_file_map->get_query_globals(); /* * The scan stage needs the information in query_options to generate different behaviors according to the specific variables: * query_options.hive_parquet_use_column_names, query_options.truncate_char_or_varchar_columns,query_globals.time_zone ... * - * To ensure the same behavior as the scan stage, I get query_options query_globals from query_ctx, then create runtime_state + * To ensure the same behavior as the scan stage, I get query_options query_globals from id_file_map, then create runtime_state * and pass it to vfile_scanner so that the runtime_state information is the same as the scan stage and the behavior is also consistent. */ runtime_state = RuntimeState::create_unique(query_id, -1, query_options, query_globals, - ExecEnv::GetInstance(), query_ctx.get()); + ExecEnv::GetInstance()); vfile_scanner_ptr = vectorized::FileScanner::create_unique( runtime_state.get(), runtime_profile.get(), &rpc_scan_params, &colname_to_slot_id, diff --git a/be/src/olap/id_manager.h b/be/src/olap/id_manager.h index a311d76ac34a59..5a01a6c9551c84 100644 --- a/be/src/olap/id_manager.h +++ b/be/src/olap/id_manager.h @@ -40,6 +40,7 @@ #include "olap/olap_common.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" +#include "runtime/query_context.h" namespace doris { @@ -196,12 +197,35 @@ class IdFileMap { int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; } + void set_external_scan_params(QueryContext* query_ctx) { + std::call_once(once_flag_for_external, [&] { + DCHECK(query_ctx != nullptr); + _query_global = query_ctx->get_query_globals(); + _query_options = query_ctx->get_query_options(); + _file_scan_range_params_map = query_ctx->file_scan_range_params_map; + }); + } + + const TQueryGlobals& get_query_globals() const { return _query_global; } + + const TQueryOptions& get_query_options() const { return _query_options; } + + const std::map& get_external_scan_params() const { + return _file_scan_range_params_map; + } + private: std::shared_mutex _mtx; uint32_t _init_id = 0; std::unordered_map _mapping_to_id; std::unordered_map> _id_map; + // use in scan external table + TQueryGlobals _query_global; + TQueryOptions _query_options; + std::map _file_scan_range_params_map; + std::once_flag once_flag_for_external; + // use in Doris Format to keep temp rowsets, preventing them from being deleted by compaction std::unordered_map, RowsetSharedPtr> _temp_rowset_maps; uint64_t delayed_expired_timestamp = 0; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index ef94e3f1c807a5..f864a04ba253db 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -37,6 +37,11 @@ Status FileScanLocalState::_init_scanners(std::list* sc return Status::OK(); } + auto& id_file_map = state()->get_id_file_map(); + if (id_file_map != nullptr) { + id_file_map->set_external_scan_params(state()->get_query_ctx()); + } + auto& p = _parent->cast(); // There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance. uint32_t shard_num = diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 86536a4bce64d8..080b166c9d2650 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -130,6 +130,29 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, _query_mem_tracker = ctx->query_mem_tracker(); } +RuntimeState::RuntimeState(const TUniqueId& query_id, int32 fragment_id, + const TQueryOptions& query_options, const TQueryGlobals& query_globals, + ExecEnv* exec_env) + : _profile("PipelineX " + std::to_string(fragment_id)), + _load_channel_profile(""), + _obj_pool(new ObjectPool()), + _unreported_error_idx(0), + _query_id(query_id), + _fragment_id(fragment_id), + _per_fragment_instance_idx(0), + _num_rows_load_total(0), + _num_rows_load_filtered(0), + _num_rows_load_unselected(0), + _num_rows_filtered_in_strict_mode_partial_update(0), + _num_print_error_rows(0), + _num_bytes_load_total(0), + _num_finished_scan_range(0), + _error_row_number(0) { + Status status = init(TUniqueId(), query_options, query_globals, exec_env); + DCHECK(status.ok()); + init_mem_trackers(""); +} + RuntimeState::RuntimeState(const TQueryGlobals& query_globals) : _profile(""), _load_channel_profile(""), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index a7096e72122c5a..f9e932d846af2b 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -95,6 +95,11 @@ class RuntimeState { RuntimeState(const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env, QueryContext* ctx); + // Only used in the materialization phase of delayed materialization, + // when there may be no corresponding QueryContext. + RuntimeState(const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, + const TQueryGlobals& query_globals, ExecEnv* exec_env); + // RuntimeState for executing expr in fe-support. RuntimeState(const TQueryGlobals& query_globals); diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 0b13a62b88639f..d43faf4c0e3b74 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -863,15 +863,13 @@ void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int le Block::erase_useless_column(block, num_columns_without_result); } -Status FileScanner::_create_row_id_column_iterator(const int column_id) { +Status FileScanner::_create_row_id_column_iterator() { auto& id_file_map = _state->get_id_file_map(); auto file_id = id_file_map->get_file_mapping_id(std::make_shared( ((pipeline::FileScanLocalState*)_local_state)->parent_id(), _current_range, _should_enable_file_meta_cache())); - _row_id_column_iterator_pair = std::make_pair( - std::make_shared(IdManager::ID_VERSION, - BackendOptions::get_backend_id(), file_id), - column_id); + _row_id_column_iterator_pair.first = std::make_shared( + IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id); return Status::OK(); } @@ -993,7 +991,11 @@ Status FileScanner::_get_next_reader() { _should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache() : nullptr, _state->query_options().enable_parquet_lazy_mat); - parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair); + + if (_row_id_column_iterator_pair.second != -1) { + RETURN_IF_ERROR(_create_row_id_column_iterator()); + parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair); + } // ATTN: the push down agg type may be set back to NONE, // see IcebergTableReader::init_row_filters for example. @@ -1013,7 +1015,11 @@ Status FileScanner::_get_next_reader() { std::unique_ptr orc_reader = OrcReader::create_unique( _profile, _state, *_params, range, _state->query_options().batch_size, _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat); - orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair); + if (_row_id_column_iterator_pair.second != -1) { + RETURN_IF_ERROR(_create_row_id_column_iterator()); + orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair); + } + orc_reader->set_push_down_agg_type(_get_push_down_agg_type()); if (push_down_predicates) { RETURN_IF_ERROR(_process_late_arrival_conjuncts()); @@ -1411,8 +1417,7 @@ Status FileScanner::_init_expr_ctxes() { fmt::format("Unknown source slot descriptor, slot_id={}", slot_id)); } if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) { - RETURN_IF_ERROR( - _create_row_id_column_iterator(_default_val_row_desc->get_column_id(slot_id))); + _row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id); continue; } if (slot_info.is_file_slot) { diff --git a/be/src/vec/exec/scan/file_scanner.h b/be/src/vec/exec/scan/file_scanner.h index 37555884dcd77c..bdffb280c9844b 100644 --- a/be/src/vec/exec/scan/file_scanner.h +++ b/be/src/vec/exec/scan/file_scanner.h @@ -250,7 +250,7 @@ class FileScanner : public Scanner { Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema); Status _init_orc_reader(std::unique_ptr&& orc_reader); Status _init_parquet_reader(std::unique_ptr&& parquet_reader); - Status _create_row_id_column_iterator(const int slot_id); + Status _create_row_id_column_iterator(); TFileFormatType::type _get_current_format_type() { // for compatibility, if format_type is not set in range, use the format type of params