Skip to content
82 changes: 46 additions & 36 deletions be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ namespace doris::vectorized {

ColumnChunkReader::ColumnChunkReader(io::BufferedStreamReader* reader,
tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema,
const tparquet::OffsetIndex* offset_index,
cctz::time_zone* ctz, io::IOContext* io_ctx)
: _field_schema(field_schema),
_max_rep_level(field_schema->repetition_level),
_max_def_level(field_schema->definition_level),
_stream_reader(reader),
_metadata(column_chunk->meta_data),
_offset_index(offset_index),
// _ctz(ctz),
_io_ctx(io_ctx) {}

Expand All @@ -61,7 +63,9 @@ Status ColumnChunkReader::init() {
? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
size_t chunk_size = _metadata.total_compressed_size;
_page_reader = std::make_unique<PageReader>(_stream_reader, _io_ctx, start_offset, chunk_size);
// create page reader
_page_reader = create_page_reader(_stream_reader, _io_ctx, start_offset, chunk_size,
_metadata.num_values, _offset_index);
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec));
if (_metadata.__isset.dictionary_page_offset) {
Expand All @@ -88,24 +92,27 @@ Status ColumnChunkReader::next_page() {
if (UNLIKELY(_remaining_num_values != 0)) {
return Status::Corruption("Should skip current page");
}

RETURN_IF_ERROR(_page_reader->next_page_header());
if (_page_reader->get_page_header()->type == tparquet::PageType::DICTIONARY_PAGE) {
// the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false,
// so we should parse the directory page in next_page()
RETURN_IF_ERROR(_decode_dict_page());
// parse the real first data page
return next_page();
} else if (_page_reader->get_page_header()->type == tparquet::PageType::DATA_PAGE_V2) {
_remaining_num_values = _page_reader->get_page_header()->data_page_header_v2.num_values;
_chunk_parsed_values += _remaining_num_values;
_state = HEADER_PARSED;
return Status::OK();
} else {
_remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values;
_chunk_parsed_values += _remaining_num_values;
_state = HEADER_PARSED;
return Status::OK();

if (!_dict_checked) {
_dict_checked = true;
const tparquet::PageHeader* header;
RETURN_IF_ERROR(_page_reader->get_page_header(header));
if (header->type == tparquet::PageType::DICTIONARY_PAGE) {
// the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false,
// so we should parse the directory page in next_page()
RETURN_IF_ERROR(_decode_dict_page());
// parse the real first data page
return next_page();
}
}

RETURN_IF_ERROR(_page_reader->get_num_values(_remaining_num_values));
_chunk_parsed_values += _remaining_num_values;
_state = HEADER_PARSED;

return Status::OK();
}

void ColumnChunkReader::_get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2,
Expand All @@ -119,26 +126,28 @@ void ColumnChunkReader::_get_uncompressed_levels(const tparquet::DataPageHeaderV
}

Status ColumnChunkReader::load_page_data() {
// TODO: remove checking HEADER_PARSED or change name
if (UNLIKELY(_state != HEADER_PARSED)) {
return Status::Corruption("Should parse page header");
}
const auto& header = *_page_reader->get_page_header();
int32_t uncompressed_size = header.uncompressed_page_size;
const tparquet::PageHeader* header;
RETURN_IF_ERROR(_page_reader->get_page_header(header));
int32_t uncompressed_size = header->uncompressed_page_size;

if (_block_compress_codec != nullptr) {
Slice compressed_data;
RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
if (header.__isset.data_page_header_v2) {
const tparquet::DataPageHeaderV2& header_v2 = header.data_page_header_v2;
if (header->__isset.data_page_header_v2) {
const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2;
// uncompressed_size = rl + dl + uncompressed_data_size
// compressed_size = rl + dl + compressed_data_size
uncompressed_size -= header_v2.repetition_levels_byte_length +
header_v2.definition_levels_byte_length;
_get_uncompressed_levels(header_v2, compressed_data);
}
bool is_v2_compressed =
header.__isset.data_page_header_v2 && header.data_page_header_v2.is_compressed;
if (header.__isset.data_page_header || is_v2_compressed) {
header->__isset.data_page_header_v2 && header->data_page_header_v2.is_compressed;
if (header->__isset.data_page_header || is_v2_compressed) {
// check decompressed buffer size
_reserve_decompress_buf(uncompressed_size);
_page_data = Slice(_decompress_buf.get(), uncompressed_size);
Expand All @@ -151,36 +160,36 @@ Status ColumnChunkReader::load_page_data() {
}
} else {
RETURN_IF_ERROR(_page_reader->get_page_data(_page_data));
if (header.__isset.data_page_header_v2) {
_get_uncompressed_levels(header.data_page_header_v2, _page_data);
if (header->__isset.data_page_header_v2) {
_get_uncompressed_levels(header->data_page_header_v2, _page_data);
}
}

// Initialize repetition level and definition level. Skip when level = 0, which means required field.
if (_max_rep_level > 0) {
SCOPED_RAW_TIMER(&_statistics.decode_level_time);
if (header.__isset.data_page_header_v2) {
if (header->__isset.data_page_header_v2) {
RETURN_IF_ERROR(_rep_level_decoder.init_v2(_v2_rep_levels, _max_rep_level,
_remaining_num_values));
} else {
RETURN_IF_ERROR(_rep_level_decoder.init(
&_page_data, header.data_page_header.repetition_level_encoding, _max_rep_level,
&_page_data, header->data_page_header.repetition_level_encoding, _max_rep_level,
_remaining_num_values));
}
}
if (_max_def_level > 0) {
SCOPED_RAW_TIMER(&_statistics.decode_level_time);
if (header.__isset.data_page_header_v2) {
if (header->__isset.data_page_header_v2) {
RETURN_IF_ERROR(_def_level_decoder.init_v2(_v2_def_levels, _max_def_level,
_remaining_num_values));
} else {
RETURN_IF_ERROR(_def_level_decoder.init(
&_page_data, header.data_page_header.definition_level_encoding, _max_def_level,
&_page_data, header->data_page_header.definition_level_encoding, _max_def_level,
_remaining_num_values));
}
}
auto encoding = header.__isset.data_page_header_v2 ? header.data_page_header_v2.encoding
: header.data_page_header.encoding;
auto encoding = header->__isset.data_page_header_v2 ? header->data_page_header_v2.encoding
: header->data_page_header.encoding;
// change the deprecated encoding to RLE_DICTIONARY
if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) {
encoding = tparquet::Encoding::RLE_DICTIONARY;
Expand All @@ -207,22 +216,23 @@ Status ColumnChunkReader::load_page_data() {
}

Status ColumnChunkReader::_decode_dict_page() {
const tparquet::PageHeader& header = *_page_reader->get_page_header();
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
const tparquet::PageHeader* header;
RETURN_IF_ERROR(_page_reader->get_page_header(header));
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type);
SCOPED_RAW_TIMER(&_statistics.decode_dict_time);

// Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification.
// Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files.
// refer: https://github.com/apache/parquet-format/blob/master/Encodings.md
tparquet::Encoding::type dict_encoding = header.dictionary_page_header.encoding;
tparquet::Encoding::type dict_encoding = header->dictionary_page_header.encoding;
if (dict_encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
dict_encoding != tparquet::Encoding::PLAIN) {
return Status::InternalError("Unsupported dictionary encoding {}",
tparquet::to_string(dict_encoding));
}

// Prepare dictionary data
int32_t uncompressed_size = header.uncompressed_page_size;
int32_t uncompressed_size = header->uncompressed_page_size;
std::unique_ptr<uint8_t[]> dict_data(new uint8_t[uncompressed_size]);
if (_block_compress_codec != nullptr) {
Slice compressed_data;
Expand All @@ -246,7 +256,7 @@ Status ColumnChunkReader::_decode_dict_page() {
// page_decoder->init(_field_schema, _ctz);
// Set the dictionary data
RETURN_IF_ERROR(page_decoder->set_dict(dict_data, uncompressed_size,
header.dictionary_page_header.num_values));
header->dictionary_page_header.num_values));
_decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] = std::move(page_decoder);

_has_dict = true;
Expand Down
11 changes: 9 additions & 2 deletions be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ using ColumnString = ColumnStr<UInt32>;
* // Or, we can call the chunk_reader.skip_page() to skip current page.
* chunk_reader.load_page_data();
* // Decode values into column or slice.
* // Or, we can call chunk_reader.slip_values(num_values) to skip some values.
* // Or, we can call chunk_reader.skip_values(num_values) to skip some values.
* chunk_reader.decode_values(slice, num_values);
* }
*/
Expand All @@ -84,10 +84,13 @@ class ColumnChunkReader {
int64_t decode_value_time = 0;
int64_t decode_dict_time = 0;
int64_t decode_level_time = 0;
int64_t skip_page_header_num = 0;
int64_t parse_page_header_num = 0;
};

ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk,
FieldSchema* field_schema, cctz::time_zone* ctz, io::IOContext* io_ctx);
FieldSchema* field_schema, const tparquet::OffsetIndex* offset_index,
cctz::time_zone* ctz, io::IOContext* io_ctx);
~ColumnChunkReader() = default;

// Initialize chunk reader, will generate the decoder and codec.
Expand Down Expand Up @@ -170,6 +173,8 @@ class ColumnChunkReader {

Statistics& statistics() {
_statistics.decode_header_time = _page_reader->statistics().decode_header_time;
_statistics.skip_page_header_num = _page_reader->statistics().skip_page_header_num;
_statistics.parse_page_header_num = _page_reader->statistics().parse_page_header_num;
return _statistics;
}

Expand Down Expand Up @@ -204,6 +209,7 @@ class ColumnChunkReader {

io::BufferedStreamReader* _stream_reader = nullptr;
tparquet::ColumnMetaData _metadata;
const tparquet::OffsetIndex* _offset_index;
// cctz::time_zone* _ctz;
io::IOContext* _io_ctx = nullptr;

Expand All @@ -219,6 +225,7 @@ class ColumnChunkReader {
size_t _decompress_buf_size = 0;
Slice _v2_rep_levels;
Slice _v2_def_levels;
bool _dict_checked = false;
bool _has_dict = false;
Decoder* _page_decoder = nullptr;
// Map: encoding -> Decoder
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx,
std::unique_ptr<ParquetColumnReader>& reader,
size_t max_buf_size) {
size_t max_buf_size, const tparquet::OffsetIndex* offset_index) {
if (field->type.type == TYPE_ARRAY) {
std::unique_ptr<ParquetColumnReader> element_reader;
RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
Expand Down Expand Up @@ -144,7 +144,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
reader.reset(struct_reader.release());
} else {
const tparquet::ColumnChunk& chunk = row_group.columns[field->physical_column_index];
auto scalar_reader = ScalarColumnReader::create_unique(row_ranges, chunk, ctz, io_ctx);
auto scalar_reader =
ScalarColumnReader::create_unique(row_ranges, chunk, offset_index, ctz, io_ctx);
RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size));
reader.reset(scalar_reader.release());
}
Expand Down Expand Up @@ -190,7 +191,7 @@ Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, siz
_stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, chunk_start, chunk_len,
prefetch_buffer_size);
_chunk_reader = std::make_unique<ColumnChunkReader>(_stream_reader.get(), &_chunk_meta, field,
_ctz, _io_ctx);
_offset_index, _ctz, _io_ctx);
RETURN_IF_ERROR(_chunk_reader->init());
return Status::OK();
}
Expand Down
22 changes: 17 additions & 5 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ class ParquetColumnReader {
decode_value_time(0),
decode_dict_time(0),
decode_level_time(0),
decode_null_map_time(0) {}
decode_null_map_time(0),
skip_page_header_num(0),
parse_page_header_num(0) {}

Statistics(io::BufferedStreamReader::Statistics& fs, ColumnChunkReader::Statistics& cs,
int64_t null_map_time)
Expand All @@ -79,7 +81,9 @@ class ParquetColumnReader {
decode_value_time(cs.decode_value_time),
decode_dict_time(cs.decode_dict_time),
decode_level_time(cs.decode_level_time),
decode_null_map_time(null_map_time) {}
decode_null_map_time(null_map_time),
skip_page_header_num(cs.skip_page_header_num),
parse_page_header_num(cs.parse_page_header_num) {}

int64_t read_time;
int64_t read_calls;
Expand All @@ -92,6 +96,8 @@ class ParquetColumnReader {
int64_t decode_dict_time;
int64_t decode_level_time;
int64_t decode_null_map_time;
int64_t skip_page_header_num;
int64_t parse_page_header_num;

void merge(Statistics& statistics) {
read_time += statistics.read_time;
Expand All @@ -105,6 +111,8 @@ class ParquetColumnReader {
decode_dict_time += statistics.decode_dict_time;
decode_level_time += statistics.decode_level_time;
decode_null_map_time += statistics.decode_null_map_time;
skip_page_header_num += statistics.skip_page_header_num;
parse_page_header_num += statistics.parse_page_header_num;
}
};

Expand Down Expand Up @@ -134,7 +142,7 @@ class ParquetColumnReader {
const tparquet::RowGroup& row_group,
const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx, std::unique_ptr<ParquetColumnReader>& reader,
size_t max_buf_size);
size_t max_buf_size, const tparquet::OffsetIndex* offset_index = nullptr);
void set_nested_column() { _nested_column = true; }
virtual const std::vector<level_t>& get_rep_level() const = 0;
virtual const std::vector<level_t>& get_def_level() const = 0;
Expand All @@ -160,9 +168,12 @@ class ScalarColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(ScalarColumnReader)
public:
ScalarColumnReader(const std::vector<RowRange>& row_ranges,
const tparquet::ColumnChunk& chunk_meta, cctz::time_zone* ctz,
const tparquet::ColumnChunk& chunk_meta,
const tparquet::OffsetIndex* offset_index, cctz::time_zone* ctz,
io::IOContext* io_ctx)
: ParquetColumnReader(row_ranges, ctz, io_ctx), _chunk_meta(chunk_meta) {}
: ParquetColumnReader(row_ranges, ctz, io_ctx),
_chunk_meta(chunk_meta),
_offset_index(offset_index) {}
~ScalarColumnReader() override { close(); }
Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
Expand All @@ -182,6 +193,7 @@ class ScalarColumnReader : public ParquetColumnReader {

private:
tparquet::ColumnChunk _chunk_meta;
const tparquet::OffsetIndex* _offset_index;
std::unique_ptr<io::BufferedFileStreamReader> _stream_reader;
std::unique_ptr<ColumnChunkReader> _chunk_reader;
std::vector<level_t> _rep_levels;
Expand Down
12 changes: 8 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "schema_desc.h"
#include "util/simd/bits.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
Expand Down Expand Up @@ -124,12 +123,17 @@ Status RowGroupReader::init(
const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20;
const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size());
for (auto& read_col : _read_columns) {
auto field = const_cast<FieldSchema*>(schema.get_column(read_col));
for (const auto& read_col : _read_columns) {
auto* field = const_cast<FieldSchema*>(schema.get_column(read_col));
auto physical_index = field->physical_column_index;
std::unique_ptr<ParquetColumnReader> reader;
// TODO : support rested column types
const tparquet::OffsetIndex* offset_index =
col_offsets.find(physical_index) != col_offsets.end() ? &col_offsets[physical_index]
: nullptr;
RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta,
_read_ranges, _ctz, _io_ctx, reader,
max_buf_size));
max_buf_size, offset_index));
if (reader == nullptr) {
VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
return Status::Corruption("Init row group reader failed");
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

#include "io/fs/file_reader_writer_fwd.h"
#include "vec/columns/column.h"
#include "vec/common/allocator.h"
#include "vec/exec/format/parquet/parquet_common.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vparquet_column_reader.h"
Expand Down
Loading