Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 9 additions & 25 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,13 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
size_t max_buf_size) {
if (field->type.type == TYPE_ARRAY) {
std::unique_ptr<ParquetColumnReader> element_reader;
if (field->children[0].type.type == TYPE_MAP ||
field->children[0].type.type == TYPE_STRUCT) {
return Status::InternalError(
"Array does not support nested map/struct type in column {}", field->name);
}
RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
element_reader, max_buf_size));
element_reader->set_nested_column();
ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz, io_ctx);
auto array_reader = ArrayColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
reader.reset(array_reader);
reader.reset(array_reader.release());
} else if (field->type.type == TYPE_MAP) {
auto key_type = field->children[0].children[0].type.type;
auto value_type = field->children[0].children[1].type.type;
if (key_type == TYPE_ARRAY || key_type == TYPE_MAP || key_type == TYPE_STRUCT ||
value_type == TYPE_ARRAY || value_type == TYPE_MAP || value_type == TYPE_STRUCT) {
return Status::InternalError("Map does not support nested complex type in column {}",
field->name);
}
std::unique_ptr<ParquetColumnReader> key_reader;
std::unique_ptr<ParquetColumnReader> value_reader;
RETURN_IF_ERROR(create(file, &field->children[0].children[0], row_group, row_ranges, ctz,
Expand All @@ -150,31 +138,27 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
io_ctx, value_reader, max_buf_size));
key_reader->set_nested_column();
value_reader->set_nested_column();
MapColumnReader* map_reader = new MapColumnReader(row_ranges, ctz, io_ctx);
auto map_reader = MapColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field));
reader.reset(map_reader);
reader.reset(map_reader.release());
} else if (field->type.type == TYPE_STRUCT) {
std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
child_readers.reserve(field->children.size());
for (int i = 0; i < field->children.size(); ++i) {
auto child_type = field->children[i].type.type;
if (child_type == TYPE_ARRAY || child_type == TYPE_MAP || child_type == TYPE_STRUCT) {
return Status::InternalError(
"Struct does not support nested complex type in column {}", field->name);
}
std::unique_ptr<ParquetColumnReader> child_reader;
RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, io_ctx,
child_reader, max_buf_size));
child_reader->set_nested_column();
child_readers.emplace_back(std::move(child_reader));
}
StructColumnReader* struct_reader = new StructColumnReader(row_ranges, ctz, io_ctx);
auto struct_reader = StructColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
reader.reset(struct_reader);
reader.reset(struct_reader.release());
} else {
const tparquet::ColumnChunk& chunk = row_group.columns[field->physical_column_index];
ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, chunk, ctz, io_ctx);
auto scalar_reader = ScalarColumnReader::create_unique(row_ranges, chunk, ctz, io_ctx);
RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size));
reader.reset(scalar_reader);
reader.reset(scalar_reader.release());
}
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class ParquetColumnReader {
};

class ScalarColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(ScalarColumnReader)
public:
ScalarColumnReader(const std::vector<RowRange>& row_ranges,
const tparquet::ColumnChunk& chunk_meta, cctz::time_zone* ctz,
Expand Down Expand Up @@ -195,6 +196,7 @@ class ScalarColumnReader : public ParquetColumnReader {
};

class ArrayColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(ArrayColumnReader)
public:
ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx)
Expand All @@ -218,6 +220,7 @@ class ArrayColumnReader : public ParquetColumnReader {
};

class MapColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(MapColumnReader)
public:
MapColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx)
Expand Down Expand Up @@ -252,6 +255,7 @@ class MapColumnReader : public ParquetColumnReader {
};

class StructColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(StructColumnReader)
public:
StructColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx)
Expand Down