From 59c896d3d4e5c9930980095994eac4f571518ee3 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 17 Oct 2024 13:30:56 -0300 Subject: [PATCH 1/7] readPageV2 --- .../Impl/Parquet/ParquetLeafColReader.cpp | 140 ++++++++++++------ .../Impl/Parquet/ParquetLeafColReader.h | 4 + 2 files changed, 101 insertions(+), 43 deletions(-) diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp index 9e1cae9bb657..4d2ac7fb2b28 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp @@ -409,46 +409,13 @@ void ParquetLeafColReader::readPage() } template -void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) +void ParquetLeafColReader::initDataReader( + parquet::Encoding::type enconding_type, + const uint8_t * buffer, + std::size_t max_size, + std::unique_ptr && def_level_reader) { - static parquet::LevelDecoder repetition_level_decoder; - - cur_page_values = page.num_values(); - - // refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc - if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0) - { - throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding()); - } - const auto * buffer = page.data(); - auto max_size = page.size(); - - if (col_descriptor.max_repetition_level() > 0) - { - auto rep_levels_bytes = repetition_level_decoder.SetData( - page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, max_size); - buffer += rep_levels_bytes; - max_size -= rep_levels_bytes; - } - - assert(col_descriptor.max_definition_level() >= 0); - std::unique_ptr def_level_reader; - if (col_descriptor.max_definition_level() > 0) - { - auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1); - auto num_bytes = ::arrow::util::SafeLoadAs(buffer); - auto bit_reader = std::make_unique(buffer + 4, num_bytes); - num_bytes += 4; - buffer += num_bytes; - max_size -= num_bytes; - def_level_reader = std::make_unique(std::move(bit_reader), bit_width); - } - else - { - def_level_reader = std::make_unique(page.num_values()); - } - - switch (page.encoding()) + switch (enconding_type) { case parquet::Encoding::PLAIN: { @@ -489,17 +456,104 @@ void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) case parquet::Encoding::DELTA_BINARY_PACKED: case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: case parquet::Encoding::DELTA_BYTE_ARRAY: - throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.encoding()); + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", enconding_type); default: - throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", page.encoding()); + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", enconding_type); } } template -void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & /*page*/) +void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "read page V2 is not implemented yet"); + static parquet::LevelDecoder repetition_level_decoder; + + cur_page_values = page.num_values(); + + // refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc + if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding()); + } + const auto * buffer = page.data(); + auto max_size = page.size(); + + if (col_descriptor.max_repetition_level() > 0) + { + auto rep_levels_bytes = repetition_level_decoder.SetData( + page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, max_size); + buffer += rep_levels_bytes; + max_size -= rep_levels_bytes; + } + + assert(col_descriptor.max_definition_level() >= 0); + std::unique_ptr def_level_reader; + if (col_descriptor.max_definition_level() > 0) + { + auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1); + auto num_bytes = ::arrow::util::SafeLoadAs(buffer); + auto bit_reader = std::make_unique(buffer + 4, num_bytes); + num_bytes += 4; + buffer += num_bytes; + max_size -= num_bytes; + def_level_reader = std::make_unique(std::move(bit_reader), bit_width); + } + else + { + def_level_reader = std::make_unique(page.num_values()); + } + + initDataReader(page.encoding(), buffer, max_size, std::move(def_level_reader)); +} + +template +void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & page) +{ + static parquet::LevelDecoder repetition_level_decoder; + + cur_page_values = page.num_values(); + + const auto * buffer = page.data(); + + const int64_t total_levels_length = + static_cast(page.repetition_levels_byte_length()) + + page.definition_levels_byte_length(); + + if (total_levels_length > page.size()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Data page too small for levels (corrupt header?)"); + } + + if (col_descriptor.max_repetition_level() > 0) + { + repetition_level_decoder.SetDataV2( + page.repetition_levels_byte_length(), + col_descriptor.max_repetition_level(), + cur_page_values, + buffer); + } + + // ARROW-17453: Even if max_rep_level_ is 0, there may still be + // repetition level bytes written and/or reported in the header by + // some writers (e.g. Athena) + buffer += page.repetition_levels_byte_length(); + + assert(col_descriptor.max_definition_level() >= 0); + std::unique_ptr def_level_reader; + if (col_descriptor.max_definition_level() > 0) + { + auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1); + auto num_bytes = page.definition_levels_byte_length(); + auto bit_reader = std::make_unique(buffer, num_bytes); + def_level_reader = std::make_unique(std::move(bit_reader), bit_width); + } + else + { + def_level_reader = std::make_unique(page.num_values()); + } + + initDataReader(page.encoding(), &buffer[total_levels_length], page.size(), std::move(def_level_reader)); } template diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h index c5b14132f176..e1eb7702def1 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h @@ -54,6 +54,10 @@ class ParquetLeafColReader : public ParquetColumnReader void readPage(); void readPageV1(const parquet::DataPageV1 & page); void readPageV2(const parquet::DataPageV2 & page); + void initDataReader(parquet::Encoding::type enconding_type, + const uint8_t * buffer, + std::size_t max_size, + std::unique_ptr && def_level_reader); std::unique_ptr createDictReader( std::unique_ptr def_level_reader, std::unique_ptr rle_data_reader); From b04a4d8ff07f8e415019023151547edb8a76f8cf Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 17 Oct 2024 14:12:39 -0300 Subject: [PATCH 2/7] add silly tests --- ...51_parquet_page_v2_native_reader.reference | 10 ++++++++ .../03251_parquet_page_v2_native_reader.sh | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference create mode 100755 tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh diff --git a/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference new file mode 100644 index 000000000000..d6305cacea08 --- /dev/null +++ b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference @@ -0,0 +1,10 @@ +abc 2 +abc 3 +abc 4 +\N 5 +abc 2 +abc 2 +abc 3 +abc 4 +\N 5 +abc 2 diff --git a/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh new file mode 100755 index 000000000000..f24a6689e904 --- /dev/null +++ b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}') + +WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}" + +mkdir -p "${WORKING_DIR}" + +DATA_FILE="${CUR_DIR}/data_parquet/datapage_v2.snappy.parquet" + +DATA_FILE_USER_PATH="${WORKING_DIR}/datapage_v2.snappy.parquet" + +cp ${DATA_FILE} ${DATA_FILE_USER_PATH} + +# Not reading all columns because some data types and encodings are not supported by native reader yet +# TODO read all columns once implemented +${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) SETTINGS input_format_parquet_use_native_reader=false;" +${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) SETTINGS input_format_parquet_use_native_reader=true;" From 850aa41aaac16ccdaf963361b3da9894a9818a09 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 17 Oct 2024 15:21:56 -0300 Subject: [PATCH 3/7] update test --- .../0_stateless/03251_parquet_page_v2_native_reader.reference | 2 +- .../0_stateless/03251_parquet_page_v2_native_reader.sh | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference index d6305cacea08..3ae545099d6a 100644 --- a/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference +++ b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference @@ -1,4 +1,5 @@ abc 2 +abc 2 abc 3 abc 4 \N 5 @@ -7,4 +8,3 @@ abc 2 abc 3 abc 4 \N 5 -abc 2 diff --git a/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh index f24a6689e904..217491534e99 100755 --- a/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh +++ b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh @@ -19,5 +19,5 @@ cp ${DATA_FILE} ${DATA_FILE_USER_PATH} # Not reading all columns because some data types and encodings are not supported by native reader yet # TODO read all columns once implemented -${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) SETTINGS input_format_parquet_use_native_reader=false;" -${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) SETTINGS input_format_parquet_use_native_reader=true;" +${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) order by c SETTINGS input_format_parquet_use_native_reader=false;" +${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) order by c SETTINGS input_format_parquet_use_native_reader=true;" From d37c9b49eaa9f70288a07954baf1b04958fba3a0 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 17 Oct 2024 15:59:01 -0300 Subject: [PATCH 4/7] add some comments --- .../Formats/Impl/Parquet/ParquetLeafColReader.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp index 4d2ac7fb2b28..cf24f5f2df7b 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp @@ -506,6 +506,16 @@ void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) initDataReader(page.encoding(), buffer, max_size, std::move(def_level_reader)); } +/* + * As far as I understand, the difference between page v1 and page v2 lies primarily on the below: + * 1. repetition and definition levels are not compressed; + * 2. size of repetition and definition levels is present in the header; + * 3. the encoding is always RLE + * + * Therefore, this method leverages the existing `parquet::LevelDecoder::SetDataV2` method to build the repetition level decoder. + * The data buffer is "offset-ed" by rl bytes length and then dl decoder is built using RLE decoder. Since dl bytes length was present in the header, + * there is no need to read it and apply an offset like in page v1. + * */ template void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & page) { From ecf4e0f961175f07b02da9a0342464ee8f324f35 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 18 Oct 2024 09:31:15 -0300 Subject: [PATCH 5/7] address a few comments --- .../Impl/Parquet/ParquetLeafColReader.cpp | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp index cf24f5f2df7b..b720820717a3 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp @@ -370,6 +370,7 @@ template void ParquetLeafColReader::readPage() { // refer to: ColumnReaderImplBase::ReadNewPage in column_reader.cc + // this is where decompression happens auto cur_page = parquet_page_reader->NextPage(); switch (cur_page->type()) { @@ -476,12 +477,12 @@ void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding()); } const auto * buffer = page.data(); - auto max_size = page.size(); + auto max_size = static_cast(page.size()); if (col_descriptor.max_repetition_level() > 0) { auto rep_levels_bytes = repetition_level_decoder.SetData( - page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, max_size); + page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, static_cast(max_size)); buffer += rep_levels_bytes; max_size -= rep_levels_bytes; } @@ -491,7 +492,24 @@ void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) if (col_descriptor.max_definition_level() > 0) { auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1); + + if (max_size < sizeof(int32_t)) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?"); + } + auto num_bytes = ::arrow::util::SafeLoadAs(buffer); + + if (num_bytes < 0) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Number of bytes for dl is negative, corrupt?"); + } + + if (num_bytes + 4u > max_size) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?"); + } + auto bit_reader = std::make_unique(buffer + 4, num_bytes); num_bytes += 4; buffer += num_bytes; @@ -525,6 +543,12 @@ void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & page) const auto * buffer = page.data(); + if (page.repetition_levels_byte_length() < 0 || page.definition_levels_byte_length() < 0) + { + throw Exception( + ErrorCodes::PARQUET_EXCEPTION, "Either RL or DL is negative, this should not happen. Most likely corrupt file or parsing issue"); + } + const int64_t total_levels_length = static_cast(page.repetition_levels_byte_length()) + page.definition_levels_byte_length(); @@ -563,7 +587,9 @@ void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & page) def_level_reader = std::make_unique(page.num_values()); } - initDataReader(page.encoding(), &buffer[total_levels_length], page.size(), std::move(def_level_reader)); + buffer += page.definition_levels_byte_length(); + + initDataReader(page.encoding(), buffer, page.size() - total_levels_length, std::move(def_level_reader)); } template From bb82ae116d1f7f7674ad9a398f8da1dc632d64be Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 18 Oct 2024 12:05:30 -0300 Subject: [PATCH 6/7] address some more comments --- .../Impl/Parquet/ParquetLeafColReader.cpp | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp index b720820717a3..dfeceac4c3a6 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp @@ -476,15 +476,33 @@ void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) { throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding()); } + const auto * buffer = page.data(); auto max_size = static_cast(page.size()); if (col_descriptor.max_repetition_level() > 0) { - auto rep_levels_bytes = repetition_level_decoder.SetData( - page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, static_cast(max_size)); - buffer += rep_levels_bytes; - max_size -= rep_levels_bytes; + if (max_size < sizeof(int32_t)) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?"); + } + + auto num_bytes = ::arrow::util::SafeLoadAs(buffer); + + if (num_bytes < 0) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Number of bytes for dl is negative, corrupt?"); + } + + if (num_bytes + 4u > max_size) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?"); + } + + // not constructing level reader because we are not using it atm + num_bytes += 4; + buffer += num_bytes; + max_size -= num_bytes; } assert(col_descriptor.max_definition_level() >= 0); @@ -559,15 +577,6 @@ void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & page) ErrorCodes::BAD_ARGUMENTS, "Data page too small for levels (corrupt header?)"); } - if (col_descriptor.max_repetition_level() > 0) - { - repetition_level_decoder.SetDataV2( - page.repetition_levels_byte_length(), - col_descriptor.max_repetition_level(), - cur_page_values, - buffer); - } - // ARROW-17453: Even if max_rep_level_ is 0, there may still be // repetition level bytes written and/or reported in the header by // some writers (e.g. Athena) From 01b508bd8abb6fd9a9b1a7ab3d7db1334607b3da Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 18 Oct 2024 19:48:11 -0300 Subject: [PATCH 7/7] remove unused rep level decoder --- src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp index dfeceac4c3a6..c3785e29aeaa 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp @@ -467,8 +467,6 @@ void ParquetLeafColReader::initDataReader( template void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) { - static parquet::LevelDecoder repetition_level_decoder; - cur_page_values = page.num_values(); // refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc @@ -555,8 +553,6 @@ void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) template void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & page) { - static parquet::LevelDecoder repetition_level_decoder; - cur_page_values = page.num_values(); const auto * buffer = page.data();