Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool*

return Status::OK();
}
RETURN_IF_ERROR(_expand_block_if_need(block));

// To support iceberg schema evolution. We change the column name in block to
// make it match with the column name in parquet file before reading data. and
Expand All @@ -130,7 +131,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool*
block->initialize_index_by_name();
}

auto res = _file_format_reader->get_next_block(block, read_rows, eof);
RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));
// Set the name back to table column name before return this block.
if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
Expand All @@ -147,7 +148,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool*
RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block));
*read_rows = block->rows();
}
return res;
return _shrink_block_if_need(block);
}

Status IcebergTableReader::set_fill_columns(
Expand Down Expand Up @@ -253,6 +254,21 @@ Status IcebergTableReader::_equality_delete_base(
}
}
}
for (int i = 0; i < equality_delete_col_names.size(); ++i) {
const std::string& delete_col = equality_delete_col_names[i];
if (std::find(_all_required_col_names.begin(), _all_required_col_names.end(), delete_col) ==
_all_required_col_names.end()) {
_expand_col_names.emplace_back(delete_col);
DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
equality_delete_col_types[i], true);
MutableColumnPtr data_column = data_type->create_column();
_expand_columns.emplace_back(
ColumnWithTypeAndName(std::move(data_column), data_type, delete_col));
}
}
for (const std::string& delete_col : _expand_col_names) {
_all_required_col_names.emplace_back(delete_col);
}
_equality_delete_impl = EqualityDeleteBase::get_delete_impl(&_equality_delete_block);
return _equality_delete_impl->init(_profile);
}
Expand All @@ -269,6 +285,24 @@ void IcebergTableReader::_generate_equality_delete_block(
}
}

Status IcebergTableReader::_expand_block_if_need(Block* block) {
for (auto& col : _expand_columns) {
col.column->assume_mutable()->clear();
if (block->try_get_by_name(col.name)) {
return Status::InternalError("Wrong expand column '{}'", col.name);
}
block->insert(col);
}
return Status::OK();
}

Status IcebergTableReader::_shrink_block_if_need(Block* block) {
for (const std::string& expand_col : _expand_col_names) {
block->erase(expand_col);
}
return Status::OK();
}

Status IcebergTableReader::_position_delete_base(
const std::vector<TIcebergDeleteFileDesc>& delete_files) {
std::string data_file_path = _range.path;
Expand Down Expand Up @@ -534,6 +568,7 @@ Status IcebergParquetReader::init_reader(
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
RETURN_IF_ERROR(init_row_filters(_range));
return parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
Expand Down Expand Up @@ -606,6 +641,7 @@ Status IcebergOrcReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
RETURN_IF_ERROR(init_row_filters(_range));
return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range,
conjuncts, false, tuple_descriptor, row_descriptor,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class IcebergTableReader : public TableFormatReader {
void _generate_equality_delete_block(
Block* block, const std::vector<std::string>& equality_delete_col_names,
const std::vector<TypeDescriptor>& equality_delete_col_types);
// Equality delete should read the primary columns. Add the missing columns
Status _expand_block_if_need(Block* block);
// Remove the added delete columns
Status _shrink_block_if_need(Block* block);

RuntimeProfile* _profile;
RuntimeState* _state;
Expand All @@ -161,6 +165,9 @@ class IcebergTableReader : public TableFormatReader {
std::vector<std::string> _all_required_col_names;
// col names in table but not in parquet,orc file
std::vector<std::string> _not_in_file_col_names;
// equality delete should read the primary columns
std::vector<std::string> _expand_col_names;
std::vector<ColumnWithTypeAndName> _expand_columns;

io::IOContext* _io_ctx;
bool _has_schema_change = false;
Expand Down
4 changes: 0 additions & 4 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,6 @@ Status VFileScanner::_get_next_reader() {
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);

RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader = std::move(iceberg_reader);
} else {
std::vector<std::string> place_holder;
Expand Down Expand Up @@ -891,8 +889,6 @@ Status VFileScanner::_get_next_reader() {
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);

RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader = std::move(iceberg_reader);
} else {
init_status = orc_reader->init_reader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@
19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18 28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are slyly at the accounts. furiously regular in
20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40 FURNITURE g alongside of the special excuses-- fluffily enticing packages wake

-- !count1 --
19

-- !count1_orc --
19

-- !max1 --
update-comment-2

-- !max1_orc --
update-comment-2

-- !one_delete_column --
1 Customer#000000001 IVhzIApeRb ot,c,E 151 update-phone-1 711.56 BUILDING update-comment-1
2 Customer#000000002 XSTf4,NCwDVaWNe6tEgvwfmRchLXak 13 23-768-687-3665 121.65 AUTOMOBILE l accounts. blithely ironic theodolites integrate boldly: caref
Expand Down Expand Up @@ -83,3 +95,15 @@
19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18 28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are slyly at the accounts. furiously regular in
20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40 FURNITURE g alongside of the special excuses-- fluffily enticing packages wake

-- !count3 --
19

-- !count3_orc --
19

-- !max3 --
update-comment-2

-- !max3_orc --
update-comment-2

Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,17 @@ suite("iceberg_equality_delete", "p2,external,iceberg,external_remote,external_r
// one delete column
qt_one_delete_column """select * from customer_flink_one order by c_custkey"""
qt_one_delete_column_orc """select * from customer_flink_one_orc order by c_custkey"""
qt_count1 """select count(*) from customer_flink_one"""
qt_count1_orc """select count(*) from customer_flink_one_orc"""
qt_max1 """select max(c_comment) from customer_flink_one"""
qt_max1_orc """select max(c_comment) from customer_flink_one_orc"""
// three delete columns
qt_one_delete_column """select * from customer_flink_three order by c_custkey"""
qt_one_delete_column_orc """select * from customer_flink_three_orc order by c_custkey"""
qt_count3 """select count(*) from customer_flink_three"""
qt_count3_orc """select count(*) from customer_flink_three_orc"""
qt_max3 """select max(c_comment) from customer_flink_three"""
qt_max3_orc """select max(c_comment) from customer_flink_three_orc"""

sql """drop catalog ${catalog_name}"""
}
Expand Down