From 852cc9d0ccb34cae0e13ffea1ef4354e25d940c2 Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Tue, 14 Nov 2023 08:30:42 +0800 Subject: [PATCH] [fix](parquet) compressed_page_size has the same meaning in page v1 and v2 (#26783) 1. Parquet with page v2 is parsed error when using other codec except snappy. Because `compressed_page_size` has the same meaning in page v1 and v2, it always contains the bytes of definition level, repetition level and compressed data. 2. Add regression test for `fix_length_byte_array` stored decimal type, and dictionary encoded date/datetime type. --- .../parquet/vparquet_column_chunk_reader.cpp | 8 ++++---- .../format/parquet/vparquet_page_reader.cpp | 17 ++--------------- .../data/external_table_p2/tvf/test_tvf_p2.out | 8 ++++++++ .../external_table_p2/tvf/test_tvf_p2.groovy | 7 +++++++ 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 1b8d428140ec47..aca5ae96e81513 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -122,14 +122,15 @@ Status ColumnChunkReader::load_page_data() { return Status::Corruption("Should parse page header"); } const auto& header = *_page_reader->get_page_header(); - // int32_t compressed_size = header.compressed_page_size; int32_t uncompressed_size = header.uncompressed_page_size; if (_block_compress_codec != nullptr) { Slice compressed_data; RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); if (header.__isset.data_page_header_v2) { - tparquet::DataPageHeaderV2 header_v2 = header.data_page_header_v2; + const tparquet::DataPageHeaderV2& header_v2 = header.data_page_header_v2; + // uncompressed_size = rl + dl + uncompressed_data_size + // compressed_size = rl + dl + compressed_data_size uncompressed_size -= header_v2.repetition_levels_byte_length + header_v2.definition_levels_byte_length; _get_uncompressed_levels(header_v2, compressed_data); @@ -150,8 +151,7 @@ Status ColumnChunkReader::load_page_data() { } else { RETURN_IF_ERROR(_page_reader->get_page_data(_page_data)); if (header.__isset.data_page_header_v2) { - tparquet::DataPageHeaderV2 header_v2 = header.data_page_header_v2; - _get_uncompressed_levels(header_v2, _page_data); + _get_uncompressed_levels(header.data_page_header_v2, _page_data); } } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index c698a07b42ade9..47be7d8bcc9f73 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -81,14 +81,7 @@ Status PageReader::next_page_header() { } _offset += real_header_size; - if (_cur_page_header.__isset.data_page_header_v2) { - auto& page_v2 = _cur_page_header.data_page_header_v2; - _next_header_offset = _offset + _cur_page_header.compressed_page_size + - page_v2.repetition_levels_byte_length + - page_v2.definition_levels_byte_length; - } else { - _next_header_offset = _offset + _cur_page_header.compressed_page_size; - } + _next_header_offset = _offset + _cur_page_header.compressed_page_size; _state = HEADER_PARSED; return Status::OK(); } @@ -106,13 +99,7 @@ Status PageReader::get_page_data(Slice& slice) { if (UNLIKELY(_state != HEADER_PARSED)) { return Status::IOError("Should generate page header first to load current page data"); } - if (_cur_page_header.__isset.data_page_header_v2) { - auto& page_v2 = _cur_page_header.data_page_header_v2; - slice.size = _cur_page_header.compressed_page_size + page_v2.repetition_levels_byte_length + - page_v2.definition_levels_byte_length; - } else { - slice.size = _cur_page_header.compressed_page_size; - } + slice.size = _cur_page_header.compressed_page_size; RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx)); _offset += slice.size; _state = INITIALIZED; diff --git a/regression-test/data/external_table_p2/tvf/test_tvf_p2.out b/regression-test/data/external_table_p2/tvf/test_tvf_p2.out index 86f3f43f2d0235..6a44b7322dcd07 100644 --- a/regression-test/data/external_table_p2/tvf/test_tvf_p2.out +++ b/regression-test/data/external_table_p2/tvf/test_tvf_p2.out @@ -42,6 +42,14 @@ -- !row_cross_pages -- 25001 25001 25001 +-- !fix_byte_array -- +\N 64.1234 128.123456 \N 64.1234 128.123456 2023-01-01 2023-01-01 2023-01-01T20:00:00.123456 2023-01-01 2023-01-01 2023-01-01T20:00:00.123456 +32.123 \N 128.789012 32.123 \N 128.789012 2023-02-15 2023-02-15 2023-02-15T23:30:45.123456 2023-02-15 2023-02-15 2023-02-15T23:30:45.123456 +32.456 64.5678 128.345678 32.456 64.5678 128.345678 2023-03-30 2023-03-30 \N 2023-03-30 2023-03-30 \N +32.789 64.9012 \N 32.789 64.9012 \N \N \N 2023-03-31T02:45:30.123456 \N \N 2023-03-31T02:45:30.123456 +32.024 64.0000 128.901468 32.024 64.0000 128.901468 2023-07-07 2023-07-07 2021-07-07T19:15:31.123456 2023-07-07 2023-07-07 2021-07-07T19:15:31.123456 +32.689 64.2580 128.745382 32.689 64.2580 128.745382 2023-11-11 2023-11-11 2022-11-11T16:35:37.123456 2023-11-11 2023-11-11 2022-11-11T16:35:37.123456 + -- !viewfs -- 25001 25001 25001 diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy index 08776efd8ecf60..6564d074097f74 100644 --- a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy @@ -53,6 +53,13 @@ suite("test_tvf_p2", "p2") { "format" = "parquet"); """ + // test for page v2 & fix_length_byte_array stored decimal + qt_fix_byte_array """select * + from hdfs( + "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/fix_byte_array.snappy.parquet", + "format" = "parquet"); + """ + // viewfs qt_viewfs """select count(id), count(m1), count(m2) from hdfs(