From 1a6c3d1bf66bbad3bf9f0022d479b5a32cc10cbb Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sun, 13 Jul 2025 00:47:12 +0800 Subject: [PATCH 1/7] GH-40730: [C++][Parquet] Add setting to limit the number of rows written per page --- cpp/src/parquet/column_writer.cc | 114 +++++++++++++++++-------------- cpp/src/parquet/properties.h | 24 +++++-- 2 files changed, 81 insertions(+), 57 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 1f3d64f6228..568764b320d 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1150,61 +1150,62 @@ void ColumnWriterImpl::FlushBufferedDataPages() { // ---------------------------------------------------------------------- // TypedColumnWriter -template -inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) { - int64_t num_batches = static_cast(total / batch_size); - for (int round = 0; round < num_batches; round++) { - action(round * batch_size, batch_size, /*check_page_size=*/true); - } - // Write the remaining values - if (total % batch_size > 0) { - action(num_batches * batch_size, total % batch_size, /*check_page_size=*/true); - } -} - template inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, int64_t batch_size, Action&& action, - bool pages_change_on_record_boundaries) { - if (!pages_change_on_record_boundaries || !rep_levels) { - // If rep_levels is null, then we are writing a non-repeated column. - // In this case, every record contains only one level. - return DoInBatches(num_levels, batch_size, std::forward(action)); - } - + bool pages_change_on_record_boundaries, int64_t max_rows_per_page, + const std::function& curr_page_buffered_rows) { int64_t offset = 0; while (offset < num_levels) { - int64_t end_offset = std::min(offset + batch_size, num_levels); + int64_t min_batch_size = std::min(batch_size, num_levels - offset); + int64_t end_offset = num_levels; + int64_t check_page_limit_end_offset = -1; + + int64_t page_buffered_rows = curr_page_buffered_rows(); + ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page); - // Find next record boundary (i.e. rep_level = 0) - while (end_offset < num_levels && rep_levels[end_offset] != 0) { - end_offset++; + if (!rep_levels) { + min_batch_size = std::min(min_batch_size, max_rows_per_page - page_buffered_rows); + end_offset = offset + min_batch_size; + check_page_limit_end_offset = end_offset; + } else { + int64_t last_record_begin_offset = -1; + // Iterate rep_levels to find the shortest sequence that ends before a record + // boundary (i.e. rep_levels == 0) with a size no less than min_batch_size + for (int64_t i = offset; i < num_levels; ++i) { + if (rep_levels[i] == 0) { + last_record_begin_offset = i; + if (i - offset >= min_batch_size || page_buffered_rows >= max_rows_per_page) { + end_offset = i; + break; + } + page_buffered_rows += 1; + } + } + // Use the beginning of last record to check page limit. + check_page_limit_end_offset = last_record_begin_offset; } + ARROW_DCHECK_LT(offset, end_offset); + ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset); + if (end_offset < num_levels) { // This is not the last chunk of batch and end_offset is a record boundary. - // It is a good chance to check the page size. - action(offset, end_offset - offset, /*check_page_size=*/true); + // It is a good chance to check the page limit. + action(offset, end_offset - offset, /*check_page_limit=*/true); } else { - DCHECK_EQ(end_offset, num_levels); - // This is the last chunk of batch, and we do not know whether end_offset is a - // record boundary. Find the offset to beginning of last record in this chunk, - // so we can check page size. - int64_t last_record_begin_offset = num_levels - 1; - while (last_record_begin_offset >= offset && - rep_levels[last_record_begin_offset] != 0) { - last_record_begin_offset--; + ARROW_DCHECK_EQ(end_offset, num_levels); + if (offset <= check_page_limit_end_offset) { + action(offset, check_page_limit_end_offset - offset, /*check_page_limit=*/true); + offset = check_page_limit_end_offset; } - - if (offset <= last_record_begin_offset) { - // We have found the beginning of last record and can check page size. - action(offset, last_record_begin_offset - offset, /*check_page_size=*/true); - offset = last_record_begin_offset; + if (offset < end_offset) { + // This is the last chunk of batch, and we do not know whether end_offset is a + // record boundary so we cannot check page limit if pages cannot change on + // record boundaries. + action(offset, end_offset - offset, + /*check_page_limit=*/!pages_change_on_record_boundaries); } - - // Write remaining data after the record boundary, - // or all data if no boundary was found. - action(offset, end_offset - offset, /*check_page_size=*/false); } offset = end_offset; @@ -1318,7 +1319,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, CheckDictionarySizeLimit(); }; DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(), - WriteChunk, pages_change_on_record_boundaries()); + WriteChunk, pages_change_on_record_boundaries(), + properties_->max_rows_per_page(), + [this]() { return num_buffered_rows_; }); return value_offset; } @@ -1368,7 +1371,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, CheckDictionarySizeLimit(); }; DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(), - WriteChunk, pages_change_on_record_boundaries()); + WriteChunk, pages_change_on_record_boundaries(), + properties_->max_rows_per_page(), + [this]() { return num_buffered_rows_; }); } Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, @@ -1769,13 +1774,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, } void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values, - int64_t num_nulls, bool check_page_size) { + int64_t num_nulls, bool check_page_limit) { num_buffered_values_ += num_levels; num_buffered_encoded_values_ += num_values; num_buffered_nulls_ += num_nulls; - if (check_page_size && - current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) { + if (check_page_limit && + (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize() || + num_buffered_rows_ >= properties_->max_rows_per_page())) { AddDataPage(); } } @@ -1996,9 +2002,10 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( return WriteDense(); } - PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels, - properties_->write_batch_size(), WriteIndicesChunk, - pages_change_on_record_boundaries())); + PARQUET_CATCH_NOT_OK(DoInBatches( + def_levels, rep_levels, num_levels, properties_->write_batch_size(), + WriteIndicesChunk, pages_change_on_record_boundaries(), + properties_->max_rows_per_page(), [this]() { return num_buffered_rows_; })); return Status::OK(); } @@ -2441,9 +2448,10 @@ Status TypedColumnWriterImpl::WriteArrowDense( value_offset += batch_num_spaced_values; }; - PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels, - properties_->write_batch_size(), WriteChunk, - pages_change_on_record_boundaries())); + PARQUET_CATCH_NOT_OK(DoInBatches( + def_levels, rep_levels, num_levels, properties_->write_batch_size(), WriteChunk, + pages_change_on_record_boundaries(), properties_->max_rows_per_page(), + [this]() { return num_buffered_rows_; })); return Status::OK(); } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5a1799c39d7..7dd44586923 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -155,6 +155,8 @@ class PARQUET_EXPORT ReaderProperties { ReaderProperties PARQUET_EXPORT default_reader_properties(); static constexpr int64_t kDefaultDataPageSize = 1024 * 1024; +/// FIXME: Switch the default value to 20000 will break UTs. +static constexpr int64_t kDefaultMaxRowsPerPage = 1000000; static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; @@ -293,6 +295,7 @@ class PARQUET_EXPORT WriterProperties { write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH), pagesize_(kDefaultDataPageSize), + max_rows_per_page_(kDefaultMaxRowsPerPage), version_(ParquetVersion::PARQUET_2_6), data_page_version_(ParquetDataPageVersion::V1), created_by_(DEFAULT_CREATED_BY), @@ -308,6 +311,7 @@ class PARQUET_EXPORT WriterProperties { write_batch_size_(properties.write_batch_size()), max_row_group_length_(properties.max_row_group_length()), pagesize_(properties.data_pagesize()), + max_rows_per_page_(properties.max_rows_per_page()), version_(properties.version()), data_page_version_(properties.data_page_version()), created_by_(properties.created_by()), @@ -422,6 +426,13 @@ class PARQUET_EXPORT WriterProperties { return this; } + /// Specify the maximum number of rows per data page. + /// Default 1M rows. + Builder* max_rows_per_page(int64_t max_rows) { + max_rows_per_page_ = max_rows; + return this; + } + /// Specify the data page version. /// Default V1. Builder* data_page_version(ParquetDataPageVersion data_page_version) { @@ -768,7 +779,7 @@ class PARQUET_EXPORT WriterProperties { return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, - pagesize_, version_, created_by_, page_checksum_enabled_, + pagesize_, max_rows_per_page_, version_, created_by_, page_checksum_enabled_, size_statistics_level_, std::move(file_encryption_properties_), default_column_properties_, column_properties, data_page_version_, store_decimal_as_integer_, std::move(sorting_columns_), @@ -781,6 +792,7 @@ class PARQUET_EXPORT WriterProperties { int64_t write_batch_size_; int64_t max_row_group_length_; int64_t pagesize_; + int64_t max_rows_per_page_; ParquetVersion::type version_; ParquetDataPageVersion data_page_version_; std::string created_by_; @@ -816,6 +828,8 @@ class PARQUET_EXPORT WriterProperties { inline int64_t data_pagesize() const { return pagesize_; } + inline int64_t max_rows_per_page() const { return max_rows_per_page_; } + inline ParquetDataPageVersion data_page_version() const { return parquet_data_page_version_; } @@ -930,9 +944,9 @@ class PARQUET_EXPORT WriterProperties { private: explicit WriterProperties( MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, - int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version, - const std::string& created_by, bool page_write_checksum_enabled, - SizeStatisticsLevel size_statistics_level, + int64_t max_row_group_length, int64_t pagesize, int64_t max_rows_per_page, + ParquetVersion::type version, const std::string& created_by, + bool page_write_checksum_enabled, SizeStatisticsLevel size_statistics_level, std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, @@ -944,6 +958,7 @@ class PARQUET_EXPORT WriterProperties { write_batch_size_(write_batch_size), max_row_group_length_(max_row_group_length), pagesize_(pagesize), + max_rows_per_page_(max_rows_per_page), parquet_data_page_version_(data_page_version), parquet_version_(version), parquet_created_by_(created_by), @@ -962,6 +977,7 @@ class PARQUET_EXPORT WriterProperties { int64_t write_batch_size_; int64_t max_row_group_length_; int64_t pagesize_; + int64_t max_rows_per_page_; ParquetDataPageVersion parquet_data_page_version_; ParquetVersion::type parquet_version_; std::string parquet_created_by_; From 0bc0282b2816850b9a37f48896658ad31756a232 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 18 Jul 2025 23:38:05 +0800 Subject: [PATCH 2/7] add and fix test --- cpp/src/parquet/column_writer.cc | 6 +- cpp/src/parquet/column_writer_test.cc | 202 +++++++++++++++++++++++- cpp/src/parquet/properties.h | 5 +- cpp/src/parquet/size_statistics_test.cc | 1 + 4 files changed, 207 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 568764b320d..2bf6640e1ca 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1150,11 +1150,11 @@ void ColumnWriterImpl::FlushBufferedDataPages() { // ---------------------------------------------------------------------- // TypedColumnWriter -template +template inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, int64_t batch_size, Action&& action, bool pages_change_on_record_boundaries, int64_t max_rows_per_page, - const std::function& curr_page_buffered_rows) { + GetBufferedRows&& curr_page_buffered_rows) { int64_t offset = 0; while (offset < num_levels) { int64_t min_batch_size = std::min(batch_size, num_levels - offset); @@ -1186,7 +1186,7 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, check_page_limit_end_offset = last_record_begin_offset; } - ARROW_DCHECK_LT(offset, end_offset); + ARROW_DCHECK_LE(offset, end_offset); ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset); if (end_offset < num_levels) { diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 990125df4e3..8269611f3c6 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -37,6 +37,7 @@ #include "parquet/file_writer.h" #include "parquet/geospatial/statistics.h" #include "parquet/metadata.h" +#include "parquet/page_index.h" #include "parquet/platform.h" #include "parquet/properties.h" #include "parquet/statistics.h" @@ -108,7 +109,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { const ColumnProperties& column_properties = ColumnProperties(), const ParquetVersion::type version = ParquetVersion::PARQUET_1_0, const ParquetDataPageVersion data_page_version = ParquetDataPageVersion::V1, - bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize) { + bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize, + int64_t max_rows_per_page = kDefaultMaxRowsPerPage) { sink_ = CreateOutputStream(); WriterProperties::Builder wp_builder; wp_builder.version(version)->data_page_version(data_page_version); @@ -125,6 +127,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { } wp_builder.max_statistics_size(column_properties.max_statistics_size()); wp_builder.data_pagesize(page_size); + wp_builder.max_rows_per_page(max_rows_per_page); writer_properties_ = wp_builder.build(); metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); @@ -506,6 +509,44 @@ void TestPrimitiveWriter::ReadColumnFully(Compression::type compressio this->SyncValuesOut(); } +template <> +void TestPrimitiveWriter::ReadColumnFully(Compression::type compression, + bool page_checksum_verify) { + int64_t total_values = static_cast(this->values_out_.size()); + BuildReader(total_values, compression, page_checksum_verify); + this->data_buffer_.clear(); + + values_read_ = 0; + while (values_read_ < total_values) { + int64_t values_read_recently = 0; + reader_->ReadBatch( + static_cast(this->values_out_.size()) - static_cast(values_read_), + definition_levels_out_.data() + values_read_, + repetition_levels_out_.data() + values_read_, + this->values_out_ptr_ + values_read_, &values_read_recently); + + // Compute the total length of the data + int64_t total_length = 0; + for (int64_t i = 0; i < values_read_recently; i++) { + total_length += this->values_out_[i + values_read_].len; + } + + // Copy contents of the pointers + std::vector data(total_length); + uint8_t* data_ptr = data.data(); + for (int64_t i = 0; i < values_read_recently; i++) { + const ByteArray& value = this->values_out_ptr_[i + values_read_]; + memcpy(data_ptr, value.ptr, value.len); + this->values_out_[i + values_read_].ptr = data_ptr; + data_ptr += value.len; + } + data_buffer_.emplace_back(std::move(data)); + + values_read_ += values_read_recently; + } + this->SyncValuesOut(); +} + typedef ::testing::Types TestTypes; @@ -2075,5 +2116,164 @@ TEST_F(TestGeometryValuesWriter, TestWriteAndReadAllNull) { EXPECT_EQ(geospatial_statistics->geometry_types(), std::nullopt); } +template +class TestColumnWriterMaxRowsPerPage : public TestPrimitiveWriter { + public: + TypedColumnWriter* BuildWriter( + int64_t max_rows_per_page = kDefaultMaxRowsPerPage, + int64_t page_size = kDefaultDataPageSize) { + this->sink_ = CreateOutputStream(); + this->writer_properties_ = WriterProperties::Builder() + .max_rows_per_page(max_rows_per_page) + ->data_pagesize(page_size) + ->enable_write_page_index() + ->build(); + file_writer_ = ParquetFileWriter::Open( + this->sink_, std::static_pointer_cast(this->schema_.schema_root()), + this->writer_properties_); + return static_cast*>( + file_writer_->AppendRowGroup()->NextColumn()); + } + + void CloseWriter() const { file_writer_->Close(); } + + void BuildReader() { + ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish()); + file_reader_ = ParquetFileReader::Open( + std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties()); + this->reader_ = std::static_pointer_cast>( + file_reader_->RowGroup(0)->Column(0)); + } + + void VerifyMaxRowsPerPage(int64_t max_rows_per_page) const { + auto file_meta = file_reader_->metadata(); + int64_t num_row_groups = file_meta->num_row_groups(); + ASSERT_EQ(num_row_groups, 1); + + auto page_index_reader = file_reader_->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + auto row_group_page_index_reader = page_index_reader->RowGroup(0); + ASSERT_NE(row_group_page_index_reader, nullptr); + + auto offset_index = row_group_page_index_reader->GetOffsetIndex(0); + ASSERT_NE(offset_index, nullptr); + size_t num_pages = offset_index->page_locations().size(); + for (size_t j = 1; j < num_pages; ++j) { + int64_t page_rows = offset_index->page_locations()[j].first_row_index - + offset_index->page_locations()[j - 1].first_row_index; + EXPECT_LE(page_rows, max_rows_per_page); + } + if (num_pages != 0) { + int64_t last_page_rows = file_meta->RowGroup(0)->num_rows() - + offset_index->page_locations().back().first_row_index; + EXPECT_LE(last_page_rows, max_rows_per_page); + } + } + + private: + std::shared_ptr file_writer_; + std::shared_ptr file_reader_; +}; + +TYPED_TEST_SUITE(TestColumnWriterMaxRowsPerPage, TestTypes); + +TYPED_TEST(TestColumnWriterMaxRowsPerPage, Optional) { + for (int64_t max_rows_per_page : {1, 10, 100}) { + this->SetUpSchema(Repetition::OPTIONAL); + this->GenerateData(SMALL_SIZE); + std::vector definition_levels(SMALL_SIZE, 1); + definition_levels[1] = 0; + + auto writer = this->BuildWriter(max_rows_per_page); + writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr, + this->values_ptr_); + this->CloseWriter(); + + this->BuildReader(); + ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); + } +} + +TYPED_TEST(TestColumnWriterMaxRowsPerPage, OptionalSpaced) { + for (int64_t max_rows_per_page : {1, 10, 100}) { + this->SetUpSchema(Repetition::OPTIONAL); + + this->GenerateData(SMALL_SIZE); + std::vector definition_levels(SMALL_SIZE, 1); + std::vector valid_bits(::arrow::bit_util::BytesForBits(SMALL_SIZE), 255); + + definition_levels[SMALL_SIZE - 1] = 0; + ::arrow::bit_util::ClearBit(valid_bits.data(), SMALL_SIZE - 1); + definition_levels[1] = 0; + ::arrow::bit_util::ClearBit(valid_bits.data(), 1); + + auto writer = this->BuildWriter(max_rows_per_page); + writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr, + valid_bits.data(), 0, this->values_ptr_); + this->CloseWriter(); + + this->BuildReader(); + ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); + } +} + +TYPED_TEST(TestColumnWriterMaxRowsPerPage, Repeated) { + for (int64_t max_rows_per_page : {1, 10, 100}) { + this->SetUpSchema(Repetition::REPEATED); + + this->GenerateData(SMALL_SIZE); + std::vector definition_levels(SMALL_SIZE); + std::vector repetition_levels(SMALL_SIZE); + + // Generate levels to include variable-sized lists, null lists, and empty lists + for (int i = 0; i < SMALL_SIZE; i++) { + int list_length = (i % 5) + 1; + bool is_null = false; + if (i % 17 == 0) { + is_null = true; + list_length = 0; + } else if (i % 13 == 0) { + list_length = 0; + } + + if (is_null) { + definition_levels[i] = 0; + repetition_levels[i] = 0; + } else if (list_length == 0) { + definition_levels[i] = 1; + repetition_levels[i] = 0; + } else { + for (int j = 0; j < list_length && i + j < SMALL_SIZE; j++) { + definition_levels[i + j] = 1; + repetition_levels[i + j] = (j == 0) ? 0 : 1; + } + i += list_length - 1; + } + } + + auto writer = this->BuildWriter(max_rows_per_page); + writer->WriteBatch(this->values_.size(), definition_levels.data(), + repetition_levels.data(), this->values_ptr_); + this->CloseWriter(); + + this->BuildReader(); + ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); + } +} + +TYPED_TEST(TestColumnWriterMaxRowsPerPage, RequiredLargeChunk) { + for (int64_t max_rows_per_page : {10, 100, 10000}) { + this->GenerateData(LARGE_SIZE); + + auto writer = this->BuildWriter(max_rows_per_page); + writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); + this->CloseWriter(); + + this->BuildReader(); + ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); + } +} + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 7dd44586923..51b549df22e 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -155,8 +155,7 @@ class PARQUET_EXPORT ReaderProperties { ReaderProperties PARQUET_EXPORT default_reader_properties(); static constexpr int64_t kDefaultDataPageSize = 1024 * 1024; -/// FIXME: Switch the default value to 20000 will break UTs. -static constexpr int64_t kDefaultMaxRowsPerPage = 1000000; +static constexpr int64_t kDefaultMaxRowsPerPage = 20'000; static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; @@ -427,7 +426,7 @@ class PARQUET_EXPORT WriterProperties { } /// Specify the maximum number of rows per data page. - /// Default 1M rows. + /// Default 20K rows. Builder* max_rows_per_page(int64_t max_rows) { max_rows_per_page_ = max_rows; return this; diff --git a/cpp/src/parquet/size_statistics_test.cc b/cpp/src/parquet/size_statistics_test.cc index 90d6df57e7f..6e8cec9a130 100644 --- a/cpp/src/parquet/size_statistics_test.cc +++ b/cpp/src/parquet/size_statistics_test.cc @@ -140,6 +140,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test { auto writer_properties = WriterProperties::Builder() .max_row_group_length(max_row_group_length) ->data_pagesize(page_size) + ->max_rows_per_page(std::numeric_limits::max()) ->write_batch_size(write_batch_size) ->enable_write_page_index() ->enable_statistics() From 2abbfd20855156eb291c4f002b30af6025f88ef3 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 27 Aug 2025 23:23:48 +0800 Subject: [PATCH 3/7] address comment --- cpp/src/parquet/column_writer.cc | 42 +++++++++++++-------------- cpp/src/parquet/column_writer_test.cc | 5 ++++ 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 2bf6640e1ca..23229f74886 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1152,29 +1152,31 @@ void ColumnWriterImpl::FlushBufferedDataPages() { template inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, - int64_t num_levels, int64_t batch_size, Action&& action, - bool pages_change_on_record_boundaries, int64_t max_rows_per_page, + int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page, + bool pages_change_on_record_boundaries, Action&& action, GetBufferedRows&& curr_page_buffered_rows) { int64_t offset = 0; while (offset < num_levels) { int64_t min_batch_size = std::min(batch_size, num_levels - offset); - int64_t end_offset = num_levels; - int64_t check_page_limit_end_offset = -1; + int64_t end_offset = num_levels; // end offset of the current batch + int64_t check_page_limit_end_offset = -1; // offset to check page limit (if not -1) int64_t page_buffered_rows = curr_page_buffered_rows(); ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page); if (!rep_levels) { + // If rep_levels is null, then we are writing a non-repeated column. + // In this case, every record contains only one level. min_batch_size = std::min(min_batch_size, max_rows_per_page - page_buffered_rows); end_offset = offset + min_batch_size; check_page_limit_end_offset = end_offset; } else { - int64_t last_record_begin_offset = -1; // Iterate rep_levels to find the shortest sequence that ends before a record // boundary (i.e. rep_levels == 0) with a size no less than min_batch_size for (int64_t i = offset; i < num_levels; ++i) { if (rep_levels[i] == 0) { - last_record_begin_offset = i; + // Use the beginning of last record to check page limit. + check_page_limit_end_offset = i; if (i - offset >= min_batch_size || page_buffered_rows >= max_rows_per_page) { end_offset = i; break; @@ -1182,8 +1184,6 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, page_buffered_rows += 1; } } - // Use the beginning of last record to check page limit. - check_page_limit_end_offset = last_record_begin_offset; } ARROW_DCHECK_LE(offset, end_offset); @@ -1319,9 +1319,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, CheckDictionarySizeLimit(); }; DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(), - WriteChunk, pages_change_on_record_boundaries(), - properties_->max_rows_per_page(), - [this]() { return num_buffered_rows_; }); + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + WriteChunk, [this]() { return num_buffered_rows_; }); return value_offset; } @@ -1371,9 +1370,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, CheckDictionarySizeLimit(); }; DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(), - WriteChunk, pages_change_on_record_boundaries(), - properties_->max_rows_per_page(), - [this]() { return num_buffered_rows_; }); + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + WriteChunk, [this]() { return num_buffered_rows_; }); } Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, @@ -2002,10 +2000,10 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( return WriteDense(); } - PARQUET_CATCH_NOT_OK(DoInBatches( - def_levels, rep_levels, num_levels, properties_->write_batch_size(), - WriteIndicesChunk, pages_change_on_record_boundaries(), - properties_->max_rows_per_page(), [this]() { return num_buffered_rows_; })); + PARQUET_CATCH_NOT_OK( + DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(), + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + WriteIndicesChunk, [this]() { return num_buffered_rows_; })); return Status::OK(); } @@ -2448,10 +2446,10 @@ Status TypedColumnWriterImpl::WriteArrowDense( value_offset += batch_num_spaced_values; }; - PARQUET_CATCH_NOT_OK(DoInBatches( - def_levels, rep_levels, num_levels, properties_->write_batch_size(), WriteChunk, - pages_change_on_record_boundaries(), properties_->max_rows_per_page(), - [this]() { return num_buffered_rows_; })); + PARQUET_CATCH_NOT_OK( + DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(), + properties_->max_rows_per_page(), pages_change_on_record_boundaries(), + WriteChunk, [this]() { return num_buffered_rows_; })); return Status::OK(); } diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 8269611f3c6..baa326f5a97 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -2159,16 +2159,21 @@ class TestColumnWriterMaxRowsPerPage : public TestPrimitiveWriter { auto offset_index = row_group_page_index_reader->GetOffsetIndex(0); ASSERT_NE(offset_index, nullptr); size_t num_pages = offset_index->page_locations().size(); + int64_t num_rows = 0; for (size_t j = 1; j < num_pages; ++j) { int64_t page_rows = offset_index->page_locations()[j].first_row_index - offset_index->page_locations()[j - 1].first_row_index; EXPECT_LE(page_rows, max_rows_per_page); + num_rows += page_rows; } if (num_pages != 0) { int64_t last_page_rows = file_meta->RowGroup(0)->num_rows() - offset_index->page_locations().back().first_row_index; EXPECT_LE(last_page_rows, max_rows_per_page); + num_rows += last_page_rows; } + + EXPECT_EQ(num_rows, file_meta->RowGroup(0)->num_rows()); } private: From 5a9c12cf2dd96d6ed1002afb6c612483372031f3 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 28 Aug 2025 10:15:45 +0800 Subject: [PATCH 4/7] modify repeated test case --- cpp/src/parquet/column_writer_test.cc | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index baa326f5a97..48cac04f071 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -2231,23 +2231,16 @@ TYPED_TEST(TestColumnWriterMaxRowsPerPage, Repeated) { std::vector definition_levels(SMALL_SIZE); std::vector repetition_levels(SMALL_SIZE); - // Generate levels to include variable-sized lists, null lists, and empty lists + // Generate levels to include variable-sized lists and empty lists for (int i = 0; i < SMALL_SIZE; i++) { int list_length = (i % 5) + 1; - bool is_null = false; - if (i % 17 == 0) { - is_null = true; - list_length = 0; - } else if (i % 13 == 0) { + if (i % 13 == 0 || i % 17 == 0) { list_length = 0; } - if (is_null) { + if (list_length == 0) { definition_levels[i] = 0; repetition_levels[i] = 0; - } else if (list_length == 0) { - definition_levels[i] = 1; - repetition_levels[i] = 0; } else { for (int j = 0; j < list_length && i + j < SMALL_SIZE; j++) { definition_levels[i + j] = 1; From 27833c46a53f6501002bd3e9bfd08aa76d6cd440 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sun, 12 Oct 2025 17:35:28 +0800 Subject: [PATCH 5/7] rename to max_batch_size --- cpp/src/parquet/column_writer.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 23229f74886..396874b7f78 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1157,7 +1157,7 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, GetBufferedRows&& curr_page_buffered_rows) { int64_t offset = 0; while (offset < num_levels) { - int64_t min_batch_size = std::min(batch_size, num_levels - offset); + int64_t max_batch_size = std::min(batch_size, num_levels - offset); int64_t end_offset = num_levels; // end offset of the current batch int64_t check_page_limit_end_offset = -1; // offset to check page limit (if not -1) @@ -1167,17 +1167,17 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, if (!rep_levels) { // If rep_levels is null, then we are writing a non-repeated column. // In this case, every record contains only one level. - min_batch_size = std::min(min_batch_size, max_rows_per_page - page_buffered_rows); - end_offset = offset + min_batch_size; + max_batch_size = std::min(max_batch_size, max_rows_per_page - page_buffered_rows); + end_offset = offset + max_batch_size; check_page_limit_end_offset = end_offset; } else { // Iterate rep_levels to find the shortest sequence that ends before a record - // boundary (i.e. rep_levels == 0) with a size no less than min_batch_size + // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size for (int64_t i = offset; i < num_levels; ++i) { if (rep_levels[i] == 0) { // Use the beginning of last record to check page limit. check_page_limit_end_offset = i; - if (i - offset >= min_batch_size || page_buffered_rows >= max_rows_per_page) { + if (i - offset >= max_batch_size || page_buffered_rows >= max_rows_per_page) { end_offset = i; break; } From 547c4fb233b6b0bf7a3f0ef0d2d13e4f814381ac Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sun, 12 Oct 2025 21:33:44 +0800 Subject: [PATCH 6/7] split DoInBatches --- cpp/src/parquet/column_writer.cc | 79 +++++++++++++++++++++++--------- 1 file changed, 57 insertions(+), 22 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 396874b7f78..0870e9ead32 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1150,11 +1150,38 @@ void ColumnWriterImpl::FlushBufferedDataPages() { // ---------------------------------------------------------------------- // TypedColumnWriter +// DoInBatches for non-repeated columns template -inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, - int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page, - bool pages_change_on_record_boundaries, Action&& action, - GetBufferedRows&& curr_page_buffered_rows) { +inline void DoInBatchesNonRepeated(int64_t num_levels, int64_t batch_size, + int64_t max_rows_per_page, Action&& action, + GetBufferedRows&& curr_page_buffered_rows) { + int64_t offset = 0; + while (offset < num_levels) { + int64_t page_buffered_rows = curr_page_buffered_rows(); + ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page); + + // Every record contains only one level. + int64_t max_batch_size = std::min(batch_size, num_levels - offset); + max_batch_size = std::min(max_batch_size, max_rows_per_page - page_buffered_rows); + int64_t end_offset = offset + max_batch_size; + + ARROW_DCHECK_LE(offset, end_offset); + ARROW_DCHECK_LE(end_offset, num_levels); + + // Always check page limit for non-repeated columns. + action(offset, end_offset - offset, /*check_page_limit=*/true); + + offset = end_offset; + } +} + +// DoInBatches for repeated columns +template +inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, int64_t batch_size, + int64_t max_rows_per_page, + bool pages_change_on_record_boundaries, Action&& action, + GetBufferedRows&& curr_page_buffered_rows) { int64_t offset = 0; while (offset < num_levels) { int64_t max_batch_size = std::min(batch_size, num_levels - offset); @@ -1164,25 +1191,17 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, int64_t page_buffered_rows = curr_page_buffered_rows(); ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page); - if (!rep_levels) { - // If rep_levels is null, then we are writing a non-repeated column. - // In this case, every record contains only one level. - max_batch_size = std::min(max_batch_size, max_rows_per_page - page_buffered_rows); - end_offset = offset + max_batch_size; - check_page_limit_end_offset = end_offset; - } else { - // Iterate rep_levels to find the shortest sequence that ends before a record - // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size - for (int64_t i = offset; i < num_levels; ++i) { - if (rep_levels[i] == 0) { - // Use the beginning of last record to check page limit. - check_page_limit_end_offset = i; - if (i - offset >= max_batch_size || page_buffered_rows >= max_rows_per_page) { - end_offset = i; - break; - } - page_buffered_rows += 1; + // Iterate rep_levels to find the shortest sequence that ends before a record + // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size + for (int64_t i = offset; i < num_levels; ++i) { + if (rep_levels[i] == 0) { + // Use the beginning of last record to check page limit. + check_page_limit_end_offset = i; + if (i - offset >= max_batch_size || page_buffered_rows >= max_rows_per_page) { + end_offset = i; + break; } + page_buffered_rows += 1; } } @@ -1212,6 +1231,22 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, } } +template +inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page, + bool pages_change_on_record_boundaries, Action&& action, + GetBufferedRows&& curr_page_buffered_rows) { + if (!rep_levels) { + DoInBatchesNonRepeated(num_levels, batch_size, max_rows_per_page, + std::forward(action), + std::forward(curr_page_buffered_rows)); + } else { + DoInBatchesRepeated(def_levels, rep_levels, num_levels, batch_size, max_rows_per_page, + pages_change_on_record_boundaries, std::forward(action), + std::forward(curr_page_buffered_rows)); + } +} + namespace { bool DictionaryDirectWriteSupported(const ::arrow::Array& array) { From a41572f579bdf6e34c1c914492bad5cacdd9bf3e Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 23 Oct 2025 22:21:57 +0800 Subject: [PATCH 7/7] address feedback --- cpp/src/parquet/column_writer.cc | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 0870e9ead32..22c36531cdb 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1208,23 +1208,19 @@ inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t* rep_le ARROW_DCHECK_LE(offset, end_offset); ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset); - if (end_offset < num_levels) { - // This is not the last chunk of batch and end_offset is a record boundary. + if (check_page_limit_end_offset >= 0) { + // At least one record boundary is included in this batch. // It is a good chance to check the page limit. - action(offset, end_offset - offset, /*check_page_limit=*/true); - } else { + action(offset, check_page_limit_end_offset - offset, /*check_page_limit=*/true); + offset = check_page_limit_end_offset; + } + if (end_offset > offset) { + // The is the last chunk of batch, and we do not know whether end_offset is a + // record boundary so we cannot check page limit if pages cannot change on + // record boundaries. ARROW_DCHECK_EQ(end_offset, num_levels); - if (offset <= check_page_limit_end_offset) { - action(offset, check_page_limit_end_offset - offset, /*check_page_limit=*/true); - offset = check_page_limit_end_offset; - } - if (offset < end_offset) { - // This is the last chunk of batch, and we do not know whether end_offset is a - // record boundary so we cannot check page limit if pages cannot change on - // record boundaries. - action(offset, end_offset - offset, - /*check_page_limit=*/!pages_change_on_record_boundaries); - } + action(offset, end_offset - offset, + /*check_page_limit=*/!pages_change_on_record_boundaries); } offset = end_offset;