diff --git a/be/src/vec/data_types/data_type_struct.h b/be/src/vec/data_types/data_type_struct.h index ad1a42a011da12..3638b0d110a76f 100644 --- a/be/src/vec/data_types/data_type_struct.h +++ b/be/src/vec/data_types/data_type_struct.h @@ -113,6 +113,7 @@ class DataTypeStruct final : public IDataType { const DataTypePtr& get_element(size_t i) const { return elems[i]; } const DataTypes& get_elements() const { return elems; } + const String& get_element_name(size_t i) const { return names[i]; } const Strings& get_element_names() const { return names; } size_t get_position_by_name(const String& name) const; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index f813d14b63bc55..27b377048fb7f2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -142,14 +142,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field)); reader.reset(map_reader.release()); } else if (field->type.type == TYPE_STRUCT) { - std::vector> child_readers; + std::unordered_map> child_readers; child_readers.reserve(field->children.size()); for (int i = 0; i < field->children.size(); ++i) { std::unique_ptr child_reader; RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, io_ctx, child_reader, max_buf_size)); child_reader->set_nested_column(); - child_readers.emplace_back(std::move(child_reader)); + child_readers[field->children[i].name] = std::move(child_reader); } auto struct_reader = StructColumnReader::create_unique(row_ranges, ctz, io_ctx); RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field)); @@ -701,8 +701,9 @@ Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& t return Status::OK(); } -Status StructColumnReader::init(std::vector>&& child_readers, - FieldSchema* field) { +Status StructColumnReader::init( + std::unordered_map>&& child_readers, + FieldSchema* field) { _field_schema = field; _child_readers = std::move(child_readers); return Status::OK(); @@ -728,19 +729,33 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } auto& doris_struct = static_cast(*data_column); - if (_child_readers.size() != doris_struct.tuple_size()) { - return Status::InternalError("Wrong number of struct fields"); - } const DataTypeStruct* doris_struct_type = reinterpret_cast(remove_nullable(type).get()); - for (int i = 0; i < doris_struct.tuple_size(); ++i) { + + bool least_one_reader = false; + std::vector missing_column_idxs {}; + + _read_column_names.clear(); + + for (size_t i = 0; i < doris_struct.tuple_size(); ++i) { ColumnPtr& doris_field = doris_struct.get_column_ptr(i); - DataTypePtr& doris_type = const_cast(doris_struct_type->get_element(i)); + auto& doris_type = const_cast(doris_struct_type->get_element(i)); + auto& doris_name = const_cast(doris_struct_type->get_element_name(i)); + + // remember the missing column index + if (_child_readers.find(doris_name) == _child_readers.end()) { + missing_column_idxs.push_back(i); + continue; + } + + _read_column_names.insert(doris_name); + select_vector.reset(); size_t field_rows = 0; bool field_eof = false; - if (i == 0) { - RETURN_IF_ERROR(_child_readers[i]->read_column_data( + if (!least_one_reader) { + least_one_reader = true; + RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data( doris_field, doris_type, select_vector, batch_size, &field_rows, &field_eof, is_dict_filter)); *read_rows = field_rows; @@ -749,7 +764,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr while (field_rows < *read_rows && !field_eof) { size_t loop_rows = 0; select_vector.reset(); - RETURN_IF_ERROR(_child_readers[i]->read_column_data( + RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data( doris_field, doris_type, select_vector, *read_rows - field_rows, &loop_rows, &field_eof, is_dict_filter)); field_rows += loop_rows; @@ -759,9 +774,25 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } } + if (!least_one_reader) { + // TODO: support read struct which columns are all missing + return Status::Corruption("Not support read struct '{}' which columns are all missing", + _field_schema->name); + } + + // fill missing column with null or default value + for (auto idx : missing_column_idxs) { + auto& doris_field = doris_struct.get_column_ptr(idx); + auto& doris_type = const_cast(doris_struct_type->get_element(idx)); + DCHECK(doris_type->is_nullable()); + auto* nullable_column = reinterpret_cast( + (*std::move(doris_field)).mutate().get()); + nullable_column->insert_null_elements(*read_rows); + } + if (null_map_ptr != nullptr) { - fill_struct_null_map(_field_schema, *null_map_ptr, _child_readers[0]->get_rep_level(), - _child_readers[0]->get_def_level()); + fill_struct_null_map(_field_schema, *null_map_ptr, this->get_rep_level(), + this->get_def_level()); } return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index d15d6d5efa1509..249e2d94878e36 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "io/fs/buffered_reader.h" @@ -262,24 +263,37 @@ class StructColumnReader : public ParquetColumnReader { : ParquetColumnReader(row_ranges, ctz, io_ctx) {} ~StructColumnReader() override { close(); } - Status init(std::vector>&& child_readers, - FieldSchema* field); + Status init( + std::unordered_map>&& child_readers, + FieldSchema* field); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter) override; const std::vector& get_rep_level() const override { - return _child_readers[0]->get_rep_level(); + if (!_read_column_names.empty()) { + // can't use _child_readers[*_read_column_names.begin()] + // because the operator[] of std::unordered_map is not const :( + return _child_readers.find(*_read_column_names.begin())->second->get_rep_level(); + } + return _child_readers.begin()->second->get_rep_level(); } + const std::vector& get_def_level() const override { - return _child_readers[0]->get_def_level(); + if (!_read_column_names.empty()) { + return _child_readers.find(*_read_column_names.begin())->second->get_def_level(); + } + return _child_readers.begin()->second->get_def_level(); } Statistics statistics() override { Statistics st; for (const auto& reader : _child_readers) { - Statistics cst = reader->statistics(); - st.merge(cst); + // make sure the field is read + if (_read_column_names.find(reader.first) != _read_column_names.end()) { + Statistics cst = reader.second->statistics(); + st.merge(cst); + } } return st; } @@ -287,7 +301,8 @@ class StructColumnReader : public ParquetColumnReader { void close() override {} private: - std::vector> _child_readers; + std::unordered_map> _child_readers; + std::set _read_column_names; }; }; // namespace doris::vectorized