diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index f9de386e595467..8e4cdbd311a2ca 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -115,6 +115,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* return Status::OK(); } + RETURN_IF_ERROR(_expand_block_if_need(block)); // To support iceberg schema evolution. We change the column name in block to // make it match with the column name in parquet file before reading data. and @@ -130,7 +131,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* block->initialize_index_by_name(); } - auto res = _file_format_reader->get_next_block(block, read_rows, eof); + RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); // Set the name back to table column name before return this block. if (_has_schema_change) { for (int i = 0; i < block->columns(); i++) { @@ -147,7 +148,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block)); *read_rows = block->rows(); } - return res; + return _shrink_block_if_need(block); } Status IcebergTableReader::set_fill_columns( @@ -253,6 +254,21 @@ Status IcebergTableReader::_equality_delete_base( } } } + for (int i = 0; i < equality_delete_col_names.size(); ++i) { + const std::string& delete_col = equality_delete_col_names[i]; + if (std::find(_all_required_col_names.begin(), _all_required_col_names.end(), delete_col) == + _all_required_col_names.end()) { + _expand_col_names.emplace_back(delete_col); + DataTypePtr data_type = DataTypeFactory::instance().create_data_type( + equality_delete_col_types[i], true); + MutableColumnPtr data_column = data_type->create_column(); + _expand_columns.emplace_back( + ColumnWithTypeAndName(std::move(data_column), data_type, delete_col)); + } + } + for (const std::string& delete_col : _expand_col_names) { + _all_required_col_names.emplace_back(delete_col); + } _equality_delete_impl = EqualityDeleteBase::get_delete_impl(&_equality_delete_block); return _equality_delete_impl->init(_profile); } @@ -269,6 +285,24 @@ void IcebergTableReader::_generate_equality_delete_block( } } +Status IcebergTableReader::_expand_block_if_need(Block* block) { + for (auto& col : _expand_columns) { + col.column->assume_mutable()->clear(); + if (block->try_get_by_name(col.name)) { + return Status::InternalError("Wrong expand column '{}'", col.name); + } + block->insert(col); + } + return Status::OK(); +} + +Status IcebergTableReader::_shrink_block_if_need(Block* block) { + for (const std::string& expand_col : _expand_col_names) { + block->erase(expand_col); + } + return Status::OK(); +} + Status IcebergTableReader::_position_delete_base( const std::vector& delete_files) { std::string data_file_path = _range.path; @@ -534,6 +568,7 @@ Status IcebergParquetReader::init_reader( _gen_new_colname_to_value_range(); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); parquet_reader->iceberg_sanitize(_all_required_col_names); + RETURN_IF_ERROR(init_row_filters(_range)); return parquet_reader->init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, @@ -606,6 +641,7 @@ Status IcebergOrcReader::init_reader( _gen_file_col_names(); _gen_new_colname_to_value_range(); orc_reader->set_table_col_to_file_col(_table_col_to_file_col); + RETURN_IF_ERROR(init_row_filters(_range)); return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 81c5613d6817ea..cda50015911eca 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -137,6 +137,10 @@ class IcebergTableReader : public TableFormatReader { void _generate_equality_delete_block( Block* block, const std::vector& equality_delete_col_names, const std::vector& equality_delete_col_types); + // Equality delete should read the primary columns. Add the missing columns + Status _expand_block_if_need(Block* block); + // Remove the added delete columns + Status _shrink_block_if_need(Block* block); RuntimeProfile* _profile; RuntimeState* _state; @@ -161,6 +165,9 @@ class IcebergTableReader : public TableFormatReader { std::vector _all_required_col_names; // col names in table but not in parquet,orc file std::vector _not_in_file_col_names; + // equality delete should read the primary columns + std::vector _expand_col_names; + std::vector _expand_columns; io::IOContext* _io_ctx; bool _has_schema_change = false; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 32828eaa796c30..03a79c0a18b359 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -838,8 +838,6 @@ Status VFileScanner::_get_next_reader() { _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); - - RETURN_IF_ERROR(iceberg_reader->init_row_filters(range)); _cur_reader = std::move(iceberg_reader); } else { std::vector place_holder; @@ -891,8 +889,6 @@ Status VFileScanner::_get_next_reader() { _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); - - RETURN_IF_ERROR(iceberg_reader->init_row_filters(range)); _cur_reader = std::move(iceberg_reader); } else { init_status = orc_reader->init_reader( diff --git a/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out b/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out index 2f7f599929b34d..44cf1b7b2bf946 100644 --- a/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out +++ b/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out @@ -41,6 +41,18 @@ 19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18 28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are slyly at the accounts. furiously regular in 20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40 FURNITURE g alongside of the special excuses-- fluffily enticing packages wake +-- !count1 -- +19 + +-- !count1_orc -- +19 + +-- !max1 -- +update-comment-2 + +-- !max1_orc -- +update-comment-2 + -- !one_delete_column -- 1 Customer#000000001 IVhzIApeRb ot,c,E 151 update-phone-1 711.56 BUILDING update-comment-1 2 Customer#000000002 XSTf4,NCwDVaWNe6tEgvwfmRchLXak 13 23-768-687-3665 121.65 AUTOMOBILE l accounts. blithely ironic theodolites integrate boldly: caref @@ -83,3 +95,15 @@ 19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18 28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are slyly at the accounts. furiously regular in 20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40 FURNITURE g alongside of the special excuses-- fluffily enticing packages wake +-- !count3 -- +19 + +-- !count3_orc -- +19 + +-- !max3 -- +update-comment-2 + +-- !max3_orc -- +update-comment-2 + diff --git a/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy b/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy index a5a21b3da6a2a1..984181ac2f35fc 100644 --- a/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy +++ b/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy @@ -40,9 +40,17 @@ suite("iceberg_equality_delete", "p2,external,iceberg,external_remote,external_r // one delete column qt_one_delete_column """select * from customer_flink_one order by c_custkey""" qt_one_delete_column_orc """select * from customer_flink_one_orc order by c_custkey""" + qt_count1 """select count(*) from customer_flink_one""" + qt_count1_orc """select count(*) from customer_flink_one_orc""" + qt_max1 """select max(c_comment) from customer_flink_one""" + qt_max1_orc """select max(c_comment) from customer_flink_one_orc""" // three delete columns qt_one_delete_column """select * from customer_flink_three order by c_custkey""" qt_one_delete_column_orc """select * from customer_flink_three_orc order by c_custkey""" + qt_count3 """select count(*) from customer_flink_three""" + qt_count3_orc """select count(*) from customer_flink_three_orc""" + qt_max3 """select max(c_comment) from customer_flink_three""" + qt_max3_orc """select max(c_comment) from customer_flink_three_orc""" sql """drop catalog ${catalog_name}""" }