From c98cfb74a4e6ad90ee07c5b14eaeb19431f17e85 Mon Sep 17 00:00:00 2001 From: Hongbin Ma Date: Mon, 15 Jan 2024 20:38:02 +0800 Subject: [PATCH 1/3] GH-38865 [C++][Parquet] support passing a RowRange to RecordBatchReader --- cpp/src/parquet/CMakeLists.txt | 2 + cpp/src/parquet/arrow/reader.cc | 221 ++++++++++- cpp/src/parquet/arrow/reader.h | 13 + cpp/src/parquet/arrow/reader_internal.h | 5 + cpp/src/parquet/column_reader.cc | 240 +++++++++++- cpp/src/parquet/column_reader.h | 151 ++++++++ cpp/src/parquet/range_reader_test.cc | 489 ++++++++++++++++++++++++ cpp/src/parquet/row_range_test.cc | 195 ++++++++++ 8 files changed, 1291 insertions(+), 25 deletions(-) create mode 100644 cpp/src/parquet/range_reader_test.cc create mode 100644 cpp/src/parquet/row_range_test.cc diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 04028431ba15..476bb48557f0 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -366,6 +366,8 @@ add_parquet_test(reader-test level_conversion_test.cc column_scanner_test.cc reader_test.cc + range_reader_test.cc + row_range_test.cc stream_reader_test.cc test_util.cc) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index d6ad7c25bc7c..86a86e41fdb0 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -17,6 +17,8 @@ #include "parquet/arrow/reader.h" +#include "parquet/page_index.h" + #include #include #include @@ -199,10 +201,11 @@ class FileReaderImpl : public FileReader { return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out); } - Status GetFieldReader(int i, - const std::shared_ptr>& included_leaves, - const std::vector& row_groups, - std::unique_ptr* out) { + Status GetFieldReader( + int i, const std::shared_ptr>& included_leaves, + const std::vector& row_groups, + const std::shared_ptr>>& row_ranges_per_rg, + std::unique_ptr* out) { // Should be covered by GetRecordBatchReader checks but // manifest_.schema_fields is a separate variable so be extra careful. if (ARROW_PREDICT_FALSE(i < 0 || @@ -218,13 +221,15 @@ class FileReaderImpl : public FileReader { ctx->iterator_factory = SomeRowGroupsFactory(row_groups); ctx->filter_leaves = true; ctx->included_leaves = included_leaves; + ctx->row_ranges_per_rg = row_ranges_per_rg; // copy the shared pointer to extend its lifecycle return GetReader(manifest_.schema_fields[i], ctx, out); } - Status GetFieldReaders(const std::vector& column_indices, - const std::vector& row_groups, - std::vector>* out, - std::shared_ptr<::arrow::Schema>* out_schema) { + Status GetFieldReaders( + const std::vector& column_indices, const std::vector& row_groups, + const std::shared_ptr>> & row_ranges_per_rg, + std::vector>* out, + std::shared_ptr<::arrow::Schema>* out_schema) { // We only need to read schema fields which have columns indicated // in the indices vector ARROW_ASSIGN_OR_RAISE(std::vector field_indices, @@ -236,8 +241,8 @@ class FileReaderImpl : public FileReader { ::arrow::FieldVector out_fields(field_indices.size()); for (size_t i = 0; i < out->size(); ++i) { std::unique_ptr reader; - RETURN_NOT_OK( - GetFieldReader(field_indices[i], included_leaves, row_groups, &reader)); + RETURN_NOT_OK(GetFieldReader(field_indices[i], included_leaves, row_groups, + row_ranges_per_rg, &reader)); out_fields[i] = reader->field(); out->at(i) = std::move(reader); @@ -325,19 +330,62 @@ class FileReaderImpl : public FileReader { return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table); } + // This is a internal API owned by FileReaderImpl, not exposed in FileReader + Status GetRecordBatchReaderWithRowRanges(const std::vector& row_group_indices, + const std::vector& column_indices, + const std::shared_ptr>> & row_ranges_per_rg, + std::unique_ptr* out); + + Status GetRecordBatchReader(const RowRanges& rows_to_return, + const std::vector& column_indices, + std::unique_ptr* out) override { + const auto metadata = reader_->metadata(); + // check if the row ranges are valid + if (!rows_to_return.IsValid()) { + return Status::Invalid("The provided row range is invalid, keep it monotone and non-interleaving: " + + rows_to_return.ToString()); + } + // check if the row ranges are within the row group boundaries + if (rows_to_return.RowCount() != 0 && rows_to_return.LastRow() >= metadata->num_rows()) { + return Status::Invalid("The provided row range " + rows_to_return.ToString() + + " exceeds the number of rows in the file: " + + std::to_string(metadata->num_rows())); + } + if (rows_to_return.RowCount() == 0) { + return GetRecordBatchReaderWithRowRanges({}, column_indices, {}, out); + } + + std::vector rows_per_rg; + for (int i = 0 ; i < metadata->num_row_groups(); i++) { + rows_per_rg.push_back( metadata->RowGroup(i)->num_rows()); + } + // We'll assign a RowRanges for each RG, even if it's not required to return any rows + std::vector> row_ranges_per_rg = rows_to_return.SplitByRowGroups(rows_per_rg); + std::vector row_group_indices; + for (int i = 0 ; i < metadata->num_row_groups(); i++) { + if (row_ranges_per_rg.at(i)->RowCount() > 0) + row_group_indices.push_back(i); + } + + return GetRecordBatchReaderWithRowRanges(row_group_indices, column_indices, + std::make_shared>>(std::move(row_ranges_per_rg)), out); + } + Status GetRecordBatchReader(const std::vector& row_group_indices, const std::vector& column_indices, - std::unique_ptr* out) override; + std::unique_ptr* out) override { + return GetRecordBatchReaderWithRowRanges(row_group_indices, column_indices, {}, out); + } Status GetRecordBatchReader(const std::vector& row_group_indices, std::unique_ptr* out) override { - return GetRecordBatchReader(row_group_indices, - Iota(reader_->metadata()->num_columns()), out); + return GetRecordBatchReaderWithRowRanges(row_group_indices, + Iota(reader_->metadata()->num_columns()), {}, out); } Status GetRecordBatchReader(std::unique_ptr* out) override { - return GetRecordBatchReader(Iota(num_row_groups()), - Iota(reader_->metadata()->num_columns()), out); + return GetRecordBatchReaderWithRowRanges(Iota(num_row_groups()), + Iota(reader_->metadata()->num_columns()), {}, out); } ::arrow::Result<::arrow::AsyncGenerator>> @@ -440,6 +488,63 @@ class RowGroupReaderImpl : public RowGroupReader { // ---------------------------------------------------------------------- // Column reader implementations +// This class is used to skip decompressing & decoding unnecessary pages by comparing user-specified row_ranges +// and page_ranges from metadata. +// Only support IntervalRange case for now. +class RowRangesPageFilter { + public: + RowRangesPageFilter(const RowRanges& row_ranges, const std::shared_ptr& page_ranges) + : row_ranges_(row_ranges), page_ranges_(page_ranges) { + } + + // To avoid error "std::function target must be copy-constructible", we must define copy constructor + RowRangesPageFilter(const RowRangesPageFilter& other) + : row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) { + } + + bool operator()(const DataPageStats& stats) { + + if (!initted) { + row_ranges_itr_ = row_ranges_.NewIterator(); + page_ranges_itr_ = page_ranges_->NewIterator(); + + current_row_range_ = row_ranges_itr_->NextRange(); + + if (current_row_range_.index() != 0) { + throw ParquetException("RowRangesPageFilter expects first NextRange() to be a IntervalRange"); + } + initted = true; + } + + current_page_range_ = page_ranges_itr_->NextRange(); + if (current_page_range_.index() != 0) { + throw ParquetException("RowRangesPageFilter expects first NextRange() to be a IntervalRange"); + } + + while (current_row_range_.index() == 0 && + std::get(current_page_range_).IsAfter( + std::get(current_row_range_))) { + current_row_range_ = row_ranges_itr_->NextRange(); + } + + if (current_row_range_.index() != 0) { + return true; + } + + return std::get(current_page_range_).IsBefore( + std::get(current_row_range_)); + } + + private: + const RowRanges& row_ranges_; + const std::shared_ptr page_ranges_; + std::unique_ptr row_ranges_itr_ = NULLPTR; + std::unique_ptr page_ranges_itr_ = NULLPTR; + std::variant current_row_range_ = End(); + std::variant current_page_range_ = End(); + bool initted = false; +}; + // Leaf reader is for primitive arrays and primitive children of nested arrays class LeafReader : public ColumnReaderImpl { public: @@ -501,8 +606,81 @@ class LeafReader : public ColumnReaderImpl { private: std::shared_ptr out_; + + void checkAndGetPageRanges(const RowRanges& row_ranges, + std::shared_ptr& page_ranges) const { + // check offset exists + const auto rg_pg_index_reader = + ctx_->reader->GetPageIndexReader()->RowGroup(input_->current_row_group()); + + if (!rg_pg_index_reader) { + throw ParquetException( + "Attempting to read with Ranges but Page Index is not found for Row " + "Group: " + + std::to_string(input_->current_row_group())); + } + const auto offset_index = rg_pg_index_reader->GetOffsetIndex(input_->column_index()); + + if (!offset_index) { + throw ParquetException( + "Attempting to read with Ranges but Offset index is not found for " + "column: " + + field_->name()); + } + + const auto page_locations = offset_index->page_locations(); + page_ranges = std::make_shared(); + for (size_t i = 0; i < page_locations.size() - 1; i++) { + page_ranges->Add( + {page_locations[i].first_row_index, page_locations[i + 1].first_row_index - 1}); + } + if (page_locations.size() >= 1) { + page_ranges->Add( + {page_locations[page_locations.size() - 1].first_row_index, + ctx_->reader->metadata()->RowGroup(input_->current_row_group())->num_rows() - + 1}); + } + + if (row_ranges.RowCount() > 0) { + if (row_ranges.LastRow() > page_ranges->LastRow()) { + throw ParquetException( + "The provided row range " + row_ranges.ToString() + + " exceeds last page :" + page_ranges->GetRanges().back().ToString()); + } + } + } + void NextRowGroup() { std::unique_ptr page_reader = input_->NextChunk(); + + /// using page index to reduce cost + if (page_reader != nullptr && ctx_->row_ranges_per_rg) { + // reset skipper + record_reader_->set_record_skipper(NULLPTR); + + const auto & row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()]; + // if specific row range is provided for this rg + if (row_ranges->RowCount() != 0) { + + // Use IntervalRanges to represent pages + std::shared_ptr page_ranges; + checkAndGetPageRanges(*row_ranges, page_ranges); + + // part 1, skip decompressing & decoding unnecessary pages + page_reader->set_data_page_filter( + RowRangesPageFilter(*row_ranges, page_ranges)); + + // part 2, skip unnecessary rows in necessary pages + record_reader_->set_record_skipper( + std::make_shared(*page_ranges, + *row_ranges)); + } else { + NextRowGroup(); + return; + } + } + + record_reader_->reset_current_rg_processed_records(); record_reader_->SetPageReader(std::move(page_reader)); } @@ -971,9 +1149,10 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& } // namespace -Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, - const std::vector& column_indices, - std::unique_ptr* out) { +Status FileReaderImpl::GetRecordBatchReaderWithRowRanges( + const std::vector& row_groups, const std::vector& column_indices, + const std::shared_ptr>> & row_ranges_per_rg, + std::unique_ptr* out) { RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); if (reader_properties_.pre_buffer()) { @@ -986,7 +1165,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, std::vector> readers; std::shared_ptr<::arrow::Schema> batch_schema; - RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema)); + RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, row_ranges_per_rg, &readers, + &batch_schema)); if (readers.empty()) { // Just generate all batches right now; they're cheap since they have no columns. @@ -1241,7 +1421,8 @@ Future> FileReaderImpl::DecodeRowGroups( // in a sync context too so use `this` over `self` std::vector> readers; std::shared_ptr<::arrow::Schema> result_schema; - RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema)); + RETURN_NOT_OK( + GetFieldReaders(column_indices, row_groups, {}, &readers, &result_schema)); // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 6e46ca43f7b1..4cfdad5a23e0 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -23,6 +23,7 @@ #include #include +#include "parquet/column_reader.h" #include "parquet/file_reader.h" #include "parquet/platform.h" #include "parquet/properties.h" @@ -180,6 +181,18 @@ class PARQUET_EXPORT FileReader { const std::vector& row_group_indices, const std::vector& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + /// \brief Return a RecordBatchReader of row groups selected from + /// rows_to_return, whose columns are selected by column_indices. + /// + /// Notice that rows_to_return is file based, it not only decides which row groups to read, + /// but also which rows to read in each row group. + /// + /// + /// \returns error Status if either rows_to_return or column_indices + /// contains an invalid index + virtual ::arrow::Status GetRecordBatchReader(const RowRanges& rows_to_return, + const std::vector& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + /// \brief Return a RecordBatchReader of row groups selected from /// row_group_indices, whose columns are selected by column_indices. /// diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index cf9dbb86577b..b30aef2691c1 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -76,6 +76,7 @@ class FileColumnIterator { } auto row_group_reader = reader_->RowGroup(row_groups_.front()); + current_rg_ = row_groups_.front(); row_groups_.pop_front(); return row_group_reader->GetColumnPageReader(column_index_); } @@ -88,11 +89,14 @@ class FileColumnIterator { int column_index() const { return column_index_; } + int current_row_group() const { return current_rg_; } + protected: int column_index_; ParquetFileReader* reader_; const SchemaDescriptor* schema_; std::deque row_groups_; + int current_rg_ = 0; }; using FileColumnIteratorFactory = @@ -109,6 +113,7 @@ struct ReaderContext { FileColumnIteratorFactory iterator_factory; bool filter_leaves; std::shared_ptr> included_leaves; + std::shared_ptr>> row_ranges_per_rg; bool IncludesLeaf(int leaf_index) const { if (this->filter_leaves) { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index ac4627d69c0f..9003e643df22 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1289,6 +1289,147 @@ std::shared_ptr ColumnReader::Make(const ColumnDescriptor* descr, ::arrow::Unreachable(); } +// ---------------------------------------------------------------------- +// RowRanges and ins implementations + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { ranges_.push_back(range); } + +IntervalRanges::IntervalRanges(const std::vector& ranges) { + this->ranges_ = ranges; +} + +std::unique_ptr IntervalRanges::NewIterator() const { + return std::make_unique(ranges_); +} + +size_t IntervalRanges::RowCount() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += range.Count(); + } + return cnt; +} + +int64_t IntervalRanges::LastRow() const { return ranges_.back().end; } + +bool IntervalRanges::IsValid() const { + if (ranges_.size() == 0) return true; + if (ranges_[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges_.size(); i++) { + if (!ranges_[i].IsValid()) { + return false; + } + } + for (size_t i = 1; i < ranges_.size(); i++) { + if (ranges_[i].start <= ranges_[i - 1].end) { + return false; + } + } + return true; +} + +bool IntervalRanges::IsOverlapping(const IntervalRange& searchRange) const { + auto it = std::lower_bound( + ranges_.begin(), ranges_.end(), searchRange, + [](const IntervalRange& r1, const IntervalRange& r2) { return r1.IsBefore(r2); }); + return it != ranges_.end() && !(*it).IsAfter(searchRange); +} + +std::string IntervalRanges::ToString() const { + std::string result = "["; + for (const IntervalRange& range : ranges_) { + result += range.ToString() + ", "; + } + if (!ranges_.empty()) { + result = result.substr(0, result.size() - 2); + } + result += "]"; + return result; +} + +std::vector> IntervalRanges::SplitByRowGroups( + const std::vector& rows_per_rg) const { + if (rows_per_rg.size() <= 1) { + std::unique_ptr single = + std::make_unique(*this); // return a copy of itself + auto ret = std::vector>(); + ret.push_back(std::move(single)); + return ret; + } + + std::vector> result; + + IntervalRanges spaces; + int64_t rows_so_far = 0; + for (size_t i = 0; i < rows_per_rg.size(); ++i) { + auto start = rows_so_far; + rows_so_far += rows_per_rg[i]; + auto end = rows_so_far - 1; + spaces.Add({start, end}); + } + + // each RG's row range forms a space, we need to adjust RowRanges in each space to + // zero based. + for (IntervalRange space : spaces.GetRanges()) { + auto intersection = Intersection(IntervalRanges(space), *this); + + std::unique_ptr zero_based_ranges = + std::make_unique(); + for (const IntervalRange& range : intersection.GetRanges()) { + zero_based_ranges->Add({range.start - space.start, range.end - space.start}); + } + result.push_back(std::move(zero_based_ranges)); + } + + return result; +} + +IntervalRanges IntervalRanges::Intersection(const IntervalRanges& left, + const IntervalRanges& right) { + IntervalRanges result; + + size_t rightIndex = 0; + for (const IntervalRange& l : left.ranges_) { + for (size_t i = rightIndex, n = right.ranges_.size(); i < n; ++i) { + const IntervalRange& r = right.ranges_[i]; + if (l.IsBefore(r)) { + break; + } else if (l.IsAfter(r)) { + rightIndex = i + 1; + continue; + } + result.Add(IntervalRange::Intersection(l, r)); + } + } + + return result; +} + +void IntervalRanges::Add(const IntervalRange& range) { + const IntervalRange rangeToAdd = range; + if (ranges_.size() > 1 && rangeToAdd.start <= ranges_.back().end) { + throw ParquetException("Ranges must be added in order"); + } + ranges_.push_back(rangeToAdd); +} + +const std::vector& IntervalRanges::GetRanges() const { return ranges_; } + +IntervalRowRangesIterator::IntervalRowRangesIterator( + const std::vector& ranges) + : ranges_(ranges) {} +IntervalRowRangesIterator::~IntervalRowRangesIterator() {} + +std::variant IntervalRowRangesIterator::NextRange() { + if (current_index_ >= ranges_.size()) return End(); + + return ranges_[current_index_++]; +} + // ---------------------------------------------------------------------- // RecordReader @@ -1363,7 +1504,7 @@ class TypedRecordReader : public TypedColumnReaderImpl, int64_t records_read = 0; if (has_values_to_process()) { - records_read += ReadRecordData(num_records); + records_read += ReadRecordDataWithSkipCheck(num_records); } int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); @@ -1417,11 +1558,11 @@ class TypedRecordReader : public TypedColumnReaderImpl, } levels_written_ += levels_read; - records_read += ReadRecordData(num_records - records_read); + records_read += ReadRecordDataWithSkipCheck(num_records - records_read); } else { // No repetition or definition levels batch_size = std::min(num_records - records_read, batch_size); - records_read += ReadRecordData(batch_size); + records_read += ReadRecordDataWithSkipCheck(batch_size); } } @@ -1624,10 +1765,12 @@ class TypedRecordReader : public TypedColumnReaderImpl, // Top level required field. Number of records equals to number of levels, // and there is not read-ahead for levels. + int64_t skipped_records = 0; if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) { - return this->Skip(num_records); + skipped_records = this->Skip(num_records); + current_rg_processed_records_ += skipped_records; + return skipped_records; } - int64_t skipped_records = 0; if (this->max_rep_level_ == 0) { // Non-repeated optional field. // First consume whatever is in the buffer. @@ -1643,6 +1786,8 @@ class TypedRecordReader : public TypedColumnReaderImpl, } else { skipped_records += this->SkipRecordsRepeated(num_records); } + + current_rg_processed_records_ += skipped_records; return skipped_records; } @@ -1971,9 +2116,27 @@ class TypedRecordReader : public TypedColumnReaderImpl, this->ConsumeBufferedValues(values_to_read); } + current_rg_processed_records_ += records_read; return records_read; } + int64_t ReadRecordDataWithSkipCheck(const int64_t num_records) { + if (!skipper_) { + return ReadRecordData(num_records); + } + + while (true) { + const auto advise = skipper_->AdviseNext(current_rg_processed_records_); + if (advise == 0) { + return 0; + } + if (advise > 0) { + return ReadRecordData(std::min(num_records, advise)); + } + SkipRecords(-advise); + } + } + void DebugPrintState() override { const int16_t* def_levels = this->def_levels(); const int16_t* rep_levels = this->rep_levels(); @@ -2300,5 +2463,72 @@ std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, return nullptr; } +RecordSkipper::RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges) { + // copy row_ranges + IntervalRanges skip_pages; + for (auto& page : pages.GetRanges()) { + if (!orig_row_ranges.IsOverlapping(page)) { + skip_pages.Add(page); + } + } + + AdjustRanges(skip_pages, orig_row_ranges, row_ranges_); + range_iter_ = row_ranges_->NewIterator(); + current_range_variant = range_iter_->NextRange(); + + total_rows_to_process_ = pages.RowCount() - skip_pages.RowCount(); +} + + +int64_t RecordSkipper::AdviseNext(const int64_t current_rg_processed) { + if (current_range_variant.index() == 2) { + return 0; + } + + auto& current_range = std::get(current_range_variant); + + if (current_range.end < current_rg_processed) { + current_range_variant = range_iter_->NextRange(); + if (current_range_variant.index() == 2) { + // negative, skip the ramaining rows + return current_rg_processed - total_rows_to_process_; + } + } + + current_range = std::get(current_range_variant); + + if (current_range.start > current_rg_processed) { + // negative, skip + return current_rg_processed - current_range.start; + } + + const auto ret = current_range.end - current_rg_processed + 1; + return ret; +} + +void RecordSkipper::AdjustRanges(IntervalRanges& skip_pages, + const RowRanges& orig_row_ranges, + std::unique_ptr& ret) { + std::unique_ptr temp = std::make_unique(); + + size_t skipped_rows = 0; + const auto orig_range_iter = orig_row_ranges.NewIterator(); + auto orig_range_variant = orig_range_iter->NextRange(); + auto skip_iter = skip_pages.GetRanges().begin(); + while (orig_range_variant.index() != 2) { + const auto& origin_range = std::get(orig_range_variant); + while (skip_iter != skip_pages.GetRanges().end() && + skip_iter->IsBefore(origin_range)) { + skipped_rows += skip_iter->Count(); + ++skip_iter; + } + + temp->Add(IntervalRange(origin_range.start - skipped_rows, + origin_range.end - skipped_rows)); + orig_range_variant = orig_range_iter->NextRange(); + } + ret = std::move(temp); +} + } // namespace internal } // namespace parquet diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 086f6c0e5580..45f566404940 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -22,6 +22,7 @@ #include #include +#include "page_index.h" #include "parquet/exception.h" #include "parquet/level_conversion.h" #include "parquet/metadata.h" @@ -302,8 +303,150 @@ class TypedColumnReader : public ColumnReader { int32_t* dict_len) = 0; }; +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " and end: " + std::to_string(end)); + } + } + + size_t Count() const { + if(!IsValid()) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " and end: " + std::to_string(end)); + } + return end - start + 1; + } + + bool IsBefore(const IntervalRange& other) const { return end < other.start; } + + bool IsAfter(const IntervalRange& other) const { return start > other.end; } + + bool IsOverlap(const IntervalRange& other) const { + return !IsBefore(other) && !IsAfter(other); + } + + bool IsValid() const { return start >= 0 && end >= 0 && end >= start; } + + std::string ToString() const { + return "(" + std::to_string(start) + ", " + std::to_string(end) + ")"; + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. +class RowRanges { + public: + RowRanges() = default; + virtual ~RowRanges() = default; + virtual size_t RowCount() const = 0; + virtual int64_t LastRow() const = 0; + virtual bool IsValid() const = 0; + virtual bool IsOverlapping(const IntervalRange& searchRange) const = 0; + // Given a RowRanges with rows accross all RGs, split it into N RowRanges, where N = number of RGs + // e.g.: suppose we have 2 RGs: [0-99] and [100-199], and user is interested in RowRanges [90-110], then + // this function will return 2 RowRanges: [90-99] and [0-10] + virtual std::vector> SplitByRowGroups(const std::vector& rows_per_rg) const = 0; + virtual std::string ToString() const = 0; + + // Returns a vector of PageLocations that must be read all to get values for + // all included in this range virtual std::vector + // PageIndexesToInclude(const std::vector& all_pages) = 0; + + class Iterator { + public: + virtual std::variant NextRange() = 0; + virtual ~Iterator() = default; + }; + virtual std::unique_ptr NewIterator() const = 0; + +}; + +class IntervalRanges : public RowRanges { + public: + IntervalRanges(); + explicit IntervalRanges(const IntervalRange& range); + explicit IntervalRanges(const std::vector& ranges); + std::unique_ptr NewIterator() const override; + size_t RowCount() const override; + int64_t LastRow() const override; + bool IsValid() const override; + bool IsOverlapping(const IntervalRange& searchRange) const override; + std::string ToString() const override; + std::vector> SplitByRowGroups( + const std::vector& rows_per_rg) const override; + static IntervalRanges Intersection(const IntervalRanges& left, + const IntervalRanges& right); + void Add(const IntervalRange& range); + const std::vector& GetRanges() const; + + private: + std::vector ranges_; +}; + +class IntervalRowRangesIterator : public RowRanges::Iterator { + public: + IntervalRowRangesIterator(const std::vector& ranges); + ~IntervalRowRangesIterator() override; + std::variant NextRange() override; + + private: + const std::vector& ranges_; + size_t current_index_ = 0; +}; + namespace internal { +// A RecordSkipper is used to skip uncessary rows within each pages. +class PARQUET_EXPORT RecordSkipper { + public: + RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges); + /// Return the number of records to read or to skip + /// if return values is positive, it means to read N records + /// if return values is negative, it means to skip N records + /// if return values is 0, it means end of RG + int64_t AdviseNext(const int64_t current_rg_processed); + + private: + /// Since the skipped pages will be silently skipped without updating + /// current_rg_processed_records or records_read_, we need to pre-process the row + /// ranges as if these skipped pages never existed + static void AdjustRanges(IntervalRanges& skip_pages, const RowRanges& orig_row_ranges, + std::unique_ptr& ret); + + std::unique_ptr row_ranges_; + std::unique_ptr range_iter_; + std::variant current_range_variant = End(); + + size_t total_rows_to_process_ = 0; +}; + /// \brief Stateful column reader that delimits semantic records for both flat /// and nested columns /// @@ -424,6 +567,10 @@ class PARQUET_EXPORT RecordReader { /// \brief True if reading dense for nullable columns. bool read_dense_for_nullable() const { return read_dense_for_nullable_; } + void reset_current_rg_processed_records() { current_rg_processed_records_ = 0; } + + void set_record_skipper(const std::shared_ptr& skipper) { skipper_ = skipper; } + protected: /// \brief Indicates if we can have nullable values. Note that repeated fields /// may or may not be nullable. @@ -432,6 +579,8 @@ class PARQUET_EXPORT RecordReader { bool at_record_start_; int64_t records_read_; + int64_t current_rg_processed_records_ = 0; // counting both read and skip records + /// \brief Stores values. These values are populated based on each ReadRecords /// call. No extra values are buffered for the next call. SkipRecords will not /// add any value to this buffer. @@ -473,6 +622,8 @@ class PARQUET_EXPORT RecordReader { // If true, we will not leave any space for the null values in the values_ // vector. bool read_dense_for_nullable_ = false; + + std::shared_ptr skipper_ = NULLPTR; }; class BinaryRecordReader : virtual public RecordReader { diff --git a/cpp/src/parquet/range_reader_test.cc b/cpp/src/parquet/range_reader_test.cc new file mode 100644 index 000000000000..cde60c583f50 --- /dev/null +++ b/cpp/src/parquet/range_reader_test.cc @@ -0,0 +1,489 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/api.h" +#include "arrow/io/api.h" +#include "arrow/io/memory.h" +#include "arrow/result.h" +#include "arrow/util/type_fwd.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/writer.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +using parquet::IntervalRange; +using parquet::IntervalRanges; + +std::string random_string(std::string::size_type length) { + static auto& chrs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + static std::mt19937 rg{std::random_device{}()}; + static std::uniform_int_distribution pick(0, sizeof(chrs) - 2); + + std::string s; + s.reserve(length); + while (length--) s += chrs[pick(rg)]; + + return s; +} + +/// The table looks like (with_nulls = false): +// { +// { a: {x: 0, y: 0}, b: {0, 0, 0}, c: "0", d: 0}, +// { a: {x: 1, y: 1}, b: {1, 1, 1}, c: "1", d: 1}, +// ... +// { a: {x: 99, y: 99}, b: {99, 99, 99}, c: "99", d: 99} +// } +arrow::Result> GetTable(bool with_nulls = false) { + // if with_nulls, the generated table should null values + // set first 10 rows and last 10 rows to null + std::shared_ptr null_bitmap; + std::vector flags(100, true); + if (with_nulls) { + std::fill_n(flags.begin(), 10, false); + std::fill_n(flags.begin() + 90, 10, false); + + size_t length = flags.size(); + + ARROW_ASSIGN_OR_RAISE(null_bitmap, arrow::AllocateEmptyBitmap(length)); + + uint8_t* bitmap = null_bitmap->mutable_data(); + for (size_t i = 0; i < length; ++i) { + if (flags[i]) { + arrow::bit_util::SetBit(bitmap, i); + } + } + } + + auto int32_builder = arrow::Int32Builder(); + + // Struct col + std::shared_ptr arr_a_x; + std::shared_ptr arr_a_y; + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(arrow::internal::Iota(0, 100))); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_a_x)); + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(arrow::internal::Iota(0, 100))); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_a_y)); + ARROW_ASSIGN_OR_RAISE(auto arr_a, arrow::StructArray::Make( + {arr_a_x, arr_a_y}, + std::vector{"x", "y"}, null_bitmap)); + + // List col + std::shared_ptr arr_b_values; + std::shared_ptr arr_b_offsets; + std::vector b_values; + for (int i = 0; i < 100; ++i) { + for (int j = 0; j < 3; ++j) { + b_values.push_back(i); + } + } + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(b_values)); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_b_values)); + std::vector offsets = arrow::internal::Iota(0, 101); + std::transform(offsets.begin(), offsets.end(), offsets.begin(), + [](const int x) { return x * 3; }); + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(offsets)); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_b_offsets)); + ARROW_ASSIGN_OR_RAISE(auto arr_b, arrow::ListArray::FromArrays( + *arr_b_offsets, *arr_b_values, + arrow::default_memory_pool(), null_bitmap)); + + // string col + auto string_builder = arrow::StringBuilder(); + std::shared_ptr arr_c; + std::vector strs; + uint8_t valid_bytes[100]; + for (size_t i = 0; i < 100; i++) { + // add more chars to make this column unaligned with other columns' page + strs.push_back(std::to_string(i) + random_string(20)); + valid_bytes[i] = flags[i]; + } + ARROW_RETURN_NOT_OK(string_builder.AppendValues(strs, &valid_bytes[0])); + ARROW_RETURN_NOT_OK(string_builder.Finish(&arr_c)); + + // int col + std::shared_ptr arr_d; + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(arrow::internal::Iota(0, 100), flags)); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_d)); + + auto schema = arrow::schema({ + // complex types prior to simple types + field("a", arr_a->type()), + field("b", list(arrow::int32())), + field("c", arrow::utf8()), + field("d", arrow::int32()), + }); + + return arrow::Table::Make(schema, {arr_a, arr_b, arr_c, arr_d}); +} + +arrow::Result> WriteFullFile( + const bool with_nulls = false) { + using parquet::ArrowWriterProperties; + using parquet::WriterProperties; + + ARROW_ASSIGN_OR_RAISE(const auto table, GetTable(with_nulls)); + + const std::shared_ptr props = + WriterProperties::Builder() + .max_row_group_length(30) + ->enable_write_page_index() + ->disable_dictionary() + ->write_batch_size(1) + ->data_pagesize(30) // small pages + ->compression(arrow::Compression::SNAPPY) + ->build(); + + const std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + + ARROW_ASSIGN_OR_RAISE(const auto out_stream, ::arrow::io::BufferOutputStream::Create()); + + ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table.get(), + arrow::default_memory_pool(), out_stream, + /*chunk_size=*/100, props, arrow_props)); + + // { + // // output to a local file for debugging + // ARROW_ASSIGN_OR_RAISE(auto outfile, arrow::io::FileOutputStream::Open( + // "/tmp/range_reader_test.parquet")); + // + // ARROW_RETURN_NOT_OK( + // parquet::arrow::WriteTable(*table.get(), arrow::default_memory_pool(), outfile, + // /*chunk_size=*/100, props, arrow_props)); + // } + + return out_stream->Finish(); +} + +bool checking_col(const std::string& col_name, + const std::vector& column_names) { + return std::find(column_names.begin(), column_names.end(), col_name) != + column_names.end(); +} + +void check_rb(std::unique_ptr rb_reader, + const size_t expected_rows, const int64_t expected_sum) { + const std::vector column_names = rb_reader->schema()->field_names(); + + size_t total_rows = 0; + int64_t sum_a = 0; + int64_t sum_b = 0; + int64_t sum_c = 0; + int64_t sum_d = 0; + for (arrow::Result> maybe_batch : *rb_reader) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + total_rows += batch->num_rows(); + + if (checking_col("a", column_names)) { + auto a_array = + std::dynamic_pointer_cast(batch->GetColumnByName("a")); + auto a_x_array = std::dynamic_pointer_cast(a_array->field(0)); + auto a_y_array = std::dynamic_pointer_cast(a_array->field(1)); + for (auto iter = a_x_array->begin(); iter != a_x_array->end(); ++iter) { + sum_a += (*iter).has_value() ? (*iter).value() : 0; + } + for (auto iter = a_y_array->begin(); iter != a_y_array->end(); ++iter) { + sum_a += (*iter).has_value() ? (*iter).value() : 0; + } + } + + if (checking_col("b", column_names)) { + auto b_array = + std::dynamic_pointer_cast(batch->GetColumnByName("b")); + ASSERT_OK_AND_ASSIGN(auto flatten_b_array, b_array->Flatten()); + auto b_array_values = std::dynamic_pointer_cast(flatten_b_array); + for (auto iter = b_array_values->begin(); iter != b_array_values->end(); ++iter) { + sum_b += (*iter).has_value() ? (*iter).value() : 0; + } + } + + if (checking_col("c", column_names)) { + auto c_array = + std::dynamic_pointer_cast(batch->GetColumnByName("c")); + for (auto iter = c_array->begin(); iter != c_array->end(); ++iter) { + sum_c += std::stoi(std::string( + (*iter).has_value() ? (*iter).value().substr(0, (*iter).value().size() - 20) + : "0")); + } + } + + if (checking_col("d", column_names)) { + auto d_array = + std::dynamic_pointer_cast(batch->GetColumnByName("d")); + for (auto iter = d_array->begin(); iter != d_array->end(); ++iter) { + sum_d += (*iter).has_value() ? (*iter).value() : 0; + } + } + } + ASSERT_EQ(expected_rows, total_rows); + + if (checking_col("a", column_names)) ASSERT_EQ(expected_sum * 2, sum_a); + if (checking_col("b", column_names)) ASSERT_EQ(expected_sum * 3, sum_b); + if (checking_col("c", column_names)) ASSERT_EQ(expected_sum, sum_c); + if (checking_col("d", column_names)) ASSERT_EQ(expected_sum, sum_d); +} + +class TestRecordBatchReaderWithRanges : public testing::Test { + public: + void SetUp() { + ASSERT_OK_AND_ASSIGN(auto buffer, WriteFullFile()); + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(10); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + const auto in_file = std::make_shared(buffer); + ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + + ASSERT_OK_AND_ASSIGN(arrow_reader, reader_builder.Build()); + } + + void TearDown() {} + + protected: + std::unique_ptr arrow_reader; +}; + +TEST_F(TestRecordBatchReaderWithRanges, TestRangesSplit) {} + +TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { + std::unique_ptr rb_reader; + IntervalRanges rows{{IntervalRange{0, 9}, IntervalRange{40, 49}, IntervalRange{80, 89}, IntervalRange{90, 99}}}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); + + // (0+...+9) + (40+...+49) + (80+...+89) + (90+...+99) = 2280 + check_rb(std::move(rb_reader), 40, 2280); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { + std::unique_ptr rb_reader; + IntervalRanges rows{{IntervalRange{0, 7}, IntervalRange{16, 23}}}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); + + // (0+...+7) + (16+...+23) = 184 + check_rb(std::move(rb_reader), 16, 184); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { + std::unique_ptr rb_reader; + IntervalRanges rows{{IntervalRange{0, 29}, IntervalRange{30, 59}, IntervalRange{60, 89}, IntervalRange{90, 99}}}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); + + // (0+...+99) = 4950 + check_rb(std::move(rb_reader), 100, 4950); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectEmptyRange) { + std::unique_ptr rb_reader; + IntervalRanges rows{}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = + arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); + ASSERT_OK(status); + check_rb(std::move(rb_reader), 0, 0); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { + // case 1: only care about RG 0 + { + std::unique_ptr rb_reader; + std::vector ranges; + for (int64_t i = 0; i < 30; i++) { + if (i % 2 == 0) ranges.push_back({i, i}); + } + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, + &rb_reader)); + + check_rb(std::move(rb_reader), 15, 210); // 0 + 2 + ... + 28 = 210 + } + + // case 2: care about RG 0 and 2 + { + std::unique_ptr rb_reader; + std::vector ranges; + for (int64_t i = 0; i < 30; i++) { + if (i % 2 == 0) ranges.push_back({i, i}); + } + + for (int64_t i = 60; i < 90; i++) { + if (i % 2 == 0) ranges.push_back({i, i}); + } + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, + &rb_reader)); + + check_rb(std::move(rb_reader), 30, + 1320); // (0 + 2 + ... + 28) + (60 + 62 ... + 88) = 1320 + } +} + +TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { + std::unique_ptr rb_reader; + { + IntervalRanges rows{{IntervalRange{-1, 5}}}; + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = + arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE(status.message().find("The provided row range is invalid, keep it " + "monotone and non-interleaving: [(-1, 5)]") != + std::string::npos); + } + + { + IntervalRanges rows{{IntervalRange{0, 4}, {2, 5}}}; + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = + arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE( + status.message().find("The provided row range is invalid, keep it monotone and " + "non-interleaving: [(0, 4), (2, 5)]") != std::string::npos); + } + { + // will treat as {0,99} + IntervalRanges rows{{IntervalRange{0, 100}}}; + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = + arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE(status.message().find("The provided row range [(0, 100)] exceeds the " + "number of rows in the file: 100") != + std::string::npos); + } +} + +TEST(TestRecordBatchReaderWithRangesBadCases, NoPageIndex) { + using parquet::ArrowWriterProperties; + using parquet::WriterProperties; + + // write a file without page index + ASSERT_OK_AND_ASSIGN(std::shared_ptr table, GetTable()); + std::shared_ptr props = + WriterProperties::Builder() + .max_row_group_length(30) + ->disable_write_page_index() // NO INDEX !!!! + ->write_batch_size(13) + ->data_pagesize(1) + ->compression(arrow::Compression::SNAPPY) + ->build(); + std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + ASSERT_OK_AND_ASSIGN(auto out_stream, ::arrow::io::BufferOutputStream::Create()); + ASSERT_OK(parquet::arrow::WriteTable(*table.get(), arrow::default_memory_pool(), + out_stream, + /*chunk_size=*/100, props, arrow_props)); + ASSERT_OK_AND_ASSIGN(auto buffer, out_stream->Finish()); + + // try to read the file with Range + arrow::MemoryPool* pool = arrow::default_memory_pool(); + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(10); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + auto in_file = std::make_shared(buffer); + ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + ASSERT_OK_AND_ASSIGN(auto arrow_reader, reader_builder.Build()); + + std::unique_ptr rb_reader; + IntervalRanges rows{{IntervalRange{0, 29}}}; + std::vector column_indices{0, 1, 2, 3, 4}; + auto status = arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE(status.message().find("Attempting to read with Ranges but Page Index is " + "not found for Row Group: 0") != std::string::npos); +} + +class TestRecordBatchReaderWithRangesWithNulls : public testing::Test { + public: + void SetUp() { + ASSERT_OK_AND_ASSIGN(auto buffer, WriteFullFile(true)); + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(10); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + const auto in_file = std::make_shared(buffer); + ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + + ASSERT_OK_AND_ASSIGN(arrow_reader, reader_builder.Build()); + } + + void TearDown() {} + + protected: + std::unique_ptr arrow_reader; +}; + +TEST_F(TestRecordBatchReaderWithRangesWithNulls, SelectOneRowSkipOneRow) { + { + std::unique_ptr rb_reader; + std::vector ranges; + for (int64_t i = 0; i < 30; i++) { + if (i % 2 == 0) ranges.push_back({i, i}); + } + + for (int64_t i = 60; i < 90; i++) { + if (i % 2 == 0) ranges.push_back({i, i}); + } + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, + &rb_reader)); + + // 0-9 is masked as null, so the ramaining is: + // (10 + 12 + ... + 28) + (60 + 62 ... + 88) = 1320 + check_rb(std::move(rb_reader), 30, 1300); + } +} \ No newline at end of file diff --git a/cpp/src/parquet/row_range_test.cc b/cpp/src/parquet/row_range_test.cc new file mode 100644 index 000000000000..44327baab04c --- /dev/null +++ b/cpp/src/parquet/row_range_test.cc @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include +#include "parquet/column_reader.h" + +using namespace parquet; + +class RowRangesTest : public ::testing::Test { + protected: + IntervalRanges row_ranges; +}; + +TEST_F(RowRangesTest, EmptyRG_ReturnsOriginalRowRanges) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector rows_per_rg; + + auto result = row_ranges.SplitByRowGroups(rows_per_rg); + ASSERT_EQ(result.size(), 1); + + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 10); + ASSERT_EQ(iter->NextRange().index(), 2); +} + +TEST_F(RowRangesTest, SingleRG_ReturnsOriginalRowRanges2) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector rows_per_rg = {11}; + + auto result = row_ranges.SplitByRowGroups(rows_per_rg); + ASSERT_EQ(result.size(), 1); + + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 10); + ASSERT_EQ(iter->NextRange().index(), 2); +} + +TEST_F(RowRangesTest, ReturnsTwoRowRanges) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector rows_per_rg = {5, 6}; + + auto result = row_ranges.SplitByRowGroups(rows_per_rg); + ASSERT_EQ(result.size(), 2); + { + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 5); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, ReturnsMultipleRowRanges) { + row_ranges.Add(IntervalRange(0, 11)); + std::vector rows_per_rg = {3, 4, 100}; + + auto result = row_ranges.SplitByRowGroups(rows_per_rg); + ASSERT_EQ(result.size(), 3); + { + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 2); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 3); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[2]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, MultipleInputRange) { + row_ranges.Add(IntervalRange(0, 10)); + row_ranges.Add(IntervalRange(90, 111)); + row_ranges.Add(IntervalRange(191, 210)); + + std::vector rows_per_rg = {100, 100}; + + auto result = row_ranges.SplitByRowGroups(rows_per_rg); + ASSERT_EQ(result.size(), 2); + { + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 10); + + range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 90); + ASSERT_EQ(range.end, 99); + + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 11); + + range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 91); + ASSERT_EQ(range.end, 99); + + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, MultipleSplitPoints_ReturnWithEmptyRowRanges) { + row_ranges.Add(IntervalRange(11, 18)); + std::vector rows_per_rg = {5, 5, 5, 5, 5}; + + auto result = row_ranges.SplitByRowGroups(rows_per_rg); + ASSERT_EQ(result.size(), 5); + { + auto iter = result[0]->NewIterator(); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[2]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 1); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[3]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 3); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[4]->NewIterator(); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} + +TEST_F(RowRangesTest, RangeExceedRG) { + row_ranges.Add(IntervalRange(0, 10)); + std::vector rows_per_rg = {5, 3}; + + auto result = row_ranges.SplitByRowGroups(rows_per_rg); + ASSERT_EQ(result.size(), 2); + { + auto iter = result[0]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 4); + ASSERT_EQ(iter->NextRange().index(), 2); + } + { + auto iter = result[1]->NewIterator(); + auto range = std::get(iter->NextRange()); + ASSERT_EQ(range.start, 0); + ASSERT_EQ(range.end, 2); + ASSERT_EQ(iter->NextRange().index(), 2); + } +} From 60db7dfdd3ea4018242a7a4ad87167ac59b062cb Mon Sep 17 00:00:00 2001 From: Hongbin Ma Date: Mon, 22 Jan 2024 16:16:26 +0800 Subject: [PATCH 2/3] fix comments --- cpp/src/parquet/CMakeLists.txt | 1 + cpp/src/parquet/arrow/reader.cc | 105 ++++++++------- cpp/src/parquet/arrow/reader.h | 9 +- cpp/src/parquet/column_reader.cc | 150 +-------------------- cpp/src/parquet/column_reader.h | 128 +----------------- cpp/src/parquet/range_reader_test.cc | 65 +++++---- cpp/src/parquet/row_range.cc | 190 +++++++++++++++++++++++++++ cpp/src/parquet/row_range.h | 156 ++++++++++++++++++++++ cpp/src/parquet/row_range_test.cc | 17 +-- 9 files changed, 461 insertions(+), 360 deletions(-) create mode 100644 cpp/src/parquet/row_range.cc create mode 100644 cpp/src/parquet/row_range.h diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 476bb48557f0..19a9ccf58aa3 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -162,6 +162,7 @@ set(PARQUET_SRCS arrow/writer.cc bloom_filter.cc bloom_filter_reader.cc + row_range.cc column_reader.cc column_scanner.cc column_writer.cc diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 86a86e41fdb0..e151fb8c4dee 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -221,13 +221,14 @@ class FileReaderImpl : public FileReader { ctx->iterator_factory = SomeRowGroupsFactory(row_groups); ctx->filter_leaves = true; ctx->included_leaves = included_leaves; - ctx->row_ranges_per_rg = row_ranges_per_rg; // copy the shared pointer to extend its lifecycle + ctx->row_ranges_per_rg = + row_ranges_per_rg; // copy the shared pointer to extend its lifecycle return GetReader(manifest_.schema_fields[i], ctx, out); } Status GetFieldReaders( const std::vector& column_indices, const std::vector& row_groups, - const std::shared_ptr>> & row_ranges_per_rg, + const std::shared_ptr>>& row_ranges_per_rg, std::vector>* out, std::shared_ptr<::arrow::Schema>* out_schema) { // We only need to read schema fields which have columns indicated @@ -331,44 +332,43 @@ class FileReaderImpl : public FileReader { } // This is a internal API owned by FileReaderImpl, not exposed in FileReader - Status GetRecordBatchReaderWithRowRanges(const std::vector& row_group_indices, - const std::vector& column_indices, - const std::shared_ptr>> & row_ranges_per_rg, - std::unique_ptr* out); + Status GetRecordBatchReaderWithRowRanges( + const std::vector& row_group_indices, const std::vector& column_indices, + const std::shared_ptr>>& row_ranges_per_rg, + std::unique_ptr* out); Status GetRecordBatchReader(const RowRanges& rows_to_return, const std::vector& column_indices, std::unique_ptr* out) override { const auto metadata = reader_->metadata(); - // check if the row ranges are valid - if (!rows_to_return.IsValid()) { - return Status::Invalid("The provided row range is invalid, keep it monotone and non-interleaving: " + - rows_to_return.ToString()); - } // check if the row ranges are within the row group boundaries - if (rows_to_return.RowCount() != 0 && rows_to_return.LastRow() >= metadata->num_rows()) { + if (rows_to_return.num_rows() != 0 && + rows_to_return.last_row() >= metadata->num_rows()) { return Status::Invalid("The provided row range " + rows_to_return.ToString() + " exceeds the number of rows in the file: " + std::to_string(metadata->num_rows())); } - if (rows_to_return.RowCount() == 0) { + if (rows_to_return.num_rows() == 0) { return GetRecordBatchReaderWithRowRanges({}, column_indices, {}, out); } std::vector rows_per_rg; - for (int i = 0 ; i < metadata->num_row_groups(); i++) { - rows_per_rg.push_back( metadata->RowGroup(i)->num_rows()); + for (int i = 0; i < metadata->num_row_groups(); i++) { + rows_per_rg.push_back(metadata->RowGroup(i)->num_rows()); } // We'll assign a RowRanges for each RG, even if it's not required to return any rows - std::vector> row_ranges_per_rg = rows_to_return.SplitByRowGroups(rows_per_rg); + std::vector> row_ranges_per_rg = + rows_to_return.SplitByRowRange(rows_per_rg); std::vector row_group_indices; - for (int i = 0 ; i < metadata->num_row_groups(); i++) { - if (row_ranges_per_rg.at(i)->RowCount() > 0) - row_group_indices.push_back(i); + for (int i = 0; i < metadata->num_row_groups(); i++) { + if (row_ranges_per_rg.at(i)->num_rows() > 0) row_group_indices.push_back(i); } - return GetRecordBatchReaderWithRowRanges(row_group_indices, column_indices, - std::make_shared>>(std::move(row_ranges_per_rg)), out); + return GetRecordBatchReaderWithRowRanges( + row_group_indices, column_indices, + std::make_shared>>( + std::move(row_ranges_per_rg)), + out); } Status GetRecordBatchReader(const std::vector& row_group_indices, @@ -379,13 +379,13 @@ class FileReaderImpl : public FileReader { Status GetRecordBatchReader(const std::vector& row_group_indices, std::unique_ptr* out) override { - return GetRecordBatchReaderWithRowRanges(row_group_indices, - Iota(reader_->metadata()->num_columns()), {}, out); + return GetRecordBatchReaderWithRowRanges( + row_group_indices, Iota(reader_->metadata()->num_columns()), {}, out); } Status GetRecordBatchReader(std::unique_ptr* out) override { - return GetRecordBatchReaderWithRowRanges(Iota(num_row_groups()), - Iota(reader_->metadata()->num_columns()), {}, out); + return GetRecordBatchReaderWithRowRanges( + Iota(num_row_groups()), Iota(reader_->metadata()->num_columns()), {}, out); } ::arrow::Result<::arrow::AsyncGenerator>> @@ -488,22 +488,21 @@ class RowGroupReaderImpl : public RowGroupReader { // ---------------------------------------------------------------------- // Column reader implementations -// This class is used to skip decompressing & decoding unnecessary pages by comparing user-specified row_ranges -// and page_ranges from metadata. -// Only support IntervalRange case for now. +// This class is used to skip decompressing & decoding unnecessary pages by comparing +// user-specified row_ranges and page_ranges from metadata. Only support IntervalRange +// case for now. class RowRangesPageFilter { public: - RowRangesPageFilter(const RowRanges& row_ranges, const std::shared_ptr& page_ranges) - : row_ranges_(row_ranges), page_ranges_(page_ranges) { - } + RowRangesPageFilter(const RowRanges& row_ranges, + const std::shared_ptr& page_ranges) + : row_ranges_(row_ranges), page_ranges_(page_ranges) {} - // To avoid error "std::function target must be copy-constructible", we must define copy constructor + // To avoid error "std::function target must be copy-constructible", we must define copy + // constructor RowRangesPageFilter(const RowRangesPageFilter& other) - : row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) { - } + : row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) {} bool operator()(const DataPageStats& stats) { - if (!initted) { row_ranges_itr_ = row_ranges_.NewIterator(); page_ranges_itr_ = page_ranges_->NewIterator(); @@ -511,19 +510,21 @@ class RowRangesPageFilter { current_row_range_ = row_ranges_itr_->NextRange(); if (current_row_range_.index() != 0) { - throw ParquetException("RowRangesPageFilter expects first NextRange() to be a IntervalRange"); + throw ParquetException( + "RowRangesPageFilter expects first NextRange() to be a IntervalRange"); } initted = true; } current_page_range_ = page_ranges_itr_->NextRange(); if (current_page_range_.index() != 0) { - throw ParquetException("RowRangesPageFilter expects first NextRange() to be a IntervalRange"); + throw ParquetException( + "RowRangesPageFilter expects first NextRange() to be a IntervalRange"); } while (current_row_range_.index() == 0 && - std::get(current_page_range_).IsAfter( - std::get(current_row_range_))) { + IntervalRangeUtils::IsAfter(std::get(current_page_range_), + std::get(current_row_range_))) { current_row_range_ = row_ranges_itr_->NextRange(); } @@ -531,8 +532,8 @@ class RowRangesPageFilter { return true; } - return std::get(current_page_range_).IsBefore( - std::get(current_row_range_)); + return IntervalRangeUtils::IsBefore(std::get(current_page_range_), + std::get(current_row_range_)); } private: @@ -641,11 +642,11 @@ class LeafReader : public ColumnReaderImpl { 1}); } - if (row_ranges.RowCount() > 0) { - if (row_ranges.LastRow() > page_ranges->LastRow()) { + if (row_ranges.num_rows() > 0) { + if (row_ranges.last_row() > page_ranges->last_row()) { throw ParquetException( - "The provided row range " + row_ranges.ToString() + - " exceeds last page :" + page_ranges->GetRanges().back().ToString()); + "The provided row range " + row_ranges.ToString() + " exceeds last page :" + + IntervalRangeUtils::ToString(page_ranges->GetRanges().back())); } } } @@ -656,23 +657,21 @@ class LeafReader : public ColumnReaderImpl { /// using page index to reduce cost if (page_reader != nullptr && ctx_->row_ranges_per_rg) { // reset skipper - record_reader_->set_record_skipper(NULLPTR); + record_reader_->reset_record_skipper(); - const auto & row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()]; + const auto& row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()]; // if specific row range is provided for this rg - if (row_ranges->RowCount() != 0) { - + if (row_ranges->num_rows() != 0) { // Use IntervalRanges to represent pages std::shared_ptr page_ranges; checkAndGetPageRanges(*row_ranges, page_ranges); // part 1, skip decompressing & decoding unnecessary pages - page_reader->set_data_page_filter( - RowRangesPageFilter(*row_ranges, page_ranges)); + page_reader->set_data_page_filter(RowRangesPageFilter(*row_ranges, page_ranges)); // part 2, skip unnecessary rows in necessary pages record_reader_->set_record_skipper( - std::make_shared(*page_ranges, + std::make_unique(*page_ranges, *row_ranges)); } else { NextRowGroup(); @@ -1151,7 +1150,7 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& Status FileReaderImpl::GetRecordBatchReaderWithRowRanges( const std::vector& row_groups, const std::vector& column_indices, - const std::shared_ptr>> & row_ranges_per_rg, + const std::shared_ptr>>& row_ranges_per_rg, std::unique_ptr* out) { RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 4cfdad5a23e0..ad330a7c7352 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -184,14 +184,15 @@ class PARQUET_EXPORT FileReader { /// \brief Return a RecordBatchReader of row groups selected from /// rows_to_return, whose columns are selected by column_indices. /// - /// Notice that rows_to_return is file based, it not only decides which row groups to read, - /// but also which rows to read in each row group. + /// Notice that rows_to_return is file based, it not only decides which row groups to + /// read, but also which rows to read in each row group. /// /// /// \returns error Status if either rows_to_return or column_indices /// contains an invalid index - virtual ::arrow::Status GetRecordBatchReader(const RowRanges& rows_to_return, - const std::vector& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + virtual ::arrow::Status GetRecordBatchReader( + const RowRanges& rows_to_return, const std::vector& column_indices, + std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; /// \brief Return a RecordBatchReader of row groups selected from /// row_group_indices, whose columns are selected by column_indices. diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 9003e643df22..69f3f6bd824d 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1289,147 +1289,6 @@ std::shared_ptr ColumnReader::Make(const ColumnDescriptor* descr, ::arrow::Unreachable(); } -// ---------------------------------------------------------------------- -// RowRanges and ins implementations - -IntervalRanges::IntervalRanges() = default; - -IntervalRanges::IntervalRanges(const IntervalRange& range) { ranges_.push_back(range); } - -IntervalRanges::IntervalRanges(const std::vector& ranges) { - this->ranges_ = ranges; -} - -std::unique_ptr IntervalRanges::NewIterator() const { - return std::make_unique(ranges_); -} - -size_t IntervalRanges::RowCount() const { - size_t cnt = 0; - for (const IntervalRange& range : ranges_) { - cnt += range.Count(); - } - return cnt; -} - -int64_t IntervalRanges::LastRow() const { return ranges_.back().end; } - -bool IntervalRanges::IsValid() const { - if (ranges_.size() == 0) return true; - if (ranges_[0].start < 0) { - return false; - } - for (size_t i = 0; i < ranges_.size(); i++) { - if (!ranges_[i].IsValid()) { - return false; - } - } - for (size_t i = 1; i < ranges_.size(); i++) { - if (ranges_[i].start <= ranges_[i - 1].end) { - return false; - } - } - return true; -} - -bool IntervalRanges::IsOverlapping(const IntervalRange& searchRange) const { - auto it = std::lower_bound( - ranges_.begin(), ranges_.end(), searchRange, - [](const IntervalRange& r1, const IntervalRange& r2) { return r1.IsBefore(r2); }); - return it != ranges_.end() && !(*it).IsAfter(searchRange); -} - -std::string IntervalRanges::ToString() const { - std::string result = "["; - for (const IntervalRange& range : ranges_) { - result += range.ToString() + ", "; - } - if (!ranges_.empty()) { - result = result.substr(0, result.size() - 2); - } - result += "]"; - return result; -} - -std::vector> IntervalRanges::SplitByRowGroups( - const std::vector& rows_per_rg) const { - if (rows_per_rg.size() <= 1) { - std::unique_ptr single = - std::make_unique(*this); // return a copy of itself - auto ret = std::vector>(); - ret.push_back(std::move(single)); - return ret; - } - - std::vector> result; - - IntervalRanges spaces; - int64_t rows_so_far = 0; - for (size_t i = 0; i < rows_per_rg.size(); ++i) { - auto start = rows_so_far; - rows_so_far += rows_per_rg[i]; - auto end = rows_so_far - 1; - spaces.Add({start, end}); - } - - // each RG's row range forms a space, we need to adjust RowRanges in each space to - // zero based. - for (IntervalRange space : spaces.GetRanges()) { - auto intersection = Intersection(IntervalRanges(space), *this); - - std::unique_ptr zero_based_ranges = - std::make_unique(); - for (const IntervalRange& range : intersection.GetRanges()) { - zero_based_ranges->Add({range.start - space.start, range.end - space.start}); - } - result.push_back(std::move(zero_based_ranges)); - } - - return result; -} - -IntervalRanges IntervalRanges::Intersection(const IntervalRanges& left, - const IntervalRanges& right) { - IntervalRanges result; - - size_t rightIndex = 0; - for (const IntervalRange& l : left.ranges_) { - for (size_t i = rightIndex, n = right.ranges_.size(); i < n; ++i) { - const IntervalRange& r = right.ranges_[i]; - if (l.IsBefore(r)) { - break; - } else if (l.IsAfter(r)) { - rightIndex = i + 1; - continue; - } - result.Add(IntervalRange::Intersection(l, r)); - } - } - - return result; -} - -void IntervalRanges::Add(const IntervalRange& range) { - const IntervalRange rangeToAdd = range; - if (ranges_.size() > 1 && rangeToAdd.start <= ranges_.back().end) { - throw ParquetException("Ranges must be added in order"); - } - ranges_.push_back(rangeToAdd); -} - -const std::vector& IntervalRanges::GetRanges() const { return ranges_; } - -IntervalRowRangesIterator::IntervalRowRangesIterator( - const std::vector& ranges) - : ranges_(ranges) {} -IntervalRowRangesIterator::~IntervalRowRangesIterator() {} - -std::variant IntervalRowRangesIterator::NextRange() { - if (current_index_ >= ranges_.size()) return End(); - - return ranges_[current_index_++]; -} - // ---------------------------------------------------------------------- // RecordReader @@ -2467,7 +2326,7 @@ RecordSkipper::RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ra // copy row_ranges IntervalRanges skip_pages; for (auto& page : pages.GetRanges()) { - if (!orig_row_ranges.IsOverlapping(page)) { + if (!orig_row_ranges.IsOverlapping(page.start, page.end)) { skip_pages.Add(page); } } @@ -2476,10 +2335,9 @@ RecordSkipper::RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ra range_iter_ = row_ranges_->NewIterator(); current_range_variant = range_iter_->NextRange(); - total_rows_to_process_ = pages.RowCount() - skip_pages.RowCount(); + total_rows_to_process_ = pages.num_rows() - skip_pages.num_rows(); } - int64_t RecordSkipper::AdviseNext(const int64_t current_rg_processed) { if (current_range_variant.index() == 2) { return 0; @@ -2518,8 +2376,8 @@ void RecordSkipper::AdjustRanges(IntervalRanges& skip_pages, while (orig_range_variant.index() != 2) { const auto& origin_range = std::get(orig_range_variant); while (skip_iter != skip_pages.GetRanges().end() && - skip_iter->IsBefore(origin_range)) { - skipped_rows += skip_iter->Count(); + IntervalRangeUtils::IsBefore(*skip_iter, origin_range)) { + skipped_rows += IntervalRangeUtils::Count(*skip_iter); ++skip_iter; } diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 45f566404940..d07273deab85 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -22,12 +22,12 @@ #include #include -#include "page_index.h" #include "parquet/exception.h" #include "parquet/level_conversion.h" #include "parquet/metadata.h" #include "parquet/platform.h" #include "parquet/properties.h" +#include "parquet/row_range.h" #include "parquet/schema.h" #include "parquet/types.h" @@ -303,124 +303,6 @@ class TypedColumnReader : public ColumnReader { int32_t* dict_len) = 0; }; -// Represent a range to read. The range is inclusive on both ends. -struct IntervalRange { - static IntervalRange Intersection(const IntervalRange& left, - const IntervalRange& right) { - if (left.start <= right.start) { - if (left.end >= right.start) { - return {right.start, std::min(left.end, right.end)}; - } - } else if (right.end >= left.start) { - return {left.start, std::min(left.end, right.end)}; - } - return {-1, -1}; // Return a default Range object if no intersection range found - } - - IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { - if (start > end) { - throw ParquetException("Invalid range with start: " + std::to_string(start) + - " and end: " + std::to_string(end)); - } - } - - size_t Count() const { - if(!IsValid()) { - throw ParquetException("Invalid range with start: " + std::to_string(start) + - " and end: " + std::to_string(end)); - } - return end - start + 1; - } - - bool IsBefore(const IntervalRange& other) const { return end < other.start; } - - bool IsAfter(const IntervalRange& other) const { return start > other.end; } - - bool IsOverlap(const IntervalRange& other) const { - return !IsBefore(other) && !IsAfter(other); - } - - bool IsValid() const { return start >= 0 && end >= 0 && end >= start; } - - std::string ToString() const { - return "(" + std::to_string(start) + ", " + std::to_string(end) + ")"; - } - - // inclusive - int64_t start = -1; - // inclusive - int64_t end = -1; -}; - -struct BitmapRange { - int64_t offset; - // zero added to, if there are less than 64 elements left in the column. - uint64_t bitmap; -}; - -struct End {}; - -// Represent a set of ranges to read. The ranges are sorted and non-overlapping. -class RowRanges { - public: - RowRanges() = default; - virtual ~RowRanges() = default; - virtual size_t RowCount() const = 0; - virtual int64_t LastRow() const = 0; - virtual bool IsValid() const = 0; - virtual bool IsOverlapping(const IntervalRange& searchRange) const = 0; - // Given a RowRanges with rows accross all RGs, split it into N RowRanges, where N = number of RGs - // e.g.: suppose we have 2 RGs: [0-99] and [100-199], and user is interested in RowRanges [90-110], then - // this function will return 2 RowRanges: [90-99] and [0-10] - virtual std::vector> SplitByRowGroups(const std::vector& rows_per_rg) const = 0; - virtual std::string ToString() const = 0; - - // Returns a vector of PageLocations that must be read all to get values for - // all included in this range virtual std::vector - // PageIndexesToInclude(const std::vector& all_pages) = 0; - - class Iterator { - public: - virtual std::variant NextRange() = 0; - virtual ~Iterator() = default; - }; - virtual std::unique_ptr NewIterator() const = 0; - -}; - -class IntervalRanges : public RowRanges { - public: - IntervalRanges(); - explicit IntervalRanges(const IntervalRange& range); - explicit IntervalRanges(const std::vector& ranges); - std::unique_ptr NewIterator() const override; - size_t RowCount() const override; - int64_t LastRow() const override; - bool IsValid() const override; - bool IsOverlapping(const IntervalRange& searchRange) const override; - std::string ToString() const override; - std::vector> SplitByRowGroups( - const std::vector& rows_per_rg) const override; - static IntervalRanges Intersection(const IntervalRanges& left, - const IntervalRanges& right); - void Add(const IntervalRange& range); - const std::vector& GetRanges() const; - - private: - std::vector ranges_; -}; - -class IntervalRowRangesIterator : public RowRanges::Iterator { - public: - IntervalRowRangesIterator(const std::vector& ranges); - ~IntervalRowRangesIterator() override; - std::variant NextRange() override; - - private: - const std::vector& ranges_; - size_t current_index_ = 0; -}; - namespace internal { // A RecordSkipper is used to skip uncessary rows within each pages. @@ -569,7 +451,11 @@ class PARQUET_EXPORT RecordReader { void reset_current_rg_processed_records() { current_rg_processed_records_ = 0; } - void set_record_skipper(const std::shared_ptr& skipper) { skipper_ = skipper; } + void set_record_skipper(std::unique_ptr skipper) { + skipper_ = std::move(skipper); + } + + void reset_record_skipper() { skipper_.reset(); } protected: /// \brief Indicates if we can have nullable values. Note that repeated fields @@ -623,7 +509,7 @@ class PARQUET_EXPORT RecordReader { // vector. bool read_dense_for_nullable_ = false; - std::shared_ptr skipper_ = NULLPTR; + std::unique_ptr skipper_ = NULLPTR; }; class BinaryRecordReader : virtual public RecordReader { diff --git a/cpp/src/parquet/range_reader_test.cc b/cpp/src/parquet/range_reader_test.cc index cde60c583f50..04510143e54c 100644 --- a/cpp/src/parquet/range_reader_test.cc +++ b/cpp/src/parquet/range_reader_test.cc @@ -39,7 +39,7 @@ using parquet::IntervalRanges; std::string random_string(std::string::size_type length) { static auto& chrs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; - static std::mt19937 rg{std::random_device{}()}; + static std::mt19937 rg = std::mt19937(std::random_device()()); static std::uniform_int_distribution pick(0, sizeof(chrs) - 2); std::string s; @@ -240,10 +240,18 @@ void check_rb(std::unique_ptr rb_reader, } ASSERT_EQ(expected_rows, total_rows); - if (checking_col("a", column_names)) ASSERT_EQ(expected_sum * 2, sum_a); - if (checking_col("b", column_names)) ASSERT_EQ(expected_sum * 3, sum_b); - if (checking_col("c", column_names)) ASSERT_EQ(expected_sum, sum_c); - if (checking_col("d", column_names)) ASSERT_EQ(expected_sum, sum_d); + if (checking_col("a", column_names)) { + ASSERT_EQ(expected_sum * 2, sum_a); + } + if (checking_col("b", column_names)) { + ASSERT_EQ(expected_sum * 3, sum_b); + } + if (checking_col("c", column_names)) { + ASSERT_EQ(expected_sum, sum_c); + } + if (checking_col("d", column_names)) { + ASSERT_EQ(expected_sum, sum_d); + } } class TestRecordBatchReaderWithRanges : public testing::Test { @@ -279,7 +287,7 @@ TEST_F(TestRecordBatchReaderWithRanges, TestRangesSplit) {} TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { std::unique_ptr rb_reader; - IntervalRanges rows{{IntervalRange{0, 9}, IntervalRange{40, 49}, IntervalRange{80, 89}, IntervalRange{90, 99}}}; + IntervalRanges rows{{{0, 9}, {40, 49}, {80, 89}, {90, 99}}}; const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); @@ -301,7 +309,8 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { std::unique_ptr rb_reader; - IntervalRanges rows{{IntervalRange{0, 29}, IntervalRange{30, 59}, IntervalRange{60, 89}, IntervalRange{90, 99}}}; + IntervalRanges rows{{IntervalRange{0, 29}, IntervalRange{30, 59}, IntervalRange{60, 89}, + IntervalRange{90, 99}}}; const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); @@ -341,11 +350,15 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { std::unique_ptr rb_reader; std::vector ranges; for (int64_t i = 0; i < 30; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } for (int64_t i = 60; i < 90; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, @@ -359,25 +372,17 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { std::unique_ptr rb_reader; { - IntervalRanges rows{{IntervalRange{-1, 5}}}; - const std::vector column_indices{0, 1, 2, 3, 4}; - const auto status = - arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); - ASSERT_NOT_OK(status); - EXPECT_TRUE(status.message().find("The provided row range is invalid, keep it " - "monotone and non-interleaving: [(-1, 5)]") != - std::string::npos); + auto create_ranges = []() -> IntervalRanges { + return IntervalRanges{{IntervalRange{-1, 5}}}; + }; + EXPECT_THROW(create_ranges(), parquet::ParquetException); } { - IntervalRanges rows{{IntervalRange{0, 4}, {2, 5}}}; - const std::vector column_indices{0, 1, 2, 3, 4}; - const auto status = - arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); - ASSERT_NOT_OK(status); - EXPECT_TRUE( - status.message().find("The provided row range is invalid, keep it monotone and " - "non-interleaving: [(0, 4), (2, 5)]") != std::string::npos); + auto create_ranges = []() -> IntervalRanges { + return IntervalRanges{{{0, 4}, {2, 5}}}; + }; + EXPECT_THROW(create_ranges(), parquet::ParquetException); } { // will treat as {0,99} @@ -472,11 +477,15 @@ TEST_F(TestRecordBatchReaderWithRangesWithNulls, SelectOneRowSkipOneRow) { std::unique_ptr rb_reader; std::vector ranges; for (int64_t i = 0; i < 30; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } for (int64_t i = 60; i < 90; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, @@ -486,4 +495,4 @@ TEST_F(TestRecordBatchReaderWithRangesWithNulls, SelectOneRowSkipOneRow) { // (10 + 12 + ... + 28) + (60 + 62 ... + 88) = 1320 check_rb(std::move(rb_reader), 30, 1300); } -} \ No newline at end of file +} diff --git a/cpp/src/parquet/row_range.cc b/cpp/src/parquet/row_range.cc new file mode 100644 index 000000000000..fa996a198f43 --- /dev/null +++ b/cpp/src/parquet/row_range.cc @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + + ", keep it monotone and non-interleaving"); + } +} + +std::unique_ptr IntervalRanges::NewIterator() const { + return std::make_unique(ranges_); +} + +size_t IntervalRanges::num_rows() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += IntervalRangeUtils::Count(range); + } + return cnt; +} + +int64_t IntervalRanges::first_row() const { + if (ranges_.empty()) { + throw ParquetException("first_row() called on empty IntervalRanges"); + } + return ranges_.front().start; +} + +int64_t IntervalRanges::last_row() const { + if (ranges_.empty()) { + throw ParquetException("last_row() called on empty IntervalRanges"); + } + return ranges_.back().end; +} + +bool IntervalRanges::IsOverlapping(const int64_t start, const int64_t end) const { + auto searchRange = IntervalRange{start, end}; + auto it = std::lower_bound(ranges_.begin(), ranges_.end(), searchRange, + [](const IntervalRange& r1, const IntervalRange& r2) { + return IntervalRangeUtils::IsBefore(r1, r2); + }); + return it != ranges_.end() && !IntervalRangeUtils::IsAfter(*it, searchRange); +} + +std::string IntervalRanges::ToString() const { + std::string result = "["; + for (const IntervalRange& range : ranges_) { + result += IntervalRangeUtils::ToString(range) + ", "; + } + if (!ranges_.empty()) { + result = result.substr(0, result.size() - 2); + } + result += "]"; + return result; +} + +std::vector> IntervalRanges::SplitByRowRange( + const std::vector& num_rows_per_sub_ranges) const { + if (num_rows_per_sub_ranges.size() <= 1) { + std::unique_ptr single = + std::make_unique(*this); // return a copy of itself + auto ret = std::vector>(); + ret.push_back(std::move(single)); + return ret; + } + + std::vector> result; + + IntervalRanges spaces; + int64_t rows_so_far = 0; + for (size_t i = 0; i < num_rows_per_sub_ranges.size(); ++i) { + auto start = rows_so_far; + rows_so_far += num_rows_per_sub_ranges[i]; + auto end = rows_so_far - 1; + spaces.Add({start, end}); + } + + // each RG's row range forms a space, we need to adjust RowRanges in each space to + // zero based. + for (IntervalRange space : spaces.GetRanges()) { + auto intersection = Intersection(IntervalRanges(space), *this); + + std::unique_ptr zero_based_ranges = + std::make_unique(); + for (const IntervalRange& range : intersection.GetRanges()) { + zero_based_ranges->Add({range.start - space.start, range.end - space.start}); + } + result.push_back(std::move(zero_based_ranges)); + } + + return result; +} + +IntervalRanges IntervalRanges::Intersection(const IntervalRanges& left, + const IntervalRanges& right) { + IntervalRanges result; + + size_t rightIndex = 0; + for (const IntervalRange& l : left.ranges_) { + for (size_t i = rightIndex, n = right.ranges_.size(); i < n; ++i) { + const IntervalRange& r = right.ranges_[i]; + if (IntervalRangeUtils::IsBefore(l, r)) { + break; + } else if (IntervalRangeUtils::IsAfter(l, r)) { + rightIndex = i + 1; + continue; + } + result.Add(IntervalRangeUtils::Intersection(l, r)); + } + } + + return result; +} + +void IntervalRanges::Add(const IntervalRange& range) { + const IntervalRange rangeToAdd = range; + if (ranges_.size() > 1 && rangeToAdd.start <= ranges_.back().end) { + throw ParquetException("Ranges must be added in order"); + } + ranges_.push_back(rangeToAdd); +} + +const std::vector& IntervalRanges::GetRanges() const { return ranges_; } + +IntervalRowRangesIterator::IntervalRowRangesIterator( + const std::vector& ranges) + : ranges_(ranges) {} + +IntervalRowRangesIterator::~IntervalRowRangesIterator() {} + +std::variant IntervalRowRangesIterator::NextRange() { + if (current_index_ >= ranges_.size()) return End(); + + return ranges_[current_index_++]; +} +} // namespace parquet diff --git a/cpp/src/parquet/row_range.h b/cpp/src/parquet/row_range.h new file mode 100644 index 000000000000..4e7c2631eb6a --- /dev/null +++ b/cpp/src/parquet/row_range.h @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { + return !IsBefore(self, other) && !IsAfter(self, other); + } +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. +class RowRanges { + public: + virtual ~RowRanges() = default; + /// \brief Total number of rows in the row ranges. + virtual size_t num_rows() const = 0; + /// \brief First row in the ranges + virtual int64_t first_row() const = 0; + /// \brief Last row in the ranges + virtual int64_t last_row() const = 0; + /// \brief Whether the given range from start to end overlaps with the row ranges. + virtual bool IsOverlapping(int64_t start, int64_t end) const = 0; + /// \brief Split the row ranges into sub row ranges according to the + /// specified number of rows per sub row ranges. A typical use case is + /// to convert file based RowRanges to row group based RowRanges. + /// + /// \param num_rows_per_sub_ranges number of rows per sub row range. + virtual std::vector> SplitByRowRange( + const std::vector& num_rows_per_sub_ranges) const = 0; + /// \brief Readable string representation + virtual std::string ToString() const = 0; + + class Iterator { + public: + virtual std::variant NextRange() = 0; + virtual ~Iterator() = default; + }; + /// \brief Create an iterator to iterate over the ranges + virtual std::unique_ptr NewIterator() const = 0; +}; + +class IntervalRanges : public RowRanges { + public: + IntervalRanges(); + explicit IntervalRanges(const IntervalRange& range); + explicit IntervalRanges(const std::vector& ranges); + std::unique_ptr NewIterator() const override; + size_t num_rows() const override; + int64_t first_row() const override; + int64_t last_row() const override; + bool IsOverlapping(int64_t start, int64_t end) const override; + std::string ToString() const override; + std::vector> SplitByRowRange( + const std::vector& num_rows_per_sub_ranges) const override; + static IntervalRanges Intersection(const IntervalRanges& left, + const IntervalRanges& right); + void Add(const IntervalRange& range); + const std::vector& GetRanges() const; + + private: + std::vector ranges_; +}; + +class IntervalRowRangesIterator : public RowRanges::Iterator { + public: + explicit IntervalRowRangesIterator(const std::vector& ranges); + ~IntervalRowRangesIterator() override; + std::variant NextRange() override; + + private: + const std::vector& ranges_; + size_t current_index_ = 0; +}; +} // namespace parquet diff --git a/cpp/src/parquet/row_range_test.cc b/cpp/src/parquet/row_range_test.cc index 44327baab04c..bf0563211b8e 100644 --- a/cpp/src/parquet/row_range_test.cc +++ b/cpp/src/parquet/row_range_test.cc @@ -17,7 +17,8 @@ #include #include "parquet/column_reader.h" -using namespace parquet; +using parquet::IntervalRange; +using parquet::IntervalRanges; class RowRangesTest : public ::testing::Test { protected: @@ -28,7 +29,7 @@ TEST_F(RowRangesTest, EmptyRG_ReturnsOriginalRowRanges) { row_ranges.Add(IntervalRange(0, 10)); std::vector rows_per_rg; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 1); auto iter = result[0]->NewIterator(); @@ -42,7 +43,7 @@ TEST_F(RowRangesTest, SingleRG_ReturnsOriginalRowRanges2) { row_ranges.Add(IntervalRange(0, 10)); std::vector rows_per_rg = {11}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 1); auto iter = result[0]->NewIterator(); @@ -56,7 +57,7 @@ TEST_F(RowRangesTest, ReturnsTwoRowRanges) { row_ranges.Add(IntervalRange(0, 10)); std::vector rows_per_rg = {5, 6}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 2); { auto iter = result[0]->NewIterator(); @@ -78,7 +79,7 @@ TEST_F(RowRangesTest, ReturnsMultipleRowRanges) { row_ranges.Add(IntervalRange(0, 11)); std::vector rows_per_rg = {3, 4, 100}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 3); { auto iter = result[0]->NewIterator(); @@ -110,7 +111,7 @@ TEST_F(RowRangesTest, MultipleInputRange) { std::vector rows_per_rg = {100, 100}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 2); { auto iter = result[0]->NewIterator(); @@ -142,7 +143,7 @@ TEST_F(RowRangesTest, MultipleSplitPoints_ReturnWithEmptyRowRanges) { row_ranges.Add(IntervalRange(11, 18)); std::vector rows_per_rg = {5, 5, 5, 5, 5}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 5); { auto iter = result[0]->NewIterator(); @@ -176,7 +177,7 @@ TEST_F(RowRangesTest, RangeExceedRG) { row_ranges.Add(IntervalRange(0, 10)); std::vector rows_per_rg = {5, 3}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 2); { auto iter = result[0]->NewIterator(); From 047be8e91a92e0ee20fc677c06e8b8a28e544acb Mon Sep 17 00:00:00 2001 From: Hongbin Ma Date: Thu, 18 Jan 2024 18:21:54 +0800 Subject: [PATCH 3/3] skip io coding done one thread for each logical range fix prebuffer conflict with datappagefilter fix coalesce problem ading tests fix style --- cpp/src/arrow/io/caching.cc | 185 ++++++++++++++++++++---- cpp/src/arrow/io/caching.h | 3 +- cpp/src/arrow/io/interfaces.cc | 15 ++ cpp/src/arrow/io/interfaces.h | 4 + cpp/src/parquet/CMakeLists.txt | 7 +- cpp/src/parquet/arrow/reader.cc | 15 +- cpp/src/parquet/arrow/reader_internal.h | 1 + cpp/src/parquet/file_reader.cc | 79 ++++++++-- cpp/src/parquet/file_reader.h | 10 +- cpp/src/parquet/range_reader_test.cc | 117 ++++++++++++--- 10 files changed, 363 insertions(+), 73 deletions(-) diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc index bd61c40693a2..75a3c47a3063 100644 --- a/cpp/src/arrow/io/caching.cc +++ b/cpp/src/arrow/io/caching.cc @@ -24,6 +24,11 @@ #include "arrow/buffer.h" #include "arrow/io/caching.h" + +#include +#include +#include + #include "arrow/io/util_internal.h" #include "arrow/result.h" #include "arrow/util/future.h" @@ -132,13 +137,56 @@ CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_mil namespace internal { +std::vector GetReadRangesExcludingHoles(const ReadRange& read_range, + const std::vector& holes) { + std::vector ranges; + int64_t offset = read_range.offset; + for (const auto& hole : holes) { + if (hole.offset >= read_range.offset + read_range.length || + hole.offset + hole.length <= read_range.offset) { + throw parquet::ParquetException("Parquet error: holes not subset of read range"); + } + if (hole.offset > offset) { + ranges.push_back({offset, hole.offset - offset}); + } + offset = hole.offset + hole.length; + } + if (offset < read_range.offset + read_range.length) { + ranges.push_back({offset, read_range.offset + read_range.length - offset}); + } + return ranges; +} + +int64_t GetActualBufferOffset(const int64_t offset, const int64_t buffer_start_offset, + const std::vector& holes) { + int padding = 0; + for (const auto& hole : holes) { + if (hole.offset >= offset) { + break; + } + if (hole.offset + hole.length <= offset) { + padding += hole.length; + } else { + padding += offset - hole.offset; + } + } + return offset - padding - buffer_start_offset; +} + struct RangeCacheEntry { - ReadRange range; - Future> future; + ReadRange range; // nominal range for this entry + std::vector holes; // nominal range - holes = actual read ranges + Future future; // the future for actual read ranges + std::shared_ptr buffer; // actual read ranges are read into this buffer with + // pre-calculated position RangeCacheEntry() = default; - RangeCacheEntry(const ReadRange& range_, Future> future_) - : range(range_), future(std::move(future_)) {} + RangeCacheEntry(const ReadRange& range, std::vector& holes, + Future& future, std::unique_ptr& buffer) + : range(range), + holes(std::move(holes)), + future(std::move(future)), + buffer(std::move(buffer)) {} friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) { return left.range.offset < right.range.offset; @@ -156,28 +204,87 @@ struct ReadRangeCache::Impl { virtual ~Impl() = default; - // Get the future corresponding to a range - virtual Future> MaybeRead(RangeCacheEntry* entry) { - return entry->future; + virtual Future MaybeRead(RangeCacheEntry* entry) { return entry->future; } + + Future DoAsyncRead(const ReadRange& range, const std::vector& holes, + std::unique_ptr& buffer) const { + int64_t total_size = range.length; + for (const auto& hole : holes) { + total_size -= hole.length; + } + + buffer = *AllocateBuffer(total_size, 64, ctx.pool()); + + auto actual_read_ranges = GetReadRangesExcludingHoles(range, holes); + return file->ReadAsync(ctx, actual_read_ranges, buffer->mutable_data()); } // Make cache entries for ranges virtual std::vector MakeCacheEntries( - const std::vector& ranges) { + const std::vector& ranges, + std::vector>& holes_foreach_range) { std::vector new_entries; new_entries.reserve(ranges.size()); - for (const auto& range : ranges) { - new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length)); + + for (size_t i = 0; i < ranges.size(); i++) { + std::unique_ptr buffer; + auto future = DoAsyncRead(ranges[i], holes_foreach_range[i], buffer); + new_entries.emplace_back(ranges[i], holes_foreach_range[i], future, buffer); } return new_entries; } + void CoalesceRanges( + std::vector& ranges, + const std::vector>& holes_foreach_range_orig, + std::vector>& holes_foreach_range_new) const { + auto result = CoalesceReadRanges(std::move(ranges), options.hole_size_limit, + options.range_size_limit); + if (!result.ok()) { + throw parquet::ParquetException("Failed to coalesce ranges: " + + result.status().message()); + } + ranges = std::move(result.ValueOrDie()); + holes_foreach_range_new.resize(ranges.size()); + + std::vector flatten_holes; + size_t index = 0; + flatten_holes.reserve( + std::accumulate(holes_foreach_range_orig.begin(), holes_foreach_range_orig.end(), + 0, [](int sum, const auto& v) { return sum + v.size(); })); + for (const auto& v : holes_foreach_range_orig) { + std::move(v.begin(), v.end(), std::back_inserter(flatten_holes)); + } + for (size_t i = 0; i < ranges.size(); ++i) { + std::vector current_range_holes; + const auto& range = ranges.at(i); + for (; index < flatten_holes.size(); ++index) { + const auto& hole = flatten_holes.at(index); + if (hole.offset >= range.offset + range.length) { + break; + } + if (!(hole.offset >= range.offset && + hole.offset + hole.length <= range.offset + range.length)) { + throw parquet::ParquetException( + "Parquet error: holes not subset of read range"); + } + current_range_holes.push_back({hole.offset, hole.length}); + } + holes_foreach_range_new.at(i) = std::move(current_range_holes); + } + if (ranges.size() != holes_foreach_range_new.size()) { + throw parquet::ParquetException("ranges.size() != holes_foreach_range_new.size()"); + } + } + // Add the given ranges to the cache, coalescing them where possible - virtual Status Cache(std::vector ranges) { - ARROW_ASSIGN_OR_RAISE( - ranges, internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit, - options.range_size_limit)); - std::vector new_entries = MakeCacheEntries(ranges); + virtual Status Cache(std::vector ranges, + std::vector> holes_foreach_range_orig) { + std::vector> holes_foreach_range_new; + CoalesceRanges(ranges, holes_foreach_range_orig, holes_foreach_range_new); + + std::vector new_entries = + MakeCacheEntries(ranges, holes_foreach_range_new); // Add new entries, themselves ordered by offset if (entries.size() > 0) { std::vector merged(entries.size() + new_entries.size()); @@ -205,21 +312,32 @@ struct ReadRangeCache::Impl { return entry.range.offset + entry.range.length < range.offset + range.length; }); if (it != entries.end() && it->range.Contains(range)) { - auto fut = MaybeRead(&*it); - ARROW_ASSIGN_OR_RAISE(auto buf, fut.result()); + const auto fut = MaybeRead(&*it); + const auto result = fut.result(); + if (!result.ok()) { + throw parquet::ParquetException( + "Parquet error: read failed for one of the sub range"); + } + if (options.lazy && options.prefetch_limit > 0) { int64_t num_prefetched = 0; for (auto next_it = it + 1; next_it != entries.end() && num_prefetched < options.prefetch_limit; ++next_it) { if (!next_it->future.is_valid()) { - next_it->future = - file->ReadAsync(ctx, next_it->range.offset, next_it->range.length); + std::unique_ptr buffer; + next_it->future = DoAsyncRead(next_it->range, next_it->holes, buffer); + next_it->buffer = std::move(buffer); } ++num_prefetched; } } - return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length); + + const auto actual_start = + GetActualBufferOffset(range.offset, it->range.offset, it->holes); + const auto actual_end = + GetActualBufferOffset(range.offset + range.length, it->range.offset, it->holes); + return SliceBuffer(it->buffer, actual_start, actual_end - actual_start); } return Status::Invalid("ReadRangeCache did not find matching cache entry"); } @@ -264,29 +382,37 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl { virtual ~LazyImpl() = default; - Future> MaybeRead(RangeCacheEntry* entry) override { + Future MaybeRead(RangeCacheEntry* entry) override { // Called by superclass Read()/WaitFor() so we have the lock if (!entry->future.is_valid()) { - entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length); + std::unique_ptr buffer; + entry->future = DoAsyncRead(entry->range, entry->holes, buffer); + entry->buffer = std::move(buffer); } return entry->future; } std::vector MakeCacheEntries( - const std::vector& ranges) override { + const std::vector& ranges, + std::vector>& holes_foreach_range) override { std::vector new_entries; new_entries.reserve(ranges.size()); - for (const auto& range : ranges) { + for (size_t i = 0; i < ranges.size(); ++i) { + const auto& range = ranges[i]; + auto& holes = holes_foreach_range[i]; + auto temp_buffer = std::make_unique(NULLPTR, 0); + auto temp_future = Future(); // In the lazy variant, don't read data here - later, a call to Read or WaitFor // will call back to MaybeRead (under the lock) which will fill the future. - new_entries.emplace_back(range, Future>()); + new_entries.emplace_back(range, holes, temp_future, temp_buffer); } return new_entries; } - Status Cache(std::vector ranges) override { + Status Cache(std::vector ranges, + std::vector> holes_foreach_range) override { std::unique_lock guard(entry_mutex); - return ReadRangeCache::Impl::Cache(std::move(ranges)); + return ReadRangeCache::Impl::Cache(std::move(ranges), std::move(holes_foreach_range)); } Result> Read(ReadRange range) override { @@ -317,8 +443,9 @@ ReadRangeCache::ReadRangeCache(std::shared_ptr owned_file, ReadRangeCache::~ReadRangeCache() = default; -Status ReadRangeCache::Cache(std::vector ranges) { - return impl_->Cache(std::move(ranges)); +Status ReadRangeCache::Cache(std::vector ranges, + std::vector> holes_foreach_range) { + return impl_->Cache(std::move(ranges), std::move(holes_foreach_range)); } Result> ReadRangeCache::Read(ReadRange range) { diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h index e2b911fafdbb..4f5f80182656 100644 --- a/cpp/src/arrow/io/caching.h +++ b/cpp/src/arrow/io/caching.h @@ -131,7 +131,8 @@ class ARROW_EXPORT ReadRangeCache { /// /// The caller must ensure that the ranges do not overlap with each other, /// nor with previously cached ranges. Otherwise, behaviour will be undefined. - Status Cache(std::vector ranges); + Status Cache(std::vector ranges, + std::vector> holes_foreach_range = {}); /// \brief Read a range previously given to Cache(). Result> Read(ReadRange range); diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 1d35549cc434..cb294db7fa1d 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -171,6 +171,21 @@ Future> RandomAccessFile::ReadAsync(const IOContext& ctx ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); })); } +Future RandomAccessFile::ReadAsync(const IOContext& ctx, + std::vector& ranges, void* out) { + auto self = checked_pointer_cast(shared_from_this()); + return DeferNotOk(internal::SubmitIO( + ctx, [self, ranges = std::move(ranges), out]() mutable -> Result { + int64_t read_size = 0; + for (const auto& r : ranges) { + RETURN_NOT_OK( + self->ReadAt(r.offset, r.length, static_cast(out) + read_size)); + read_size += r.length; + } + return read_size; + })); +} + Future> RandomAccessFile::ReadAsync(int64_t position, int64_t nbytes) { return ReadAsync(io_context(), position, nbytes); diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index b36c38c6d486..7e29ff1613ad 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -296,6 +296,10 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { virtual Future> ReadAsync(const IOContext&, int64_t position, int64_t nbytes); + /// EXPERIMENTAL: Read data asynchronously. The dest is provided by `out` + virtual Future ReadAsync(const IOContext& ctx, std::vector& ranges, + void* out); + /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext. Future> ReadAsync(int64_t position, int64_t nbytes); diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 19a9ccf58aa3..39e0794e750b 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -360,6 +360,11 @@ add_parquet_test(internals-test set_source_files_properties(public_api_test.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON SKIP_UNITY_BUILD_INCLUSION ON) +add_parquet_test(rowrange-read-test + SOURCES + row_range_test.cc + range_reader_test.cc +) add_parquet_test(reader-test SOURCES @@ -367,8 +372,6 @@ add_parquet_test(reader-test level_conversion_test.cc column_scanner_test.cc reader_test.cc - range_reader_test.cc - row_range_test.cc stream_reader_test.cc test_util.cc) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index e151fb8c4dee..c258692dc467 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -223,6 +223,7 @@ class FileReaderImpl : public FileReader { ctx->included_leaves = included_leaves; ctx->row_ranges_per_rg = row_ranges_per_rg; // copy the shared pointer to extend its lifecycle + ctx->pre_buffer_enabled = properties().pre_buffer(); return GetReader(manifest_.schema_fields[i], ctx, out); } @@ -667,7 +668,13 @@ class LeafReader : public ColumnReaderImpl { checkAndGetPageRanges(*row_ranges, page_ranges); // part 1, skip decompressing & decoding unnecessary pages - page_reader->set_data_page_filter(RowRangesPageFilter(*row_ranges, page_ranges)); + if (!ctx_->pre_buffer_enabled) { + page_reader->set_data_page_filter( + RowRangesPageFilter(*row_ranges, page_ranges)); + } else { + // Pre buffer already skipped useless pages, so should not apply + // data_page_filter here again. + } // part 2, skip unnecessary rows in necessary pages record_reader_->set_record_skipper( @@ -1158,7 +1165,7 @@ Status FileReaderImpl::GetRecordBatchReaderWithRowRanges( // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled BEGIN_PARQUET_CATCH_EXCEPTIONS reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(), - reader_properties_.cache_options()); + reader_properties_.cache_options(), row_ranges_per_rg); END_PARQUET_CATCH_EXCEPTIONS } @@ -1366,7 +1373,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, if (reader_properties_.pre_buffer()) { BEGIN_PARQUET_CATCH_EXCEPTIONS reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(), - reader_properties_.cache_options()); + reader_properties_.cache_options(), {}); END_PARQUET_CATCH_EXCEPTIONS } ::arrow::AsyncGenerator row_group_generator = @@ -1403,7 +1410,7 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, BEGIN_PARQUET_CATCH_EXCEPTIONS parquet_reader()->PreBuffer(row_groups, column_indices, reader_properties_.io_context(), - reader_properties_.cache_options()); + reader_properties_.cache_options(), {}); END_PARQUET_CATCH_EXCEPTIONS } diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index b30aef2691c1..961e26052d3e 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -114,6 +114,7 @@ struct ReaderContext { bool filter_leaves; std::shared_ptr> included_leaves; std::shared_ptr>> row_ranges_per_rg; + bool pre_buffer_enabled; bool IncludesLeaf(int leaf_index) const { if (this->filter_leaves) { diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index b3dd1d6054ac..39c16e5bddf6 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -375,13 +375,39 @@ class SerializedFile : public ParquetFileReader::Contents { file_metadata_ = std::move(metadata); } - void PreBuffer(const std::vector& row_groups, - const std::vector& column_indices, - const ::arrow::io::IOContext& ctx, - const ::arrow::io::CacheOptions& options) { + auto GetRowGroupPageIndexReader(int row_group_index, bool required) { + auto ret = GetPageIndexReader()->RowGroup(row_group_index); + + if (required) { + if (!ret) { + throw ParquetException("Page index is required but not found for row group " + + std::to_string(row_group_index)); + } + } + return ret; + } + + static auto GetColumnOffsetIndex(std::shared_ptr rg_page_idx, + int column_index, bool required) { + auto ret = rg_page_idx->GetOffsetIndex(column_index); + + if (required) { + if (!ret) { + throw ParquetException("Offset index is required but not found for column " + + std::to_string(column_index)); + } + } + return ret; + } + + void PreBuffer( + const std::vector& row_groups, const std::vector& column_indices, + const ::arrow::io::IOContext& ctx, const ::arrow::io::CacheOptions& options, + const std::shared_ptr>>& row_ranges_per_rg) { cached_source_ = std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options); std::vector<::arrow::io::ReadRange> ranges; + std::vector> holes_foreach_range; prebuffered_column_chunks_.clear(); int num_cols = file_metadata_->num_columns(); // a bitmap for buffered columns. @@ -395,12 +421,45 @@ class SerializedFile : public ParquetFileReader::Contents { } for (int row : row_groups) { prebuffered_column_chunks_[row] = buffer_columns; + const int64_t num_rows = file_metadata_->RowGroup(row)->num_rows(); + const auto rg_page_idx = GetRowGroupPageIndexReader(row, true); + for (int col : column_indices) { ranges.push_back( ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); + + std::vector<::arrow::io::ReadRange> holes; + const auto offset_idx = GetColumnOffsetIndex(rg_page_idx, col, true); + const auto& page_locations = offset_idx->page_locations(); + for (size_t i = 0; i < page_locations.size(); i++) { + IntervalRange page_row_interval = {-1, -1}; + if (i != page_locations.size() - 1) { + page_row_interval = IntervalRange{page_locations[i].first_row_index, + page_locations[i + 1].first_row_index - 1}; + } else { + page_row_interval = + IntervalRange{page_locations[i].first_row_index, num_rows - 1}; + } + auto page_offset_range = ::arrow::io::ReadRange{ + page_locations[i].offset, page_locations[i].compressed_page_size}; + if (!row_ranges_per_rg->at(row)->IsOverlapping(page_row_interval.start, + page_row_interval.end)) { + if (holes.empty()) { + holes.push_back({page_offset_range.offset, page_offset_range.length}); + } else { + if (holes.back().offset + holes.back().length == page_offset_range.offset) { + holes.back().length += page_offset_range.length; + } else { + holes.push_back({page_offset_range.offset, page_offset_range.length}); + } + } + } + } + holes_foreach_range.emplace_back(std::move(holes)); } } - PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges)); + PARQUET_THROW_NOT_OK( + cached_source_->Cache(std::move(ranges), std::move(holes_foreach_range))); } ::arrow::Future<> WhenBuffered(const std::vector& row_groups, @@ -887,14 +946,14 @@ std::shared_ptr ParquetFileReader::RowGroup(int i) { return contents_->GetRowGroup(i); } -void ParquetFileReader::PreBuffer(const std::vector& row_groups, - const std::vector& column_indices, - const ::arrow::io::IOContext& ctx, - const ::arrow::io::CacheOptions& options) { +void ParquetFileReader::PreBuffer( + const std::vector& row_groups, const std::vector& column_indices, + const ::arrow::io::IOContext& ctx, const ::arrow::io::CacheOptions& options, + const std::shared_ptr>>& row_ranges_per_rg) { // Access private methods here SerializedFile* file = ::arrow::internal::checked_cast(contents_.get()); - file->PreBuffer(row_groups, column_indices, ctx, options); + file->PreBuffer(row_groups, column_indices, ctx, options, row_ranges_per_rg); } ::arrow::Future<> ParquetFileReader::WhenBuffered( diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index b59b59f95c2d..c5e5f13eaa1e 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -29,7 +29,7 @@ #include "parquet/properties.h" namespace parquet { - +class RowRanges; class ColumnReader; class FileMetaData; class PageIndexReader; @@ -196,10 +196,10 @@ class PARQUET_EXPORT ParquetFileReader { /// only one row group at a time may be useful. /// /// This method may throw. - void PreBuffer(const std::vector& row_groups, - const std::vector& column_indices, - const ::arrow::io::IOContext& ctx, - const ::arrow::io::CacheOptions& options); + void PreBuffer( + const std::vector& row_groups, const std::vector& column_indices, + const ::arrow::io::IOContext& ctx, const ::arrow::io::CacheOptions& options, + const std::shared_ptr>>& row_ranges_per_rg); /// Wait for the specified row groups and column indices to be pre-buffered. /// diff --git a/cpp/src/parquet/range_reader_test.cc b/cpp/src/parquet/range_reader_test.cc index 04510143e54c..80f95f5811c5 100644 --- a/cpp/src/parquet/range_reader_test.cc +++ b/cpp/src/parquet/range_reader_test.cc @@ -30,18 +30,21 @@ #include #include +#include +#include #include #include using parquet::IntervalRange; using parquet::IntervalRanges; -std::string random_string(std::string::size_type length) { +std::string random_string() { static auto& chrs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; - - static std::mt19937 rg = std::mt19937(std::random_device()()); + static std::mt19937 rg{std::random_device()()}; static std::uniform_int_distribution pick(0, sizeof(chrs) - 2); + int length = std::rand() % 100; + std::string s; s.reserve(length); while (length--) s += chrs[pick(rg)]; @@ -51,10 +54,10 @@ std::string random_string(std::string::size_type length) { /// The table looks like (with_nulls = false): // { -// { a: {x: 0, y: 0}, b: {0, 0, 0}, c: "0", d: 0}, -// { a: {x: 1, y: 1}, b: {1, 1, 1}, c: "1", d: 1}, +// { a: {x: 0, y: 0}, b: {0, 0, 0}, c: "0", d: 0}, +// { a: {x: 1, y: 1}, b: {1, 1, 1}, c: "1", d: 1}, // ... -// { a: {x: 99, y: 99}, b: {99, 99, 99}, c: "99", d: 99} +// { a: {x: 99, y: 99}, b: {99, 99, 99}, c: "99", d: 99} // } arrow::Result> GetTable(bool with_nulls = false) { // if with_nulls, the generated table should null values @@ -117,7 +120,7 @@ arrow::Result> GetTable(bool with_nulls = false) { uint8_t valid_bytes[100]; for (size_t i = 0; i < 100; i++) { // add more chars to make this column unaligned with other columns' page - strs.push_back(std::to_string(i) + random_string(20)); + strs.push_back(std::to_string(i) + random_string()); valid_bytes[i] = flags[i]; } ARROW_RETURN_NOT_OK(string_builder.AppendValues(strs, &valid_bytes[0])); @@ -225,8 +228,13 @@ void check_rb(std::unique_ptr rb_reader, std::dynamic_pointer_cast(batch->GetColumnByName("c")); for (auto iter = c_array->begin(); iter != c_array->end(); ++iter) { sum_c += std::stoi(std::string( - (*iter).has_value() ? (*iter).value().substr(0, (*iter).value().size() - 20) - : "0")); + (*iter).has_value() + ? (*iter).value().substr( + 0, std::distance( + (*iter).value().begin(), + std::find_if((*iter).value().begin(), (*iter).value().end(), + [](char c) { return !std::isdigit(c); }))) + : "0")); } } @@ -254,9 +262,37 @@ void check_rb(std::unique_ptr rb_reader, } } -class TestRecordBatchReaderWithRanges : public testing::Test { +class CountingBytesBufferReader : public arrow::io::BufferReader { + public: + using BufferReader::BufferReader; + + arrow::Future> ReadAsync( + const arrow::io::IOContext& context, int64_t position, int64_t nbytes) override { + read_bytes_ += nbytes; + return BufferReader::ReadAsync(context, position, nbytes); + } + + arrow::Future ReadAsync(const arrow::io::IOContext& ctx, + std::vector& ranges, + void* out) override { + read_bytes_ += std::accumulate(ranges.begin(), ranges.end(), 0, + [](int64_t sum, const arrow::io::ReadRange& range) { + return sum + range.length; + }); + return RandomAccessFile::ReadAsync(ctx, ranges, out); + } + + int64_t read_bytes() const { return read_bytes_; } + + private: + int64_t read_bytes_ = 0; +}; + +class TestRecordBatchReaderWithRanges : public testing::TestWithParam { public: void SetUp() { + int mode = GetParam(); + ASSERT_OK_AND_ASSIGN(auto buffer, WriteFullFile()); arrow::MemoryPool* pool = arrow::default_memory_pool(); @@ -267,9 +303,20 @@ class TestRecordBatchReaderWithRanges : public testing::Test { auto arrow_reader_props = parquet::ArrowReaderProperties(); arrow_reader_props.set_batch_size(10); // default 64 * 1024 + if (mode != 0) { + arrow_reader_props.set_pre_buffer(true); + } + + if (mode == 2) { + arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults(); + cache_options.hole_size_limit = 0; + cache_options.lazy = true; + cache_options.prefetch_limit = 2; + arrow_reader_props.set_cache_options(cache_options); + } parquet::arrow::FileReaderBuilder reader_builder; - const auto in_file = std::make_shared(buffer); + in_file = std::make_shared(buffer); ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); reader_builder.memory_pool(pool); reader_builder.properties(arrow_reader_props); @@ -281,11 +328,10 @@ class TestRecordBatchReaderWithRanges : public testing::Test { protected: std::unique_ptr arrow_reader; + std::shared_ptr in_file; }; -TEST_F(TestRecordBatchReaderWithRanges, TestRangesSplit) {} - -TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { +TEST_P(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { std::unique_ptr rb_reader; IntervalRanges rows{{{0, 9}, {40, 49}, {80, 89}, {90, 99}}}; @@ -296,7 +342,7 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { check_rb(std::move(rb_reader), 40, 2280); } -TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { +TEST_P(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { std::unique_ptr rb_reader; IntervalRanges rows{{IntervalRange{0, 7}, IntervalRange{16, 23}}}; @@ -307,7 +353,7 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { check_rb(std::move(rb_reader), 16, 184); } -TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { +TEST_P(TestRecordBatchReaderWithRanges, SelectAllRange) { std::unique_ptr rb_reader; IntervalRanges rows{{IntervalRange{0, 29}, IntervalRange{30, 59}, IntervalRange{60, 89}, IntervalRange{90, 99}}}; @@ -319,7 +365,24 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { check_rb(std::move(rb_reader), 100, 4950); } -TEST_F(TestRecordBatchReaderWithRanges, SelectEmptyRange) { +TEST_P(TestRecordBatchReaderWithRanges, CheckSkipIOEffective) { + std::unique_ptr rb_reader; + IntervalRanges rows{{IntervalRange{3, 3}}}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); + + check_rb(std::move(rb_reader), 1, 3); + + // only one page should be touched when we enable pre_buffer + // the total read bytes should be small (the first RG is about 3000 bytes) + auto mode = GetParam(); + if (mode == 1 || mode == 2) { + ASSERT_LT(in_file->read_bytes(), 1000); + } +} + +TEST_P(TestRecordBatchReaderWithRanges, SelectEmptyRange) { std::unique_ptr rb_reader; IntervalRanges rows{}; @@ -330,13 +393,15 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectEmptyRange) { check_rb(std::move(rb_reader), 0, 0); } -TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { +TEST_P(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { // case 1: only care about RG 0 { std::unique_ptr rb_reader; std::vector ranges; for (int64_t i = 0; i < 30; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, @@ -369,7 +434,7 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { } } -TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { +TEST_P(TestRecordBatchReaderWithRanges, InvalidRanges) { std::unique_ptr rb_reader; { auto create_ranges = []() -> IntervalRanges { @@ -397,6 +462,12 @@ TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { } } +// mode 0: normal read with pre_buffer = false +// mode 1: normal read with pre_buffer = true +// mode 2: with pre_buffer = true and set cache options +INSTANTIATE_TEST_SUITE_P(ParameterizedTestRecordBatchReaderWithRanges, + TestRecordBatchReaderWithRanges, testing::Values(0, 1, 2)); + TEST(TestRecordBatchReaderWithRangesBadCases, NoPageIndex) { using parquet::ArrowWriterProperties; using parquet::WriterProperties; @@ -439,8 +510,10 @@ TEST(TestRecordBatchReaderWithRangesBadCases, NoPageIndex) { std::vector column_indices{0, 1, 2, 3, 4}; auto status = arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); ASSERT_NOT_OK(status); - EXPECT_TRUE(status.message().find("Attempting to read with Ranges but Page Index is " - "not found for Row Group: 0") != std::string::npos); + + EXPECT_TRUE( + status.message().find("Page index is required but not found for row group 0") != + std::string::npos); } class TestRecordBatchReaderWithRangesWithNulls : public testing::Test {