diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 4b910d5c432..e849d533aca 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -130,9 +130,12 @@ set(PARQUET_STATIC_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS} parquet_static thrift # # Generated Thrift sources -set_source_files_properties(src/generated/parquet_types.cpp src/generated/parquet_types.h - src/generated/parquet_constants.cpp - src/generated/parquet_constants.h +set(PARQUET_THRIFT_SOURCE_DIR "${ARROW_SOURCE_DIR}/src/generated/") + +set_source_files_properties("${PARQUET_THRIFT_SOURCE_DIR}/parquet_types.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_types.h" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_constants.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_constants.h" PROPERTIES SKIP_PRECOMPILE_HEADERS ON SKIP_UNITY_BUILD_INCLUSION ON) @@ -167,8 +170,8 @@ set(PARQUET_SRCS metadata.cc xxhasher.cc page_index.cc - "${ARROW_SOURCE_DIR}/src/generated/parquet_constants.cpp" - "${ARROW_SOURCE_DIR}/src/generated/parquet_types.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_constants.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_types.cpp" platform.cc printer.cc properties.cc @@ -277,8 +280,8 @@ add_arrow_lib(parquet if(WIN32 AND NOT (ARROW_TEST_LINKAGE STREQUAL "static")) add_library(parquet_test_support STATIC - "${ARROW_SOURCE_DIR}/src/generated/parquet_constants.cpp" - "${ARROW_SOURCE_DIR}/src/generated/parquet_types.cpp") + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_constants.cpp" + "${PARQUET_THRIFT_SOURCE_DIR}/parquet_types.cpp") target_link_libraries(parquet_test_support thrift::thrift) set(PARQUET_SHARED_TEST_LINK_LIBS ${PARQUET_SHARED_TEST_LINK_LIBS} parquet_test_support) set(PARQUET_LIBRARIES ${PARQUET_LIBRARIES} parquet_test_support) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 520317539b5..2edd258715a 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -40,6 +40,7 @@ #include "parquet/exception.h" #include "parquet/file_writer.h" #include "parquet/metadata.h" +#include "parquet/page_index.h" #include "parquet/platform.h" #include "parquet/properties.h" #include "parquet/schema.h" @@ -302,6 +303,22 @@ class SerializedFile : public ParquetFileReader::Contents { std::shared_ptr metadata() const override { return file_metadata_; } + std::shared_ptr GetPageIndexReader() override { + if (!file_metadata_) { + // Usually this won't happen if user calls one of the static Open() functions + // to create a ParquetFileReader instance. But if user calls the constructor + // directly and calls GetPageIndexReader() before Open() then this could happen. + throw ParquetException( + "Cannot call GetPageIndexReader() due to missing file metadata. Did you " + "forget to call ParquetFileReader::Open() first?"); + } + if (!page_index_reader_) { + page_index_reader_ = PageIndexReader::Make(source_.get(), file_metadata_, + properties_, file_decryptor_); + } + return page_index_reader_; + } + void set_metadata(std::shared_ptr metadata) { file_metadata_ = std::move(metadata); } @@ -522,7 +539,7 @@ class SerializedFile : public ParquetFileReader::Contents { int64_t source_size_; std::shared_ptr file_metadata_; ReaderProperties properties_; - + std::shared_ptr page_index_reader_; std::shared_ptr file_decryptor_; // \return The true length of the metadata in bytes @@ -784,6 +801,10 @@ std::shared_ptr ParquetFileReader::metadata() const { return contents_->metadata(); } +std::shared_ptr ParquetFileReader::GetPageIndexReader() { + return contents_->GetPageIndexReader(); +} + std::shared_ptr ParquetFileReader::RowGroup(int i) { if (i >= metadata()->num_row_groups()) { std::stringstream ss; diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 4b27575f01d..b39ff3d95b7 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -32,6 +32,7 @@ namespace parquet { class ColumnReader; class FileMetaData; +class PageIndexReader; class PageReader; class RowGroupMetaData; @@ -98,6 +99,7 @@ class PARQUET_EXPORT ParquetFileReader { virtual void Close() = 0; virtual std::shared_ptr GetRowGroup(int i) = 0; virtual std::shared_ptr metadata() const = 0; + virtual std::shared_ptr GetPageIndexReader() = 0; }; ParquetFileReader(); @@ -133,6 +135,17 @@ class PARQUET_EXPORT ParquetFileReader { // Returns the file metadata. Only one instance is ever created std::shared_ptr metadata() const; + /// Returns the PageIndexReader. Only one instance is ever created. + /// + /// If the file does not have the page index, nullptr may be returned. + /// Because it pays to check existence of page index in the file, it + /// is possible to return a non null value even if page index does + /// not exist. It is the caller's responsibility to check the return + /// value and follow-up calls to PageIndexReader. + /// + /// WARNING: The returned PageIndexReader must not outlive the ParquetFileReader. + std::shared_ptr GetPageIndexReader(); + /// Pre-buffer the specified column indices in all row groups. /// /// Readers can optionally call this to cache the necessary slices diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 559d3659882..f24cecf43b3 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -18,10 +18,12 @@ #include "parquet/page_index.h" #include "parquet/encoding.h" #include "parquet/exception.h" +#include "parquet/metadata.h" #include "parquet/schema.h" #include "parquet/statistics.h" #include "parquet/thrift_internal.h" +#include "arrow/util/int_util_overflow.h" #include "arrow/util/unreachable.h" #include @@ -184,8 +186,298 @@ class OffsetIndexImpl : public OffsetIndex { std::vector page_locations_; }; +class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { + public: + RowGroupPageIndexReaderImpl(::arrow::io::RandomAccessFile* input, + std::shared_ptr row_group_metadata, + const ReaderProperties& properties, + int32_t row_group_ordinal, + const RowGroupIndexReadRange& index_read_range, + std::shared_ptr file_decryptor) + : input_(input), + row_group_metadata_(std::move(row_group_metadata)), + properties_(properties), + row_group_ordinal_(row_group_ordinal), + index_read_range_(index_read_range), + file_decryptor_(std::move(file_decryptor)) {} + + /// Read column index of a column chunk. + std::shared_ptr GetColumnIndex(int32_t i) override { + if (i < 0 || i >= row_group_metadata_->num_columns()) { + throw ParquetException("Invalid column index at column ordinal ", i); + } + + auto col_chunk = row_group_metadata_->ColumnChunk(i); + std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); + if (crypto_metadata != nullptr) { + ParquetException::NYI("Cannot read encrypted column index yet"); + } + + auto column_index_location = col_chunk->GetColumnIndexLocation(); + if (!column_index_location.has_value()) { + return nullptr; + } + + CheckReadRangeOrThrow(*column_index_location, index_read_range_.column_index, + row_group_ordinal_); + + if (column_index_buffer_ == nullptr) { + PARQUET_ASSIGN_OR_THROW(column_index_buffer_, + input_->ReadAt(index_read_range_.column_index->offset, + index_read_range_.column_index->length)); + } + + int64_t buffer_offset = + column_index_location->offset - index_read_range_.column_index->offset; + // ColumnIndex::Make() requires the type of serialized thrift message to be + // uint32_t + uint32_t length = static_cast(column_index_location->length); + auto descr = row_group_metadata_->schema()->Column(i); + return ColumnIndex::Make(*descr, column_index_buffer_->data() + buffer_offset, length, + properties_); + } + + /// Read offset index of a column chunk. + std::shared_ptr GetOffsetIndex(int32_t i) override { + if (i < 0 || i >= row_group_metadata_->num_columns()) { + throw ParquetException("Invalid offset index at column ordinal ", i); + } + + auto col_chunk = row_group_metadata_->ColumnChunk(i); + std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); + if (crypto_metadata != nullptr) { + ParquetException::NYI("Cannot read encrypted offset index yet"); + } + + auto offset_index_location = col_chunk->GetOffsetIndexLocation(); + if (!offset_index_location.has_value()) { + return nullptr; + } + + CheckReadRangeOrThrow(*offset_index_location, index_read_range_.offset_index, + row_group_ordinal_); + + if (offset_index_buffer_ == nullptr) { + PARQUET_ASSIGN_OR_THROW(offset_index_buffer_, + input_->ReadAt(index_read_range_.offset_index->offset, + index_read_range_.offset_index->length)); + } + + int64_t buffer_offset = + offset_index_location->offset - index_read_range_.offset_index->offset; + // OffsetIndex::Make() requires the type of serialized thrift message to be + // uint32_t + uint32_t length = static_cast(offset_index_location->length); + return OffsetIndex::Make(offset_index_buffer_->data() + buffer_offset, length, + properties_); + } + + private: + static void CheckReadRangeOrThrow( + const IndexLocation& index_location, + const std::optional<::arrow::io::ReadRange>& index_read_range, + int32_t row_group_ordinal) { + if (!index_read_range.has_value()) { + throw ParquetException("Missing page index read range of row group ", + row_group_ordinal, + ", it may not exist or has not been requested"); + } + + /// The coalesced read range is invalid. + if (index_read_range->offset < 0 || index_read_range->length <= 0) { + throw ParquetException("Invalid page index read range: offset ", + index_read_range->offset, " length ", + index_read_range->length); + } + + /// The location to page index itself is corrupted. + if (index_location.offset < 0 || index_location.length <= 0) { + throw ParquetException("Invalid page index location: offset ", + index_location.offset, " length ", index_location.length); + } + + /// Page index location must be within the range of the read range. + if (index_location.offset < index_read_range->offset || + index_location.offset + index_location.length > + index_read_range->offset + index_read_range->length) { + throw ParquetException("Page index location [offset:", index_location.offset, + ",length:", index_location.length, + "] is out of range from previous WillNeed request [offset:", + index_read_range->offset, + ",length:", index_read_range->length, + "], row group: ", row_group_ordinal); + } + } + + private: + /// The input stream that can perform random access read. + ::arrow::io::RandomAccessFile* input_; + + /// The row group metadata to get column chunk metadata. + std::shared_ptr row_group_metadata_; + + /// Reader properties used to deserialize thrift object. + const ReaderProperties& properties_; + + /// The ordinal of the row group in the file. + int32_t row_group_ordinal_; + + /// File offsets and sizes of the page Index of all column chunks in the row group. + RowGroupIndexReadRange index_read_range_; + + /// File-level decryptor. + std::shared_ptr file_decryptor_; + + /// Buffer to hold the raw bytes of the page index. + /// Will be set lazily when the corresponding page index is accessed for the 1st time. + std::shared_ptr<::arrow::Buffer> column_index_buffer_; + std::shared_ptr<::arrow::Buffer> offset_index_buffer_; +}; + +class PageIndexReaderImpl : public PageIndexReader { + public: + PageIndexReaderImpl(::arrow::io::RandomAccessFile* input, + std::shared_ptr file_metadata, + const ReaderProperties& properties, + std::shared_ptr file_decryptor) + : input_(input), + file_metadata_(std::move(file_metadata)), + properties_(properties), + file_decryptor_(std::move(file_decryptor)) {} + + std::shared_ptr RowGroup(int i) override { + if (i < 0 || i >= file_metadata_->num_row_groups()) { + throw ParquetException("Invalid row group ordinal: ", i); + } + + auto row_group_metadata = file_metadata_->RowGroup(i); + + // Find the read range of the page index of the row group if provided by WillNeed() + RowGroupIndexReadRange index_read_range; + auto iter = index_read_ranges_.find(i); + if (iter != index_read_ranges_.cend()) { + /// This row group has been requested by WillNeed(). Only column index and/or + /// offset index of requested columns can be read. + index_read_range = iter->second; + } else { + /// If the row group has not been requested by WillNeed(), by default both column + /// index and offset index of all column chunks for the row group can be read. + index_read_range = + PageIndexReader::DeterminePageIndexRangesInRowGroup(*row_group_metadata, {}); + } + + if (index_read_range.column_index.has_value() || + index_read_range.offset_index.has_value()) { + return std::make_shared( + input_, std::move(row_group_metadata), properties_, i, index_read_range, + file_decryptor_); + } + + /// The row group does not has page index or has not been requested by WillNeed(). + /// Simply returns nullptr. + return nullptr; + } + + void WillNeed(const std::vector& row_group_indices, + const std::vector& column_indices, + const PageIndexSelection& selection) override { + std::vector<::arrow::io::ReadRange> read_ranges; + for (int32_t row_group_ordinal : row_group_indices) { + auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( + *file_metadata_->RowGroup(row_group_ordinal), column_indices); + if (selection.column_index && read_range.column_index.has_value()) { + read_ranges.push_back(*read_range.column_index); + } else { + // Mark the column index as not requested. + read_range.column_index = std::nullopt; + } + if (selection.offset_index && read_range.offset_index.has_value()) { + read_ranges.push_back(*read_range.offset_index); + } else { + // Mark the offset index as not requested. + read_range.offset_index = std::nullopt; + } + index_read_ranges_.emplace(row_group_ordinal, std::move(read_range)); + } + PARQUET_THROW_NOT_OK(input_->WillNeed(read_ranges)); + } + + void WillNotNeed(const std::vector& row_group_indices) override { + for (int32_t row_group_ordinal : row_group_indices) { + index_read_ranges_.erase(row_group_ordinal); + } + } + + private: + /// The input stream that can perform random read. + ::arrow::io::RandomAccessFile* input_; + + /// The file metadata to get row group metadata. + std::shared_ptr file_metadata_; + + /// Reader properties used to deserialize thrift object. + const ReaderProperties& properties_; + + /// File-level decrypter. + std::shared_ptr file_decryptor_; + + /// Coalesced read ranges of page index of row groups that have been suggested by + /// WillNeed(). Key is the row group ordinal. + std::unordered_map index_read_ranges_; +}; + } // namespace +RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( + const RowGroupMetaData& row_group_metadata, const std::vector& columns) { + int64_t ci_start = std::numeric_limits::max(); + int64_t oi_start = std::numeric_limits::max(); + int64_t ci_end = -1; + int64_t oi_end = -1; + + auto merge_range = [](const std::optional& index_location, + int64_t* start, int64_t* end) { + if (index_location.has_value()) { + int64_t index_end = 0; + if (index_location->offset < 0 || index_location->length <= 0 || + ::arrow::internal::AddWithOverflow(index_location->offset, + index_location->length, &index_end)) { + throw ParquetException("Invalid page index location: offset ", + index_location->offset, " length ", + index_location->length); + } + *start = std::min(*start, index_location->offset); + *end = std::max(*end, index_end); + } + }; + + if (columns.empty()) { + for (int32_t i = 0; i < row_group_metadata.num_columns(); ++i) { + auto col_chunk = row_group_metadata.ColumnChunk(i); + merge_range(col_chunk->GetColumnIndexLocation(), &ci_start, &ci_end); + merge_range(col_chunk->GetOffsetIndexLocation(), &oi_start, &oi_end); + } + } else { + for (int32_t i : columns) { + if (i < 0 || i >= row_group_metadata.num_columns()) { + throw ParquetException("Invalid column ordinal ", i); + } + auto col_chunk = row_group_metadata.ColumnChunk(i); + merge_range(col_chunk->GetColumnIndexLocation(), &ci_start, &ci_end); + merge_range(col_chunk->GetOffsetIndexLocation(), &oi_start, &oi_end); + } + } + + RowGroupIndexReadRange read_range; + if (ci_end != -1) { + read_range.column_index = {ci_start, ci_end - ci_start}; + } + if (oi_end != -1) { + read_range.offset_index = {oi_start, oi_end - oi_start}; + } + return read_range; +} + // ---------------------------------------------------------------------- // Public factory functions @@ -231,4 +523,12 @@ std::unique_ptr OffsetIndex::Make(const void* serialized_index, return std::make_unique(offset_index); } +std::shared_ptr PageIndexReader::Make( + ::arrow::io::RandomAccessFile* input, std::shared_ptr file_metadata, + const ReaderProperties& properties, + std::shared_ptr file_decryptor) { + return std::make_shared(input, file_metadata, properties, + std::move(file_decryptor)); +} + } // namespace parquet diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index 13dae40f56c..79fbce20ed3 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -17,14 +17,20 @@ #pragma once +#include "arrow/io/interfaces.h" #include "parquet/types.h" +#include #include namespace parquet { class ColumnDescriptor; +class FileMetaData; +class InternalFileDecryptor; class ReaderProperties; +class RowGroupMetaData; +class RowGroupPageIndexReader; /// \brief ColumnIndex is a proxy around format::ColumnIndex. class PARQUET_EXPORT ColumnIndex { @@ -126,4 +132,122 @@ class PARQUET_EXPORT OffsetIndex { virtual const std::vector& page_locations() const = 0; }; +/// \brief Interface for reading the page index for a Parquet row group. +class PARQUET_EXPORT RowGroupPageIndexReader { + public: + virtual ~RowGroupPageIndexReader() = default; + + /// \brief Read column index of a column chunk. + /// + /// \param[in] i column ordinal of the column chunk. + /// \returns column index of the column or nullptr if it does not exist. + /// \throws ParquetException if the index is out of bound. + virtual std::shared_ptr GetColumnIndex(int32_t i) = 0; + + /// \brief Read offset index of a column chunk. + /// + /// \param[in] i column ordinal of the column chunk. + /// \returns offset index of the column or nullptr if it does not exist. + /// \throws ParquetException if the index is out of bound. + virtual std::shared_ptr GetOffsetIndex(int32_t i) = 0; +}; + +struct PageIndexSelection { + /// Specifies whether to read the column index. + bool column_index = false; + /// Specifies whether to read the offset index. + bool offset_index = false; +}; + +struct RowGroupIndexReadRange { + /// Base start and total size of column index of all column chunks in a row group. + /// If none of the column chunks have column index, it is set to std::nullopt. + std::optional<::arrow::io::ReadRange> column_index = std::nullopt; + /// Base start and total size of offset index of all column chunks in a row group. + /// If none of the column chunks have offset index, it is set to std::nullopt. + std::optional<::arrow::io::ReadRange> offset_index = std::nullopt; +}; + +/// \brief Interface for reading the page index for a Parquet file. +class PARQUET_EXPORT PageIndexReader { + public: + virtual ~PageIndexReader() = default; + + /// \brief Create a PageIndexReader instance. + /// \returns a PageIndexReader instance. + /// WARNING: The returned PageIndexReader references to all the input parameters, so + /// it must not outlive all of the input parameters. Usually these input parameters + /// come from the same ParquetFileReader object, so it must not outlive the reader + /// that creates this PageIndexReader. + static std::shared_ptr Make( + ::arrow::io::RandomAccessFile* input, std::shared_ptr file_metadata, + const ReaderProperties& properties, + std::shared_ptr file_decryptor = NULLPTR); + + /// \brief Get the page index reader of a specific row group. + /// \param[in] i row group ordinal to get page index reader. + /// \returns RowGroupPageIndexReader of the specified row group. A nullptr may or may + /// not be returned if the page index for the row group is unavailable. It is + /// the caller's responsibility to check the return value of follow-up calls + /// to the RowGroupPageIndexReader. + /// \throws ParquetException if the index is out of bound. + virtual std::shared_ptr RowGroup(int i) = 0; + + /// \brief Advise the reader which part of page index will be read later. + /// + /// The PageIndexReader can optionally prefetch and cache page index that + /// may be read later to get better performance. + /// + /// The contract of this function is as below: + /// 1) If WillNeed() has not been called for a specific row group and the page index + /// exists, follow-up calls to get column index or offset index of all columns in + /// this row group SHOULD NOT FAIL, but the performance may not be optimal. + /// 2) If WillNeed() has been called for a specific row group, follow-up calls to get + /// page index are limited to columns and index type requested by WillNeed(). + /// So it MAY FAIL if columns that are not requested by WillNeed() are requested. + /// 3) Later calls to WillNeed() MAY OVERRIDE previous calls of same row groups. + /// For example, + /// 1) If WillNeed() is not called for row group 0, then follow-up calls to read + /// column index and/or offset index of all columns of row group 0 should not + /// fail if its page index exists. + /// 2) If WillNeed() is called for columns 0 and 1 for row group 0, then follow-up + /// call to read page index of column 2 for row group 0 MAY FAIL even if its + /// page index exists. + /// 3) If WillNeed() is called for row group 0 with offset index only, then + /// follow-up call to read column index of row group 0 MAY FAIL even if + /// the column index of this column exists. + /// 4) If WillNeed() is called for columns 0 and 1 for row group 0, then later + /// call to WillNeed() for columns 1 and 2 for row group 0. The later one + /// overrides previous call and only columns 1 and 2 of row group 0 are allowed + /// to access. + /// + /// \param[in] row_group_indices list of row group ordinal to read page index later. + /// \param[in] column_indices list of column ordinal to read page index later. If it is + /// empty, it means all columns in the row group will be read. + /// \param[in] selection which kind of page index is required later. + virtual void WillNeed(const std::vector& row_group_indices, + const std::vector& column_indices, + const PageIndexSelection& selection) = 0; + + /// \brief Advise the reader page index of these row groups will not be read any more. + /// + /// The PageIndexReader implementation has the opportunity to cancel any prefetch or + /// release resource that are related to these row groups. + /// + /// \param[in] row_group_indices list of row group ordinal that whose page index will + /// not be accessed any more. + virtual void WillNotNeed(const std::vector& row_group_indices) = 0; + + /// \brief Determine the column index and offset index ranges for the given row group. + /// + /// \param[in] row_group_metadata row group metadata to get column chunk metadata. + /// \param[in] columns list of column ordinals to get page index. If the list is empty, + /// it means all columns in the row group. + /// \returns RowGroupIndexReadRange of the specified row group. Throws ParquetException + /// if the selected column ordinal is out of bound or metadata of page index + /// is corrupted. + static RowGroupIndexReadRange DeterminePageIndexRangesInRowGroup( + const RowGroupMetaData& row_group_metadata, const std::vector& columns); +}; + } // namespace parquet diff --git a/cpp/src/parquet/page_index_test.cc b/cpp/src/parquet/page_index_test.cc index 6d1cdc2c97a..46599960b8a 100644 --- a/cpp/src/parquet/page_index_test.cc +++ b/cpp/src/parquet/page_index_test.cc @@ -256,4 +256,164 @@ TEST(PageIndex, ReadColumnIndexWithNullPage) { null_pages, min_values, max_values, has_null_counts, null_counts); } +struct PageIndexRanges { + int64_t column_index_offset; + int64_t column_index_length; + int64_t offset_index_offset; + int64_t offset_index_length; +}; + +using RowGroupRanges = std::vector; + +/// Creates an FileMetaData object w/ single row group based on data in +/// 'row_group_ranges'. It sets the offsets and sizes of the column index and offset index +/// members of the row group. It doesn't set the member if the input value is -1. +std::shared_ptr ConstructFakeMetaData( + const RowGroupRanges& row_group_ranges) { + format::RowGroup row_group; + for (auto& page_index_ranges : row_group_ranges) { + format::ColumnChunk col_chunk; + if (page_index_ranges.column_index_offset != -1) { + col_chunk.__set_column_index_offset(page_index_ranges.column_index_offset); + } + if (page_index_ranges.column_index_length != -1) { + col_chunk.__set_column_index_length( + static_cast(page_index_ranges.column_index_length)); + } + if (page_index_ranges.offset_index_offset != -1) { + col_chunk.__set_offset_index_offset(page_index_ranges.offset_index_offset); + } + if (page_index_ranges.offset_index_length != -1) { + col_chunk.__set_offset_index_length( + static_cast(page_index_ranges.offset_index_length)); + } + row_group.columns.push_back(col_chunk); + } + + format::FileMetaData metadata; + metadata.row_groups.push_back(row_group); + + metadata.schema.emplace_back(); + schema::NodeVector fields; + for (size_t i = 0; i < row_group_ranges.size(); ++i) { + fields.push_back(schema::Int64(std::to_string(i))); + metadata.schema.emplace_back(); + fields.back()->ToParquet(&metadata.schema.back()); + } + schema::GroupNode::Make("schema", Repetition::REPEATED, fields) + ->ToParquet(&metadata.schema.front()); + + auto sink = CreateOutputStream(); + ThriftSerializer{}.Serialize(&metadata, sink.get()); + auto buffer = sink->Finish().MoveValueUnsafe(); + uint32_t len = static_cast(buffer->size()); + return FileMetaData::Make(buffer->data(), &len); +} + +/// Validates that 'DeterminePageIndexRangesInRowGroup()' selects the expected file +/// offsets and sizes or returns false when the row group doesn't have a page index. +void ValidatePageIndexRange(const RowGroupRanges& row_group_ranges, + const std::vector& column_indices, + bool expected_has_column_index, + bool expected_has_offset_index, int expected_ci_start, + int expected_ci_size, int expected_oi_start, + int expected_oi_size) { + auto file_metadata = ConstructFakeMetaData(row_group_ranges); + auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( + *file_metadata->RowGroup(0), column_indices); + ASSERT_EQ(expected_has_column_index, read_range.column_index.has_value()); + ASSERT_EQ(expected_has_offset_index, read_range.offset_index.has_value()); + if (expected_has_column_index) { + EXPECT_EQ(expected_ci_start, read_range.column_index->offset); + EXPECT_EQ(expected_ci_size, read_range.column_index->length); + } + if (expected_has_offset_index) { + EXPECT_EQ(expected_oi_start, read_range.offset_index->offset); + EXPECT_EQ(expected_oi_size, read_range.offset_index->length); + } +} + +/// This test constructs a couple of artificial row groups with page index offsets in +/// them. Then it validates if PageIndexReader::DeterminePageIndexRangesInRowGroup() +/// properly computes the file range that contains the whole page index. +TEST(PageIndex, DeterminePageIndexRangesInRowGroup) { + // No Column chunks + ValidatePageIndexRange({}, {}, false, false, -1, -1, -1, -1); + // No page index at all. + ValidatePageIndexRange({{-1, -1, -1, -1}}, {}, false, false, -1, -1, -1, -1); + // Page index for single column chunk. + ValidatePageIndexRange({{10, 5, 15, 5}}, {}, true, true, 10, 5, 15, 5); + // Page index for two column chunks. + ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, {}, true, true, 10, 20, 30, + 40); + // Page index for second column chunk. + ValidatePageIndexRange({{-1, -1, -1, -1}, {20, 10, 30, 25}}, {}, true, true, 20, 10, 30, + 25); + // Page index for first column chunk. + ValidatePageIndexRange({{10, 5, 15, 5}, {-1, -1, -1, -1}}, {}, true, true, 10, 5, 15, + 5); + // Missing offset index for first column chunk. Gap in column index. + ValidatePageIndexRange({{10, 5, -1, -1}, {20, 10, 30, 25}}, {}, true, true, 10, 20, 30, + 25); + // Missing offset index for second column chunk. + ValidatePageIndexRange({{10, 5, 25, 5}, {20, 10, -1, -1}}, {}, true, true, 10, 20, 25, + 5); + // Four column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + {}, true, true, 100, 110, 220, 180); +} + +/// This test constructs a couple of artificial row groups with page index offsets in +/// them. Then it validates if PageIndexReader::DeterminePageIndexRangesInRowGroup() +/// properly computes the file range that contains the page index of selected columns. +TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithPartialColumnsSelected) { + // No page index at all. + ValidatePageIndexRange({{-1, -1, -1, -1}}, {0}, false, false, -1, -1, -1, -1); + // Page index for single column chunk. + ValidatePageIndexRange({{10, 5, 15, 5}}, {0}, true, true, 10, 5, 15, 5); + // Page index for the 1st column chunk. + ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, {0}, true, true, 10, 5, 30, + 25); + // Page index for the 2nd column chunk. + ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, {1}, true, true, 15, 15, 50, + 20); + // Only 2nd column is selected among four column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + {1}, true, true, 110, 25, 250, 10); + // Only 2nd and 3rd columns are selected among four column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + {1, 2}, true, true, 110, 60, 250, 50); + // Only 2nd and 4th columns are selected among four column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + {1, 3}, true, true, 110, 100, 250, 150); + // Only 1st, 2nd and 4th columns are selected among four column chunks. + ValidatePageIndexRange( + {{100, 10, 220, 30}, {110, 25, 250, 10}, {140, 30, 260, 40}, {200, 10, 300, 100}}, + {0, 1, 3}, true, true, 100, 110, 220, 180); + // 3rd column is selected but not present in the row group. + EXPECT_THROW(ValidatePageIndexRange({{10, 5, 30, 25}, {15, 15, 50, 20}}, {2}, false, + false, -1, -1, -1, -1), + ParquetException); +} + +/// This test constructs a couple of artificial row groups with page index offsets in +/// them. Then it validates if PageIndexReader::DeterminePageIndexRangesInRowGroup() +/// properly detects if column index or offset index is missing. +TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) { + // No column index at all. + ValidatePageIndexRange({{-1, -1, 15, 5}}, {}, false, true, -1, -1, 15, 5); + // No offset index at all. + ValidatePageIndexRange({{10, 5, -1, -1}}, {}, true, false, 10, 5, -1, -1); + // No column index at all among two column chunks. + ValidatePageIndexRange({{-1, -1, 30, 25}, {-1, -1, 50, 20}}, {}, false, true, -1, -1, + 30, 40); + // No offset index at all among two column chunks. + ValidatePageIndexRange({{10, 5, -1, -1}, {15, 15, -1, -1}}, {}, true, false, 10, 20, -1, + -1); +} + } // namespace parquet diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index e17f7a91f9b..5e5a08fc7ae 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -37,6 +37,7 @@ #include "parquet/file_reader.h" #include "parquet/file_writer.h" #include "parquet/metadata.h" +#include "parquet/page_index.h" #include "parquet/platform.h" #include "parquet/printer.h" #include "parquet/test_util.h" @@ -1059,4 +1060,159 @@ TEST(TestFileReader, TestOverflowInt16PageOrdinal) { } } +struct PageIndexReaderParam { + std::vector row_group_indices; + std::vector column_indices; + PageIndexSelection index_selection; +}; + +class ParameterizedPageIndexReaderTest + : public ::testing::TestWithParam {}; + +// Test reading a data file with page index. +TEST_P(ParameterizedPageIndexReaderTest, TestReadPageIndex) { + ReaderProperties properties; + auto file_reader = ParquetFileReader::OpenFile(data_file("alltypes_tiny_pages.parquet"), + /*memory_map=*/false, properties); + auto metadata = file_reader->metadata(); + EXPECT_EQ(1, metadata->num_row_groups()); + EXPECT_EQ(13, metadata->num_columns()); + + // Create the page index reader and provide different read hints. + auto page_index_reader = file_reader->GetPageIndexReader(); + ASSERT_NE(nullptr, page_index_reader); + const auto params = GetParam(); + const bool call_will_need = !params.row_group_indices.empty(); + if (call_will_need) { + page_index_reader->WillNeed(params.row_group_indices, params.column_indices, + params.index_selection); + } + + auto row_group_index_reader = page_index_reader->RowGroup(0); + if (!call_will_need || params.index_selection.offset_index || + params.index_selection.column_index) { + ASSERT_NE(nullptr, row_group_index_reader); + } else { + // None of page index is requested. + ASSERT_EQ(nullptr, row_group_index_reader); + return; + } + + auto column_index_requested = [&](int32_t column_id) { + return !call_will_need || + (params.index_selection.column_index && + (params.column_indices.empty() || + (std::find(params.column_indices.cbegin(), params.column_indices.cend(), + column_id) != params.column_indices.cend()))); + }; + + auto offset_index_requested = [&](int32_t column_id) { + return !call_will_need || + (params.index_selection.offset_index && + (params.column_indices.empty() || + (std::find(params.column_indices.cbegin(), params.column_indices.cend(), + column_id) != params.column_indices.cend()))); + }; + + if (offset_index_requested(0)) { + // Verify offset index of column 0 and only partial data as it contains 325 pages. + const size_t num_pages = 325; + const std::vector page_indices = {0, 100, 200, 300}; + const std::vector page_locations = { + PageLocation{4, 109, 0}, PageLocation{11480, 133, 2244}, + PageLocation{22980, 133, 4494}, PageLocation{34480, 133, 6744}}; + + auto offset_index = row_group_index_reader->GetOffsetIndex(0); + ASSERT_NE(nullptr, offset_index); + + EXPECT_EQ(num_pages, offset_index->page_locations().size()); + for (size_t i = 0; i < page_indices.size(); ++i) { + size_t page_id = page_indices.at(i); + const auto& read_page_location = offset_index->page_locations().at(page_id); + const auto& expected_page_location = page_locations.at(i); + EXPECT_EQ(expected_page_location.offset, read_page_location.offset); + EXPECT_EQ(expected_page_location.compressed_page_size, + read_page_location.compressed_page_size); + EXPECT_EQ(expected_page_location.first_row_index, + read_page_location.first_row_index); + } + } else { + EXPECT_THROW(row_group_index_reader->GetOffsetIndex(0), ParquetException); + } + + if (column_index_requested(5)) { + // Verify column index of column 5 and only partial data as it contains 528 pages. + const size_t num_pages = 528; + const BoundaryOrder::type boundary_order = BoundaryOrder::Unordered; + const std::vector page_indices = {0, 99, 426, 520}; + const std::vector null_pages = {false, false, false, false}; + const bool has_null_counts = true; + const std::vector null_counts = {0, 0, 0, 0}; + const std::vector min_values = {0, 10, 0, 0}; + const std::vector max_values = {90, 90, 80, 70}; + + auto column_index = row_group_index_reader->GetColumnIndex(5); + ASSERT_NE(nullptr, column_index); + auto typed_column_index = std::dynamic_pointer_cast(column_index); + ASSERT_NE(nullptr, typed_column_index); + + EXPECT_EQ(num_pages, column_index->null_pages().size()); + EXPECT_EQ(has_null_counts, column_index->has_null_counts()); + EXPECT_EQ(boundary_order, column_index->boundary_order()); + for (size_t i = 0; i < page_indices.size(); ++i) { + size_t page_id = page_indices.at(i); + EXPECT_EQ(null_pages.at(i), column_index->null_pages().at(page_id)); + if (has_null_counts) { + EXPECT_EQ(null_counts.at(i), column_index->null_counts().at(page_id)); + } + if (!null_pages.at(i)) { + EXPECT_EQ(min_values.at(i), typed_column_index->min_values().at(page_id)); + EXPECT_EQ(max_values.at(i), typed_column_index->max_values().at(page_id)); + } + } + } else { + EXPECT_THROW(row_group_index_reader->GetColumnIndex(5), ParquetException); + } + + // Verify null is returned if column index does not exist. + auto column_index = row_group_index_reader->GetColumnIndex(10); + EXPECT_EQ(nullptr, column_index); +} + +INSTANTIATE_TEST_SUITE_P( + PageIndexReaderTests, ParameterizedPageIndexReaderTest, + ::testing::Values(PageIndexReaderParam{{}, {}, {true, true}}, + PageIndexReaderParam{{}, {}, {true, false}}, + PageIndexReaderParam{{}, {}, {false, true}}, + PageIndexReaderParam{{}, {}, {false, false}}, + PageIndexReaderParam{{0}, {}, {true, true}}, + PageIndexReaderParam{{0}, {}, {true, false}}, + PageIndexReaderParam{{0}, {}, {false, true}}, + PageIndexReaderParam{{0}, {}, {false, false}}, + PageIndexReaderParam{{0}, {0}, {true, true}}, + PageIndexReaderParam{{0}, {0}, {true, false}}, + PageIndexReaderParam{{0}, {0}, {false, true}}, + PageIndexReaderParam{{0}, {0}, {false, false}}, + PageIndexReaderParam{{0}, {5}, {true, true}}, + PageIndexReaderParam{{0}, {5}, {true, false}}, + PageIndexReaderParam{{0}, {5}, {false, true}}, + PageIndexReaderParam{{0}, {5}, {false, false}}, + PageIndexReaderParam{{0}, {0, 5}, {true, true}}, + PageIndexReaderParam{{0}, {0, 5}, {true, false}}, + PageIndexReaderParam{{0}, {0, 5}, {false, true}}, + PageIndexReaderParam{{0}, {0, 5}, {false, false}})); + +TEST(PageIndexReaderTest, ReadFileWithoutPageIndex) { + ReaderProperties properties; + auto file_reader = ParquetFileReader::OpenFile(data_file("int32_decimal.parquet"), + /*memory_map=*/false, properties); + auto metadata = file_reader->metadata(); + EXPECT_EQ(1, metadata->num_row_groups()); + + auto page_index_reader = file_reader->GetPageIndexReader(); + ASSERT_NE(nullptr, page_index_reader); + auto row_group_index_reader = page_index_reader->RowGroup(0); + ASSERT_EQ(nullptr, row_group_index_reader); +} + } // namespace parquet