Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFil

Status IcebergParquetReader::init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
const std::unordered_map<uint64_t, std::string>& col_id_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
Expand Down Expand Up @@ -575,7 +575,7 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d

Status IcebergOrcReader::init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
const std::unordered_map<uint64_t, std::string>& col_id_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class IcebergTableReader : public TableFormatReader {
// copy from _colname_to_value_range with new column name that is in parquet/orc file, to support schema evolution.
std::unordered_map<std::string, ColumnValueRangeType> _new_colname_to_value_range;
// column id to name map. Collect from FE slot descriptor.
std::unordered_map<int, std::string> _col_id_name_map;
std::unordered_map<uint64_t, std::string> _col_id_name_map;
// col names in the parquet,orc file
std::vector<std::string> _all_required_col_names;
// col names in table but not in parquet,orc file
Expand Down Expand Up @@ -194,7 +194,7 @@ class IcebergParquetReader final : public IcebergTableReader {
kv_cache, io_ctx) {}
Status init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
const std::unordered_map<uint64_t, std::string>& col_id_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
Expand Down Expand Up @@ -240,7 +240,7 @@ class IcebergOrcReader final : public IcebergTableReader {

Status init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
const std::unordered_map<uint64_t, std::string>& col_id_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
Expand Down
87 changes: 86 additions & 1 deletion be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,68 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
}

Status PaimonReader::gen_file_col_name(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
table_col_name_to_value_range) {
// It is a bit similar to iceberg. I will consider integrating it when I write hudi schema change later.
_table_col_to_file_col.clear();
_file_col_to_table_col.clear();

if (!_params.__isset.paimon_schema_info) [[unlikely]] {
return Status::RuntimeError("miss paimon schema info.");
}

if (!_params.paimon_schema_info.contains(_range.table_format_params.paimon_params.schema_id))
[[unlikely]] {
return Status::InternalError("miss paimon schema info.");
}

const auto& table_id_to_file_name =
_params.paimon_schema_info.at(_range.table_format_params.paimon_params.schema_id);
for (auto [table_col_id, file_col_name] : table_id_to_file_name) {
if (table_col_id_table_name_map.find(table_col_id) == table_col_id_table_name_map.end()) {
continue;
}
auto& table_col_name = table_col_id_table_name_map.at(table_col_id);

_table_col_to_file_col.emplace(table_col_name, file_col_name);
_file_col_to_table_col.emplace(file_col_name, table_col_name);
if (table_col_name != file_col_name) {
_has_schema_change = true;
}
}

_all_required_col_names.clear();
_not_in_file_col_names.clear();
for (auto name : read_table_col_names) {
auto iter = _table_col_to_file_col.find(name);
if (iter == _table_col_to_file_col.end()) {
auto name_low = to_lower(name);
_all_required_col_names.emplace_back(name_low);

_table_col_to_file_col.emplace(name, name_low);
_file_col_to_table_col.emplace(name_low, name);
if (name != name_low) {
_has_schema_change = true;
}
} else {
_all_required_col_names.emplace_back(iter->second);
}
}

for (auto& it : *table_col_name_to_value_range) {
auto iter = _table_col_to_file_col.find(it.first);
if (iter == _table_col_to_file_col.end()) {
_new_colname_to_value_range.emplace(it.first, it.second);
} else {
_new_colname_to_value_range.emplace(iter->second, it.second);
}
}
return Status::OK();
}

Status PaimonReader::init_row_filters() {
const auto& table_desc = _range.table_format_params.paimon_params;
if (!table_desc.__isset.deletion_file) {
Expand Down Expand Up @@ -106,6 +168,29 @@ Status PaimonReader::init_row_filters() {
}

Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
return _file_format_reader->get_next_block(block, read_rows, eof);
if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _table_col_to_file_col.find(col.name);
if (iter != _table_col_to_file_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}

RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));

if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _file_col_to_table_col.find(col.name);
if (iter != _file_col_to_table_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}
return Status::OK();
}
} // namespace doris::vectorized
57 changes: 57 additions & 0 deletions be/src/vec/exec/format/table/paimon_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ class PaimonReader : public TableFormatReader {
PaimonReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx);

~PaimonReader() override = default;

Status init_row_filters() final;

Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final;

Status gen_file_col_name(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
table_col_name_to_value_range);

protected:
struct PaimonProfile {
RuntimeProfile::Counter* num_delete_rows;
Expand All @@ -44,6 +51,16 @@ class PaimonReader : public TableFormatReader {
std::vector<int64_t> _delete_rows;
PaimonProfile _paimon_profile;

std::unordered_map<std::string, ColumnValueRangeType> _new_colname_to_value_range;

std::unordered_map<std::string, std::string> _file_col_to_table_col;
std::unordered_map<std::string, std::string> _table_col_to_file_col;

std::vector<std::string> _all_required_col_names;
std::vector<std::string> _not_in_file_col_names;

bool _has_schema_change = false;

virtual void set_delete_rows() = 0;
};

Expand All @@ -60,6 +77,25 @@ class PaimonOrcReader final : public PaimonReader {
(reinterpret_cast<OrcReader*>(_file_format_reader.get()))
->set_position_delete_rowids(&_delete_rows);
}

Status init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
RETURN_IF_ERROR(gen_file_col_name(read_table_col_names, table_col_id_table_name_map,
table_col_name_to_value_range));
auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range,
conjuncts, false, tuple_descriptor, row_descriptor,
not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts);
}
};

class PaimonParquetReader final : public PaimonReader {
Expand All @@ -75,5 +111,26 @@ class PaimonParquetReader final : public PaimonReader {
(reinterpret_cast<ParquetReader*>(_file_format_reader.get()))
->set_delete_rows(&_delete_rows);
}

Status init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
RETURN_IF_ERROR(gen_file_col_name(read_table_col_names, table_col_id_table_name_map,
table_col_name_to_value_range));
auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);

return parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
}
};
} // namespace doris::vectorized
24 changes: 13 additions & 11 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -961,15 +961,14 @@ Status VFileScanner::_get_next_reader() {
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_file_col_names, place_holder, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
std::unique_ptr<PaimonParquetReader> paimon_reader =
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _io_ctx.get());
init_status = paimon_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else {
Expand Down Expand Up @@ -1027,12 +1026,13 @@ Status VFileScanner::_get_next_reader() {
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
init_status = orc_reader->init_reader(
&_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get());

init_status = paimon_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else {
Expand Down Expand Up @@ -1237,7 +1237,9 @@ Status VFileScanner::_init_expr_ctxes() {
if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
_file_col_names.push_back(it->second->col_name());
if (it->second->col_unique_id() > 0) {
if (it->second->col_unique_id() >= 0) {
// Iceberg field unique ID starts from 1, Paimon/Hudi field unique ID starts from 0.
// For other data sources, all columns are set to -1.
_col_id_name_map.emplace(it->second->col_unique_id(), it->second->col_name());
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class VFileScanner : public VScanner {
// col names from _file_slot_descs
std::vector<std::string> _file_col_names;
// column id to name map. Collect from FE slot descriptor.
std::unordered_map<int, std::string> _col_id_name_map;
std::unordered_map<uint64_t, std::string> _col_id_name_map;

// Partition source slot descriptors
std::vector<SlotDescriptor*> _partition_slot_descs;
Expand Down
Loading
Loading