Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,16 @@ Status RowIdStorageReader::read_batch_external_row(const PRequestBlockDesc& requ
std::unique_ptr<vectorized::FileScanner> vfile_scanner_ptr = nullptr;

{
if (result_block.is_empty_column()) [[likely]] {
result_block = vectorized::Block(slots, request_block_desc.row_id_size());
}

auto& external_info = first_file_mapping->get_external_file_info();
int plan_node_id = external_info.plan_node_id;
const auto& first_scan_range_desc = external_info.scan_range_desc;

auto query_ctx = ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(query_id);
const auto* old_scan_params = &(query_ctx->file_scan_range_params_map[plan_node_id]);
DCHECK(id_file_map->get_external_scan_params().contains(plan_node_id));
const auto* old_scan_params = &(id_file_map->get_external_scan_params().at(plan_node_id));
rpc_scan_params = *old_scan_params;

rpc_scan_params.required_slots.clear();
Expand Down Expand Up @@ -679,22 +683,17 @@ Status RowIdStorageReader::read_batch_external_row(const PRequestBlockDesc& requ
rpc_scan_params.slot_name_to_schema_pos.emplace(slot.col_name(), column_idx);
}

if (result_block.is_empty_column()) [[likely]] {
result_block = vectorized::Block(slots, request_block_desc.row_id_size());
}

const auto& query_options = query_ctx->get_query_options();
const auto& query_globals = query_ctx->get_query_globals();

const auto& query_options = id_file_map->get_query_options();
const auto& query_globals = id_file_map->get_query_globals();
/*
* The scan stage needs the information in query_options to generate different behaviors according to the specific variables:
* query_options.hive_parquet_use_column_names, query_options.truncate_char_or_varchar_columns,query_globals.time_zone ...
*
* To ensure the same behavior as the scan stage, I get query_options query_globals from query_ctx, then create runtime_state
* To ensure the same behavior as the scan stage, I get query_options query_globals from id_file_map, then create runtime_state
* and pass it to vfile_scanner so that the runtime_state information is the same as the scan stage and the behavior is also consistent.
*/
runtime_state = RuntimeState::create_unique(query_id, -1, query_options, query_globals,
ExecEnv::GetInstance(), query_ctx.get());
ExecEnv::GetInstance());

vfile_scanner_ptr = vectorized::FileScanner::create_unique(
runtime_state.get(), runtime_profile.get(), &rpc_scan_params, &colname_to_slot_id,
Expand Down
24 changes: 24 additions & 0 deletions be/src/olap/id_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "runtime/query_context.h"

namespace doris {

Expand Down Expand Up @@ -196,12 +197,35 @@ class IdFileMap {

int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; }

void set_external_scan_params(QueryContext* query_ctx) {
std::call_once(once_flag_for_external, [&] {
DCHECK(query_ctx != nullptr);
_query_global = query_ctx->get_query_globals();
_query_options = query_ctx->get_query_options();
_file_scan_range_params_map = query_ctx->file_scan_range_params_map;
});
}

const TQueryGlobals& get_query_globals() const { return _query_global; }

const TQueryOptions& get_query_options() const { return _query_options; }

const std::map<int, TFileScanRangeParams>& get_external_scan_params() const {
return _file_scan_range_params_map;
}

private:
std::shared_mutex _mtx;
uint32_t _init_id = 0;
std::unordered_map<std::string, uint32_t> _mapping_to_id;
std::unordered_map<uint32_t, std::shared_ptr<FileMapping>> _id_map;

// use in scan external table
TQueryGlobals _query_global;
TQueryOptions _query_options;
std::map<int, TFileScanRangeParams> _file_scan_range_params_map;
std::once_flag once_flag_for_external;

// use in Doris Format to keep temp rowsets, preventing them from being deleted by compaction
std::unordered_map<std::pair<int64_t, RowsetId>, RowsetSharedPtr> _temp_rowset_maps;
uint64_t delayed_expired_timestamp = 0;
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
return Status::OK();
}

auto& id_file_map = state()->get_id_file_map();
if (id_file_map != nullptr) {
id_file_map->set_external_scan_params(state()->get_query_ctx());
}

auto& p = _parent->cast<FileScanOperatorX>();
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
uint32_t shard_num =
Expand Down
23 changes: 23 additions & 0 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,29 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
_query_mem_tracker = ctx->query_mem_tracker();
}

RuntimeState::RuntimeState(const TUniqueId& query_id, int32 fragment_id,
const TQueryOptions& query_options, const TQueryGlobals& query_globals,
ExecEnv* exec_env)
: _profile("PipelineX " + std::to_string(fragment_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
_unreported_error_idx(0),
_query_id(query_id),
_fragment_id(fragment_id),
_per_fragment_instance_idx(0),
_num_rows_load_total(0),
_num_rows_load_filtered(0),
_num_rows_load_unselected(0),
_num_rows_filtered_in_strict_mode_partial_update(0),
_num_print_error_rows(0),
_num_bytes_load_total(0),
_num_finished_scan_range(0),
_error_row_number(0) {
Status status = init(TUniqueId(), query_options, query_globals, exec_env);
DCHECK(status.ok());
init_mem_trackers("<unnamed>");
}

RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
: _profile("<unnamed>"),
_load_channel_profile("<unnamed>"),
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class RuntimeState {
RuntimeState(const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env, QueryContext* ctx);

// Only used in the materialization phase of delayed materialization,
// when there may be no corresponding QueryContext.
RuntimeState(const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env);

// RuntimeState for executing expr in fe-support.
RuntimeState(const TQueryGlobals& query_globals);

Expand Down
23 changes: 14 additions & 9 deletions be/src/vec/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -863,15 +863,13 @@ void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int le
Block::erase_useless_column(block, num_columns_without_result);
}

Status FileScanner::_create_row_id_column_iterator(const int column_id) {
Status FileScanner::_create_row_id_column_iterator() {
auto& id_file_map = _state->get_id_file_map();
auto file_id = id_file_map->get_file_mapping_id(std::make_shared<FileMapping>(
((pipeline::FileScanLocalState*)_local_state)->parent_id(), _current_range,
_should_enable_file_meta_cache()));
_row_id_column_iterator_pair = std::make_pair(
std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION,
BackendOptions::get_backend_id(), file_id),
column_id);
_row_id_column_iterator_pair.first = std::make_shared<RowIdColumnIteratorV2>(
IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
return Status::OK();
}

Expand Down Expand Up @@ -993,7 +991,11 @@ Status FileScanner::_get_next_reader() {
_should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
: nullptr,
_state->query_options().enable_parquet_lazy_mat);
parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);

if (_row_id_column_iterator_pair.second != -1) {
RETURN_IF_ERROR(_create_row_id_column_iterator());
parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
}

// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
Expand All @@ -1013,7 +1015,11 @@ Status FileScanner::_get_next_reader() {
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
if (_row_id_column_iterator_pair.second != -1) {
RETURN_IF_ERROR(_create_row_id_column_iterator());
orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
}

orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
Expand Down Expand Up @@ -1411,8 +1417,7 @@ Status FileScanner::_init_expr_ctxes() {
fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
}
if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
RETURN_IF_ERROR(
_create_row_id_column_iterator(_default_val_row_desc->get_column_id(slot_id)));
_row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
continue;
}
if (slot_info.is_file_slot) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class FileScanner : public Scanner {
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
Status _init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader);
Status _init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader);
Status _create_row_id_column_iterator(const int slot_id);
Status _create_row_id_column_iterator();

TFileFormatType::type _get_current_format_type() {
// for compatibility, if format_type is not set in range, use the format type of params
Expand Down
Loading