diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 1f3d64f6228..22c36531cdb 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1150,67 +1150,99 @@ void ColumnWriterImpl::FlushBufferedDataPages() { // ---------------------------------------------------------------------- // TypedColumnWriter -template -inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) { - int64_t num_batches = static_cast(total / batch_size); - for (int round = 0; round < num_batches; round++) { - action(round * batch_size, batch_size, /*check_page_size=*/true); - } - // Write the remaining values - if (total % batch_size > 0) { - action(num_batches * batch_size, total % batch_size, /*check_page_size=*/true); - } -} +// DoInBatches for non-repeated columns +template +inline void DoInBatchesNonRepeated(int64_t num_levels, int64_t batch_size, + int64_t max_rows_per_page, Action&& action, + GetBufferedRows&& curr_page_buffered_rows) { + int64_t offset = 0; + while (offset < num_levels) { + int64_t page_buffered_rows = curr_page_buffered_rows(); + ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page); -template -inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, - int64_t num_levels, int64_t batch_size, Action&& action, - bool pages_change_on_record_boundaries) { - if (!pages_change_on_record_boundaries || !rep_levels) { - // If rep_levels is null, then we are writing a non-repeated column. - // In this case, every record contains only one level. - return DoInBatches(num_levels, batch_size, std::forward(action)); + // Every record contains only one level. + int64_t max_batch_size = std::min(batch_size, num_levels - offset); + max_batch_size = std::min(max_batch_size, max_rows_per_page - page_buffered_rows); + int64_t end_offset = offset + max_batch_size; + + ARROW_DCHECK_LE(offset, end_offset); + ARROW_DCHECK_LE(end_offset, num_levels); + + // Always check page limit for non-repeated columns. + action(offset, end_offset - offset, /*check_page_limit=*/true); + + offset = end_offset; } +} +// DoInBatches for repeated columns +template +inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, int64_t batch_size, + int64_t max_rows_per_page, + bool pages_change_on_record_boundaries, Action&& action, + GetBufferedRows&& curr_page_buffered_rows) { int64_t offset = 0; while (offset < num_levels) { - int64_t end_offset = std::min(offset + batch_size, num_levels); - - // Find next record boundary (i.e. rep_level = 0) - while (end_offset < num_levels && rep_levels[end_offset] != 0) { - end_offset++; - } - - if (end_offset < num_levels) { - // This is not the last chunk of batch and end_offset is a record boundary. - // It is a good chance to check the page size. - action(offset, end_offset - offset, /*check_page_size=*/true); - } else { - DCHECK_EQ(end_offset, num_levels); - // This is the last chunk of batch, and we do not know whether end_offset is a - // record boundary. Find the offset to beginning of last record in this chunk, - // so we can check page size. - int64_t last_record_begin_offset = num_levels - 1; - while (last_record_begin_offset >= offset && - rep_levels[last_record_begin_offset] != 0) { - last_record_begin_offset--; + int64_t max_batch_size = std::min(batch_size, num_levels - offset); + int64_t end_offset = num_levels; // end offset of the current batch + int64_t check_page_limit_end_offset = -1; // offset to check page limit (if not -1) + + int64_t page_buffered_rows = curr_page_buffered_rows(); + ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page); + + // Iterate rep_levels to find the shortest sequence that ends before a record + // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size + for (int64_t i = offset; i < num_levels; ++i) { + if (rep_levels[i] == 0) { + // Use the beginning of last record to check page limit. + check_page_limit_end_offset = i; + if (i - offset >= max_batch_size || page_buffered_rows >= max_rows_per_page) { + end_offset = i; + break; + } + page_buffered_rows += 1; } + } - if (offset <= last_record_begin_offset) { - // We have found the beginning of last record and can check page size. - action(offset, last_record_begin_offset - offset, /*check_page_size=*/true); - offset = last_record_begin_offset; - } + ARROW_DCHECK_LE(offset, end_offset); + ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset); - // Write remaining data after the record boundary, - // or all data if no boundary was found. - action(offset, end_offset - offset, /*check_page_size=*/false); + if (check_page_limit_end_offset >= 0) { + // At least one record boundary is included in this batch. + // It is a good chance to check the page limit. + action(offset, check_page_limit_end_offset - offset, /*check_page_limit=*/true); + offset = check_page_limit_end_offset; + } + if (end_offset > offset) { + // The is the last chunk of batch, and we do not know whether end_offset is a + // record boundary so we cannot check page limit if pages cannot change on + // record boundaries. + ARROW_DCHECK_EQ(end_offset, num_levels); + action(offset, end_offset - offset, + /*check_page_limit=*/!pages_change_on_record_boundaries); } offset = end_offset; } } +template +inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page, + bool pages_change_on_record_boundaries, Action&& action, + GetBufferedRows&& curr_page_buffered_rows) { + if (!rep_levels) { + DoInBatchesNonRepeated(num_levels, batch_size, max_rows_per_page, + std::forward(action), + std::forward(curr_page_buffered_rows)); + } else { + DoInBatchesRepeated(def_levels, rep_levels, num_levels, batch_size, max_rows_per_page, + pages_change_on_record_boundaries, std::forward(action), + std::forward(curr_page_buffered_rows)); + } +} + namespace { bool DictionaryDirectWriteSupported(const ::arrow::Array& array) { @@ -1318,7 +1350,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, CheckDictionarySizeLimit(); }; DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(), - WriteChunk, pages_change_on_record_boundaries()); + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + WriteChunk, [this]() { return num_buffered_rows_; }); return value_offset; } @@ -1368,7 +1401,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, CheckDictionarySizeLimit(); }; DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(), - WriteChunk, pages_change_on_record_boundaries()); + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + WriteChunk, [this]() { return num_buffered_rows_; }); } Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, @@ -1769,13 +1803,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, } void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values, - int64_t num_nulls, bool check_page_size) { + int64_t num_nulls, bool check_page_limit) { num_buffered_values_ += num_levels; num_buffered_encoded_values_ += num_values; num_buffered_nulls_ += num_nulls; - if (check_page_size && - current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) { + if (check_page_limit && + (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize() || + num_buffered_rows_ >= properties_->max_rows_per_page())) { AddDataPage(); } } @@ -1996,9 +2031,10 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( return WriteDense(); } - PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels, - properties_->write_batch_size(), WriteIndicesChunk, - pages_change_on_record_boundaries())); + PARQUET_CATCH_NOT_OK( + DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(), + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + WriteIndicesChunk, [this]() { return num_buffered_rows_; })); return Status::OK(); } @@ -2441,9 +2477,10 @@ Status TypedColumnWriterImpl::WriteArrowDense( value_offset += batch_num_spaced_values; }; - PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels, - properties_->write_batch_size(), WriteChunk, - pages_change_on_record_boundaries())); + PARQUET_CATCH_NOT_OK( + DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(), + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + WriteChunk, [this]() { return num_buffered_rows_; })); return Status::OK(); } diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 990125df4e3..48cac04f071 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -37,6 +37,7 @@ #include "parquet/file_writer.h" #include "parquet/geospatial/statistics.h" #include "parquet/metadata.h" +#include "parquet/page_index.h" #include "parquet/platform.h" #include "parquet/properties.h" #include "parquet/statistics.h" @@ -108,7 +109,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { const ColumnProperties& column_properties = ColumnProperties(), const ParquetVersion::type version = ParquetVersion::PARQUET_1_0, const ParquetDataPageVersion data_page_version = ParquetDataPageVersion::V1, - bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize) { + bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize, + int64_t max_rows_per_page = kDefaultMaxRowsPerPage) { sink_ = CreateOutputStream(); WriterProperties::Builder wp_builder; wp_builder.version(version)->data_page_version(data_page_version); @@ -125,6 +127,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { } wp_builder.max_statistics_size(column_properties.max_statistics_size()); wp_builder.data_pagesize(page_size); + wp_builder.max_rows_per_page(max_rows_per_page); writer_properties_ = wp_builder.build(); metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); @@ -506,6 +509,44 @@ void TestPrimitiveWriter::ReadColumnFully(Compression::type compressio this->SyncValuesOut(); } +template <> +void TestPrimitiveWriter::ReadColumnFully(Compression::type compression, + bool page_checksum_verify) { + int64_t total_values = static_cast(this->values_out_.size()); + BuildReader(total_values, compression, page_checksum_verify); + this->data_buffer_.clear(); + + values_read_ = 0; + while (values_read_ < total_values) { + int64_t values_read_recently = 0; + reader_->ReadBatch( + static_cast(this->values_out_.size()) - static_cast(values_read_), + definition_levels_out_.data() + values_read_, + repetition_levels_out_.data() + values_read_, + this->values_out_ptr_ + values_read_, &values_read_recently); + + // Compute the total length of the data + int64_t total_length = 0; + for (int64_t i = 0; i < values_read_recently; i++) { + total_length += this->values_out_[i + values_read_].len; + } + + // Copy contents of the pointers + std::vector data(total_length); + uint8_t* data_ptr = data.data(); + for (int64_t i = 0; i < values_read_recently; i++) { + const ByteArray& value = this->values_out_ptr_[i + values_read_]; + memcpy(data_ptr, value.ptr, value.len); + this->values_out_[i + values_read_].ptr = data_ptr; + data_ptr += value.len; + } + data_buffer_.emplace_back(std::move(data)); + + values_read_ += values_read_recently; + } + this->SyncValuesOut(); +} + typedef ::testing::Types TestTypes; @@ -2075,5 +2116,162 @@ TEST_F(TestGeometryValuesWriter, TestWriteAndReadAllNull) { EXPECT_EQ(geospatial_statistics->geometry_types(), std::nullopt); } +template +class TestColumnWriterMaxRowsPerPage : public TestPrimitiveWriter { + public: + TypedColumnWriter* BuildWriter( + int64_t max_rows_per_page = kDefaultMaxRowsPerPage, + int64_t page_size = kDefaultDataPageSize) { + this->sink_ = CreateOutputStream(); + this->writer_properties_ = WriterProperties::Builder() + .max_rows_per_page(max_rows_per_page) + ->data_pagesize(page_size) + ->enable_write_page_index() + ->build(); + file_writer_ = ParquetFileWriter::Open( + this->sink_, std::static_pointer_cast(this->schema_.schema_root()), + this->writer_properties_); + return static_cast*>( + file_writer_->AppendRowGroup()->NextColumn()); + } + + void CloseWriter() const { file_writer_->Close(); } + + void BuildReader() { + ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish()); + file_reader_ = ParquetFileReader::Open( + std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties()); + this->reader_ = std::static_pointer_cast>( + file_reader_->RowGroup(0)->Column(0)); + } + + void VerifyMaxRowsPerPage(int64_t max_rows_per_page) const { + auto file_meta = file_reader_->metadata(); + int64_t num_row_groups = file_meta->num_row_groups(); + ASSERT_EQ(num_row_groups, 1); + + auto page_index_reader = file_reader_->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + auto row_group_page_index_reader = page_index_reader->RowGroup(0); + ASSERT_NE(row_group_page_index_reader, nullptr); + + auto offset_index = row_group_page_index_reader->GetOffsetIndex(0); + ASSERT_NE(offset_index, nullptr); + size_t num_pages = offset_index->page_locations().size(); + int64_t num_rows = 0; + for (size_t j = 1; j < num_pages; ++j) { + int64_t page_rows = offset_index->page_locations()[j].first_row_index - + offset_index->page_locations()[j - 1].first_row_index; + EXPECT_LE(page_rows, max_rows_per_page); + num_rows += page_rows; + } + if (num_pages != 0) { + int64_t last_page_rows = file_meta->RowGroup(0)->num_rows() - + offset_index->page_locations().back().first_row_index; + EXPECT_LE(last_page_rows, max_rows_per_page); + num_rows += last_page_rows; + } + + EXPECT_EQ(num_rows, file_meta->RowGroup(0)->num_rows()); + } + + private: + std::shared_ptr file_writer_; + std::shared_ptr file_reader_; +}; + +TYPED_TEST_SUITE(TestColumnWriterMaxRowsPerPage, TestTypes); + +TYPED_TEST(TestColumnWriterMaxRowsPerPage, Optional) { + for (int64_t max_rows_per_page : {1, 10, 100}) { + this->SetUpSchema(Repetition::OPTIONAL); + this->GenerateData(SMALL_SIZE); + std::vector definition_levels(SMALL_SIZE, 1); + definition_levels[1] = 0; + + auto writer = this->BuildWriter(max_rows_per_page); + writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr, + this->values_ptr_); + this->CloseWriter(); + + this->BuildReader(); + ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); + } +} + +TYPED_TEST(TestColumnWriterMaxRowsPerPage, OptionalSpaced) { + for (int64_t max_rows_per_page : {1, 10, 100}) { + this->SetUpSchema(Repetition::OPTIONAL); + + this->GenerateData(SMALL_SIZE); + std::vector definition_levels(SMALL_SIZE, 1); + std::vector valid_bits(::arrow::bit_util::BytesForBits(SMALL_SIZE), 255); + + definition_levels[SMALL_SIZE - 1] = 0; + ::arrow::bit_util::ClearBit(valid_bits.data(), SMALL_SIZE - 1); + definition_levels[1] = 0; + ::arrow::bit_util::ClearBit(valid_bits.data(), 1); + + auto writer = this->BuildWriter(max_rows_per_page); + writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr, + valid_bits.data(), 0, this->values_ptr_); + this->CloseWriter(); + + this->BuildReader(); + ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); + } +} + +TYPED_TEST(TestColumnWriterMaxRowsPerPage, Repeated) { + for (int64_t max_rows_per_page : {1, 10, 100}) { + this->SetUpSchema(Repetition::REPEATED); + + this->GenerateData(SMALL_SIZE); + std::vector definition_levels(SMALL_SIZE); + std::vector repetition_levels(SMALL_SIZE); + + // Generate levels to include variable-sized lists and empty lists + for (int i = 0; i < SMALL_SIZE; i++) { + int list_length = (i % 5) + 1; + if (i % 13 == 0 || i % 17 == 0) { + list_length = 0; + } + + if (list_length == 0) { + definition_levels[i] = 0; + repetition_levels[i] = 0; + } else { + for (int j = 0; j < list_length && i + j < SMALL_SIZE; j++) { + definition_levels[i + j] = 1; + repetition_levels[i + j] = (j == 0) ? 0 : 1; + } + i += list_length - 1; + } + } + + auto writer = this->BuildWriter(max_rows_per_page); + writer->WriteBatch(this->values_.size(), definition_levels.data(), + repetition_levels.data(), this->values_ptr_); + this->CloseWriter(); + + this->BuildReader(); + ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); + } +} + +TYPED_TEST(TestColumnWriterMaxRowsPerPage, RequiredLargeChunk) { + for (int64_t max_rows_per_page : {10, 100, 10000}) { + this->GenerateData(LARGE_SIZE); + + auto writer = this->BuildWriter(max_rows_per_page); + writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); + this->CloseWriter(); + + this->BuildReader(); + ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); + } +} + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5a1799c39d7..51b549df22e 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -155,6 +155,7 @@ class PARQUET_EXPORT ReaderProperties { ReaderProperties PARQUET_EXPORT default_reader_properties(); static constexpr int64_t kDefaultDataPageSize = 1024 * 1024; +static constexpr int64_t kDefaultMaxRowsPerPage = 20'000; static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; @@ -293,6 +294,7 @@ class PARQUET_EXPORT WriterProperties { write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH), pagesize_(kDefaultDataPageSize), + max_rows_per_page_(kDefaultMaxRowsPerPage), version_(ParquetVersion::PARQUET_2_6), data_page_version_(ParquetDataPageVersion::V1), created_by_(DEFAULT_CREATED_BY), @@ -308,6 +310,7 @@ class PARQUET_EXPORT WriterProperties { write_batch_size_(properties.write_batch_size()), max_row_group_length_(properties.max_row_group_length()), pagesize_(properties.data_pagesize()), + max_rows_per_page_(properties.max_rows_per_page()), version_(properties.version()), data_page_version_(properties.data_page_version()), created_by_(properties.created_by()), @@ -422,6 +425,13 @@ class PARQUET_EXPORT WriterProperties { return this; } + /// Specify the maximum number of rows per data page. + /// Default 20K rows. + Builder* max_rows_per_page(int64_t max_rows) { + max_rows_per_page_ = max_rows; + return this; + } + /// Specify the data page version. /// Default V1. Builder* data_page_version(ParquetDataPageVersion data_page_version) { @@ -768,7 +778,7 @@ class PARQUET_EXPORT WriterProperties { return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, - pagesize_, version_, created_by_, page_checksum_enabled_, + pagesize_, max_rows_per_page_, version_, created_by_, page_checksum_enabled_, size_statistics_level_, std::move(file_encryption_properties_), default_column_properties_, column_properties, data_page_version_, store_decimal_as_integer_, std::move(sorting_columns_), @@ -781,6 +791,7 @@ class PARQUET_EXPORT WriterProperties { int64_t write_batch_size_; int64_t max_row_group_length_; int64_t pagesize_; + int64_t max_rows_per_page_; ParquetVersion::type version_; ParquetDataPageVersion data_page_version_; std::string created_by_; @@ -816,6 +827,8 @@ class PARQUET_EXPORT WriterProperties { inline int64_t data_pagesize() const { return pagesize_; } + inline int64_t max_rows_per_page() const { return max_rows_per_page_; } + inline ParquetDataPageVersion data_page_version() const { return parquet_data_page_version_; } @@ -930,9 +943,9 @@ class PARQUET_EXPORT WriterProperties { private: explicit WriterProperties( MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, - int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version, - const std::string& created_by, bool page_write_checksum_enabled, - SizeStatisticsLevel size_statistics_level, + int64_t max_row_group_length, int64_t pagesize, int64_t max_rows_per_page, + ParquetVersion::type version, const std::string& created_by, + bool page_write_checksum_enabled, SizeStatisticsLevel size_statistics_level, std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, @@ -944,6 +957,7 @@ class PARQUET_EXPORT WriterProperties { write_batch_size_(write_batch_size), max_row_group_length_(max_row_group_length), pagesize_(pagesize), + max_rows_per_page_(max_rows_per_page), parquet_data_page_version_(data_page_version), parquet_version_(version), parquet_created_by_(created_by), @@ -962,6 +976,7 @@ class PARQUET_EXPORT WriterProperties { int64_t write_batch_size_; int64_t max_row_group_length_; int64_t pagesize_; + int64_t max_rows_per_page_; ParquetDataPageVersion parquet_data_page_version_; ParquetVersion::type parquet_version_; std::string parquet_created_by_; diff --git a/cpp/src/parquet/size_statistics_test.cc b/cpp/src/parquet/size_statistics_test.cc index 90d6df57e7f..6e8cec9a130 100644 --- a/cpp/src/parquet/size_statistics_test.cc +++ b/cpp/src/parquet/size_statistics_test.cc @@ -140,6 +140,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test { auto writer_properties = WriterProperties::Builder() .max_row_group_length(max_row_group_length) ->data_pagesize(page_size) + ->max_rows_per_page(std::numeric_limits::max()) ->write_batch_size(write_batch_size) ->enable_write_page_index() ->enable_statistics()