From b02ba3b280f3d880f4d3c09fc7efbb7e8ab4e7f5 Mon Sep 17 00:00:00 2001 From: daidai Date: Tue, 12 Aug 2025 11:09:02 +0800 Subject: [PATCH] branch-3.0:[fix](orc)fix core that iceberg-orc reader read miss iceberg.id orc file.(#49051) (#54167) pick #49051 but only fix: ``` terminate called after throwing an instance of 'std::range_error' what(): Key not found: iceberg.id *** Query id: 6a93d7cdc9f44370-a40b07934a14c81b *** *** is nereids: 1 *** *** tablet id: 0 *** *** Aborted at 1753842428 (unix time) try "date -d @1753842428" if you are using GNU date *** *** Current BE git commitID: 910c4249c5 *** *** SIGABRT unknown detail explain (@0x5a46f) received by PID 369775 (TID 371694 OR 0x7fad067ef640) from PID 369775; stack trace: *** terminate called recursively terminate called recursively terminate called recursively terminate called recursively 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/zcp/repo_center/doris_release/doris/be/src/common/signal_handler.h:421 1# 0x00007FB12263EBF0 in /lib64/libc.so.6 2# __pthread_kill_implementation in /lib64/libc.so.6 3# gsignal in /lib64/libc.so.6 4# abort in /lib64/libc.so.6 5# __gnu_cxx::__verbose_terminate_handler() [clone .cold] at ../../../../libstdc++-v3/libsupc++/vterminate.cc:75 6# __cxxabiv1::__terminate(void (*)()) at ../../../../libstdc++-v3/libsupc++/eh_terminate.cc:48 7# 0x000055C047B28EC1 in /opt/apache-doris-3.0.6.2-bin-x64/be/lib/doris_be 8# 0x000055C047B29014 in /opt/apache-doris-3.0.6.2-bin-x64/be/lib/doris_be 9# orc::TypeImpl::getAttributeValue(std::__cxx11::basic_string, std::allocator > const&) const in /opt/apache-doris-3.0.6.2-bin-x64/be/lib/doris_be 10# doris::vectorized::OrcReader::get_schema_col_name_attribute(std::vector, std::allocator >, std::allocator, std::allocator > > >*, std::vector >*, std::__cxx11::basic_string, std::allocator >) at /home/zcp/repo_center/doris_release/doris/be/src/vec/exec/format/orc/vorc_reader.cpp:332 11# doris::vectorized::IcebergOrcReader::_gen_col_name_maps(doris::vectorized::OrcReader*) at ``` --- be/src/vec/exec/format/orc/vorc_reader.cpp | 30 ++- be/src/vec/exec/format/orc/vorc_reader.h | 15 +- be/src/vec/exec/format/parquet/schema_desc.h | 4 +- .../vec/exec/format/table/iceberg_reader.cpp | 190 ++++-------------- be/src/vec/exec/format/table/iceberg_reader.h | 29 +-- .../exec/format/table/table_format_reader.cpp | 103 ++++++++++ .../exec/format/table/table_format_reader.h | 41 ++++ .../table/transactional_hive_reader.cpp | 10 +- be/src/vec/exec/scan/vfile_scanner.cpp | 4 +- .../iceberg/run08.sql | 48 +++++ .../iceberg/iceberg_schema_change2.out | 64 ++++++ .../iceberg/iceberg_schema_change2.groovy | 66 ++++++ 12 files changed, 414 insertions(+), 190 deletions(-) create mode 100644 docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql create mode 100644 regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out create mode 100644 regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index a488bef7455071..828761c5f83d6f 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -243,6 +243,10 @@ void OrcReader::_init_profile() { } Status OrcReader::_create_file_reader() { + if (_reader != nullptr) { + return Status::OK(); + } + if (_file_input_stream == nullptr) { _file_description.mtime = _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; @@ -283,13 +287,15 @@ Status OrcReader::_create_file_reader() { Status OrcReader::init_reader( const std::vector* column_names, - std::unordered_map* colname_to_value_range, + const std::vector& missing_column_names, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts, const bool hive_use_column_names) { _column_names = column_names; + _missing_column_names_set.insert(missing_column_names.begin(), missing_column_names.end()); _colname_to_value_range = colname_to_value_range; _lazy_read_ctx.conjuncts = conjuncts; _is_acid = is_acid; @@ -326,14 +332,21 @@ Status OrcReader::get_parsed_schema(std::vector* col_names, } Status OrcReader::get_schema_col_name_attribute(std::vector* col_names, - std::vector* col_attributes, - std::string attribute) { + std::vector* col_attributes, + const std::string& attribute, + bool* exist_attribute) { RETURN_IF_ERROR(_create_file_reader()); - auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); + *exist_attribute = true; + const auto& root_type = _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(get_field_name_lower_case(&root_type, i)); + + if (!root_type.getSubtype(i)->hasAttributeKey(attribute)) { + *exist_attribute = false; + return Status::OK(); + } col_attributes->emplace_back( - std::stol(root_type.getSubtype(i)->getAttributeValue(attribute))); + std::stoi(root_type.getSubtype(i)->getAttributeValue(attribute))); } return Status::OK(); } @@ -349,8 +362,15 @@ Status OrcReader::_init_read_columns() { // TODO, should be removed in 2.2 or later _is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) && _scan_params.__isset.slot_name_to_schema_pos; + for (size_t i = 0; i < _column_names->size(); ++i) { auto& col_name = (*_column_names)[i]; + + if (_missing_column_names_set.contains(col_name)) { + _missing_cols.emplace_back(col_name); + continue; + } + if (_is_hive1_orc_or_use_idx) { auto iter = _scan_params.slot_name_to_schema_pos.find(col_name); if (iter != _scan_params.slot_name_to_schema_pos.end()) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 6bbf3bead1efce..ffebf15933287f 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -138,7 +138,8 @@ class OrcReader : public GenericReader { //If you want to read the file by index instead of column name, set hive_use_column_names to false. Status init_reader( const std::vector* column_names, - std::unordered_map* colname_to_value_range, + const std::vector& missing_column_names, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -178,8 +179,8 @@ class OrcReader : public GenericReader { std::vector* col_types) override; Status get_schema_col_name_attribute(std::vector* col_names, - std::vector* col_attributes, - std::string attribute); + std::vector* col_attributes, + const std::string& attribute, bool* exist_attribute); void set_table_col_to_file_col( std::unordered_map table_col_to_file_col) { _table_col_to_file_col = table_col_to_file_col; @@ -577,6 +578,10 @@ class OrcReader : public GenericReader { int64_t _range_size; const std::string& _ctz; const std::vector* _column_names; + // _missing_column_names_set: used in iceberg/hudi/paimon, the columns are dropped + // but added back(drop column a then add column a). Shouldn't read this column data in this case. + std::set _missing_column_names_set; + int32_t _offset_days = 0; cctz::time_zone _time_zone; @@ -604,7 +609,7 @@ class OrcReader : public GenericReader { orc::ReaderMetrics _reader_metrics; std::unique_ptr _batch; - std::unique_ptr _reader; + std::unique_ptr _reader = nullptr; std::unique_ptr _row_reader; std::unique_ptr _orc_filter; orc::RowReaderOptions _row_reader_options; @@ -618,7 +623,7 @@ class OrcReader : public GenericReader { std::vector _decimal_scale_params; size_t _decimal_scale_params_index; - std::unordered_map* _colname_to_value_range; + const std::unordered_map* _colname_to_value_range = nullptr; bool _is_acid = false; std::unique_ptr _filter; LazyReadContext _lazy_read_ctx; diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index 2593da837c3da6..aed0da070008e9 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -71,7 +71,7 @@ class FieldDescriptor { std::unordered_map _name_to_field; // Used in from_thrift, marking the next schema position that should be parsed size_t _next_schema_pos; - std::unordered_map _field_id_name_mapping; + std::map _field_id_name_mapping; void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable, FieldSchema* physical_field); @@ -135,6 +135,8 @@ class FieldDescriptor { bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; } + std::map get_field_id_name_map() { return _field_id_name_mapping; } + const doris::Slice get_column_name_from_field_id(int32_t id) const; }; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 21a98f79acb171..c9e92bdc9d49dc 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -120,32 +120,9 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* } RETURN_IF_ERROR(_expand_block_if_need(block)); - // To support iceberg schema evolution. We change the column name in block to - // make it match with the column name in parquet file before reading data. and - // Set the name back to table column name before return this block. - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _table_col_to_file_col.find(col.name); - if (iter != _table_col_to_file_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } - + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block)); RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); - // Set the name back to table column name before return this block. - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _file_col_to_table_col.find(col.name); - if (iter != _file_col_to_table_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block)); if (_equality_delete_impl != nullptr) { RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block)); @@ -228,8 +205,9 @@ Status IcebergTableReader::_equality_delete_base( not_in_file_col_names, nullptr, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false)); } else if (auto* orc_reader = typeid_cast(delete_reader.get())) { - RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, nullptr, {}, false, - {}, {}, nullptr, nullptr)); + RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, + not_in_file_col_names, nullptr, {}, false, {}, + {}, nullptr, nullptr)); } else { return Status::InternalError("Unsupported format of delete file"); } @@ -445,60 +423,6 @@ void IcebergTableReader::_sort_delete_rows(std::vector*>& d } } -/* - * Generate _all_required_col_names and _not_in_file_col_names. - * - * _all_required_col_names is all the columns required by user sql. - * If the column name has been modified after the data file was written, - * put the old name in data file to _all_required_col_names. - * - * _not_in_file_col_names is all the columns required by user sql but not in the data file. - * e.g. New columns added after this data file was written. - * The columns added with names used by old dropped columns should consider as a missing column, - * which should be in _not_in_file_col_names. - */ -void IcebergTableReader::_gen_file_col_names() { - _all_required_col_names.clear(); - _not_in_file_col_names.clear(); - for (int i = 0; i < _file_col_names.size(); ++i) { - auto name = _file_col_names[i]; - auto iter = _table_col_to_file_col.find(name); - if (iter == _table_col_to_file_col.end()) { - // If the user creates the iceberg table, directly append the parquet file that already exists, - // there is no 'iceberg.schema' field in the footer of parquet, the '_table_col_to_file_col' may be empty. - // Because we are ignoring case, so, it is converted to lowercase here - auto name_low = to_lower(name); - _all_required_col_names.emplace_back(name_low); - if (_has_iceberg_schema) { - _not_in_file_col_names.emplace_back(name); - } else { - _table_col_to_file_col.emplace(name, name_low); - _file_col_to_table_col.emplace(name_low, name); - if (name != name_low) { - _has_schema_change = true; - } - } - } else { - _all_required_col_names.emplace_back(iter->second); - } - } -} - -/* - * Generate _new_colname_to_value_range, by replacing the column name in - * _colname_to_value_range with column name in data file. - */ -void IcebergTableReader::_gen_new_colname_to_value_range() { - for (auto it = _colname_to_value_range->begin(); it != _colname_to_value_range->end(); it++) { - auto iter = _table_col_to_file_col.find(it->first); - if (iter == _table_col_to_file_col.end()) { - _new_colname_to_value_range.emplace(it->first, it->second); - } else { - _new_colname_to_value_range.emplace(iter->second, it->second); - } - } -} - void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFile* position_delete, size_t read_rows, bool file_path_column_dictionary_coded) { @@ -544,13 +468,9 @@ Status IcebergParquetReader::init_reader( const std::unordered_map* slot_id_to_filter_conjuncts) { _file_format = Fileformat::PARQUET; ParquetReader* parquet_reader = static_cast(_file_format_reader.get()); - _col_id_name_map = col_id_name_map; - _file_col_names = file_col_names; - _colname_to_value_range = colname_to_value_range; - FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema(); - RETURN_IF_ERROR(_gen_col_name_maps(field_desc)); - _gen_file_col_names(); - _gen_new_colname_to_value_range(); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, col_id_name_map, + colname_to_value_range)); + parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); parquet_reader->iceberg_sanitize(_all_required_col_names); RETURN_IF_ERROR(init_row_filters(_range, _io_ctx)); @@ -617,18 +537,14 @@ Status IcebergOrcReader::init_reader( const std::unordered_map* slot_id_to_filter_conjuncts) { _file_format = Fileformat::ORC; auto* orc_reader = static_cast(_file_format_reader.get()); - _col_id_name_map = col_id_name_map; - _file_col_names = file_col_names; - _colname_to_value_range = colname_to_value_range; - - RETURN_IF_ERROR(_gen_col_name_maps(orc_reader)); - _gen_file_col_names(); - _gen_new_colname_to_value_range(); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, col_id_name_map, + colname_to_value_range)); orc_reader->set_table_col_to_file_col(_table_col_to_file_col); RETURN_IF_ERROR(init_row_filters(_range, _io_ctx)); - return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, - conjuncts, false, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + return orc_reader->init_reader(&_all_required_col_names, _not_in_file_col_names, + &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, + row_descriptor, not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts); } Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range, @@ -636,8 +552,9 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete OrcReader orc_delete_reader(_profile, _state, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx); std::unordered_map colname_to_value_range; - RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, &colname_to_value_range, - {}, false, {}, {}, nullptr, nullptr)); + RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, {}, + &colname_to_value_range, {}, false, {}, {}, + nullptr, nullptr)); std::unordered_map> partition_columns; @@ -658,61 +575,36 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete return Status::OK(); } -/* - * To support schema evolution, Iceberg write the column id to column name map to - * parquet file key_value_metadata. - * This function is to compare the table schema from FE (_col_id_name_map) with - * the schema in key_value_metadata for the current parquet file and generate two maps - * for future use: - * 1. table column name to parquet column name. - * 2. parquet column name to table column name. - * For example, parquet file has a column 'col1', - * after this file was written, iceberg changed the column name to 'col1_new'. - * The two maps would contain: - * 1. col1_new -> col1 - * 2. col1 -> col1_new - */ -Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor& field_desc) { +// To support schema evolution, Iceberg write the column id to column name map to parquet file key_value_metadata. +Status IcebergParquetReader::get_file_col_id_to_name( + bool& exist_schema, std::map& file_col_id_to_name) { + auto* parquet_reader = static_cast(_file_format_reader.get()); + FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema(); + if (field_desc.has_parquet_field_id()) { - for (const auto& pair : _col_id_name_map) { - auto name_slice = field_desc.get_column_name_from_field_id(pair.first); - if (name_slice.get_size() == 0) { - _has_schema_change = true; - } else { - auto name_string = name_slice.to_string(); - _table_col_to_file_col.emplace(pair.second, name_string); - _file_col_to_table_col.emplace(name_string, pair.second); - if (name_string != pair.second) { - _has_schema_change = true; - } - } - } + file_col_id_to_name = field_desc.get_field_id_name_map(); + } else { + //For early iceberg version, it doesn't write any schema information to Parquet file. + exist_schema = false; } + return Status::OK(); } -Status IcebergOrcReader::_gen_col_name_maps(OrcReader* orc_reader) { +//To support schema evolution, Iceberg write the column id to orc file attribute. +Status IcebergOrcReader::get_file_col_id_to_name( + bool& exist_schema, std::map& file_col_id_to_name) { + auto* orc_reader = static_cast(_file_format_reader.get()); + std::vector col_names; - std::vector col_ids; - RETURN_IF_ERROR( - orc_reader->get_schema_col_name_attribute(&col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE)); - _has_iceberg_schema = true; - _table_col_to_file_col.clear(); - _file_col_to_table_col.clear(); - for (size_t i = 0; i < col_ids.size(); i++) { - auto col_id = col_ids[i]; - auto& file_col_name = col_names[i]; - - if (_col_id_name_map.find(col_id) == _col_id_name_map.end()) { - _has_schema_change = true; - continue; - } - auto& table_col_name = _col_id_name_map[col_id]; - _table_col_to_file_col.emplace(table_col_name, file_col_name); - _file_col_to_table_col.emplace(file_col_name, table_col_name); - if (table_col_name != file_col_name) { - _has_schema_change = true; - } + std::vector col_ids; + RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute( + &col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema)); + if (!exist_schema) { + return Status::OK(); + } + for (auto i = 0; i < col_names.size(); i++) { + file_col_id_to_name.emplace(col_ids[i], std::move(col_names[i])); } return Status::OK(); } diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index ee7dcdd68d24fa..c0546eeff05387 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -67,7 +67,7 @@ class GenericReader; class ShardedKVCache; class VExprContext; -class IcebergTableReader : public TableFormatReader { +class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHelper { public: struct PositionDeleteRange { std::vector data_file_path; @@ -118,9 +118,6 @@ class IcebergTableReader : public TableFormatReader { PositionDeleteRange _get_range(const ColumnString& file_path_column); - void _gen_file_col_names(); - - void _gen_new_colname_to_value_range(); static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; } Status _position_delete_base(const std::string data_file_path, @@ -144,28 +141,12 @@ class IcebergTableReader : public TableFormatReader { ShardedKVCache* _kv_cache; IcebergProfile _iceberg_profile; std::vector _iceberg_delete_rows; - // col names from _file_slot_descs - std::vector _file_col_names; - // file column name to table column name map. For iceberg schema evolution. - std::unordered_map _file_col_to_table_col; - // table column name to file column name map. For iceberg schema evolution. - std::unordered_map _table_col_to_file_col; - std::unordered_map* _colname_to_value_range; - // copy from _colname_to_value_range with new column name that is in parquet/orc file, to support schema evolution. - std::unordered_map _new_colname_to_value_range; - // column id to name map. Collect from FE slot descriptor. - std::unordered_map _col_id_name_map; - // col names in the parquet,orc file - std::vector _all_required_col_names; - // col names in table but not in parquet,orc file - std::vector _not_in_file_col_names; + // equality delete should read the primary columns std::vector _expand_col_names; std::vector _expand_columns; io::IOContext* _io_ctx; - bool _has_schema_change = false; - bool _has_iceberg_schema = false; // the table level row count for optimizing query like: // select count(*) from table; @@ -220,7 +201,8 @@ class IcebergParquetReader final : public IcebergTableReader { parquet_reader->set_delete_rows(&_iceberg_delete_rows); } - Status _gen_col_name_maps(const FieldDescriptor& field_desc); + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; protected: std::unique_ptr _create_equality_reader( @@ -258,7 +240,8 @@ class IcebergOrcReader final : public IcebergTableReader { const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts); - Status _gen_col_name_maps(OrcReader* orc_reader); + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; protected: std::unique_ptr _create_equality_reader( diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp b/be/src/vec/exec/format/table/table_format_reader.cpp index ea8111d81b3d04..8676287ffe6fde 100644 --- a/be/src/vec/exec/format/table/table_format_reader.cpp +++ b/be/src/vec/exec/format/table/table_format_reader.cpp @@ -22,4 +22,107 @@ namespace doris::vectorized { TableFormatReader::TableFormatReader(std::unique_ptr file_format_reader) : _file_format_reader(std::move(file_format_reader)) {} +Status TableSchemaChangeHelper::init_schema_info( + const std::vector& read_table_col_names, + const std::unordered_map& table_id_to_name, + const std::unordered_map* + table_col_name_to_value_range) { + bool exist_schema = true; + std::map file_id_to_name; + RETURN_IF_ERROR(get_file_col_id_to_name(exist_schema, file_id_to_name)); + if (!exist_schema) { + file_id_to_name.clear(); + for (const auto& [table_col_id, table_col_name] : table_id_to_name) { + file_id_to_name.emplace(table_col_id, table_col_name); + } + } + + /** This is to compare the table schema from FE (table_id_to_name) with + * the current file schema (file_id_to_name) , generate two maps for future use: + * 1. table column name to file column name. + * 2. file column name to table column name. + * For example, file has a column 'col1', + * after this file was written, iceberg changed the column name to 'col1_new'. + * The two maps would contain: + * 1. col1_new -> col1 + * 2. col1 -> col1_new + */ + for (const auto& [file_col_id, file_col_name] : file_id_to_name) { + if (table_id_to_name.find(file_col_id) == table_id_to_name.end()) { + continue; + } + + auto& table_col_name = table_id_to_name.at(file_col_id); + _table_col_to_file_col.emplace(table_col_name, file_col_name); + _file_col_to_table_col.emplace(file_col_name, table_col_name); + if (table_col_name != file_col_name) { + _has_schema_change = true; + } + } + + /** Generate _all_required_col_names and _not_in_file_col_names. + * + * _all_required_col_names is all the columns required by user sql. + * If the column name has been modified after the data file was written, + * put the old name in data file to _all_required_col_names. + * + * _not_in_file_col_names is all the columns required by user sql but not in the data file. + * e.g. New columns added after this data file was written. + * The columns added with names used by old dropped columns should consider as a missing column, + * which should be in _not_in_file_col_names. + */ + _all_required_col_names.clear(); + _not_in_file_col_names.clear(); + for (auto table_col_name : read_table_col_names) { + auto iter = _table_col_to_file_col.find(table_col_name); + if (iter == _table_col_to_file_col.end()) { + _all_required_col_names.emplace_back(table_col_name); + _not_in_file_col_names.emplace_back(table_col_name); + } else { + _all_required_col_names.emplace_back(iter->second); + } + } + + /** Generate _new_colname_to_value_range, by replacing the column name in + * _colname_to_value_range with column name in data file. + */ + for (auto& it : *table_col_name_to_value_range) { + auto iter = _table_col_to_file_col.find(it.first); + if (iter == _table_col_to_file_col.end()) { + _new_colname_to_value_range.emplace(it.first, it.second); + } else { + _new_colname_to_value_range.emplace(iter->second, it.second); + } + } + return Status::OK(); +} + +Status TableSchemaChangeHelper::get_next_block_before(Block* block) const { + if (_has_schema_change) { + for (int i = 0; i < block->columns(); i++) { + ColumnWithTypeAndName& col = block->get_by_position(i); + auto iter = _table_col_to_file_col.find(col.name); + if (iter != _table_col_to_file_col.end()) { + col.name = iter->second; + } + } + block->initialize_index_by_name(); + } + return Status::OK(); +} + +Status TableSchemaChangeHelper::get_next_block_after(Block* block) const { + if (_has_schema_change) { + for (int i = 0; i < block->columns(); i++) { + ColumnWithTypeAndName& col = block->get_by_position(i); + auto iter = _file_col_to_table_col.find(col.name); + if (iter != _file_col_to_table_col.end()) { + col.name = iter->second; + } + } + block->initialize_index_by_name(); + } + return Status::OK(); +} + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index 5a102a7665e8f1..2f6b8742bae079 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -25,6 +25,7 @@ #include #include "common/status.h" +#include "exec/olap_common.h" #include "vec/exec/format/generic_reader.h" namespace doris { @@ -78,4 +79,44 @@ class TableFormatReader : public GenericReader { std::unique_ptr _file_format_reader; // parquet, orc }; +class TableSchemaChangeHelper { +public: + /** Get the mapping from the unique ID of the column in the current file to the file column name. + * Iceberg/Hudi/Paimon usually maintains field IDs to support schema changes. If you cannot obtain this + * information (maybe the old version does not have this information), you need to set `exist_schema` = `false`. + */ + virtual Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) = 0; + + virtual ~TableSchemaChangeHelper() = default; + +protected: + /** table_id_to_name : table column unique id to table name map */ + Status init_schema_info(const std::vector& read_table_col_names, + const std::unordered_map& table_id_to_name, + const std::unordered_map* + table_col_name_to_value_range); + + /** To support schema evolution. We change the column name in block to + * make it match with the column name in file before reading data. and + * set the name back to table column name before return this block. + */ + Status get_next_block_before(Block* block) const; + + /** Set the name back to table column name before return this block.*/ + Status get_next_block_after(Block* block) const; + + // copy from _colname_to_value_range with new column name that is in parquet/orc file + std::unordered_map _new_colname_to_value_range; + // all the columns required by user sql. + std::vector _all_required_col_names; + // col names in table but not in parquet,orc file + std::vector _not_in_file_col_names; + bool _has_schema_change = false; + // file column name to table column name map + std::unordered_map _file_col_to_table_col; + // table column name to file column name map. + std::unordered_map _table_col_to_file_col; +}; + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index 18642ab1218b4d..4fd2fe5948ff17 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -68,8 +68,8 @@ Status TransactionalHiveReader::init_reader( _col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end()); Status status = orc_reader->init_reader( - &_col_names, colname_to_value_range, conjuncts, true, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + &_col_names, {}, colname_to_value_range, conjuncts, true, tuple_descriptor, + row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); return status; } @@ -154,9 +154,9 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, OrcReader delete_reader(_profile, _state, _params, delete_range, _MIN_BATCH_SIZE, _state->timezone(), _io_ctx, false); - RETURN_IF_ERROR( - delete_reader.init_reader(&TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, - nullptr, {}, false, nullptr, nullptr, nullptr, nullptr)); + RETURN_IF_ERROR(delete_reader.init_reader( + &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, {}, nullptr, {}, false, + nullptr, nullptr, nullptr, nullptr)); std::unordered_map> partition_columns; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 5b96b5561fba1c..ac0e065fb133ca 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -917,7 +917,7 @@ Status VFileScanner::_get_next_reader() { } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "paimon") { init_status = orc_reader->init_reader( - &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, + &_file_col_names, {}, _colname_to_value_range, _push_down_conjuncts, false, _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); std::unique_ptr paimon_reader = @@ -933,7 +933,7 @@ Status VFileScanner::_get_next_reader() { hive_orc_use_column_names = _state->query_options().hive_orc_use_column_names; } init_status = orc_reader->init_reader( - &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, + &_file_col_names, {}, _colname_to_value_range, _push_down_conjuncts, false, _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, hive_orc_use_column_names); diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql new file mode 100644 index 00000000000000..1a3d844ef6027e --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql @@ -0,0 +1,48 @@ + +use demo.test_db; + +CREATE TABLE sc_drop_add_orc ( + id BIGINT, + name STRING, + age INT +) +USING iceberg +PARTITIONED BY (id) +TBLPROPERTIES ('format'='orc'); + +INSERT INTO sc_drop_add_orc VALUES (1, 'Alice', 25); +INSERT INTO sc_drop_add_orc VALUES (2, 'Bob', 30); + +ALTER TABLE sc_drop_add_orc DROP COLUMN age; + +INSERT INTO sc_drop_add_orc (id, name) VALUES (3, 'Charlie'); +INSERT INTO sc_drop_add_orc (id, name) VALUES (4, 'David'); + +ALTER TABLE sc_drop_add_orc ADD COLUMN age INT; + +INSERT INTO sc_drop_add_orc VALUES (5, 'Eve', 28); +INSERT INTO sc_drop_add_orc VALUES (6, 'Frank', 35); + + + +CREATE TABLE sc_drop_add_parquet ( + id BIGINT, + name STRING, + age INT +) +USING iceberg +PARTITIONED BY (id) +TBLPROPERTIES ('format'='parquet'); + +INSERT INTO sc_drop_add_parquet VALUES (1, 'Alice', 25); +INSERT INTO sc_drop_add_parquet VALUES (2, 'Bob', 30); + +ALTER TABLE sc_drop_add_parquet DROP COLUMN age; + +INSERT INTO sc_drop_add_parquet (id, name) VALUES (3, 'Charlie'); +INSERT INTO sc_drop_add_parquet (id, name) VALUES (4, 'David'); + +ALTER TABLE sc_drop_add_parquet ADD COLUMN age INT; + +INSERT INTO sc_drop_add_parquet VALUES (5, 'Eve', 28); +INSERT INTO sc_drop_add_parquet VALUES (6, 'Frank', 35); \ No newline at end of file diff --git a/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out new file mode 100644 index 00000000000000..d68cde9a50eb02 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out @@ -0,0 +1,64 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !parquet_1 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N +5 Eve 28 +6 Frank 35 + +-- !parquet_2 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N + +-- !parquet_3 -- +5 Eve 28 +6 Frank 35 + +-- !parquet_4 -- +6 Frank 35 + +-- !parquet_5 -- +5 Eve 28 +6 Frank 35 + +-- !parquet_6 -- +5 Eve +6 Frank + +-- !parquet_7 -- +5 28 + +-- !orc_1 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N +5 Eve 28 +6 Frank 35 + +-- !orc_2 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N + +-- !orc_3 -- +5 Eve 28 +6 Frank 35 + +-- !orc_4 -- +6 Frank 35 + +-- !orc_5 -- +5 Eve 28 +6 Frank 35 + +-- !orc_6 -- +5 Eve +6 Frank + +-- !orc_7 -- +5 28 \ No newline at end of file diff --git a/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy new file mode 100644 index 00000000000000..295d14b246e7e5 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_schema_change2", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String catalog_name = "iceberg_schema_change2" + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """ use test_db;""" + + qt_parquet_1 """ select * from sc_drop_add_parquet order by id; """ + qt_parquet_2 """ select * from sc_drop_add_parquet where age is NULL order by id; """ + qt_parquet_3 """ select * from sc_drop_add_parquet where age is not NULL order by id; """ + qt_parquet_4 """ select * from sc_drop_add_parquet where age > 28 order by id; """ + qt_parquet_5 """ select * from sc_drop_add_parquet where age >= 28 order by id; """ + qt_parquet_6 """ select id, name from sc_drop_add_parquet where age >= 28 order by id; """ + qt_parquet_7 """ select id, age from sc_drop_add_parquet where name="Eve" order by id; """ + + + + qt_orc_1 """ select * from sc_drop_add_orc order by id; """ + qt_orc_2 """ select * from sc_drop_add_orc where age is NULL order by id; """ + qt_orc_3 """ select * from sc_drop_add_orc where age is not NULL order by id; """ + qt_orc_4 """ select * from sc_drop_add_orc where age > 28 order by id; """ + qt_orc_5 """ select * from sc_drop_add_orc where age >= 28 order by id; """ + qt_orc_6 """ select id, name from sc_drop_add_orc where age >= 28 order by id; """ + qt_orc_7 """ select id, age from sc_drop_add_orc where name="Eve" order by id; """ + +} \ No newline at end of file