From fc893827e6bcf07ea68bedc53f649f470202e306 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 9 Jan 2024 07:56:50 +0000 Subject: [PATCH 1/8] GH-39527: [C++][Parquet] Validate page sizes before truncated to int32 Be defensive instead of writing invalid data. --- cpp/src/parquet/column_writer.cc | 17 ++++++++++-- cpp/src/parquet/column_writer_test.cc | 40 +++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 12b2837fbfd..04f2f9b9601 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -271,7 +271,10 @@ class SerializedPageWriter : public PageWriter { } int64_t WriteDictionaryPage(const DictionaryPage& page) override { - int64_t uncompressed_size = page.size(); + int64_t uncompressed_size = page.buffer()->size(); + if (uncompressed_size > std::numeric_limits::max()) { + throw ParquetException("Uncompressed page size overflows to INT32_MAX."); + } std::shared_ptr compressed_data; if (has_compressor()) { auto buffer = std::static_pointer_cast( @@ -288,6 +291,9 @@ class SerializedPageWriter : public PageWriter { dict_page_header.__set_is_sorted(page.is_sorted()); const uint8_t* output_data_buffer = compressed_data->data(); + if (compressed_data->size() > std::numeric_limits::max()) { + throw ParquetException("Compressed page size overflows to INT32_MAX."); + } int32_t output_data_len = static_cast(compressed_data->size()); if (data_encryptor_.get()) { @@ -371,7 +377,7 @@ class SerializedPageWriter : public PageWriter { const int64_t uncompressed_size = page.uncompressed_size(); std::shared_ptr compressed_data = page.buffer(); const uint8_t* output_data_buffer = compressed_data->data(); - int32_t output_data_len = static_cast(compressed_data->size()); + int64_t output_data_len = compressed_data->size(); if (data_encryptor_.get()) { PARQUET_THROW_NOT_OK(encryption_buffer_->Resize( @@ -383,7 +389,14 @@ class SerializedPageWriter : public PageWriter { } format::PageHeader page_header; + + if (uncompressed_size > std::numeric_limits::max()) { + throw ParquetException("Uncompressed page size overflows to INT32_MAX."); + } page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); + if (output_data_len > std::numeric_limits::max()) { + throw ParquetException("Compressed page size overflows to INT32_MAX."); + } page_header.__set_compressed_page_size(static_cast(output_data_len)); if (page_checksum_verification_) { diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 59fc848d7fd..5d8eacf5880 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include @@ -36,6 +37,7 @@ #include "parquet/test_util.h" #include "parquet/thrift_internal.h" #include "parquet/types.h" +#include "third_party/parquet_cpp/src2/parquet/column_page.h" namespace bit_util = arrow::bit_util; @@ -889,6 +891,44 @@ TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) { ASSERT_TRUE(this->metadata_is_stats_set()); } +TEST(TestPageWriter, ThrowsOnPagesToLarge) { + NodePtr item = schema::Int32("item"); // optional item + NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST)); + NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); // optional list + std::vector fields = {bag}; + NodePtr root = GroupNode::Make("schema", Repetition::REPEATED, fields); + + SchemaDescriptor schema; + schema.Init(root); + + auto sink = CreateOutputStream(); + auto props = WriterProperties::Builder().build(); + + auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); + std::unique_ptr pager = + PageWriter::Open(sink, Compression::UNCOMPRESSED, + Codec::UseDefaultCompressionLevel(), metadata.get()); + + uint8_t data; + std::shared_ptr buffer = + std::make_shared(&data, std::numeric_limits::max() + int64_t{1}); + DataPageV1 over_compressed_limit(buffer, /*num_values=*/100, Encoding::BIT_PACKED, + Encoding::BIT_PACKED, Encoding::BIT_PACKED, + /*uncompressed_size=*/100); + EXPECT_THROW(pager->WriteDataPage(over_compressed_limit), ParquetException); + DictionaryPage dictionary_over_compressed_limit(buffer, /*num_values=*/100, + Encoding::PLAIN); + EXPECT_THROW(pager->WriteDictionaryPage(dictionary_over_compressed_limit), + ParquetException); + + buffer = std::make_shared(&data, 1); + DataPageV1 over_uncompressed_limit( + buffer, /*num_values=*/100, Encoding::BIT_PACKED, Encoding::BIT_PACKED, + Encoding::BIT_PACKED, + /*uncompressed_size=*/std::numeric_limits::max() + int64_t{1}); + EXPECT_THROW(pager->WriteDataPage(over_uncompressed_limit), ParquetException); +} + TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { // In ARROW-3930 we discovered a bug when writing from Arrow when we had data // that looks like this: From 69b5cbbb869d051699839e73658683ce5a860806 Mon Sep 17 00:00:00 2001 From: emkornfield Date: Tue, 9 Jan 2024 00:09:21 -0800 Subject: [PATCH 2/8] fix include --- cpp/src/parquet/column_writer_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 5d8eacf5880..a1245d98a60 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -26,6 +26,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_builders.h" +#include "parquet/column_page.h" #include "parquet/column_reader.h" #include "parquet/column_writer.h" #include "parquet/file_reader.h" @@ -37,7 +38,6 @@ #include "parquet/test_util.h" #include "parquet/thrift_internal.h" #include "parquet/types.h" -#include "third_party/parquet_cpp/src2/parquet/column_page.h" namespace bit_util = arrow::bit_util; From 2f9c906c2f3b5749a6342719a6132f55c133c2d3 Mon Sep 17 00:00:00 2001 From: emkornfield Date: Tue, 9 Jan 2024 12:54:01 -0800 Subject: [PATCH 3/8] Update cpp/src/parquet/column_writer_test.cc Co-authored-by: Gang Wu --- cpp/src/parquet/column_writer_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index a1245d98a60..e7400310ca9 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -891,7 +891,7 @@ TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) { ASSERT_TRUE(this->metadata_is_stats_set()); } -TEST(TestPageWriter, ThrowsOnPagesToLarge) { +TEST(TestPageWriter, ThrowsOnPagesTooLarge) { NodePtr item = schema::Int32("item"); // optional item NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST)); NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); // optional list From e7fb451f183e52a4e96bdaa1b24dccc87e4dd41d Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 9 Jan 2024 21:10:09 +0000 Subject: [PATCH 4/8] move check earlier and add missing cast --- cpp/src/parquet/column_writer.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 04f2f9b9601..3ea43633693 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -379,11 +379,16 @@ class SerializedPageWriter : public PageWriter { const uint8_t* output_data_buffer = compressed_data->data(); int64_t output_data_len = compressed_data->size(); + if (output_data_len > std::numeric_limits::max()) { + throw ParquetException("Compressed page size overflows to INT32_MAX."); + } + if (data_encryptor_.get()) { PARQUET_THROW_NOT_OK(encryption_buffer_->Resize( data_encryptor_->CiphertextSizeDelta() + output_data_len, false)); UpdateEncryption(encryption::kDataPage); - output_data_len = data_encryptor_->Encrypt(compressed_data->data(), output_data_len, + output_data_len = data_encryptor_->Encrypt(compressed_data->data(), + static_cast(output_data_len), encryption_buffer_->mutable_data()); output_data_buffer = encryption_buffer_->data(); } @@ -394,9 +399,6 @@ class SerializedPageWriter : public PageWriter { throw ParquetException("Uncompressed page size overflows to INT32_MAX."); } page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); - if (output_data_len > std::numeric_limits::max()) { - throw ParquetException("Compressed page size overflows to INT32_MAX."); - } page_header.__set_compressed_page_size(static_cast(output_data_len)); if (page_checksum_verification_) { From df8ca6ce6c10599eb7eb874969056b6b68f8010a Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 22 Jan 2024 21:40:42 +0000 Subject: [PATCH 5/8] address feedback --- cpp/src/parquet/column_writer.cc | 14 ++++++++++---- cpp/src/parquet/column_writer_test.cc | 13 +++++++++---- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 3ea43633693..e81827dd6a1 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -273,7 +273,9 @@ class SerializedPageWriter : public PageWriter { int64_t WriteDictionaryPage(const DictionaryPage& page) override { int64_t uncompressed_size = page.buffer()->size(); if (uncompressed_size > std::numeric_limits::max()) { - throw ParquetException("Uncompressed page size overflows to INT32_MAX."); + throw ParquetException( + "Uncompressed dictionary page size overflows to INT32_MAX. Size:", + uncompressed_size); } std::shared_ptr compressed_data; if (has_compressor()) { @@ -292,7 +294,9 @@ class SerializedPageWriter : public PageWriter { const uint8_t* output_data_buffer = compressed_data->data(); if (compressed_data->size() > std::numeric_limits::max()) { - throw ParquetException("Compressed page size overflows to INT32_MAX."); + throw ParquetException( + "Compressed dictionary page size overflows to INT32_MAX. Size: ", + uncompressed_size); } int32_t output_data_len = static_cast(compressed_data->size()); @@ -380,7 +384,8 @@ class SerializedPageWriter : public PageWriter { int64_t output_data_len = compressed_data->size(); if (output_data_len > std::numeric_limits::max()) { - throw ParquetException("Compressed page size overflows to INT32_MAX."); + throw ParquetException("Compressed data page size overflows to INT32_MAX. Size:", + output_data_len); } if (data_encryptor_.get()) { @@ -396,7 +401,8 @@ class SerializedPageWriter : public PageWriter { format::PageHeader page_header; if (uncompressed_size > std::numeric_limits::max()) { - throw ParquetException("Uncompressed page size overflows to INT32_MAX."); + throw ParquetException("Uncompressed data page size overflows to INT32_MAX. Size:", + uncompressed_size); } page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); page_header.__set_compressed_page_size(static_cast(output_data_len)); diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index e7400310ca9..b5304f9768b 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include "arrow/io/buffered.h" @@ -481,6 +482,9 @@ using TestValuesWriterInt64Type = TestPrimitiveWriter; using TestByteArrayValuesWriter = TestPrimitiveWriter; using TestFixedLengthByteArrayValuesWriter = TestPrimitiveWriter; +using ::testing::HasSubstr; +using ::testing::ThrowsMessage; + TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { this->TestRequiredWithEncoding(Encoding::PLAIN); } @@ -906,8 +910,7 @@ TEST(TestPageWriter, ThrowsOnPagesTooLarge) { auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); std::unique_ptr pager = - PageWriter::Open(sink, Compression::UNCOMPRESSED, - Codec::UseDefaultCompressionLevel(), metadata.get()); + PageWriter::Open(sink, Compression::UNCOMPRESSED, metadata.get()); uint8_t data; std::shared_ptr buffer = @@ -915,7 +918,8 @@ TEST(TestPageWriter, ThrowsOnPagesTooLarge) { DataPageV1 over_compressed_limit(buffer, /*num_values=*/100, Encoding::BIT_PACKED, Encoding::BIT_PACKED, Encoding::BIT_PACKED, /*uncompressed_size=*/100); - EXPECT_THROW(pager->WriteDataPage(over_compressed_limit), ParquetException); + EXPECT_THAT([&]() { pager->WriteDataPage(over_compressed_limit); }, + ThrowsMessage(HasSubstr("overflows to INT32_MAX"))); DictionaryPage dictionary_over_compressed_limit(buffer, /*num_values=*/100, Encoding::PLAIN); EXPECT_THROW(pager->WriteDictionaryPage(dictionary_over_compressed_limit), @@ -926,7 +930,8 @@ TEST(TestPageWriter, ThrowsOnPagesTooLarge) { buffer, /*num_values=*/100, Encoding::BIT_PACKED, Encoding::BIT_PACKED, Encoding::BIT_PACKED, /*uncompressed_size=*/std::numeric_limits::max() + int64_t{1}); - EXPECT_THROW(pager->WriteDataPage(over_uncompressed_limit), ParquetException); + EXPECT_THAT([&]() { pager->WriteDataPage(over_compressed_limit); }, + ThrowsMessage(HasSubstr("overflows to INT32_MAX"))); } TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { From 7d28e6a8357aac5d24bff1625fd1c31b1b882dfb Mon Sep 17 00:00:00 2001 From: emkornfield Date: Wed, 24 Jan 2024 21:51:01 -0800 Subject: [PATCH 6/8] Update cpp/src/parquet/column_writer.cc Co-authored-by: Antoine Pitrou --- cpp/src/parquet/column_writer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index e81827dd6a1..19ae5afb68b 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -274,7 +274,7 @@ class SerializedPageWriter : public PageWriter { int64_t uncompressed_size = page.buffer()->size(); if (uncompressed_size > std::numeric_limits::max()) { throw ParquetException( - "Uncompressed dictionary page size overflows to INT32_MAX. Size:", + "Uncompressed dictionary page size overflows INT32_MAX. Size:", uncompressed_size); } std::shared_ptr compressed_data; From 3c57779bd269455d3698cda3e75a1e916ed517cd Mon Sep 17 00:00:00 2001 From: emkornfield Date: Wed, 24 Jan 2024 21:52:50 -0800 Subject: [PATCH 7/8] update error message --- cpp/src/parquet/column_writer.cc | 8 ++++---- cpp/src/parquet/column_writer_test.cc | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 19ae5afb68b..23366b2daaf 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -295,7 +295,7 @@ class SerializedPageWriter : public PageWriter { const uint8_t* output_data_buffer = compressed_data->data(); if (compressed_data->size() > std::numeric_limits::max()) { throw ParquetException( - "Compressed dictionary page size overflows to INT32_MAX. Size: ", + "Compressed dictionary page size overflows INT32_MAX. Size: ", uncompressed_size); } int32_t output_data_len = static_cast(compressed_data->size()); @@ -384,7 +384,7 @@ class SerializedPageWriter : public PageWriter { int64_t output_data_len = compressed_data->size(); if (output_data_len > std::numeric_limits::max()) { - throw ParquetException("Compressed data page size overflows to INT32_MAX. Size:", + throw ParquetException("Compressed data page size overflows INT32_MAX. Size:", output_data_len); } @@ -401,7 +401,7 @@ class SerializedPageWriter : public PageWriter { format::PageHeader page_header; if (uncompressed_size > std::numeric_limits::max()) { - throw ParquetException("Uncompressed data page size overflows to INT32_MAX. Size:", + throw ParquetException("Uncompressed data page size overflows INT32_MAX. Size:", uncompressed_size); } page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); @@ -442,7 +442,7 @@ class SerializedPageWriter : public PageWriter { if (offset_index_builder_ != nullptr) { const int64_t compressed_size = output_data_len + header_size; if (compressed_size > std::numeric_limits::max()) { - throw ParquetException("Compressed page size overflows to INT32_MAX."); + throw ParquetException("Compressed page size overflows INT32_MAX."); } if (!page.first_row_index().has_value()) { throw ParquetException("First row index is not set in data page."); diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index b5304f9768b..3b9f3ac6fe6 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -919,7 +919,7 @@ TEST(TestPageWriter, ThrowsOnPagesTooLarge) { Encoding::BIT_PACKED, Encoding::BIT_PACKED, /*uncompressed_size=*/100); EXPECT_THAT([&]() { pager->WriteDataPage(over_compressed_limit); }, - ThrowsMessage(HasSubstr("overflows to INT32_MAX"))); + ThrowsMessage(HasSubstr("overflows INT32_MAX"))); DictionaryPage dictionary_over_compressed_limit(buffer, /*num_values=*/100, Encoding::PLAIN); EXPECT_THROW(pager->WriteDictionaryPage(dictionary_over_compressed_limit), @@ -931,7 +931,7 @@ TEST(TestPageWriter, ThrowsOnPagesTooLarge) { Encoding::BIT_PACKED, /*uncompressed_size=*/std::numeric_limits::max() + int64_t{1}); EXPECT_THAT([&]() { pager->WriteDataPage(over_compressed_limit); }, - ThrowsMessage(HasSubstr("overflows to INT32_MAX"))); + ThrowsMessage(HasSubstr("overflows INT32_MAX"))); } TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { From 96f704737238ac64da839172462594d07fad4405 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 27 Jan 2024 12:50:48 +0800 Subject: [PATCH 8/8] Update cpp/src/parquet/column_writer_test.cc --- cpp/src/parquet/column_writer_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 3b9f3ac6fe6..97421629d2c 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -922,8 +922,8 @@ TEST(TestPageWriter, ThrowsOnPagesTooLarge) { ThrowsMessage(HasSubstr("overflows INT32_MAX"))); DictionaryPage dictionary_over_compressed_limit(buffer, /*num_values=*/100, Encoding::PLAIN); - EXPECT_THROW(pager->WriteDictionaryPage(dictionary_over_compressed_limit), - ParquetException); + EXPECT_THAT([&]() { pager->WriteDictionaryPage(dictionary_over_compressed_limit); }, + ThrowsMessage(HasSubstr("overflows INT32_MAX"))); buffer = std::make_shared(&data, 1); DataPageV1 over_uncompressed_limit(