From 6cc74e94796fa805c08e217ab2921e56c428b031 Mon Sep 17 00:00:00 2001 From: daidai Date: Fri, 16 Jan 2026 14:49:13 +0800 Subject: [PATCH] [fix](parqeut)Fixed a performance fallback caused by filling RL DL when reading Parquet scalar columns. (#59833) Related PR: #58785 Problem Summary: The performance rollback was introduced in #58785. The reason is that, prior to #58785, reading ordinary columns did not require filling and saving the RL DL. #58785 combined the logic for reading ordinary columns from a struct with the logic for reading ordinary columns, and filled and saved the RL DL to populate the null map information of the struct. This PR re-separates the reading logic. --- .../parquet/vparquet_column_chunk_reader.cpp | 14 +++++----- .../parquet/vparquet_column_chunk_reader.h | 13 +++++++++ .../format/parquet/vparquet_column_reader.cpp | 27 +++++-------------- .../format/parquet/vparquet_column_reader.h | 5 ++++ 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 621d8a2c505021..e7f5f94d32ffae 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -362,7 +362,7 @@ Status ColumnChunkReader::seek_to_nested_row(size_t } else { while (true) { RETURN_IF_ERROR(parse_page_header()); - if (_page_reader->is_header_v2()) { + if (_page_reader->is_header_v2() || !IN_COLLECTION) { if (_page_reader->start_row() <= left_row && left_row < _page_reader->end_row()) { RETURN_IF_ERROR(load_page_data()); // this page contain this row. @@ -448,11 +448,11 @@ Status ColumnChunkReader::load_page_nested_rows( *result_rows = 0; rep_levels.reserve(rep_levels.size() + _remaining_rep_nums); while (_remaining_rep_nums) { - level_t rep_level = _rep_level_decoder.get_next(); + level_t rep_level = _rep_level_get_next(); if (rep_level == 0) { // rep_level 0 indicates start of new row if (*result_rows == max_rows) { // this page contain max_rows, page no end. _current_row += max_rows; - _rep_level_decoder.rewind_one(); + _rep_level_rewind_one(); return Status::OK(); } (*result_rows)++; @@ -463,8 +463,8 @@ Status ColumnChunkReader::load_page_nested_rows( _current_row += *result_rows; auto need_check_cross_page = [&]() -> bool { - return !OFFSET_INDEX && _remaining_rep_nums == 0 && !_page_reader->is_header_v2() && - has_next_page(); + return !OFFSET_INDEX && IN_COLLECTION && _remaining_rep_nums == 0 && + !_page_reader->is_header_v2() && has_next_page(); }; *cross_page = need_check_cross_page(); return Status::OK(); @@ -479,10 +479,10 @@ Status ColumnChunkReader::load_cross_page_nested_ro *cross_page = has_next_page(); while (_remaining_rep_nums) { - level_t rep_level = _rep_level_decoder.get_next(); + level_t rep_level = _rep_level_get_next(); if (rep_level == 0) { // rep_level 0 indicates start of new row *cross_page = false; - _rep_level_decoder.rewind_one(); + _rep_level_rewind_one(); break; } _remaining_rep_nums--; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 1270e5e37fcd1e..9e77a3139f60fb 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -206,6 +206,19 @@ class ColumnChunkReader { void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, Slice& page_data); Status _skip_nested_rows_in_page(size_t num_rows); + level_t _rep_level_get_next() { + if constexpr (IN_COLLECTION) { + return _rep_level_decoder.get_next(); + } + return 0; + } + + void _rep_level_rewind_one() { + if constexpr (IN_COLLECTION) { + _rep_level_decoder.rewind_one(); + } + } + ColumnChunkReaderState _state = NOT_INIT; FieldSchema* _field_schema = nullptr; const level_t _max_rep_level; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 00bdc51844e1e6..0364fe496b035a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -119,6 +119,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, element_reader, max_buf_size, col_offsets, true, column_ids, filter_column_ids)); auto array_reader = ArrayColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx); + element_reader->set_column_in_nested(); RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field)); array_reader->_filter_column_ids = filter_column_ids; reader.reset(array_reader.release()); @@ -150,6 +151,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, } auto map_reader = MapColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx); + key_reader->set_column_in_nested(); + value_reader->set_column_in_nested(); RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field)); map_reader->_filter_column_ids = filter_column_ids; reader.reset(map_reader.release()); @@ -175,6 +178,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, skip_reader->_filter_column_ids = filter_column_ids; child_readers[child.name] = std::move(skip_reader); } + child_readers[child.name]->set_column_in_nested(); } // If all children are SkipReadingReader, force the first child to call create if (non_skip_reader_idx == -1) { @@ -182,6 +186,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, child_reader, max_buf_size, col_offsets, in_collection, column_ids, filter_column_ids)); + child_reader->set_column_in_nested(); child_readers[field->children[0].name] = std::move(child_reader); } auto struct_reader = StructColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx); @@ -195,7 +200,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, : nullptr; const tparquet::ColumnChunk& chunk = row_group.columns[physical_index]; - if (in_collection) { if (offset_index == nullptr) { auto scalar_reader = ScalarColumnReader::create_unique( @@ -345,9 +349,6 @@ Status ScalarColumnReader::_read_values(size_t num_ return Status::InternalError("Failed to decode definition level."); } - for (int i = 0; i < loop_read; i++) { - _def_levels.emplace_back(def_level); - } bool is_null = def_level < _field_schema->definition_level; if (!(prev_is_null ^ is_null)) { null_map.emplace_back(0); @@ -362,14 +363,11 @@ Status ScalarColumnReader::_read_values(size_t num_ prev_is_null = is_null; has_read += loop_read; } - } else { - _def_levels.resize(_def_levels.size() + num_values, 0); } } else { if (_chunk_reader->max_def_level() > 0) { return Status::Corruption("Not nullable column has null values in parquet file"); } - _def_levels.resize(_def_levels.size() + num_values, 0); data_column = doris_column->assume_mutable(); } if (null_map.size() == 0) { @@ -548,7 +546,7 @@ Status ScalarColumnReader::read_column_data( _rep_levels.clear(); *read_rows = 0; - if constexpr (IN_COLLECTION) { + if (_in_nested) { RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, batch_size, read_rows, eof, is_dict_filter)); return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column, @@ -562,7 +560,6 @@ Status ScalarColumnReader::read_column_data( } else { right_row = _chunk_reader->page_end_row(); } - auto before_filter_map_index = _filter_map_index; do { // generate the row ranges that should be read @@ -629,18 +626,6 @@ Status ScalarColumnReader::read_column_data( } } - if (filter_map.has_filter()) { - size_t new_rep_sz = 0; - for (size_t idx = before_filter_map_index; idx < _filter_map_index; idx++) { - if (filter_map.filter_map_data()[idx]) { - _def_levels[new_rep_sz] = _def_levels[idx - before_filter_map_index]; - new_rep_sz++; - } - } - _def_levels.resize(new_rep_sz); - } - _rep_levels.resize(_def_levels.size(), 0); - return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column, is_dict_filter); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 62ae4eb5fbeb96..2a21ddd84cb0b5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -142,6 +142,7 @@ class ParquetColumnReader { virtual void reset_filter_map_index() = 0; FieldSchema* get_field_schema() const { return _field_schema; } + void set_column_in_nested() { _in_nested = true; } protected: void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const; @@ -156,6 +157,10 @@ class ParquetColumnReader { size_t _filter_map_index = 0; std::set _filter_column_ids; + + // _in_nested: column in struct/map/array + // IN_COLLECTION : column in map/array + bool _in_nested = false; }; template