From 3b6ef6be57fa78eb4259d09a8de9be74a13f2149 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 15 Dec 2022 21:03:39 +0800 Subject: [PATCH 01/12] ARROW-18434: [C++][Parquet] Parquet page index read support --- cpp/src/parquet/file_reader.cc | 18 ++- cpp/src/parquet/file_reader.h | 5 + cpp/src/parquet/page_index.cc | 242 +++++++++++++++++++++++++++++ cpp/src/parquet/page_index.h | 75 +++++++++ cpp/src/parquet/page_index_test.cc | 107 +++++++++++++ 5 files changed, 446 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 520317539b5..2ae67db3ef8 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -40,6 +40,7 @@ #include "parquet/exception.h" #include "parquet/file_writer.h" #include "parquet/metadata.h" +#include "parquet/page_index.h" #include "parquet/platform.h" #include "parquet/properties.h" #include "parquet/schema.h" @@ -302,6 +303,17 @@ class SerializedFile : public ParquetFileReader::Contents { std::shared_ptr metadata() const override { return file_metadata_; } + std::shared_ptr page_index_reader() override { + if (!file_metadata_) { + throw ParquetException("Cannot get PageIndexReader as file metadata is not ready"); + } + if (!page_index_reader_) { + page_index_reader_ = PageIndexReader::Make(source_.get(), file_metadata_, + properties_, file_decryptor_); + } + return page_index_reader_; + } + void set_metadata(std::shared_ptr metadata) { file_metadata_ = std::move(metadata); } @@ -522,7 +534,7 @@ class SerializedFile : public ParquetFileReader::Contents { int64_t source_size_; std::shared_ptr file_metadata_; ReaderProperties properties_; - + std::shared_ptr page_index_reader_; std::shared_ptr file_decryptor_; // \return The true length of the metadata in bytes @@ -784,6 +796,10 @@ std::shared_ptr ParquetFileReader::metadata() const { return contents_->metadata(); } +std::shared_ptr ParquetFileReader::page_index_reader() { + return contents_->page_index_reader(); +} + std::shared_ptr ParquetFileReader::RowGroup(int i) { if (i >= metadata()->num_row_groups()) { std::stringstream ss; diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 4b27575f01d..803f6a38fcb 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -32,6 +32,7 @@ namespace parquet { class ColumnReader; class FileMetaData; +class PageIndexReader; class PageReader; class RowGroupMetaData; @@ -98,6 +99,7 @@ class PARQUET_EXPORT ParquetFileReader { virtual void Close() = 0; virtual std::shared_ptr GetRowGroup(int i) = 0; virtual std::shared_ptr metadata() const = 0; + virtual std::shared_ptr page_index_reader() = 0; }; ParquetFileReader(); @@ -133,6 +135,9 @@ class PARQUET_EXPORT ParquetFileReader { // Returns the file metadata. Only one instance is ever created std::shared_ptr metadata() const; + // Returns the page index reader. Only one instance is ever created + std::shared_ptr page_index_reader(); + /// Pre-buffer the specified column indices in all row groups. /// /// Readers can optionally call this to cache the necessary slices diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 559d3659882..cfc81f4b226 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -18,6 +18,7 @@ #include "parquet/page_index.h" #include "parquet/encoding.h" #include "parquet/exception.h" +#include "parquet/metadata.h" #include "parquet/schema.h" #include "parquet/statistics.h" #include "parquet/thrift_internal.h" @@ -184,8 +185,241 @@ class OffsetIndexImpl : public OffsetIndex { std::vector page_locations_; }; +class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { + public: + RowGroupPageIndexReaderImpl(::arrow::io::RandomAccessFile* input, + std::shared_ptr row_group_metadata, + const ReaderProperties& properties, + int32_t row_group_ordinal, + std::shared_ptr file_decryptor) + : input_(input), + row_group_metadata_(row_group_metadata), + properties_(properties), + file_decryptor_(std::move(file_decryptor)) { + bool has_column_index = false; + bool has_offset_index = false; + PageIndexReader::DeterminePageIndexRangesInRowGroup( + *row_group_metadata, &column_index_base_offset_, &column_index_size_, + &offset_index_base_offset_, &offset_index_size_, &has_column_index, + &has_offset_index); + } + + /// Read column index of a column chunk. + ::arrow::Result> GetColumnIndex(int32_t i) override { + if (i < 0 || i >= row_group_metadata_->num_columns()) { + return ::arrow::Status::IndexError("Invalid column {} to get column index", i); + } + + auto col_chunk = row_group_metadata_->ColumnChunk(i); + + std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); + if (crypto_metadata != nullptr && file_decryptor_ == nullptr) { + ParquetException::NYI("Cannot read encrypted column index yet"); + } + + auto column_index_location = col_chunk->GetColumnIndexLocation(); + if (!column_index_location.has_value()) { + return ::arrow::Status::Invalid("Column index does not exist for column {}", i); + } + + if (column_index_buffer_ == nullptr) { + ARROW_ASSIGN_OR_RAISE( + column_index_buffer_, + input_->ReadAt(column_index_base_offset_, column_index_size_)); + } + + auto buffer = column_index_buffer_.get(); + int64_t buffer_offset = column_index_location->offset - column_index_base_offset_; + uint32_t length = column_index_location->length; + DCHECK_GE(buffer_offset, 0); + DCHECK_LE(buffer_offset + length, column_index_size_); + + auto descr = row_group_metadata_->schema()->Column(i); + std::shared_ptr column_index; + try { + column_index = + ColumnIndex::Make(*descr, buffer->data() + buffer_offset, length, properties_); + } catch (...) { + return ::arrow::Status::SerializationError( + "Cannot deserialize column index for column {}", i); + } + return column_index; + } + + /// Read offset index of a column chunk. + ::arrow::Result> GetOffsetIndex(int32_t i) override { + if (i < 0 || i >= row_group_metadata_->num_columns()) { + return ::arrow::Status::IndexError("Invalid column {} to get offset index", i); + } + + auto col_chunk = row_group_metadata_->ColumnChunk(i); + + std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); + if (crypto_metadata != nullptr && file_decryptor_ == nullptr) { + ParquetException::NYI("Cannot read encrypted offset index yet"); + } + + auto offset_index_location = col_chunk->GetOffsetIndexLocation(); + if (!offset_index_location.has_value()) { + return ::arrow::Status::Invalid("Offset index does not exist for column {}", i); + } + + if (offset_index_buffer_ == nullptr) { + ARROW_ASSIGN_OR_RAISE( + offset_index_buffer_, + input_->ReadAt(offset_index_base_offset_, offset_index_size_)); + } + + auto buffer = offset_index_buffer_.get(); + int64_t buffer_offset = offset_index_location->offset - offset_index_base_offset_; + uint32_t length = offset_index_location->length; + DCHECK_GE(buffer_offset, 0); + DCHECK_LE(buffer_offset + length, offset_index_size_); + + std::shared_ptr offset_index; + try { + offset_index = + OffsetIndex::Make(buffer->data() + buffer_offset, length, properties_); + } catch (...) { + return ::arrow::Status::SerializationError( + "Cannot deserialize offset index for column {}", i); + } + return offset_index; + } + + private: + /// The input stream that can perform random access read. + ::arrow::io::RandomAccessFile* input_; + + /// The row group metadata to get column chunk metadata. + std::shared_ptr row_group_metadata_; + + /// Reader properties used to deserialize thrift object. + const ReaderProperties& properties_; + + /// File-level decryptor. + std::shared_ptr file_decryptor_; + + /// File offsets and sizes of the page Index. + int64_t column_index_base_offset_; + int64_t column_index_size_; + int64_t offset_index_base_offset_; + int64_t offset_index_size_; + + /// Buffer to hold the raw bytes of the page index. + /// Will be set lazily when the corresponding page index is accessed for the 1st time. + std::shared_ptr<::arrow::Buffer> column_index_buffer_; + std::shared_ptr<::arrow::Buffer> offset_index_buffer_; +}; + +class PageIndexReaderImpl : public PageIndexReader { + public: + PageIndexReaderImpl(::arrow::io::RandomAccessFile* input, + std::shared_ptr file_metadata, + const ReaderProperties& properties, + std::shared_ptr file_decryptor) + : input_(input), + file_metadata_(std::move(file_metadata)), + properties_(properties), + file_decryptor_(std::move(file_decryptor)) {} + + ::arrow::Result> RowGroup(int i) override { + if (i < 0 || i >= file_metadata_->num_row_groups()) { + return ::arrow::Status::IndexError("Invalid row group ordinal {}", i); + } + return std::make_shared( + input_, file_metadata_->RowGroup(i), properties_, i, file_decryptor_); + } + + void WillNeed(const std::vector& row_group_indices, bool need_column_index, + bool need_offset_index) override { + std::vector<::arrow::io::ReadRange> read_ranges; + for (int32_t row_group_ordinal : row_group_indices) { + int64_t column_index_offset; + int64_t column_index_size; + int64_t offset_index_offset; + int64_t offset_index_size; + bool has_column_index; + bool has_offset_index; + PageIndexReader::DeterminePageIndexRangesInRowGroup( + *file_metadata_->RowGroup(row_group_ordinal), &column_index_offset, + &column_index_size, &offset_index_offset, &offset_index_size, &has_column_index, + &has_offset_index); + if (need_column_index && has_column_index) { + read_ranges.emplace_back( + ::arrow::io::ReadRange{column_index_offset, column_index_size}); + } + if (need_offset_index && has_offset_index) { + read_ranges.emplace_back( + ::arrow::io::ReadRange{offset_index_offset, offset_index_size}); + } + } + PARQUET_IGNORE_NOT_OK(input_->WillNeed(read_ranges)); + } + + void WillNotNeed(const std::vector& row_group_indices) override { + // No-op for now. + } + + private: + /// The input stream that can perform random read. + ::arrow::io::RandomAccessFile* input_; + + /// The file metadata to get row group metadata. + std::shared_ptr file_metadata_; + + /// Reader properties used to deserialize thrift object. + const ReaderProperties& properties_; + + /// File-level decrypter. + std::shared_ptr file_decryptor_; +}; + } // namespace +void PageIndexReader::DeterminePageIndexRangesInRowGroup( + const RowGroupMetaData& row_group_metadata, int64_t* column_index_start, + int64_t* column_index_size, int64_t* offset_index_start, int64_t* offset_index_size, + bool* has_column_index, bool* has_offset_index) { + int64_t ci_start = std::numeric_limits::max(); + int64_t oi_start = std::numeric_limits::max(); + int64_t ci_end = -1; + int64_t oi_end = -1; + for (int i = 0; i < row_group_metadata.num_columns(); ++i) { + auto col_chunk = row_group_metadata.ColumnChunk(i); + + auto column_index_location = col_chunk->GetColumnIndexLocation(); + if (column_index_location.has_value()) { + ci_start = std::min(ci_start, column_index_location->offset); + ci_end = + std::max(ci_end, column_index_location->offset + column_index_location->length); + } + + auto offset_index_location = col_chunk->GetOffsetIndexLocation(); + if (offset_index_location.has_value()) { + oi_start = std::min(oi_start, offset_index_location->offset); + oi_end = + std::max(oi_end, offset_index_location->offset + offset_index_location->length); + } + } + + if (ci_end != -1) { + *has_column_index = true; + *column_index_start = ci_start; + *column_index_size = ci_end - ci_start; + } else { + *has_column_index = false; + } + + if (oi_end != -1) { + *has_offset_index = true; + *offset_index_start = oi_start; + *offset_index_size = oi_end - oi_start; + } else { + *has_offset_index = false; + } +} + // ---------------------------------------------------------------------- // Public factory functions @@ -231,4 +465,12 @@ std::unique_ptr OffsetIndex::Make(const void* serialized_index, return std::make_unique(offset_index); } +std::shared_ptr PageIndexReader::Make( + ::arrow::io::RandomAccessFile* input, std::shared_ptr file_metadata, + const ReaderProperties& properties, + std::shared_ptr file_decryptor) { + return std::make_shared(input, file_metadata, properties, + std::move(file_decryptor)); +} + } // namespace parquet diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index 13dae40f56c..11b4efd1584 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -24,7 +24,11 @@ namespace parquet { class ColumnDescriptor; +class FileMetaData; +class InternalFileDecryptor; class ReaderProperties; +class RowGroupMetaData; +class RowGroupPageIndexReader; /// \brief ColumnIndex is a proxy around format::ColumnIndex. class PARQUET_EXPORT ColumnIndex { @@ -126,4 +130,75 @@ class PARQUET_EXPORT OffsetIndex { virtual const std::vector& page_locations() const = 0; }; +/// \brief Interface for reading the page index from a Parquet row group. +class PARQUET_EXPORT RowGroupPageIndexReader { + public: + virtual ~RowGroupPageIndexReader() = default; + + /// \brief Read column index of a column chunk. + /// + /// \param[in] i column id of the column chunk. + /// \returns error Result if either column id is invalid or column index does not exist. + virtual ::arrow::Result> GetColumnIndex(int32_t i) = 0; + + /// \brief Read offset index of a column chunk. + /// + /// \param[in] i column id of the column chunk. + /// \returns error Result if either column id is invalid or offset index does not exist. + virtual ::arrow::Result> GetOffsetIndex(int32_t i) = 0; +}; + +/// \brief Interface for reading the page index from a Parquet file. +class PARQUET_EXPORT PageIndexReader { + public: + virtual ~PageIndexReader() = default; + + /// \brief Create a PageIndexReader instance. + static std::shared_ptr Make( + ::arrow::io::RandomAccessFile* input, std::shared_ptr file_metadata, + const ReaderProperties& properties, + std::shared_ptr file_decryptor = NULLPTR); + + /// \brief Get the page index reader of a specific row group. + /// \param[in] i row group ordinal to get page index reader. + /// \returns RowGroupPageIndexReader of the specified row group. + virtual ::arrow::Result> RowGroup(int i) = 0; + + /// \brief Advise the reader which part of page index will be read later. + /// + /// The PageIndexReader implementation can optionally prefetch and cache page index + /// that may be read later. Follow-up read should not fail even if WillNeed() is not + /// called, or the requested page index is out of range from WillNeed() call. + /// + /// \param[in] row_group_indices list of row group ordinal to read page index later. + /// \param[in] need_column_index tell if column index is required later. + /// \param[in] need_offset_index tell if offset index is required later. + virtual void WillNeed(const std::vector& row_group_indices, + bool need_column_index, bool need_offset_index) = 0; + + /// \brief Advise the reader which part of page index will be read later. + /// + /// The PageIndexReader implementation has the opportunity to cancel any prefetch or + /// release resource that are related to these row groups. + /// + /// \param[in] row_group_indices list of row group ordinal that whose page index will + /// not be needed any more. + virtual void WillNotNeed(const std::vector& row_group_indices) = 0; + + /// \brief Determines the column index and offset index ranges for the given row group. + /// \param[in] row_group_metadata row group metadata to get column chunk metadata. + /// \param[out] column_index_start Base start of column index of all column chunks. + /// \param[out] column_index_size Total size of column index of all column chunks. + /// \param[out] offset_index_start Base start of offset index of all column chunks. + /// \param[out] offset_index_size Total size of offset index of all column chunks. + /// \param[out] has_column_index Returns true when at least a partial column index are + /// found. Returns false when there is absolutely no column index for the row group. + /// \param[out] has_offset_index Returns true when at least a partial offset index are + /// found. Returns false when there is absolutely no offsets index for the row group. + static void DeterminePageIndexRangesInRowGroup( + const RowGroupMetaData& row_group_metadata, int64_t* column_index_start, + int64_t* column_index_size, int64_t* offset_index_start, int64_t* offset_index_size, + bool* has_column_index, bool* has_offset_index); +}; + } // namespace parquet diff --git a/cpp/src/parquet/page_index_test.cc b/cpp/src/parquet/page_index_test.cc index 6d1cdc2c97a..3539eadb745 100644 --- a/cpp/src/parquet/page_index_test.cc +++ b/cpp/src/parquet/page_index_test.cc @@ -256,4 +256,111 @@ TEST(PageIndex, ReadColumnIndexWithNullPage) { null_pages, min_values, max_values, has_null_counts, null_counts); } +struct PageIndexRanges { + int64_t column_index_offset; + int64_t column_index_length; + int64_t offset_index_offset; + int64_t offset_index_length; +}; + +using RowGroupRanges = std::vector; + +/// Creates an FileMetaData object w/ single row group based on data in +/// 'row_group_ranges'. It sets the offsets and sizes of the column index and offset index +/// members of the row group. It doesn't set the member if the input value is -1. +std::shared_ptr ConstructFakeMetaData( + const RowGroupRanges& row_group_ranges) { + format::RowGroup row_group; + for (auto& page_index_ranges : row_group_ranges) { + format::ColumnChunk col_chunk; + if (page_index_ranges.column_index_offset != -1) { + col_chunk.__set_column_index_offset(page_index_ranges.column_index_offset); + } + if (page_index_ranges.column_index_length != -1) { + col_chunk.__set_column_index_length( + static_cast(page_index_ranges.column_index_length)); + } + if (page_index_ranges.offset_index_offset != -1) { + col_chunk.__set_offset_index_offset(page_index_ranges.offset_index_offset); + } + if (page_index_ranges.offset_index_length != -1) { + col_chunk.__set_offset_index_length( + static_cast(page_index_ranges.offset_index_length)); + } + row_group.columns.push_back(col_chunk); + } + + format::FileMetaData metadata; + metadata.row_groups.push_back(row_group); + + metadata.schema.emplace_back(); + schema::NodeVector fields; + for (size_t i = 0; i < row_group_ranges.size(); ++i) { + fields.push_back(schema::Int64(std::to_string(i))); + metadata.schema.emplace_back(); + fields.back()->ToParquet(&metadata.schema.back()); + } + schema::GroupNode::Make("schema", Repetition::REPEATED, fields) + ->ToParquet(&metadata.schema.front()); + + auto sink = CreateOutputStream(); + ThriftSerializer{}.Serialize(&metadata, sink.get()); + auto buffer = sink->Finish().MoveValueUnsafe(); + uint32_t len = static_cast(buffer->size()); + return FileMetaData::Make(buffer->data(), &len); +} + +/// Validates that 'DeterminePageIndexRangesInRowGroup()' selects the expected file +/// offsets and sizes or returns false when the row group doesn't have a page index. +void ValidatePageIndexRange(const RowGroupRanges& row_group_ranges, + bool expected_has_page_index, int expected_ci_start, + int expected_ci_size, int expected_oi_start, + int expected_oi_size) { + auto file_metadata = ConstructFakeMetaData(row_group_ranges); + + int64_t ci_start; + int64_t ci_size; + int64_t oi_start; + int64_t oi_size; + bool has_column_index; + bool has_offset_index; + PageIndexReader::DeterminePageIndexRangesInRowGroup( + *file_metadata->RowGroup(0), &ci_start, &ci_size, &oi_start, &oi_size, + &has_column_index, &has_offset_index); + ASSERT_EQ(expected_has_page_index, has_column_index); + ASSERT_EQ(expected_has_page_index, has_offset_index); + if (expected_has_page_index) { + EXPECT_EQ(expected_ci_start, ci_start); + EXPECT_EQ(expected_ci_size, ci_size); + EXPECT_EQ(expected_oi_start, oi_start); + EXPECT_EQ(expected_oi_size, oi_size); + } +} + +/// This test constructs a couple of artificial row groups with page index offsets in +/// them. Then it validates if PageIndexReader::DeterminePageIndexRangesInRowGroup() +/// properly computes the file range that contains the whole page index. +TEST(PageIndex, DeterminePageIndexRangesInRowGroup) { + // No Column chunks + ValidatePageIndexRange({}, false, -1, -1, -1, -1); + // No page index at all. + ValidatePageIndexRange({{-1, -1, -1, -1}}, false, -1, -1, -1, -1); + // Page index for single column chunk. + ValidatePageIndexRange({{10, 5, 15, 5}}, true, 10, 5, 15, 5); + // Page index for two column chunks. + ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, true, 10, 20, 30, 40); + // Page index for second column chunk.. + ValidatePageIndexRange({{-1, -1, -1, -1}, {20, 10, 30, 25}}, true, 20, 10, 30, 25); + // Page index for first column chunk. + ValidatePageIndexRange({{10, 5, 15, 5}, {-1, -1, -1, -1}}, true, 10, 5, 15, 5); + // Missing offset index for first column chunk. Gap in column index. + ValidatePageIndexRange({{10, 5, -1, -1}, {20, 10, 30, 25}}, true, 10, 20, 30, 25); + // Missing offset index for second column chunk. + ValidatePageIndexRange({{10, 5, 25, 5}, {20, 10, -1, -1}}, true, 10, 20, 25, 5); + // Three column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + true, 100, 110, 220, 180); +} + } // namespace parquet From f94284fa4782517a65c01a02d7d89509b3ebfdb2 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sun, 8 Jan 2023 21:06:12 +0800 Subject: [PATCH 02/12] use exception instead of status and add more comments --- cpp/src/parquet/file_reader.cc | 4 +- cpp/src/parquet/file_reader.h | 8 +- cpp/src/parquet/metadata.h | 2 +- cpp/src/parquet/page_index.cc | 121 +++++++++++++---------------- cpp/src/parquet/page_index.h | 60 +++++++++----- cpp/src/parquet/page_index_test.cc | 16 ++-- 6 files changed, 112 insertions(+), 99 deletions(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 2ae67db3ef8..ea0a0cb6839 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -303,7 +303,7 @@ class SerializedFile : public ParquetFileReader::Contents { std::shared_ptr metadata() const override { return file_metadata_; } - std::shared_ptr page_index_reader() override { + std::shared_ptr GetPageIndexReader() override { if (!file_metadata_) { throw ParquetException("Cannot get PageIndexReader as file metadata is not ready"); } @@ -797,7 +797,7 @@ std::shared_ptr ParquetFileReader::metadata() const { } std::shared_ptr ParquetFileReader::page_index_reader() { - return contents_->page_index_reader(); + return contents_->GetPageIndexReader(); } std::shared_ptr ParquetFileReader::RowGroup(int i) { diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 803f6a38fcb..97cbcf2242b 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -99,7 +99,13 @@ class PARQUET_EXPORT ParquetFileReader { virtual void Close() = 0; virtual std::shared_ptr GetRowGroup(int i) = 0; virtual std::shared_ptr metadata() const = 0; - virtual std::shared_ptr page_index_reader() = 0; + // Returns a PageIndexReader instance for the parquet file. + // If the file does not have page index, nullptr may be returned. Since it pays to + // check existence of page index in the file, it is possible to return a non-nullptr + // even if page index does not exist. It is the caller's responsibility to check + // the return value of follow-up calls to PageIndexReader. + // WARNING: The returned PageIndexReader must not outlive the ParquetFileReader. + virtual std::shared_ptr GetPageIndexReader() = 0; }; ParquetFileReader(); diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 277f59a1b60..48c3fd9f7b8 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -124,7 +124,7 @@ struct IndexLocation { /// File offset of the given index, in bytes int64_t offset; /// Length of the given index, in bytes - int32_t length; + int64_t length; }; /// \brief ColumnChunkMetaData is a proxy around format::ColumnChunkMetaData. diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index cfc81f4b226..8dee1b2cfc9 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -193,21 +193,20 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { int32_t row_group_ordinal, std::shared_ptr file_decryptor) : input_(input), - row_group_metadata_(row_group_metadata), + row_group_metadata_(std::move(row_group_metadata)), properties_(properties), file_decryptor_(std::move(file_decryptor)) { bool has_column_index = false; bool has_offset_index = false; PageIndexReader::DeterminePageIndexRangesInRowGroup( - *row_group_metadata, &column_index_base_offset_, &column_index_size_, - &offset_index_base_offset_, &offset_index_size_, &has_column_index, - &has_offset_index); + *row_group_metadata, &column_index_location_, &offset_index_location_, + &has_column_index, &has_offset_index); } /// Read column index of a column chunk. - ::arrow::Result> GetColumnIndex(int32_t i) override { + std::shared_ptr GetColumnIndex(int32_t i) override { if (i < 0 || i >= row_group_metadata_->num_columns()) { - return ::arrow::Status::IndexError("Invalid column {} to get column index", i); + throw ParquetException("Invalid column {} to get column index", i); } auto col_chunk = row_group_metadata_->ColumnChunk(i); @@ -219,20 +218,20 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { auto column_index_location = col_chunk->GetColumnIndexLocation(); if (!column_index_location.has_value()) { - return ::arrow::Status::Invalid("Column index does not exist for column {}", i); + return nullptr; } if (column_index_buffer_ == nullptr) { - ARROW_ASSIGN_OR_RAISE( + PARQUET_ASSIGN_OR_THROW( column_index_buffer_, - input_->ReadAt(column_index_base_offset_, column_index_size_)); + input_->ReadAt(column_index_location_.offset, column_index_location_.length)); } auto buffer = column_index_buffer_.get(); - int64_t buffer_offset = column_index_location->offset - column_index_base_offset_; - uint32_t length = column_index_location->length; + int64_t buffer_offset = column_index_location->offset - column_index_location_.offset; + uint32_t length = static_cast(column_index_location->length); DCHECK_GE(buffer_offset, 0); - DCHECK_LE(buffer_offset + length, column_index_size_); + DCHECK_LE(buffer_offset + length, column_index_location_.length); auto descr = row_group_metadata_->schema()->Column(i); std::shared_ptr column_index; @@ -240,16 +239,15 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { column_index = ColumnIndex::Make(*descr, buffer->data() + buffer_offset, length, properties_); } catch (...) { - return ::arrow::Status::SerializationError( - "Cannot deserialize column index for column {}", i); + throw ParquetException("Cannot deserialize column index for column {}", i); } return column_index; } /// Read offset index of a column chunk. - ::arrow::Result> GetOffsetIndex(int32_t i) override { + std::shared_ptr GetOffsetIndex(int32_t i) override { if (i < 0 || i >= row_group_metadata_->num_columns()) { - return ::arrow::Status::IndexError("Invalid column {} to get offset index", i); + throw ParquetException("Invalid column {} to get offset index", i); } auto col_chunk = row_group_metadata_->ColumnChunk(i); @@ -261,28 +259,27 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { auto offset_index_location = col_chunk->GetOffsetIndexLocation(); if (!offset_index_location.has_value()) { - return ::arrow::Status::Invalid("Offset index does not exist for column {}", i); + return nullptr; } if (offset_index_buffer_ == nullptr) { - ARROW_ASSIGN_OR_RAISE( + PARQUET_ASSIGN_OR_THROW( offset_index_buffer_, - input_->ReadAt(offset_index_base_offset_, offset_index_size_)); + input_->ReadAt(offset_index_location_.offset, offset_index_location_.length)); } auto buffer = offset_index_buffer_.get(); - int64_t buffer_offset = offset_index_location->offset - offset_index_base_offset_; - uint32_t length = offset_index_location->length; + int64_t buffer_offset = offset_index_location->offset - offset_index_location_.offset; + uint32_t length = static_cast(offset_index_location->length); DCHECK_GE(buffer_offset, 0); - DCHECK_LE(buffer_offset + length, offset_index_size_); + DCHECK_LE(buffer_offset + length, offset_index_location_.length); std::shared_ptr offset_index; try { offset_index = OffsetIndex::Make(buffer->data() + buffer_offset, length, properties_); } catch (...) { - return ::arrow::Status::SerializationError( - "Cannot deserialize offset index for column {}", i); + throw ParquetException("Cannot deserialize offset index for column {}", i); } return offset_index; } @@ -300,11 +297,9 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { /// File-level decryptor. std::shared_ptr file_decryptor_; - /// File offsets and sizes of the page Index. - int64_t column_index_base_offset_; - int64_t column_index_size_; - int64_t offset_index_base_offset_; - int64_t offset_index_size_; + /// File offsets and sizes of the page Index of all column chunks. + IndexLocation column_index_location_; + IndexLocation offset_index_location_; /// Buffer to hold the raw bytes of the page index. /// Will be set lazily when the corresponding page index is accessed for the 1st time. @@ -323,35 +318,32 @@ class PageIndexReaderImpl : public PageIndexReader { properties_(properties), file_decryptor_(std::move(file_decryptor)) {} - ::arrow::Result> RowGroup(int i) override { + std::shared_ptr RowGroup(int i) override { if (i < 0 || i >= file_metadata_->num_row_groups()) { - return ::arrow::Status::IndexError("Invalid row group ordinal {}", i); + throw ParquetException("Invalid row group ordinal {}", i); } return std::make_shared( input_, file_metadata_->RowGroup(i), properties_, i, file_decryptor_); } - void WillNeed(const std::vector& row_group_indices, bool need_column_index, - bool need_offset_index) override { + void WillNeed(const std::vector& row_group_indices, + IndexSelection index_selection) override { std::vector<::arrow::io::ReadRange> read_ranges; for (int32_t row_group_ordinal : row_group_indices) { - int64_t column_index_offset; - int64_t column_index_size; - int64_t offset_index_offset; - int64_t offset_index_size; + IndexLocation column_index_location; + IndexLocation offset_index_location; bool has_column_index; bool has_offset_index; PageIndexReader::DeterminePageIndexRangesInRowGroup( - *file_metadata_->RowGroup(row_group_ordinal), &column_index_offset, - &column_index_size, &offset_index_offset, &offset_index_size, &has_column_index, - &has_offset_index); - if (need_column_index && has_column_index) { - read_ranges.emplace_back( - ::arrow::io::ReadRange{column_index_offset, column_index_size}); + *file_metadata_->RowGroup(row_group_ordinal), &column_index_location, + &offset_index_location, &has_column_index, &has_offset_index); + if (index_selection.need_column_index && has_column_index) { + read_ranges.emplace_back(::arrow::io::ReadRange{column_index_location.offset, + column_index_location.length}); } - if (need_offset_index && has_offset_index) { - read_ranges.emplace_back( - ::arrow::io::ReadRange{offset_index_offset, offset_index_size}); + if (index_selection.need_offset_index && has_offset_index) { + read_ranges.emplace_back(::arrow::io::ReadRange{offset_index_location.offset, + offset_index_location.length}); } } PARQUET_IGNORE_NOT_OK(input_->WillNeed(read_ranges)); @@ -378,43 +370,40 @@ class PageIndexReaderImpl : public PageIndexReader { } // namespace void PageIndexReader::DeterminePageIndexRangesInRowGroup( - const RowGroupMetaData& row_group_metadata, int64_t* column_index_start, - int64_t* column_index_size, int64_t* offset_index_start, int64_t* offset_index_size, - bool* has_column_index, bool* has_offset_index) { + const RowGroupMetaData& row_group_metadata, IndexLocation* column_index_location, + IndexLocation* offset_index_location, bool* has_column_index, + bool* has_offset_index) { int64_t ci_start = std::numeric_limits::max(); int64_t oi_start = std::numeric_limits::max(); int64_t ci_end = -1; int64_t oi_end = -1; - for (int i = 0; i < row_group_metadata.num_columns(); ++i) { - auto col_chunk = row_group_metadata.ColumnChunk(i); - auto column_index_location = col_chunk->GetColumnIndexLocation(); - if (column_index_location.has_value()) { - ci_start = std::min(ci_start, column_index_location->offset); - ci_end = - std::max(ci_end, column_index_location->offset + column_index_location->length); + auto merge_range = [](const std::optional& index_location, + int64_t* start, int64_t* end) { + if (index_location.has_value()) { + *start = std::min(*start, index_location->offset); + *end = std::max(*end, index_location->offset + index_location->length); } + }; - auto offset_index_location = col_chunk->GetOffsetIndexLocation(); - if (offset_index_location.has_value()) { - oi_start = std::min(oi_start, offset_index_location->offset); - oi_end = - std::max(oi_end, offset_index_location->offset + offset_index_location->length); - } + for (int i = 0; i < row_group_metadata.num_columns(); ++i) { + auto col_chunk = row_group_metadata.ColumnChunk(i); + merge_range(col_chunk->GetColumnIndexLocation(), &ci_start, &ci_end); + merge_range(col_chunk->GetOffsetIndexLocation(), &oi_start, &oi_end); } if (ci_end != -1) { *has_column_index = true; - *column_index_start = ci_start; - *column_index_size = ci_end - ci_start; + column_index_location->offset = ci_start; + column_index_location->length = ci_end - ci_start; } else { *has_column_index = false; } if (oi_end != -1) { *has_offset_index = true; - *offset_index_start = oi_start; - *offset_index_size = oi_end - oi_start; + offset_index_location->offset = oi_start; + offset_index_location->length = oi_end - oi_start; } else { *has_offset_index = false; } diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index 11b4efd1584..337690a06f2 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -19,6 +19,7 @@ #include "parquet/types.h" +#include #include namespace parquet { @@ -30,6 +31,8 @@ class ReaderProperties; class RowGroupMetaData; class RowGroupPageIndexReader; +struct IndexLocation; + /// \brief ColumnIndex is a proxy around format::ColumnIndex. class PARQUET_EXPORT ColumnIndex { public: @@ -130,30 +133,44 @@ class PARQUET_EXPORT OffsetIndex { virtual const std::vector& page_locations() const = 0; }; -/// \brief Interface for reading the page index from a Parquet row group. +/// \brief Interface for reading the page index for a Parquet row group. class PARQUET_EXPORT RowGroupPageIndexReader { public: virtual ~RowGroupPageIndexReader() = default; /// \brief Read column index of a column chunk. /// - /// \param[in] i column id of the column chunk. - /// \returns error Result if either column id is invalid or column index does not exist. - virtual ::arrow::Result> GetColumnIndex(int32_t i) = 0; + /// \param[in] i column ordinal of the column chunk. + /// \returns column index of the column or nullptr if it does not exist. + /// \throws ParquetException if the index is out of bound. + virtual std::shared_ptr GetColumnIndex(int32_t i) = 0; /// \brief Read offset index of a column chunk. /// - /// \param[in] i column id of the column chunk. - /// \returns error Result if either column id is invalid or offset index does not exist. - virtual ::arrow::Result> GetOffsetIndex(int32_t i) = 0; + /// \param[in] i column ordinal of the column chunk. + /// \returns offset index of the column or nullptr if it does not exist. + /// \throws ParquetException if the index is out of bound. + virtual std::shared_ptr GetOffsetIndex(int32_t i) = 0; +}; + +struct IndexSelection { + /// Specifies whether to read the column index. + bool need_column_index = false; + /// Specifies whether to read the offset index. + bool need_offset_index = false; }; -/// \brief Interface for reading the page index from a Parquet file. +/// \brief Interface for reading the page index for a Parquet file. class PARQUET_EXPORT PageIndexReader { public: virtual ~PageIndexReader() = default; /// \brief Create a PageIndexReader instance. + /// \returns a PageIndexReader instance. + /// WARNING: The returned PageIndexReader references to all the input parameters, so + /// it must not outlive all of the input parameters. Usually these input parameters + /// come from the same ParquetFileReader object, so it must not outlive the reader + /// that creates this PageIndexReader. static std::shared_ptr Make( ::arrow::io::RandomAccessFile* input, std::shared_ptr file_metadata, const ReaderProperties& properties, @@ -161,8 +178,12 @@ class PARQUET_EXPORT PageIndexReader { /// \brief Get the page index reader of a specific row group. /// \param[in] i row group ordinal to get page index reader. - /// \returns RowGroupPageIndexReader of the specified row group. - virtual ::arrow::Result> RowGroup(int i) = 0; + /// \returns RowGroupPageIndexReader of the specified row group. A nullptr may or may + /// not be returned if the page index for the row group is unavailable. It is + /// the caller's responsibility to check the return value of follow-up calls + /// to the RowGroupPageIndexReader. + /// \throws ParquetException if the index is out of bound. + virtual std::shared_ptr RowGroup(int i) = 0; /// \brief Advise the reader which part of page index will be read later. /// @@ -171,10 +192,9 @@ class PARQUET_EXPORT PageIndexReader { /// called, or the requested page index is out of range from WillNeed() call. /// /// \param[in] row_group_indices list of row group ordinal to read page index later. - /// \param[in] need_column_index tell if column index is required later. - /// \param[in] need_offset_index tell if offset index is required later. + /// \param[in] index_selection tell if any of the page index is required later. virtual void WillNeed(const std::vector& row_group_indices, - bool need_column_index, bool need_offset_index) = 0; + IndexSelection index_selection) = 0; /// \brief Advise the reader which part of page index will be read later. /// @@ -187,18 +207,18 @@ class PARQUET_EXPORT PageIndexReader { /// \brief Determines the column index and offset index ranges for the given row group. /// \param[in] row_group_metadata row group metadata to get column chunk metadata. - /// \param[out] column_index_start Base start of column index of all column chunks. - /// \param[out] column_index_size Total size of column index of all column chunks. - /// \param[out] offset_index_start Base start of offset index of all column chunks. - /// \param[out] offset_index_size Total size of offset index of all column chunks. + /// \param[out] column_index_location Base start and total size of column index of + /// all column chunks. + /// \param[out] offset_index_location Base start and total size of offset index of + /// all column chunks. /// \param[out] has_column_index Returns true when at least a partial column index are /// found. Returns false when there is absolutely no column index for the row group. /// \param[out] has_offset_index Returns true when at least a partial offset index are /// found. Returns false when there is absolutely no offsets index for the row group. static void DeterminePageIndexRangesInRowGroup( - const RowGroupMetaData& row_group_metadata, int64_t* column_index_start, - int64_t* column_index_size, int64_t* offset_index_start, int64_t* offset_index_size, - bool* has_column_index, bool* has_offset_index); + const RowGroupMetaData& row_group_metadata, IndexLocation* column_index_location, + IndexLocation* offset_index_location, bool* has_column_index, + bool* has_offset_index); }; } // namespace parquet diff --git a/cpp/src/parquet/page_index_test.cc b/cpp/src/parquet/page_index_test.cc index 3539eadb745..0c9936e0afa 100644 --- a/cpp/src/parquet/page_index_test.cc +++ b/cpp/src/parquet/page_index_test.cc @@ -318,22 +318,20 @@ void ValidatePageIndexRange(const RowGroupRanges& row_group_ranges, int expected_oi_size) { auto file_metadata = ConstructFakeMetaData(row_group_ranges); - int64_t ci_start; - int64_t ci_size; - int64_t oi_start; - int64_t oi_size; + IndexLocation column_index_location; + IndexLocation offset_index_location; bool has_column_index; bool has_offset_index; PageIndexReader::DeterminePageIndexRangesInRowGroup( - *file_metadata->RowGroup(0), &ci_start, &ci_size, &oi_start, &oi_size, + *file_metadata->RowGroup(0), &column_index_location, &offset_index_location, &has_column_index, &has_offset_index); ASSERT_EQ(expected_has_page_index, has_column_index); ASSERT_EQ(expected_has_page_index, has_offset_index); if (expected_has_page_index) { - EXPECT_EQ(expected_ci_start, ci_start); - EXPECT_EQ(expected_ci_size, ci_size); - EXPECT_EQ(expected_oi_start, oi_start); - EXPECT_EQ(expected_oi_size, oi_size); + EXPECT_EQ(expected_ci_start, column_index_location.offset); + EXPECT_EQ(expected_ci_size, column_index_location.length); + EXPECT_EQ(expected_oi_start, offset_index_location.offset); + EXPECT_EQ(expected_oi_size, offset_index_location.length); } } From ae55ad808e645e266bf7b94948180bc24a03c55a Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 13 Jan 2023 17:43:42 +0800 Subject: [PATCH 03/12] refine interface and add reader test --- cpp/src/parquet/file_reader.cc | 2 +- cpp/src/parquet/file_reader.h | 18 ++--- cpp/src/parquet/metadata.h | 2 +- cpp/src/parquet/page_index.cc | 85 +++++++++++------------- cpp/src/parquet/page_index.h | 31 ++++----- cpp/src/parquet/page_index_test.cc | 22 +++--- cpp/src/parquet/reader_test.cc | 103 +++++++++++++++++++++++++++++ 7 files changed, 176 insertions(+), 87 deletions(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index ea0a0cb6839..95d6a57c1d6 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -796,7 +796,7 @@ std::shared_ptr ParquetFileReader::metadata() const { return contents_->metadata(); } -std::shared_ptr ParquetFileReader::page_index_reader() { +std::shared_ptr ParquetFileReader::GetPageIndexReader() { return contents_->GetPageIndexReader(); } diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 97cbcf2242b..b39ff3d95b7 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -99,12 +99,6 @@ class PARQUET_EXPORT ParquetFileReader { virtual void Close() = 0; virtual std::shared_ptr GetRowGroup(int i) = 0; virtual std::shared_ptr metadata() const = 0; - // Returns a PageIndexReader instance for the parquet file. - // If the file does not have page index, nullptr may be returned. Since it pays to - // check existence of page index in the file, it is possible to return a non-nullptr - // even if page index does not exist. It is the caller's responsibility to check - // the return value of follow-up calls to PageIndexReader. - // WARNING: The returned PageIndexReader must not outlive the ParquetFileReader. virtual std::shared_ptr GetPageIndexReader() = 0; }; @@ -141,8 +135,16 @@ class PARQUET_EXPORT ParquetFileReader { // Returns the file metadata. Only one instance is ever created std::shared_ptr metadata() const; - // Returns the page index reader. Only one instance is ever created - std::shared_ptr page_index_reader(); + /// Returns the PageIndexReader. Only one instance is ever created. + /// + /// If the file does not have the page index, nullptr may be returned. + /// Because it pays to check existence of page index in the file, it + /// is possible to return a non null value even if page index does + /// not exist. It is the caller's responsibility to check the return + /// value and follow-up calls to PageIndexReader. + /// + /// WARNING: The returned PageIndexReader must not outlive the ParquetFileReader. + std::shared_ptr GetPageIndexReader(); /// Pre-buffer the specified column indices in all row groups. /// diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 48c3fd9f7b8..277f59a1b60 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -124,7 +124,7 @@ struct IndexLocation { /// File offset of the given index, in bytes int64_t offset; /// Length of the given index, in bytes - int64_t length; + int32_t length; }; /// \brief ColumnChunkMetaData is a proxy around format::ColumnChunkMetaData. diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 8dee1b2cfc9..ac32950cfae 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -195,13 +195,9 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { : input_(input), row_group_metadata_(std::move(row_group_metadata)), properties_(properties), - file_decryptor_(std::move(file_decryptor)) { - bool has_column_index = false; - bool has_offset_index = false; - PageIndexReader::DeterminePageIndexRangesInRowGroup( - *row_group_metadata, &column_index_location_, &offset_index_location_, - &has_column_index, &has_offset_index); - } + file_decryptor_(std::move(file_decryptor)), + index_read_range_( + PageIndexReader::DeterminePageIndexRangesInRowGroup(*row_group_metadata_)) {} /// Read column index of a column chunk. std::shared_ptr GetColumnIndex(int32_t i) override { @@ -221,17 +217,22 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { return nullptr; } + if (!index_read_range_.column_index.has_value()) { + throw ParquetException("Missing column index read range"); + } + if (column_index_buffer_ == nullptr) { - PARQUET_ASSIGN_OR_THROW( - column_index_buffer_, - input_->ReadAt(column_index_location_.offset, column_index_location_.length)); + PARQUET_ASSIGN_OR_THROW(column_index_buffer_, + input_->ReadAt(index_read_range_.column_index->offset, + index_read_range_.column_index->length)); } auto buffer = column_index_buffer_.get(); - int64_t buffer_offset = column_index_location->offset - column_index_location_.offset; + int64_t buffer_offset = + column_index_location->offset - index_read_range_.column_index->offset; uint32_t length = static_cast(column_index_location->length); DCHECK_GE(buffer_offset, 0); - DCHECK_LE(buffer_offset + length, column_index_location_.length); + DCHECK_LE(buffer_offset + length, index_read_range_.column_index->length); auto descr = row_group_metadata_->schema()->Column(i); std::shared_ptr column_index; @@ -262,17 +263,22 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { return nullptr; } + if (!index_read_range_.offset_index.has_value()) { + throw ParquetException("Missing column index read range"); + } + if (offset_index_buffer_ == nullptr) { - PARQUET_ASSIGN_OR_THROW( - offset_index_buffer_, - input_->ReadAt(offset_index_location_.offset, offset_index_location_.length)); + PARQUET_ASSIGN_OR_THROW(offset_index_buffer_, + input_->ReadAt(index_read_range_.offset_index->offset, + index_read_range_.offset_index->length)); } auto buffer = offset_index_buffer_.get(); - int64_t buffer_offset = offset_index_location->offset - offset_index_location_.offset; + int64_t buffer_offset = + offset_index_location->offset - index_read_range_.offset_index->offset; uint32_t length = static_cast(offset_index_location->length); DCHECK_GE(buffer_offset, 0); - DCHECK_LE(buffer_offset + length, offset_index_location_.length); + DCHECK_LE(buffer_offset + length, index_read_range_.offset_index->length); std::shared_ptr offset_index; try { @@ -297,9 +303,8 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { /// File-level decryptor. std::shared_ptr file_decryptor_; - /// File offsets and sizes of the page Index of all column chunks. - IndexLocation column_index_location_; - IndexLocation offset_index_location_; + /// File offsets and sizes of the page Index of all column chunks in the row group. + RowGroupIndexReadRange index_read_range_; /// Buffer to hold the raw bytes of the page index. /// Will be set lazily when the corresponding page index is accessed for the 1st time. @@ -330,20 +335,13 @@ class PageIndexReaderImpl : public PageIndexReader { IndexSelection index_selection) override { std::vector<::arrow::io::ReadRange> read_ranges; for (int32_t row_group_ordinal : row_group_indices) { - IndexLocation column_index_location; - IndexLocation offset_index_location; - bool has_column_index; - bool has_offset_index; - PageIndexReader::DeterminePageIndexRangesInRowGroup( - *file_metadata_->RowGroup(row_group_ordinal), &column_index_location, - &offset_index_location, &has_column_index, &has_offset_index); - if (index_selection.need_column_index && has_column_index) { - read_ranges.emplace_back(::arrow::io::ReadRange{column_index_location.offset, - column_index_location.length}); + auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( + *file_metadata_->RowGroup(row_group_ordinal)); + if (index_selection.column_index && read_range.column_index.has_value()) { + read_ranges.emplace_back(*read_range.column_index); } - if (index_selection.need_offset_index && has_offset_index) { - read_ranges.emplace_back(::arrow::io::ReadRange{offset_index_location.offset, - offset_index_location.length}); + if (index_selection.offset_index && read_range.offset_index.has_value()) { + read_ranges.emplace_back(*read_range.offset_index); } } PARQUET_IGNORE_NOT_OK(input_->WillNeed(read_ranges)); @@ -369,10 +367,8 @@ class PageIndexReaderImpl : public PageIndexReader { } // namespace -void PageIndexReader::DeterminePageIndexRangesInRowGroup( - const RowGroupMetaData& row_group_metadata, IndexLocation* column_index_location, - IndexLocation* offset_index_location, bool* has_column_index, - bool* has_offset_index) { +RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( + const RowGroupMetaData& row_group_metadata) { int64_t ci_start = std::numeric_limits::max(); int64_t oi_start = std::numeric_limits::max(); int64_t ci_end = -1; @@ -392,21 +388,14 @@ void PageIndexReader::DeterminePageIndexRangesInRowGroup( merge_range(col_chunk->GetOffsetIndexLocation(), &oi_start, &oi_end); } + RowGroupIndexReadRange read_range; if (ci_end != -1) { - *has_column_index = true; - column_index_location->offset = ci_start; - column_index_location->length = ci_end - ci_start; - } else { - *has_column_index = false; + read_range.column_index = ::arrow::io::ReadRange{ci_start, ci_end - ci_start}; } - if (oi_end != -1) { - *has_offset_index = true; - offset_index_location->offset = oi_start; - offset_index_location->length = oi_end - oi_start; - } else { - *has_offset_index = false; + read_range.offset_index = ::arrow::io::ReadRange{oi_start, oi_end - oi_start}; } + return read_range; } // ---------------------------------------------------------------------- diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index 337690a06f2..cb660796164 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -17,6 +17,7 @@ #pragma once +#include "arrow/io/interfaces.h" #include "parquet/types.h" #include @@ -31,8 +32,6 @@ class ReaderProperties; class RowGroupMetaData; class RowGroupPageIndexReader; -struct IndexLocation; - /// \brief ColumnIndex is a proxy around format::ColumnIndex. class PARQUET_EXPORT ColumnIndex { public: @@ -155,9 +154,18 @@ class PARQUET_EXPORT RowGroupPageIndexReader { struct IndexSelection { /// Specifies whether to read the column index. - bool need_column_index = false; + bool column_index = false; /// Specifies whether to read the offset index. - bool need_offset_index = false; + bool offset_index = false; +}; + +struct RowGroupIndexReadRange { + /// Base start and total size of column index of all column chunks in a row group. + /// If none of the column chunks have column index, it is set to std::nullopt. + std::optional<::arrow::io::ReadRange> column_index = std::nullopt; + /// Base start and total size of offset index of all column chunks in a row group. + /// If none of the column chunks have offset index, it is set to std::nullopt. + std::optional<::arrow::io::ReadRange> offset_index = std::nullopt; }; /// \brief Interface for reading the page index for a Parquet file. @@ -206,19 +214,12 @@ class PARQUET_EXPORT PageIndexReader { virtual void WillNotNeed(const std::vector& row_group_indices) = 0; /// \brief Determines the column index and offset index ranges for the given row group. + /// /// \param[in] row_group_metadata row group metadata to get column chunk metadata. - /// \param[out] column_index_location Base start and total size of column index of - /// all column chunks. - /// \param[out] offset_index_location Base start and total size of offset index of - /// all column chunks. - /// \param[out] has_column_index Returns true when at least a partial column index are - /// found. Returns false when there is absolutely no column index for the row group. - /// \param[out] has_offset_index Returns true when at least a partial offset index are + /// \returns RowGroupIndexReadRange of the specified row group. /// found. Returns false when there is absolutely no offsets index for the row group. - static void DeterminePageIndexRangesInRowGroup( - const RowGroupMetaData& row_group_metadata, IndexLocation* column_index_location, - IndexLocation* offset_index_location, bool* has_column_index, - bool* has_offset_index); + static RowGroupIndexReadRange DeterminePageIndexRangesInRowGroup( + const RowGroupMetaData& row_group_metadata); }; } // namespace parquet diff --git a/cpp/src/parquet/page_index_test.cc b/cpp/src/parquet/page_index_test.cc index 0c9936e0afa..7020cc77a20 100644 --- a/cpp/src/parquet/page_index_test.cc +++ b/cpp/src/parquet/page_index_test.cc @@ -317,21 +317,15 @@ void ValidatePageIndexRange(const RowGroupRanges& row_group_ranges, int expected_ci_size, int expected_oi_start, int expected_oi_size) { auto file_metadata = ConstructFakeMetaData(row_group_ranges); - - IndexLocation column_index_location; - IndexLocation offset_index_location; - bool has_column_index; - bool has_offset_index; - PageIndexReader::DeterminePageIndexRangesInRowGroup( - *file_metadata->RowGroup(0), &column_index_location, &offset_index_location, - &has_column_index, &has_offset_index); - ASSERT_EQ(expected_has_page_index, has_column_index); - ASSERT_EQ(expected_has_page_index, has_offset_index); + auto read_range = + PageIndexReader::DeterminePageIndexRangesInRowGroup(*file_metadata->RowGroup(0)); + ASSERT_EQ(expected_has_page_index, read_range.column_index.has_value()); + ASSERT_EQ(expected_has_page_index, read_range.offset_index.has_value()); if (expected_has_page_index) { - EXPECT_EQ(expected_ci_start, column_index_location.offset); - EXPECT_EQ(expected_ci_size, column_index_location.length); - EXPECT_EQ(expected_oi_start, offset_index_location.offset); - EXPECT_EQ(expected_oi_size, offset_index_location.length); + EXPECT_EQ(expected_ci_start, read_range.column_index->offset); + EXPECT_EQ(expected_ci_size, read_range.column_index->length); + EXPECT_EQ(expected_oi_start, read_range.offset_index->offset); + EXPECT_EQ(expected_oi_size, read_range.offset_index->length); } } diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index e17f7a91f9b..b5be0c79bb6 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -37,6 +37,7 @@ #include "parquet/file_reader.h" #include "parquet/file_writer.h" #include "parquet/metadata.h" +#include "parquet/page_index.h" #include "parquet/platform.h" #include "parquet/printer.h" #include "parquet/test_util.h" @@ -1059,4 +1060,106 @@ TEST(TestFileReader, TestOverflowInt16PageOrdinal) { } } +struct PageIndexReaderParam { + PageIndexReaderParam(const std::vector& row_group_indices, + bool need_column_index, bool need_offset_index) + : row_group_indices(row_group_indices), + index_selection{need_column_index, need_offset_index} {} + std::vector row_group_indices; + IndexSelection index_selection; +}; + +class ParameterizedPageIndexReaderTest + : public ::testing::TestWithParam {}; + +// Test reading a data file with page index. +TEST_P(ParameterizedPageIndexReaderTest, TestReadPageIndex) { + ReaderProperties properties; + auto file_reader = ParquetFileReader::OpenFile(data_file("alltypes_tiny_pages.parquet"), + /*memory_map=*/false, properties); + auto metadata = file_reader->metadata(); + EXPECT_EQ(1, metadata->num_row_groups()); + EXPECT_EQ(13, metadata->num_columns()); + + // Create the page index reader and provide different read hints. + auto page_index_reader = file_reader->GetPageIndexReader(); + ASSERT_NE(nullptr, page_index_reader); + const auto params = GetParam(); + page_index_reader->WillNeed(params.row_group_indices, params.index_selection); + auto row_group_index_reader = page_index_reader->RowGroup(0); + ASSERT_NE(nullptr, row_group_index_reader); + + // Verify offset index of column 0 and only partial data as it contains 325 pages. + { + const size_t num_pages = 325; + const std::vector page_indices = {0, 100, 200, 300}; + const std::vector page_locations = { + PageLocation{4, 109, 0}, PageLocation{11480, 133, 2244}, + PageLocation{22980, 133, 4494}, PageLocation{34480, 133, 6744}}; + + auto offset_index = row_group_index_reader->GetOffsetIndex(0); + ASSERT_NE(nullptr, offset_index); + + EXPECT_EQ(num_pages, offset_index->page_locations().size()); + for (size_t i = 0; i < page_indices.size(); ++i) { + size_t page_id = page_indices.at(i); + const auto& read_page_location = offset_index->page_locations().at(page_id); + const auto& expected_page_location = page_locations.at(i); + EXPECT_EQ(expected_page_location.offset, read_page_location.offset); + EXPECT_EQ(expected_page_location.compressed_page_size, + read_page_location.compressed_page_size); + EXPECT_EQ(expected_page_location.first_row_index, + read_page_location.first_row_index); + } + } + + // Verify column index of column 5 and only partial data as it contains 528 pages. + { + const size_t num_pages = 528; + const BoundaryOrder::type boundary_order = BoundaryOrder::Unordered; + const std::vector page_indices = {0, 99, 426, 520}; + const std::vector null_pages = {false, false, false, false}; + const bool has_null_counts = true; + const std::vector null_counts = {0, 0, 0, 0}; + const std::vector min_values = {0, 10, 0, 0}; + const std::vector max_values = {90, 90, 80, 70}; + + auto column_index = row_group_index_reader->GetColumnIndex(5); + ASSERT_NE(nullptr, column_index); + auto typed_column_index = std::dynamic_pointer_cast(column_index); + ASSERT_NE(nullptr, typed_column_index); + + EXPECT_EQ(num_pages, column_index->null_pages().size()); + EXPECT_EQ(has_null_counts, column_index->has_null_counts()); + EXPECT_EQ(boundary_order, column_index->boundary_order()); + for (size_t i = 0; i < page_indices.size(); ++i) { + size_t page_id = page_indices.at(i); + EXPECT_EQ(null_pages.at(i), column_index->null_pages().at(page_id)); + if (has_null_counts) { + EXPECT_EQ(null_counts.at(i), column_index->null_counts().at(page_id)); + } + if (!null_pages.at(i)) { + EXPECT_EQ(min_values.at(i), typed_column_index->min_values().at(page_id)); + EXPECT_EQ(max_values.at(i), typed_column_index->max_values().at(page_id)); + } + } + } + + // Verify null is returned if column index does not exist. + { + auto column_index = row_group_index_reader->GetColumnIndex(10); + EXPECT_EQ(nullptr, column_index); + } +} + +INSTANTIATE_TEST_SUITE_P(PageIndexReaderTests, ParameterizedPageIndexReaderTest, + ::testing::Values(PageIndexReaderParam({0}, true, true), + PageIndexReaderParam({0}, true, false), + PageIndexReaderParam({0}, false, true), + PageIndexReaderParam({0}, false, false), + PageIndexReaderParam({}, true, true), + PageIndexReaderParam({}, true, false), + PageIndexReaderParam({}, false, true), + PageIndexReaderParam({}, false, false))); + } // namespace parquet From dbc46ad271444a2dfafa361d2f9ac470509a9829 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 13 Jan 2023 19:41:27 +0800 Subject: [PATCH 04/12] remove dependency of internal read range --- cpp/src/parquet/page_index.cc | 12 ++++++++---- cpp/src/parquet/page_index.h | 9 ++++++--- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index ac32950cfae..ab5516848b3 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -338,10 +338,12 @@ class PageIndexReaderImpl : public PageIndexReader { auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( *file_metadata_->RowGroup(row_group_ordinal)); if (index_selection.column_index && read_range.column_index.has_value()) { - read_ranges.emplace_back(*read_range.column_index); + read_ranges.emplace_back(::arrow::io::ReadRange{read_range.column_index->offset, + read_range.column_index->length}); } if (index_selection.offset_index && read_range.offset_index.has_value()) { - read_ranges.emplace_back(*read_range.offset_index); + read_ranges.emplace_back(::arrow::io::ReadRange{read_range.offset_index->offset, + read_range.offset_index->length}); } } PARQUET_IGNORE_NOT_OK(input_->WillNeed(read_ranges)); @@ -390,10 +392,12 @@ RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( RowGroupIndexReadRange read_range; if (ci_end != -1) { - read_range.column_index = ::arrow::io::ReadRange{ci_start, ci_end - ci_start}; + read_range.column_index = + RowGroupIndexReadRange::ReadRange{ci_start, ci_end - ci_start}; } if (oi_end != -1) { - read_range.offset_index = ::arrow::io::ReadRange{oi_start, oi_end - oi_start}; + read_range.offset_index = + RowGroupIndexReadRange::ReadRange{oi_start, oi_end - oi_start}; } return read_range; } diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index cb660796164..1563bb522e6 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -17,7 +17,6 @@ #pragma once -#include "arrow/io/interfaces.h" #include "parquet/types.h" #include @@ -160,12 +159,16 @@ struct IndexSelection { }; struct RowGroupIndexReadRange { + struct ReadRange { + int64_t offset; + int64_t length; + }; /// Base start and total size of column index of all column chunks in a row group. /// If none of the column chunks have column index, it is set to std::nullopt. - std::optional<::arrow::io::ReadRange> column_index = std::nullopt; + std::optional column_index = std::nullopt; /// Base start and total size of offset index of all column chunks in a row group. /// If none of the column chunks have offset index, it is set to std::nullopt. - std::optional<::arrow::io::ReadRange> offset_index = std::nullopt; + std::optional offset_index = std::nullopt; }; /// \brief Interface for reading the page index for a Parquet file. From c3412bd4f4a001bd6bf69b49e07bcf5d8ea7ad48 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 13 Jan 2023 21:01:04 +0800 Subject: [PATCH 05/12] use ::arrow::io::ReadRange --- cpp/src/parquet/page_index.cc | 12 ++++-------- cpp/src/parquet/page_index.h | 9 +++------ 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index ab5516848b3..3ac1f6986fd 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -338,12 +338,10 @@ class PageIndexReaderImpl : public PageIndexReader { auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( *file_metadata_->RowGroup(row_group_ordinal)); if (index_selection.column_index && read_range.column_index.has_value()) { - read_ranges.emplace_back(::arrow::io::ReadRange{read_range.column_index->offset, - read_range.column_index->length}); + read_ranges.push_back(*read_range.column_index); } if (index_selection.offset_index && read_range.offset_index.has_value()) { - read_ranges.emplace_back(::arrow::io::ReadRange{read_range.offset_index->offset, - read_range.offset_index->length}); + read_ranges.push_back(*read_range.offset_index); } } PARQUET_IGNORE_NOT_OK(input_->WillNeed(read_ranges)); @@ -392,12 +390,10 @@ RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( RowGroupIndexReadRange read_range; if (ci_end != -1) { - read_range.column_index = - RowGroupIndexReadRange::ReadRange{ci_start, ci_end - ci_start}; + read_range.column_index = {ci_start, ci_end - ci_start}; } if (oi_end != -1) { - read_range.offset_index = - RowGroupIndexReadRange::ReadRange{oi_start, oi_end - oi_start}; + read_range.offset_index = {oi_start, oi_end - oi_start}; } return read_range; } diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index 1563bb522e6..cb660796164 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -17,6 +17,7 @@ #pragma once +#include "arrow/io/interfaces.h" #include "parquet/types.h" #include @@ -159,16 +160,12 @@ struct IndexSelection { }; struct RowGroupIndexReadRange { - struct ReadRange { - int64_t offset; - int64_t length; - }; /// Base start and total size of column index of all column chunks in a row group. /// If none of the column chunks have column index, it is set to std::nullopt. - std::optional column_index = std::nullopt; + std::optional<::arrow::io::ReadRange> column_index = std::nullopt; /// Base start and total size of offset index of all column chunks in a row group. /// If none of the column chunks have offset index, it is set to std::nullopt. - std::optional offset_index = std::nullopt; + std::optional<::arrow::io::ReadRange> offset_index = std::nullopt; }; /// \brief Interface for reading the page index for a Parquet file. From bb0c9076836a4f662b044794253947eef48fe71f Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sat, 21 Jan 2023 16:19:09 +0800 Subject: [PATCH 06/12] validate range and simply error handling --- cpp/src/parquet/file_reader.cc | 7 +++- cpp/src/parquet/page_index.cc | 64 ++++++++++++++++++++-------------- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 95d6a57c1d6..2edd258715a 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -305,7 +305,12 @@ class SerializedFile : public ParquetFileReader::Contents { std::shared_ptr GetPageIndexReader() override { if (!file_metadata_) { - throw ParquetException("Cannot get PageIndexReader as file metadata is not ready"); + // Usually this won't happen if user calls one of the static Open() functions + // to create a ParquetFileReader instance. But if user calls the constructor + // directly and calls GetPageIndexReader() before Open() then this could happen. + throw ParquetException( + "Cannot call GetPageIndexReader() due to missing file metadata. Did you " + "forget to call ParquetFileReader::Open() first?"); } if (!page_index_reader_) { page_index_reader_ = PageIndexReader::Make(source_.get(), file_metadata_, diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 3ac1f6986fd..44bd6e791ae 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -208,7 +208,7 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { auto col_chunk = row_group_metadata_->ColumnChunk(i); std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); - if (crypto_metadata != nullptr && file_decryptor_ == nullptr) { + if (crypto_metadata != nullptr) { ParquetException::NYI("Cannot read encrypted column index yet"); } @@ -227,22 +227,25 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { index_read_range_.column_index->length)); } - auto buffer = column_index_buffer_.get(); + // Validate range of column index + if (column_index_location->offset < index_read_range_.column_index->offset || + column_index_location->length <= 0 || + column_index_location->offset + column_index_location->length > + index_read_range_.column_index->offset + + index_read_range_.column_index->length) { + throw ParquetException("Invalid column index location: offset {} length {}", + column_index_location->offset, + column_index_location->length); + } + int64_t buffer_offset = column_index_location->offset - index_read_range_.column_index->offset; + // ColumnIndex::Make() requires the type of serialized thrift message to be + // uint32_t uint32_t length = static_cast(column_index_location->length); - DCHECK_GE(buffer_offset, 0); - DCHECK_LE(buffer_offset + length, index_read_range_.column_index->length); - auto descr = row_group_metadata_->schema()->Column(i); - std::shared_ptr column_index; - try { - column_index = - ColumnIndex::Make(*descr, buffer->data() + buffer_offset, length, properties_); - } catch (...) { - throw ParquetException("Cannot deserialize column index for column {}", i); - } - return column_index; + return ColumnIndex::Make(*descr, column_index_buffer_->data() + buffer_offset, length, + properties_); } /// Read offset index of a column chunk. @@ -254,7 +257,7 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { auto col_chunk = row_group_metadata_->ColumnChunk(i); std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); - if (crypto_metadata != nullptr && file_decryptor_ == nullptr) { + if (crypto_metadata != nullptr) { ParquetException::NYI("Cannot read encrypted offset index yet"); } @@ -273,21 +276,24 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { index_read_range_.offset_index->length)); } - auto buffer = offset_index_buffer_.get(); + // Validate range of offset index + if (offset_index_location->offset < index_read_range_.offset_index->offset || + offset_index_location->length <= 0 || + offset_index_location->offset + offset_index_location->length > + index_read_range_.offset_index->offset + + index_read_range_.offset_index->length) { + throw ParquetException("Invalid offset index location: offset {} length {}", + offset_index_location->offset, + offset_index_location->length); + } + int64_t buffer_offset = offset_index_location->offset - index_read_range_.offset_index->offset; + // OffsetIndex::Make() requires the type of serialized thrift message to be + // uint32_t uint32_t length = static_cast(offset_index_location->length); - DCHECK_GE(buffer_offset, 0); - DCHECK_LE(buffer_offset + length, index_read_range_.offset_index->length); - - std::shared_ptr offset_index; - try { - offset_index = - OffsetIndex::Make(buffer->data() + buffer_offset, length, properties_); - } catch (...) { - throw ParquetException("Cannot deserialize offset index for column {}", i); - } - return offset_index; + return OffsetIndex::Make(offset_index_buffer_->data() + buffer_offset, length, + properties_); } private: @@ -344,7 +350,7 @@ class PageIndexReaderImpl : public PageIndexReader { read_ranges.push_back(*read_range.offset_index); } } - PARQUET_IGNORE_NOT_OK(input_->WillNeed(read_ranges)); + PARQUET_THROW_NOT_OK(input_->WillNeed(read_ranges)); } void WillNotNeed(const std::vector& row_group_indices) override { @@ -377,6 +383,10 @@ RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( auto merge_range = [](const std::optional& index_location, int64_t* start, int64_t* end) { if (index_location.has_value()) { + if (index_location->offset < 0 || index_location->length <= 0) { + throw ParquetException("Invalid index location: offset {} length {}", + index_location->offset, index_location->length); + } *start = std::min(*start, index_location->offset); *end = std::max(*end, index_location->offset + index_location->length); } From 171853b840545b0ea51a3be20dfc967d1fd5dd87 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 24 Jan 2023 23:56:04 +0800 Subject: [PATCH 07/12] support provide hint of selected columns --- cpp/src/parquet/page_index.cc | 140 ++++++++++++++++++++--------- cpp/src/parquet/page_index.h | 42 +++++++-- cpp/src/parquet/page_index_test.cc | 6 +- cpp/src/parquet/reader_test.cc | 2 +- 4 files changed, 138 insertions(+), 52 deletions(-) diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 44bd6e791ae..ea3a9fa568f 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -191,13 +191,25 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { std::shared_ptr row_group_metadata, const ReaderProperties& properties, int32_t row_group_ordinal, + std::optional index_read_range, std::shared_ptr file_decryptor) : input_(input), row_group_metadata_(std::move(row_group_metadata)), properties_(properties), - file_decryptor_(std::move(file_decryptor)), - index_read_range_( - PageIndexReader::DeterminePageIndexRangesInRowGroup(*row_group_metadata_)) {} + row_group_ordinal_(row_group_ordinal), + file_decryptor_(std::move(file_decryptor)) { + if (index_read_range.has_value()) { + /// This row group has been requested by WillNeed(). Only column index and/or + /// offset index of requested columns can be read. Will throw if columns are + /// out of range. + index_read_range_ = index_read_range.value(); + } else { + /// If the row group has not been requested by WillNeed(), by default both column + /// index and offset index of all column chunks for the row group can be read. + index_read_range_ = + PageIndexReader::DeterminePageIndexRangesInRowGroup(*row_group_metadata_, {}); + } + } /// Read column index of a column chunk. std::shared_ptr GetColumnIndex(int32_t i) override { @@ -206,7 +218,6 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { } auto col_chunk = row_group_metadata_->ColumnChunk(i); - std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); if (crypto_metadata != nullptr) { ParquetException::NYI("Cannot read encrypted column index yet"); @@ -217,9 +228,8 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { return nullptr; } - if (!index_read_range_.column_index.has_value()) { - throw ParquetException("Missing column index read range"); - } + CheckReadRangeOrThrow(*column_index_location, index_read_range_.column_index, + row_group_ordinal_); if (column_index_buffer_ == nullptr) { PARQUET_ASSIGN_OR_THROW(column_index_buffer_, @@ -227,17 +237,6 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { index_read_range_.column_index->length)); } - // Validate range of column index - if (column_index_location->offset < index_read_range_.column_index->offset || - column_index_location->length <= 0 || - column_index_location->offset + column_index_location->length > - index_read_range_.column_index->offset + - index_read_range_.column_index->length) { - throw ParquetException("Invalid column index location: offset {} length {}", - column_index_location->offset, - column_index_location->length); - } - int64_t buffer_offset = column_index_location->offset - index_read_range_.column_index->offset; // ColumnIndex::Make() requires the type of serialized thrift message to be @@ -255,7 +254,6 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { } auto col_chunk = row_group_metadata_->ColumnChunk(i); - std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); if (crypto_metadata != nullptr) { ParquetException::NYI("Cannot read encrypted offset index yet"); @@ -266,9 +264,8 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { return nullptr; } - if (!index_read_range_.offset_index.has_value()) { - throw ParquetException("Missing column index read range"); - } + CheckReadRangeOrThrow(*offset_index_location, index_read_range_.offset_index, + row_group_ordinal_); if (offset_index_buffer_ == nullptr) { PARQUET_ASSIGN_OR_THROW(offset_index_buffer_, @@ -276,17 +273,6 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { index_read_range_.offset_index->length)); } - // Validate range of offset index - if (offset_index_location->offset < index_read_range_.offset_index->offset || - offset_index_location->length <= 0 || - offset_index_location->offset + offset_index_location->length > - index_read_range_.offset_index->offset + - index_read_range_.offset_index->length) { - throw ParquetException("Invalid offset index location: offset {} length {}", - offset_index_location->offset, - offset_index_location->length); - } - int64_t buffer_offset = offset_index_location->offset - index_read_range_.offset_index->offset; // OffsetIndex::Make() requires the type of serialized thrift message to be @@ -296,6 +282,42 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { properties_); } + private: + static void CheckReadRangeOrThrow( + const IndexLocation& index_location, + const std::optional<::arrow::io::ReadRange>& index_read_range, + int32_t row_group_ordinal) { + if (!index_read_range.has_value()) { + throw ParquetException( + "Missing page index read range of row group {}, it may not exist or has not " + "been requested", + row_group_ordinal); + } + + /// The coalesced read range is invalid. + if (index_read_range->offset < 0 || index_read_range->length <= 0) { + throw ParquetException("Invalid page index read range: offset {} length {}", + index_read_range->offset, index_read_range->length); + } + + /// The location to page index itself is corrupted. + if (index_location.offset < 0 || index_location.length <= 0) { + throw ParquetException("Invalid page index location: offset {} length {}", + index_location.offset, index_location.length); + } + + /// Page index location must be within the range of the read range. + if (index_location.offset < index_read_range->offset || + index_location.offset + index_location.length > + index_read_range->offset + index_read_range->length) { + throw ParquetException( + "Page index location [offset:{},length:{}] is out of range from previous " + "WillNeed request [offset:{},length:{}], row group: {}", + index_location.offset, index_location.length, index_read_range->offset, + index_read_range->length, row_group_ordinal); + } + } + private: /// The input stream that can perform random access read. ::arrow::io::RandomAccessFile* input_; @@ -306,6 +328,9 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { /// Reader properties used to deserialize thrift object. const ReaderProperties& properties_; + /// The ordinal of the row group in the file. + int32_t row_group_ordinal_; + /// File-level decryptor. std::shared_ptr file_decryptor_; @@ -333,28 +358,46 @@ class PageIndexReaderImpl : public PageIndexReader { if (i < 0 || i >= file_metadata_->num_row_groups()) { throw ParquetException("Invalid row group ordinal {}", i); } + + // Find the read range of the page index of the row group if provided by WillNeed() + std::optional index_read_range = std::nullopt; + auto iter = index_read_ranges_.find(i); + if (iter != index_read_ranges_.cend()) { + index_read_range = iter->second; + } + return std::make_shared( - input_, file_metadata_->RowGroup(i), properties_, i, file_decryptor_); + input_, file_metadata_->RowGroup(i), properties_, i, index_read_range, + file_decryptor_); } void WillNeed(const std::vector& row_group_indices, + const std::vector& column_indices, IndexSelection index_selection) override { + if (!index_selection.column_index && !index_selection.offset_index) { + // Neither column index nor offset index has been requested, simply return. + return; + } + std::vector<::arrow::io::ReadRange> read_ranges; for (int32_t row_group_ordinal : row_group_indices) { auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( - *file_metadata_->RowGroup(row_group_ordinal)); + *file_metadata_->RowGroup(row_group_ordinal), column_indices); if (index_selection.column_index && read_range.column_index.has_value()) { read_ranges.push_back(*read_range.column_index); } if (index_selection.offset_index && read_range.offset_index.has_value()) { read_ranges.push_back(*read_range.offset_index); } + index_read_ranges_.emplace(row_group_ordinal, std::move(read_range)); } PARQUET_THROW_NOT_OK(input_->WillNeed(read_ranges)); } void WillNotNeed(const std::vector& row_group_indices) override { - // No-op for now. + for (int32_t row_group_ordinal : row_group_indices) { + index_read_ranges_.erase(row_group_ordinal); + } } private: @@ -369,12 +412,16 @@ class PageIndexReaderImpl : public PageIndexReader { /// File-level decrypter. std::shared_ptr file_decryptor_; + + /// Coalesced read ranges of page index of row groups that have been suggested by + /// WillNeed(). Key is the row group ordinal. + std::unordered_map index_read_ranges_; }; } // namespace RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( - const RowGroupMetaData& row_group_metadata) { + const RowGroupMetaData& row_group_metadata, const std::vector& columns) { int64_t ci_start = std::numeric_limits::max(); int64_t oi_start = std::numeric_limits::max(); int64_t ci_end = -1; @@ -392,10 +439,21 @@ RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( } }; - for (int i = 0; i < row_group_metadata.num_columns(); ++i) { - auto col_chunk = row_group_metadata.ColumnChunk(i); - merge_range(col_chunk->GetColumnIndexLocation(), &ci_start, &ci_end); - merge_range(col_chunk->GetOffsetIndexLocation(), &oi_start, &oi_end); + if (columns.empty()) { + for (int32_t i = 0; i < row_group_metadata.num_columns(); ++i) { + auto col_chunk = row_group_metadata.ColumnChunk(i); + merge_range(col_chunk->GetColumnIndexLocation(), &ci_start, &ci_end); + merge_range(col_chunk->GetOffsetIndexLocation(), &oi_start, &oi_end); + } + } else { + for (int32_t i : columns) { + if (i < 0 || i >= row_group_metadata.num_columns()) { + throw ParquetException("Invalid column ordinal {}", i); + } + auto col_chunk = row_group_metadata.ColumnChunk(i); + merge_range(col_chunk->GetColumnIndexLocation(), &ci_start, &ci_end); + merge_range(col_chunk->GetOffsetIndexLocation(), &oi_start, &oi_end); + } } RowGroupIndexReadRange read_range; diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index cb660796164..f2c6f11bd88 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -195,31 +195,59 @@ class PARQUET_EXPORT PageIndexReader { /// \brief Advise the reader which part of page index will be read later. /// - /// The PageIndexReader implementation can optionally prefetch and cache page index - /// that may be read later. Follow-up read should not fail even if WillNeed() is not - /// called, or the requested page index is out of range from WillNeed() call. + /// The PageIndexReader can optionally prefetch and cache page index that + /// may be read later to get better performance. + /// + /// The contract of this function is as below: + /// 1) If WillNeed() has not been called for a specific row group and the page index + /// exists, follow-up calls to get column index or offset index of all columns in + /// this row group SHOULD NOT FAIL, but the performance may not be optimal. + /// 2) If WillNeed() has been called for a specific row group, follow-up calls MAY + /// FAIL if columns that are not requested by WillNeed() are requested. + /// 3) Later calls to WillNeed() MAY override previous calls of same row groups. + /// For example, + /// 1) If WillNeed() is not called for row group 0, then follow-up calls to read + /// column index and/or offset index of all columns of row group 0 should not + /// fail if its page index exists. + /// 2) If WillNeed() is called for columns 0 and 1 for row group 0, then follow-up + /// call to read page index of column 2 for row group 0 WILL FAIL even if its + /// page index exists. + /// 3) If WillNeed() is called for row group 0 with offset index only, then + /// follow-up call to read column index of row group 0 WILL FAIL even if + /// the column index of this column exists. + /// 4) If WillNeed() is called for columns 0 and 1 for row group 0, then later + /// call to WillNeed() for columns 1 and 2 for row group 0. The later one + /// overrides previous call and only columns 1 and 2 of row group 0 are allowed + /// to access. /// /// \param[in] row_group_indices list of row group ordinal to read page index later. + /// \param[in] column_indices list of column ordinal to read page index later. If it is + /// empty, it means all columns in the row group will be read. /// \param[in] index_selection tell if any of the page index is required later. virtual void WillNeed(const std::vector& row_group_indices, + const std::vector& column_indices, IndexSelection index_selection) = 0; - /// \brief Advise the reader which part of page index will be read later. + /// \brief Advise the reader page index of these row groups will not be read any more. /// /// The PageIndexReader implementation has the opportunity to cancel any prefetch or /// release resource that are related to these row groups. /// /// \param[in] row_group_indices list of row group ordinal that whose page index will - /// not be needed any more. + /// not be accessed any more. virtual void WillNotNeed(const std::vector& row_group_indices) = 0; /// \brief Determines the column index and offset index ranges for the given row group. /// /// \param[in] row_group_metadata row group metadata to get column chunk metadata. - /// \returns RowGroupIndexReadRange of the specified row group. + /// \param[in] columns list of column ordinals to get page index. If the list is empty, + /// it means all columns in the row group. + /// \returns RowGroupIndexReadRange of the specified row group. Throws ParquetException + /// if the selected column ordinal is out of bound or metadata of page index + /// is corrupted. /// found. Returns false when there is absolutely no offsets index for the row group. static RowGroupIndexReadRange DeterminePageIndexRangesInRowGroup( - const RowGroupMetaData& row_group_metadata); + const RowGroupMetaData& row_group_metadata, const std::vector& columns); }; } // namespace parquet diff --git a/cpp/src/parquet/page_index_test.cc b/cpp/src/parquet/page_index_test.cc index 7020cc77a20..4ecee076218 100644 --- a/cpp/src/parquet/page_index_test.cc +++ b/cpp/src/parquet/page_index_test.cc @@ -317,8 +317,8 @@ void ValidatePageIndexRange(const RowGroupRanges& row_group_ranges, int expected_ci_size, int expected_oi_start, int expected_oi_size) { auto file_metadata = ConstructFakeMetaData(row_group_ranges); - auto read_range = - PageIndexReader::DeterminePageIndexRangesInRowGroup(*file_metadata->RowGroup(0)); + auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( + *file_metadata->RowGroup(0), {}); ASSERT_EQ(expected_has_page_index, read_range.column_index.has_value()); ASSERT_EQ(expected_has_page_index, read_range.offset_index.has_value()); if (expected_has_page_index) { @@ -341,7 +341,7 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroup) { ValidatePageIndexRange({{10, 5, 15, 5}}, true, 10, 5, 15, 5); // Page index for two column chunks. ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, true, 10, 20, 30, 40); - // Page index for second column chunk.. + // Page index for second column chunk. ValidatePageIndexRange({{-1, -1, -1, -1}, {20, 10, 30, 25}}, true, 20, 10, 30, 25); // Page index for first column chunk. ValidatePageIndexRange({{10, 5, 15, 5}, {-1, -1, -1, -1}}, true, 10, 5, 15, 5); diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index b5be0c79bb6..df052e74ad4 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -1085,7 +1085,7 @@ TEST_P(ParameterizedPageIndexReaderTest, TestReadPageIndex) { auto page_index_reader = file_reader->GetPageIndexReader(); ASSERT_NE(nullptr, page_index_reader); const auto params = GetParam(); - page_index_reader->WillNeed(params.row_group_indices, params.index_selection); + page_index_reader->WillNeed(params.row_group_indices, {}, params.index_selection); auto row_group_index_reader = page_index_reader->RowGroup(0); ASSERT_NE(nullptr, row_group_index_reader); From d11f3bb018f22aad1067c67ad5e62e1571ad7fe8 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 25 Jan 2023 10:58:52 +0800 Subject: [PATCH 08/12] add test cases for column selections and refine exception message --- cpp/src/parquet/page_index.cc | 97 ++++++++++++++++-------------- cpp/src/parquet/page_index.h | 7 ++- cpp/src/parquet/page_index_test.cc | 91 +++++++++++++++++++++++----- cpp/src/parquet/reader_test.cc | 97 ++++++++++++++++++++++++------ 4 files changed, 210 insertions(+), 82 deletions(-) diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index ea3a9fa568f..0df37d1d786 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -191,30 +191,19 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { std::shared_ptr row_group_metadata, const ReaderProperties& properties, int32_t row_group_ordinal, - std::optional index_read_range, + const RowGroupIndexReadRange& index_read_range, std::shared_ptr file_decryptor) : input_(input), row_group_metadata_(std::move(row_group_metadata)), properties_(properties), row_group_ordinal_(row_group_ordinal), - file_decryptor_(std::move(file_decryptor)) { - if (index_read_range.has_value()) { - /// This row group has been requested by WillNeed(). Only column index and/or - /// offset index of requested columns can be read. Will throw if columns are - /// out of range. - index_read_range_ = index_read_range.value(); - } else { - /// If the row group has not been requested by WillNeed(), by default both column - /// index and offset index of all column chunks for the row group can be read. - index_read_range_ = - PageIndexReader::DeterminePageIndexRangesInRowGroup(*row_group_metadata_, {}); - } - } + index_read_range_(index_read_range), + file_decryptor_(std::move(file_decryptor)) {} /// Read column index of a column chunk. std::shared_ptr GetColumnIndex(int32_t i) override { if (i < 0 || i >= row_group_metadata_->num_columns()) { - throw ParquetException("Invalid column {} to get column index", i); + throw ParquetException("Invalid column index at column ordinal ", i); } auto col_chunk = row_group_metadata_->ColumnChunk(i); @@ -250,7 +239,7 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { /// Read offset index of a column chunk. std::shared_ptr GetOffsetIndex(int32_t i) override { if (i < 0 || i >= row_group_metadata_->num_columns()) { - throw ParquetException("Invalid column {} to get offset index", i); + throw ParquetException("Invalid offset index at column ordinal ", i); } auto col_chunk = row_group_metadata_->ColumnChunk(i); @@ -288,33 +277,34 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { const std::optional<::arrow::io::ReadRange>& index_read_range, int32_t row_group_ordinal) { if (!index_read_range.has_value()) { - throw ParquetException( - "Missing page index read range of row group {}, it may not exist or has not " - "been requested", - row_group_ordinal); + throw ParquetException("Missing page index read range of row group ", + row_group_ordinal, + ", it may not exist or has not been requested"); } /// The coalesced read range is invalid. if (index_read_range->offset < 0 || index_read_range->length <= 0) { - throw ParquetException("Invalid page index read range: offset {} length {}", - index_read_range->offset, index_read_range->length); + throw ParquetException("Invalid page index read range: offset ", + index_read_range->offset, " length ", + index_read_range->length); } /// The location to page index itself is corrupted. if (index_location.offset < 0 || index_location.length <= 0) { - throw ParquetException("Invalid page index location: offset {} length {}", - index_location.offset, index_location.length); + throw ParquetException("Invalid page index location: offset ", + index_location.offset, " length ", index_location.length); } /// Page index location must be within the range of the read range. if (index_location.offset < index_read_range->offset || index_location.offset + index_location.length > index_read_range->offset + index_read_range->length) { - throw ParquetException( - "Page index location [offset:{},length:{}] is out of range from previous " - "WillNeed request [offset:{},length:{}], row group: {}", - index_location.offset, index_location.length, index_read_range->offset, - index_read_range->length, row_group_ordinal); + throw ParquetException("Page index location [offset:", index_location.offset, + ",length:", index_location.length, + "] is out of range from previous WillNeed request [offset:", + index_read_range->offset, + ",length:", index_read_range->length, + "], row group: ", row_group_ordinal); } } @@ -331,12 +321,12 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { /// The ordinal of the row group in the file. int32_t row_group_ordinal_; - /// File-level decryptor. - std::shared_ptr file_decryptor_; - /// File offsets and sizes of the page Index of all column chunks in the row group. RowGroupIndexReadRange index_read_range_; + /// File-level decryptor. + std::shared_ptr file_decryptor_; + /// Buffer to hold the raw bytes of the page index. /// Will be set lazily when the corresponding page index is accessed for the 1st time. std::shared_ptr<::arrow::Buffer> column_index_buffer_; @@ -356,38 +346,55 @@ class PageIndexReaderImpl : public PageIndexReader { std::shared_ptr RowGroup(int i) override { if (i < 0 || i >= file_metadata_->num_row_groups()) { - throw ParquetException("Invalid row group ordinal {}", i); + throw ParquetException("Invalid row group ordinal: ", i); } + auto row_group_metadata = file_metadata_->RowGroup(i); + // Find the read range of the page index of the row group if provided by WillNeed() - std::optional index_read_range = std::nullopt; + RowGroupIndexReadRange index_read_range; auto iter = index_read_ranges_.find(i); if (iter != index_read_ranges_.cend()) { + /// This row group has been requested by WillNeed(). Only column index and/or + /// offset index of requested columns can be read. index_read_range = iter->second; + } else { + /// If the row group has not been requested by WillNeed(), by default both column + /// index and offset index of all column chunks for the row group can be read. + index_read_range = + PageIndexReader::DeterminePageIndexRangesInRowGroup(*row_group_metadata, {}); + } + + if (index_read_range.column_index.has_value() || + index_read_range.offset_index.has_value()) { + return std::make_shared( + input_, std::move(row_group_metadata), properties_, i, index_read_range, + file_decryptor_); } - return std::make_shared( - input_, file_metadata_->RowGroup(i), properties_, i, index_read_range, - file_decryptor_); + /// The row group does not has page index or has not been requested by WillNeed(). + /// Simply returns nullptr. + return nullptr; } void WillNeed(const std::vector& row_group_indices, const std::vector& column_indices, IndexSelection index_selection) override { - if (!index_selection.column_index && !index_selection.offset_index) { - // Neither column index nor offset index has been requested, simply return. - return; - } - std::vector<::arrow::io::ReadRange> read_ranges; for (int32_t row_group_ordinal : row_group_indices) { auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( *file_metadata_->RowGroup(row_group_ordinal), column_indices); if (index_selection.column_index && read_range.column_index.has_value()) { read_ranges.push_back(*read_range.column_index); + } else { + // Mark the column index as not requested. + read_range.column_index = std::nullopt; } if (index_selection.offset_index && read_range.offset_index.has_value()) { read_ranges.push_back(*read_range.offset_index); + } else { + // Mark the offset index as not requested. + read_range.offset_index = std::nullopt; } index_read_ranges_.emplace(row_group_ordinal, std::move(read_range)); } @@ -431,8 +438,8 @@ RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( int64_t* start, int64_t* end) { if (index_location.has_value()) { if (index_location->offset < 0 || index_location->length <= 0) { - throw ParquetException("Invalid index location: offset {} length {}", - index_location->offset, index_location->length); + throw ParquetException("Invalid index location: offset ", index_location->offset, + " length ", index_location->length); } *start = std::min(*start, index_location->offset); *end = std::max(*end, index_location->offset + index_location->length); @@ -448,7 +455,7 @@ RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( } else { for (int32_t i : columns) { if (i < 0 || i >= row_group_metadata.num_columns()) { - throw ParquetException("Invalid column ordinal {}", i); + throw ParquetException("Invalid column ordinal ", i); } auto col_chunk = row_group_metadata.ColumnChunk(i); merge_range(col_chunk->GetColumnIndexLocation(), &ci_start, &ci_end); diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index f2c6f11bd88..b61e675071e 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -202,9 +202,10 @@ class PARQUET_EXPORT PageIndexReader { /// 1) If WillNeed() has not been called for a specific row group and the page index /// exists, follow-up calls to get column index or offset index of all columns in /// this row group SHOULD NOT FAIL, but the performance may not be optimal. - /// 2) If WillNeed() has been called for a specific row group, follow-up calls MAY - /// FAIL if columns that are not requested by WillNeed() are requested. - /// 3) Later calls to WillNeed() MAY override previous calls of same row groups. + /// 2) If WillNeed() has been called for a specific row group, follow-up calls to get + /// page index are limited to columns and index type requested by WillNeed(). + /// So it MAY FAIL if columns that are not requested by WillNeed() are requested. + /// 3) Later calls to WillNeed() MAY OVERRIDE previous calls of same row groups. /// For example, /// 1) If WillNeed() is not called for row group 0, then follow-up calls to read /// column index and/or offset index of all columns of row group 0 should not diff --git a/cpp/src/parquet/page_index_test.cc b/cpp/src/parquet/page_index_test.cc index 4ecee076218..46599960b8a 100644 --- a/cpp/src/parquet/page_index_test.cc +++ b/cpp/src/parquet/page_index_test.cc @@ -313,17 +313,21 @@ std::shared_ptr ConstructFakeMetaData( /// Validates that 'DeterminePageIndexRangesInRowGroup()' selects the expected file /// offsets and sizes or returns false when the row group doesn't have a page index. void ValidatePageIndexRange(const RowGroupRanges& row_group_ranges, - bool expected_has_page_index, int expected_ci_start, + const std::vector& column_indices, + bool expected_has_column_index, + bool expected_has_offset_index, int expected_ci_start, int expected_ci_size, int expected_oi_start, int expected_oi_size) { auto file_metadata = ConstructFakeMetaData(row_group_ranges); auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( - *file_metadata->RowGroup(0), {}); - ASSERT_EQ(expected_has_page_index, read_range.column_index.has_value()); - ASSERT_EQ(expected_has_page_index, read_range.offset_index.has_value()); - if (expected_has_page_index) { + *file_metadata->RowGroup(0), column_indices); + ASSERT_EQ(expected_has_column_index, read_range.column_index.has_value()); + ASSERT_EQ(expected_has_offset_index, read_range.offset_index.has_value()); + if (expected_has_column_index) { EXPECT_EQ(expected_ci_start, read_range.column_index->offset); EXPECT_EQ(expected_ci_size, read_range.column_index->length); + } + if (expected_has_offset_index) { EXPECT_EQ(expected_oi_start, read_range.offset_index->offset); EXPECT_EQ(expected_oi_size, read_range.offset_index->length); } @@ -334,25 +338,82 @@ void ValidatePageIndexRange(const RowGroupRanges& row_group_ranges, /// properly computes the file range that contains the whole page index. TEST(PageIndex, DeterminePageIndexRangesInRowGroup) { // No Column chunks - ValidatePageIndexRange({}, false, -1, -1, -1, -1); + ValidatePageIndexRange({}, {}, false, false, -1, -1, -1, -1); // No page index at all. - ValidatePageIndexRange({{-1, -1, -1, -1}}, false, -1, -1, -1, -1); + ValidatePageIndexRange({{-1, -1, -1, -1}}, {}, false, false, -1, -1, -1, -1); // Page index for single column chunk. - ValidatePageIndexRange({{10, 5, 15, 5}}, true, 10, 5, 15, 5); + ValidatePageIndexRange({{10, 5, 15, 5}}, {}, true, true, 10, 5, 15, 5); // Page index for two column chunks. - ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, true, 10, 20, 30, 40); + ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, {}, true, true, 10, 20, 30, + 40); // Page index for second column chunk. - ValidatePageIndexRange({{-1, -1, -1, -1}, {20, 10, 30, 25}}, true, 20, 10, 30, 25); + ValidatePageIndexRange({{-1, -1, -1, -1}, {20, 10, 30, 25}}, {}, true, true, 20, 10, 30, + 25); // Page index for first column chunk. - ValidatePageIndexRange({{10, 5, 15, 5}, {-1, -1, -1, -1}}, true, 10, 5, 15, 5); + ValidatePageIndexRange({{10, 5, 15, 5}, {-1, -1, -1, -1}}, {}, true, true, 10, 5, 15, + 5); // Missing offset index for first column chunk. Gap in column index. - ValidatePageIndexRange({{10, 5, -1, -1}, {20, 10, 30, 25}}, true, 10, 20, 30, 25); + ValidatePageIndexRange({{10, 5, -1, -1}, {20, 10, 30, 25}}, {}, true, true, 10, 20, 30, + 25); // Missing offset index for second column chunk. - ValidatePageIndexRange({{10, 5, 25, 5}, {20, 10, -1, -1}}, true, 10, 20, 25, 5); - // Three column chunks. + ValidatePageIndexRange({{10, 5, 25, 5}, {20, 10, -1, -1}}, {}, true, true, 10, 20, 25, + 5); + // Four column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + {}, true, true, 100, 110, 220, 180); +} + +/// This test constructs a couple of artificial row groups with page index offsets in +/// them. Then it validates if PageIndexReader::DeterminePageIndexRangesInRowGroup() +/// properly computes the file range that contains the page index of selected columns. +TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithPartialColumnsSelected) { + // No page index at all. + ValidatePageIndexRange({{-1, -1, -1, -1}}, {0}, false, false, -1, -1, -1, -1); + // Page index for single column chunk. + ValidatePageIndexRange({{10, 5, 15, 5}}, {0}, true, true, 10, 5, 15, 5); + // Page index for the 1st column chunk. + ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, {0}, true, true, 10, 5, 30, + 25); + // Page index for the 2nd column chunk. + ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, {1}, true, true, 15, 15, 50, + 20); + // Only 2nd column is selected among four column chunks. ValidatePageIndexRange( {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, - true, 100, 110, 220, 180); + {1}, true, true, 110, 25, 250, 10); + // Only 2nd and 3rd columns are selected among four column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + {1, 2}, true, true, 110, 60, 250, 50); + // Only 2nd and 4th columns are selected among four column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + {1, 3}, true, true, 110, 100, 250, 150); + // Only 1st, 2nd and 4th columns are selected among four column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + {0, 1, 3}, true, true, 100, 110, 220, 180); + // 3rd column is selected but not present in the row group. + EXPECT_THROW(ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, {2}, false, + false, -1, -1, -1, -1), + ParquetException); +} + +/// This test constructs a couple of artificial row groups with page index offsets in +/// them. Then it validates if PageIndexReader::DeterminePageIndexRangesInRowGroup() +/// properly detects if column index or offset index is missing. +TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) { + // No column index at all. + ValidatePageIndexRange({{-1, -1, 15, 5}}, {}, false, true, -1, -1, 15, 5); + // No offset index at all. + ValidatePageIndexRange({{10, 5, -1, -1}}, {}, true, false, 10, 5, -1, -1); + // No column index at all among two column chunks. + ValidatePageIndexRange({{-1, -1, 30, 25}, {-1, -1, 50, 20}}, {}, false, true, -1, -1, + 30, 40); + // No offset index at all among two column chunks. + ValidatePageIndexRange({{10, 5, -1, -1}, {15, 15, -1, -1}}, {}, true, false, 10, 20, -1, + -1); } } // namespace parquet diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index df052e74ad4..7a0168aa3b2 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -1062,10 +1062,13 @@ TEST(TestFileReader, TestOverflowInt16PageOrdinal) { struct PageIndexReaderParam { PageIndexReaderParam(const std::vector& row_group_indices, - bool need_column_index, bool need_offset_index) + const std::vector& column_indices, bool need_column_index, + bool need_offset_index) : row_group_indices(row_group_indices), + column_indices(column_indices), index_selection{need_column_index, need_offset_index} {} std::vector row_group_indices; + std::vector column_indices; IndexSelection index_selection; }; @@ -1085,12 +1088,40 @@ TEST_P(ParameterizedPageIndexReaderTest, TestReadPageIndex) { auto page_index_reader = file_reader->GetPageIndexReader(); ASSERT_NE(nullptr, page_index_reader); const auto params = GetParam(); - page_index_reader->WillNeed(params.row_group_indices, {}, params.index_selection); + const bool call_will_need = !params.row_group_indices.empty(); + if (call_will_need) { + page_index_reader->WillNeed(params.row_group_indices, params.column_indices, + params.index_selection); + } + auto row_group_index_reader = page_index_reader->RowGroup(0); - ASSERT_NE(nullptr, row_group_index_reader); + if (!call_will_need || params.index_selection.offset_index || + params.index_selection.column_index) { + ASSERT_NE(nullptr, row_group_index_reader); + } else { + // None of page index is requested. + ASSERT_EQ(nullptr, row_group_index_reader); + return; + } - // Verify offset index of column 0 and only partial data as it contains 325 pages. - { + auto column_index_requested = [&](int32_t column_id) { + return !call_will_need || + (params.index_selection.column_index && + (params.column_indices.empty() || + (std::find(params.column_indices.cbegin(), params.column_indices.cend(), + column_id) != params.column_indices.cend()))); + }; + + auto offset_index_requested = [&](int32_t column_id) { + return !call_will_need || + (params.index_selection.offset_index && + (params.column_indices.empty() || + (std::find(params.column_indices.cbegin(), params.column_indices.cend(), + column_id) != params.column_indices.cend()))); + }; + + if (offset_index_requested(0)) { + // Verify offset index of column 0 and only partial data as it contains 325 pages. const size_t num_pages = 325; const std::vector page_indices = {0, 100, 200, 300}; const std::vector page_locations = { @@ -1111,10 +1142,12 @@ TEST_P(ParameterizedPageIndexReaderTest, TestReadPageIndex) { EXPECT_EQ(expected_page_location.first_row_index, read_page_location.first_row_index); } + } else { + EXPECT_THROW(row_group_index_reader->GetOffsetIndex(0), ParquetException); } - // Verify column index of column 5 and only partial data as it contains 528 pages. - { + if (column_index_requested(5)) { + // Verify column index of column 5 and only partial data as it contains 528 pages. const size_t num_pages = 528; const BoundaryOrder::type boundary_order = BoundaryOrder::Unordered; const std::vector page_indices = {0, 99, 426, 520}; @@ -1143,23 +1176,49 @@ TEST_P(ParameterizedPageIndexReaderTest, TestReadPageIndex) { EXPECT_EQ(max_values.at(i), typed_column_index->max_values().at(page_id)); } } + } else { + EXPECT_THROW(row_group_index_reader->GetColumnIndex(5), ParquetException); } // Verify null is returned if column index does not exist. - { - auto column_index = row_group_index_reader->GetColumnIndex(10); - EXPECT_EQ(nullptr, column_index); - } + auto column_index = row_group_index_reader->GetColumnIndex(10); + EXPECT_EQ(nullptr, column_index); } INSTANTIATE_TEST_SUITE_P(PageIndexReaderTests, ParameterizedPageIndexReaderTest, - ::testing::Values(PageIndexReaderParam({0}, true, true), - PageIndexReaderParam({0}, true, false), - PageIndexReaderParam({0}, false, true), - PageIndexReaderParam({0}, false, false), - PageIndexReaderParam({}, true, true), - PageIndexReaderParam({}, true, false), - PageIndexReaderParam({}, false, true), - PageIndexReaderParam({}, false, false))); + ::testing::Values(PageIndexReaderParam({}, {}, true, true), + PageIndexReaderParam({}, {}, true, false), + PageIndexReaderParam({}, {}, false, true), + PageIndexReaderParam({}, {}, false, false), + PageIndexReaderParam({0}, {}, true, true), + PageIndexReaderParam({0}, {}, true, false), + PageIndexReaderParam({0}, {}, false, true), + PageIndexReaderParam({0}, {}, false, false), + PageIndexReaderParam({0}, {0}, true, true), + PageIndexReaderParam({0}, {0}, true, false), + PageIndexReaderParam({0}, {0}, false, true), + PageIndexReaderParam({0}, {0}, false, false), + PageIndexReaderParam({0}, {5}, true, true), + PageIndexReaderParam({0}, {5}, true, false), + PageIndexReaderParam({0}, {5}, false, true), + PageIndexReaderParam({0}, {5}, false, false), + PageIndexReaderParam({0}, {0, 5}, true, true), + PageIndexReaderParam({0}, {0, 5}, true, false), + PageIndexReaderParam({0}, {0, 5}, false, true), + PageIndexReaderParam({0}, {0, 5}, false, + false))); + +TEST(PageIndexReaderTest, ReadFileWithoutPageIndex) { + ReaderProperties properties; + auto file_reader = ParquetFileReader::OpenFile(data_file("int32_decimal.parquet"), + /*memory_map=*/false, properties); + auto metadata = file_reader->metadata(); + EXPECT_EQ(1, metadata->num_row_groups()); + + auto page_index_reader = file_reader->GetPageIndexReader(); + ASSERT_NE(nullptr, page_index_reader); + auto row_group_index_reader = page_index_reader->RowGroup(0); + ASSERT_EQ(nullptr, row_group_index_reader); +} } // namespace parquet From d3109eb1f465d74dab03b5d364f95111569013e2 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 2 Feb 2023 10:04:34 +0800 Subject: [PATCH 09/12] change signature of WillNeed --- cpp/src/parquet/page_index.cc | 2 +- cpp/src/parquet/page_index.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 0df37d1d786..14de032e521 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -379,7 +379,7 @@ class PageIndexReaderImpl : public PageIndexReader { void WillNeed(const std::vector& row_group_indices, const std::vector& column_indices, - IndexSelection index_selection) override { + const IndexSelection& index_selection) override { std::vector<::arrow::io::ReadRange> read_ranges; for (int32_t row_group_ordinal : row_group_indices) { auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index b61e675071e..f8ce0e453fe 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -227,7 +227,7 @@ class PARQUET_EXPORT PageIndexReader { /// \param[in] index_selection tell if any of the page index is required later. virtual void WillNeed(const std::vector& row_group_indices, const std::vector& column_indices, - IndexSelection index_selection) = 0; + const IndexSelection& index_selection) = 0; /// \brief Advise the reader page index of these row groups will not be read any more. /// From dd146434e8ef87e9f0fb5281d03b2d89cc78d8e6 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 2 Feb 2023 19:34:05 +0800 Subject: [PATCH 10/12] change comments and simplify tests --- cpp/src/parquet/page_index.cc | 12 +++++--- cpp/src/parquet/page_index.h | 13 ++++----- cpp/src/parquet/reader_test.cc | 52 +++++++++++++++------------------- 3 files changed, 37 insertions(+), 40 deletions(-) diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 14de032e521..877628c31ed 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -23,6 +23,7 @@ #include "parquet/statistics.h" #include "parquet/thrift_internal.h" +#include "arrow/util/int_util_overflow.h" #include "arrow/util/unreachable.h" #include @@ -379,18 +380,18 @@ class PageIndexReaderImpl : public PageIndexReader { void WillNeed(const std::vector& row_group_indices, const std::vector& column_indices, - const IndexSelection& index_selection) override { + const PageIndexSelection& selection) override { std::vector<::arrow::io::ReadRange> read_ranges; for (int32_t row_group_ordinal : row_group_indices) { auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( *file_metadata_->RowGroup(row_group_ordinal), column_indices); - if (index_selection.column_index && read_range.column_index.has_value()) { + if (selection.column_index && read_range.column_index.has_value()) { read_ranges.push_back(*read_range.column_index); } else { // Mark the column index as not requested. read_range.column_index = std::nullopt; } - if (index_selection.offset_index && read_range.offset_index.has_value()) { + if (selection.offset_index && read_range.offset_index.has_value()) { read_ranges.push_back(*read_range.offset_index); } else { // Mark the offset index as not requested. @@ -441,8 +442,11 @@ RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( throw ParquetException("Invalid index location: offset ", index_location->offset, " length ", index_location->length); } + int64_t index_end = 0; + ::arrow::internal::AddWithOverflow(index_location->offset, index_location->length, + &index_end); *start = std::min(*start, index_location->offset); - *end = std::max(*end, index_location->offset + index_location->length); + *end = std::max(*end, index_end); } }; diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index f8ce0e453fe..79fbce20ed3 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -152,7 +152,7 @@ class PARQUET_EXPORT RowGroupPageIndexReader { virtual std::shared_ptr GetOffsetIndex(int32_t i) = 0; }; -struct IndexSelection { +struct PageIndexSelection { /// Specifies whether to read the column index. bool column_index = false; /// Specifies whether to read the offset index. @@ -211,10 +211,10 @@ class PARQUET_EXPORT PageIndexReader { /// column index and/or offset index of all columns of row group 0 should not /// fail if its page index exists. /// 2) If WillNeed() is called for columns 0 and 1 for row group 0, then follow-up - /// call to read page index of column 2 for row group 0 WILL FAIL even if its + /// call to read page index of column 2 for row group 0 MAY FAIL even if its /// page index exists. /// 3) If WillNeed() is called for row group 0 with offset index only, then - /// follow-up call to read column index of row group 0 WILL FAIL even if + /// follow-up call to read column index of row group 0 MAY FAIL even if /// the column index of this column exists. /// 4) If WillNeed() is called for columns 0 and 1 for row group 0, then later /// call to WillNeed() for columns 1 and 2 for row group 0. The later one @@ -224,10 +224,10 @@ class PARQUET_EXPORT PageIndexReader { /// \param[in] row_group_indices list of row group ordinal to read page index later. /// \param[in] column_indices list of column ordinal to read page index later. If it is /// empty, it means all columns in the row group will be read. - /// \param[in] index_selection tell if any of the page index is required later. + /// \param[in] selection which kind of page index is required later. virtual void WillNeed(const std::vector& row_group_indices, const std::vector& column_indices, - const IndexSelection& index_selection) = 0; + const PageIndexSelection& selection) = 0; /// \brief Advise the reader page index of these row groups will not be read any more. /// @@ -238,7 +238,7 @@ class PARQUET_EXPORT PageIndexReader { /// not be accessed any more. virtual void WillNotNeed(const std::vector& row_group_indices) = 0; - /// \brief Determines the column index and offset index ranges for the given row group. + /// \brief Determine the column index and offset index ranges for the given row group. /// /// \param[in] row_group_metadata row group metadata to get column chunk metadata. /// \param[in] columns list of column ordinals to get page index. If the list is empty, @@ -246,7 +246,6 @@ class PARQUET_EXPORT PageIndexReader { /// \returns RowGroupIndexReadRange of the specified row group. Throws ParquetException /// if the selected column ordinal is out of bound or metadata of page index /// is corrupted. - /// found. Returns false when there is absolutely no offsets index for the row group. static RowGroupIndexReadRange DeterminePageIndexRangesInRowGroup( const RowGroupMetaData& row_group_metadata, const std::vector& columns); }; diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index 7a0168aa3b2..5e5a08fc7ae 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -1061,15 +1061,9 @@ TEST(TestFileReader, TestOverflowInt16PageOrdinal) { } struct PageIndexReaderParam { - PageIndexReaderParam(const std::vector& row_group_indices, - const std::vector& column_indices, bool need_column_index, - bool need_offset_index) - : row_group_indices(row_group_indices), - column_indices(column_indices), - index_selection{need_column_index, need_offset_index} {} std::vector row_group_indices; std::vector column_indices; - IndexSelection index_selection; + PageIndexSelection index_selection; }; class ParameterizedPageIndexReaderTest @@ -1185,28 +1179,28 @@ TEST_P(ParameterizedPageIndexReaderTest, TestReadPageIndex) { EXPECT_EQ(nullptr, column_index); } -INSTANTIATE_TEST_SUITE_P(PageIndexReaderTests, ParameterizedPageIndexReaderTest, - ::testing::Values(PageIndexReaderParam({}, {}, true, true), - PageIndexReaderParam({}, {}, true, false), - PageIndexReaderParam({}, {}, false, true), - PageIndexReaderParam({}, {}, false, false), - PageIndexReaderParam({0}, {}, true, true), - PageIndexReaderParam({0}, {}, true, false), - PageIndexReaderParam({0}, {}, false, true), - PageIndexReaderParam({0}, {}, false, false), - PageIndexReaderParam({0}, {0}, true, true), - PageIndexReaderParam({0}, {0}, true, false), - PageIndexReaderParam({0}, {0}, false, true), - PageIndexReaderParam({0}, {0}, false, false), - PageIndexReaderParam({0}, {5}, true, true), - PageIndexReaderParam({0}, {5}, true, false), - PageIndexReaderParam({0}, {5}, false, true), - PageIndexReaderParam({0}, {5}, false, false), - PageIndexReaderParam({0}, {0, 5}, true, true), - PageIndexReaderParam({0}, {0, 5}, true, false), - PageIndexReaderParam({0}, {0, 5}, false, true), - PageIndexReaderParam({0}, {0, 5}, false, - false))); +INSTANTIATE_TEST_SUITE_P( + PageIndexReaderTests, ParameterizedPageIndexReaderTest, + ::testing::Values(PageIndexReaderParam{{}, {}, {true, true}}, + PageIndexReaderParam{{}, {}, {true, false}}, + PageIndexReaderParam{{}, {}, {false, true}}, + PageIndexReaderParam{{}, {}, {false, false}}, + PageIndexReaderParam{{0}, {}, {true, true}}, + PageIndexReaderParam{{0}, {}, {true, false}}, + PageIndexReaderParam{{0}, {}, {false, true}}, + PageIndexReaderParam{{0}, {}, {false, false}}, + PageIndexReaderParam{{0}, {0}, {true, true}}, + PageIndexReaderParam{{0}, {0}, {true, false}}, + PageIndexReaderParam{{0}, {0}, {false, true}}, + PageIndexReaderParam{{0}, {0}, {false, false}}, + PageIndexReaderParam{{0}, {5}, {true, true}}, + PageIndexReaderParam{{0}, {5}, {true, false}}, + PageIndexReaderParam{{0}, {5}, {false, true}}, + PageIndexReaderParam{{0}, {5}, {false, false}}, + PageIndexReaderParam{{0}, {0, 5}, {true, true}}, + PageIndexReaderParam{{0}, {0, 5}, {true, false}}, + PageIndexReaderParam{{0}, {0, 5}, {false, true}}, + PageIndexReaderParam{{0}, {0, 5}, {false, false}})); TEST(PageIndexReaderTest, ReadFileWithoutPageIndex) { ReaderProperties properties; From d9c7b258f34b50d9d7dfcdd3056b7a0028e99860 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 2 Feb 2023 23:08:16 +0800 Subject: [PATCH 11/12] check return value --- cpp/src/parquet/page_index.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 877628c31ed..f24cecf43b3 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -438,13 +438,14 @@ RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( auto merge_range = [](const std::optional& index_location, int64_t* start, int64_t* end) { if (index_location.has_value()) { - if (index_location->offset < 0 || index_location->length <= 0) { - throw ParquetException("Invalid index location: offset ", index_location->offset, - " length ", index_location->length); - } int64_t index_end = 0; - ::arrow::internal::AddWithOverflow(index_location->offset, index_location->length, - &index_end); + if (index_location->offset < 0 || index_location->length <= 0 || + ::arrow::internal::AddWithOverflow(index_location->offset, + index_location->length, &index_end)) { + throw ParquetException("Invalid page index location: offset ", + index_location->offset, " length ", + index_location->length); + } *start = std::min(*start, index_location->offset); *end = std::max(*end, index_end); } From a5a9be79ce45ab2545f6f2cff175b9b445104813 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 2 Feb 2023 16:54:10 +0100 Subject: [PATCH 12/12] Try to fix Unity build issue --- cpp/src/parquet/CMakeLists.txt | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 4b910d5c432..e849d533aca 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -130,9 +130,12 @@ set(PARQUET_STATIC_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS} parquet_static thrift # # Generated Thrift sources -set_source_files_properties(src/generated/parquet_types.cpp src/generated/parquet_types.h - src/generated/parquet_constants.cpp - src/generated/parquet_constants.h +set(PARQUET_THRIFT_SOURCE_DIR "${ARROW_SOURCE_DIR}/src/generated/") + +set_source_files_properties("${PARQUET_THRIFT_SOURCE_DIR}/parquet_types.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_types.h" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_constants.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_constants.h" PROPERTIES SKIP_PRECOMPILE_HEADERS ON SKIP_UNITY_BUILD_INCLUSION ON) @@ -167,8 +170,8 @@ set(PARQUET_SRCS metadata.cc xxhasher.cc page_index.cc - "${ARROW_SOURCE_DIR}/src/generated/parquet_constants.cpp" - "${ARROW_SOURCE_DIR}/src/generated/parquet_types.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_constants.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_types.cpp" platform.cc printer.cc properties.cc @@ -277,8 +280,8 @@ add_arrow_lib(parquet if(WIN32 AND NOT (ARROW_TEST_LINKAGE STREQUAL "static")) add_library(parquet_test_support STATIC - "${ARROW_SOURCE_DIR}/src/generated/parquet_constants.cpp" - "${ARROW_SOURCE_DIR}/src/generated/parquet_types.cpp") + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_constants.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_types.cpp") target_link_libraries(parquet_test_support thrift::thrift) set(PARQUET_SHARED_TEST_LINK_LIBS ${PARQUET_SHARED_TEST_LINK_LIBS} parquet_test_support) set(PARQUET_LIBRARIES ${PARQUET_LIBRARIES} parquet_test_support)