From ab580a0111919a3dcdf2687a7b934d30b1c304e3 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 9 Jul 2025 12:56:56 +1200 Subject: [PATCH 1/2] Fix pages not being split for repeated columns --- cpp/src/parquet/column_writer.cc | 7 ++-- cpp/src/parquet/column_writer_test.cc | 57 ++++++++++++++++++++++++++- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index acce754c87e..a56faa3aaef 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1176,7 +1176,7 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, while (offset < num_levels) { int64_t end_offset = std::min(offset + batch_size, num_levels); - // Find next record boundary (i.e. ref_level = 0) + // Find next record boundary (i.e. rep_level = 0) while (end_offset < num_levels && rep_levels[end_offset] != 0) { end_offset++; } @@ -1196,13 +1196,14 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels, last_record_begin_offset--; } - if (offset < last_record_begin_offset) { + if (offset <= last_record_begin_offset) { // We have found the beginning of last record and can check page size. action(offset, last_record_begin_offset - offset, /*check_page_size=*/true); offset = last_record_begin_offset; } - // There is no record boundary in this chunk and cannot check page size. + // Write remaining data after the record boundary, + // or all data if no boundary was found. action(offset, end_offset - offset, /*check_page_size=*/false); } diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 4427950d620..024801a6072 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -108,7 +108,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { const ColumnProperties& column_properties = ColumnProperties(), const ParquetVersion::type version = ParquetVersion::PARQUET_1_0, const ParquetDataPageVersion data_page_version = ParquetDataPageVersion::V1, - bool enable_checksum = false) { + bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize) { sink_ = CreateOutputStream(); WriterProperties::Builder wp_builder; wp_builder.version(version)->data_page_version(data_page_version); @@ -124,6 +124,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { wp_builder.enable_page_checksum(); } wp_builder.max_statistics_size(column_properties.max_statistics_size()); + wp_builder.data_pagesize(page_size); writer_properties_ = wp_builder.build(); metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); @@ -938,6 +939,60 @@ TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) { ASSERT_TRUE(this->metadata_is_stats_set()); } +// Test for https://github.com/apache/arrow/issues/47027. +// When writing a repeated column with page indexes enabled +// and batches that are aligned with list boundaries, +// pages should be written after reaching the page limit. +TEST_F(TestValuesWriterInt32Type, PagesSplitWithListAlignedWrites) { + this->SetUpSchema(Repetition::REPEATED); + + constexpr int list_length = 10; + constexpr int num_rows = 100; + constexpr int64_t page_size = sizeof(int32_t) * 100; + + this->GenerateData(num_rows * list_length); + + std::vector repetition_levels(list_length, 1); + repetition_levels[0] = 0; + + ColumnProperties column_properties; + column_properties.set_dictionary_enabled(false); + column_properties.set_encoding(Encoding::PLAIN); + column_properties.set_page_index_enabled(true); + + auto writer = + this->BuildWriter(list_length, column_properties, ParquetVersion::PARQUET_1_0, + ParquetDataPageVersion::V1, false, page_size); + + int64_t pages_written = 0; + int64_t prev_bytes_written = 0; + + for (int row_idx = 0; row_idx < num_rows; ++row_idx) { + writer->WriteBatch(list_length, def_levels_.data(), repetition_levels.data(), + values_ptr_ + row_idx * list_length); + + int64_t bytes_written = writer->total_bytes_written(); + if (bytes_written != prev_bytes_written) { + pages_written++; + prev_bytes_written = bytes_written; + } + // Buffered bytes shouldn't grow larger than the specified page size + ASSERT_LE(writer->estimated_buffered_value_bytes(), page_size); + } + + writer->Close(); + + // pages_written doesn't include the last page written when closing the writer: + ASSERT_EQ(pages_written, 9); + + this->SetupValuesOut(num_rows * list_length); + definition_levels_out_.resize(num_rows * list_length); + repetition_levels_out_.resize(num_rows * list_length); + this->ReadColumnFully(); + + ASSERT_EQ(values_out_, values_); +} + TEST(TestPageWriter, ThrowsOnPagesTooLarge) { NodePtr item = schema::Int32("item"); // optional item NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST)); From 6f677e50ec19c39e27e33787829b1997ba945e13 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 9 Jul 2025 13:16:45 +1200 Subject: [PATCH 2/2] Fix existing test --- cpp/src/parquet/column_writer_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 024801a6072..990125df4e3 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -1536,7 +1536,7 @@ TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundariesWithSmallBatches) { auto row_group_reader = file_reader->RowGroup(0); // Check if pages are changed on record boundaries. - const std::array expect_num_pages_by_col = {5, 201, 397, 201}; + const std::array expect_num_pages_by_col = {5, 201, 397, 400}; const std::array expect_num_rows_1st_page_by_col = {99, 1, 1, 1}; const std::array expect_num_vals_1st_page_by_col = {99, 50, 99, 150}; for (int32_t i = 0; i < num_cols; ++i) {