Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/vec/data_types/data_type_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class DataTypeStruct final : public IDataType {

const DataTypePtr& get_element(size_t i) const { return elems[i]; }
const DataTypes& get_elements() const { return elems; }
const String& get_element_name(size_t i) const { return names[i]; }
const Strings& get_element_names() const { return names; }

size_t get_position_by_name(const String& name) const;
Expand Down
59 changes: 45 additions & 14 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field));
reader.reset(map_reader.release());
} else if (field->type.type == TYPE_STRUCT) {
std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> child_readers;
child_readers.reserve(field->children.size());
for (int i = 0; i < field->children.size(); ++i) {
std::unique_ptr<ParquetColumnReader> child_reader;
RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, io_ctx,
child_reader, max_buf_size));
child_reader->set_nested_column();
child_readers.emplace_back(std::move(child_reader));
child_readers[field->children[i].name] = std::move(child_reader);
}
auto struct_reader = StructColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
Expand Down Expand Up @@ -701,8 +701,9 @@ Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& t
return Status::OK();
}

Status StructColumnReader::init(std::vector<std::unique_ptr<ParquetColumnReader>>&& child_readers,
FieldSchema* field) {
Status StructColumnReader::init(
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers,
FieldSchema* field) {
_field_schema = field;
_child_readers = std::move(child_readers);
return Status::OK();
Expand All @@ -728,19 +729,33 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
}

auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
if (_child_readers.size() != doris_struct.tuple_size()) {
return Status::InternalError("Wrong number of struct fields");
}
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const DataTypeStruct*>(remove_nullable(type).get());
for (int i = 0; i < doris_struct.tuple_size(); ++i) {

bool least_one_reader = false;
std::vector<size_t> missing_column_idxs {};

_read_column_names.clear();

for (size_t i = 0; i < doris_struct.tuple_size(); ++i) {
ColumnPtr& doris_field = doris_struct.get_column_ptr(i);
DataTypePtr& doris_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
auto& doris_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
auto& doris_name = const_cast<String&>(doris_struct_type->get_element_name(i));

// remember the missing column index
if (_child_readers.find(doris_name) == _child_readers.end()) {
missing_column_idxs.push_back(i);
continue;
}

_read_column_names.insert(doris_name);

select_vector.reset();
size_t field_rows = 0;
bool field_eof = false;
if (i == 0) {
RETURN_IF_ERROR(_child_readers[i]->read_column_data(
if (!least_one_reader) {
least_one_reader = true;
RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data(
doris_field, doris_type, select_vector, batch_size, &field_rows, &field_eof,
is_dict_filter));
*read_rows = field_rows;
Expand All @@ -749,7 +764,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
while (field_rows < *read_rows && !field_eof) {
size_t loop_rows = 0;
select_vector.reset();
RETURN_IF_ERROR(_child_readers[i]->read_column_data(
RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data(
doris_field, doris_type, select_vector, *read_rows - field_rows, &loop_rows,
&field_eof, is_dict_filter));
field_rows += loop_rows;
Expand All @@ -759,9 +774,25 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
}
}

if (!least_one_reader) {
// TODO: support read struct which columns are all missing
return Status::Corruption("Not support read struct '{}' which columns are all missing",
_field_schema->name);
}

// fill missing column with null or default value
for (auto idx : missing_column_idxs) {
auto& doris_field = doris_struct.get_column_ptr(idx);
auto& doris_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(idx));
DCHECK(doris_type->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_field)).mutate().get());
nullable_column->insert_null_elements(*read_rows);
}

if (null_map_ptr != nullptr) {
fill_struct_null_map(_field_schema, *null_map_ptr, _child_readers[0]->get_rep_level(),
_child_readers[0]->get_def_level());
fill_struct_null_map(_field_schema, *null_map_ptr, this->get_rep_level(),
this->get_def_level());
}

return Status::OK();
Expand Down
29 changes: 22 additions & 7 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <list>
#include <memory>
#include <ostream>
#include <unordered_map>
#include <vector>

#include "io/fs/buffered_reader.h"
Expand Down Expand Up @@ -262,32 +263,46 @@ class StructColumnReader : public ParquetColumnReader {
: ParquetColumnReader(row_ranges, ctz, io_ctx) {}
~StructColumnReader() override { close(); }

Status init(std::vector<std::unique_ptr<ParquetColumnReader>>&& child_readers,
FieldSchema* field);
Status init(
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers,
FieldSchema* field);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows,
bool* eof, bool is_dict_filter) override;

const std::vector<level_t>& get_rep_level() const override {
return _child_readers[0]->get_rep_level();
if (!_read_column_names.empty()) {
// can't use _child_readers[*_read_column_names.begin()]
// because the operator[] of std::unordered_map is not const :(
return _child_readers.find(*_read_column_names.begin())->second->get_rep_level();
}
return _child_readers.begin()->second->get_rep_level();
}

const std::vector<level_t>& get_def_level() const override {
return _child_readers[0]->get_def_level();
if (!_read_column_names.empty()) {
return _child_readers.find(*_read_column_names.begin())->second->get_def_level();
}
return _child_readers.begin()->second->get_def_level();
}

Statistics statistics() override {
Statistics st;
for (const auto& reader : _child_readers) {
Statistics cst = reader->statistics();
st.merge(cst);
// make sure the field is read
if (_read_column_names.find(reader.first) != _read_column_names.end()) {
Statistics cst = reader.second->statistics();
st.merge(cst);
}
}
return st;
}

void close() override {}

private:
std::vector<std::unique_ptr<ParquetColumnReader>> _child_readers;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers;
std::set<std::string> _read_column_names;
};

}; // namespace doris::vectorized