From bf65aab4d912ce7377cf10f28976a2e5477fe899 Mon Sep 17 00:00:00 2001 From: gaoxin Date: Tue, 20 Feb 2024 10:19:40 +0800 Subject: [PATCH] [fix](parquet) return error if schema changed in complex types --- .../format/parquet/vparquet_column_reader.cpp | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 059375ef006245..cf9753f46eb6f0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -414,7 +414,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType } RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter)); if (ancestor_nulls != 0) { - _chunk_reader->skip_values(ancestor_nulls, false); + RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false)); } if (!align_rows) { @@ -582,6 +582,9 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& } data_column = doris_column->assume_mutable(); } + if (remove_nullable(type)->get_type_id() != TypeIndex::Array) { + return Status::Corruption("Wrong data type for column '{}'", _field_schema->name); + } ColumnPtr& element_column = static_cast(*data_column).get_data_ptr(); DataTypePtr& element_type = const_cast( @@ -628,6 +631,9 @@ Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& t } data_column = doris_column->assume_mutable(); } + if (remove_nullable(type)->get_type_id() != TypeIndex::Map) { + return Status::Corruption("Wrong data type for column '{}'", _field_schema->name); + } auto& map = static_cast(*data_column); DataTypePtr& key_type = const_cast( @@ -691,6 +697,9 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } data_column = doris_column->assume_mutable(); } + if (remove_nullable(type)->get_type_id() != TypeIndex::Struct) { + return Status::Corruption("Wrong data type for column '{}'", _field_schema->name); + } auto& doris_struct = static_cast(*data_column); if (_child_readers.size() != doris_struct.tuple_size()) { @@ -705,17 +714,18 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr size_t field_rows = 0; bool field_eof = false; if (i == 0) { - _child_readers[i]->read_column_data(doris_field, doris_type, select_vector, batch_size, - &field_rows, &field_eof, is_dict_filter); + RETURN_IF_ERROR(_child_readers[i]->read_column_data( + doris_field, doris_type, select_vector, batch_size, &field_rows, &field_eof, + is_dict_filter)); *read_rows = field_rows; *eof = field_eof; } else { while (field_rows < *read_rows && !field_eof) { size_t loop_rows = 0; select_vector.reset(); - _child_readers[i]->read_column_data(doris_field, doris_type, select_vector, - *read_rows - field_rows, &loop_rows, &field_eof, - is_dict_filter); + RETURN_IF_ERROR(_child_readers[i]->read_column_data( + doris_field, doris_type, select_vector, *read_rows - field_rows, &loop_rows, + &field_eof, is_dict_filter)); field_rows += loop_rows; } DCHECK_EQ(*read_rows, field_rows);