Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ class SerializedPageWriter : public PageWriter {
}

int64_t WriteDictionaryPage(const DictionaryPage& page) override {
int64_t uncompressed_size = page.size();
int64_t uncompressed_size = page.buffer()->size();
if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException(
"Uncompressed dictionary page size overflows INT32_MAX. Size:",
uncompressed_size);
}
std::shared_ptr<Buffer> compressed_data;
if (has_compressor()) {
auto buffer = std::static_pointer_cast<ResizableBuffer>(
Expand All @@ -288,6 +293,11 @@ class SerializedPageWriter : public PageWriter {
dict_page_header.__set_is_sorted(page.is_sorted());

const uint8_t* output_data_buffer = compressed_data->data();
if (compressed_data->size() > std::numeric_limits<int32_t>::max()) {
throw ParquetException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here and below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

"Compressed dictionary page size overflows INT32_MAX. Size: ",
uncompressed_size);
}
int32_t output_data_len = static_cast<int32_t>(compressed_data->size());

if (data_encryptor_.get()) {
Expand Down Expand Up @@ -371,18 +381,29 @@ class SerializedPageWriter : public PageWriter {
const int64_t uncompressed_size = page.uncompressed_size();
std::shared_ptr<Buffer> compressed_data = page.buffer();
const uint8_t* output_data_buffer = compressed_data->data();
int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
int64_t output_data_len = compressed_data->size();

if (output_data_len > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed data page size overflows INT32_MAX. Size:",
output_data_len);
}

if (data_encryptor_.get()) {
PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
data_encryptor_->CiphertextSizeDelta() + output_data_len, false));
UpdateEncryption(encryption::kDataPage);
output_data_len = data_encryptor_->Encrypt(compressed_data->data(), output_data_len,
output_data_len = data_encryptor_->Encrypt(compressed_data->data(),
static_cast<int32_t>(output_data_len),
encryption_buffer_->mutable_data());
output_data_buffer = encryption_buffer_->data();
}

format::PageHeader page_header;

if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Uncompressed data page size overflows INT32_MAX. Size:",
uncompressed_size);
}
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));

Expand Down Expand Up @@ -421,7 +442,7 @@ class SerializedPageWriter : public PageWriter {
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
if (compressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed page size overflows to INT32_MAX.");
throw ParquetException("Compressed page size overflows INT32_MAX.");
}
if (!page.first_row_index().has_value()) {
throw ParquetException("First row index is not set in data page.");
Expand Down
45 changes: 45 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
// specific language governing permissions and limitations
// under the License.

#include <memory>
#include <utility>
#include <vector>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "arrow/io/buffered.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_builders.h"

#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
#include "parquet/file_reader.h"
Expand Down Expand Up @@ -479,6 +482,9 @@ using TestValuesWriterInt64Type = TestPrimitiveWriter<Int64Type>;
using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
using TestFixedLengthByteArrayValuesWriter = TestPrimitiveWriter<FLBAType>;

using ::testing::HasSubstr;
using ::testing::ThrowsMessage;

TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
this->TestRequiredWithEncoding(Encoding::PLAIN);
}
Expand Down Expand Up @@ -889,6 +895,45 @@ TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
ASSERT_TRUE(this->metadata_is_stats_set());
}

TEST(TestPageWriter, ThrowsOnPagesTooLarge) {
NodePtr item = schema::Int32("item"); // optional item
NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST));
NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); // optional list
std::vector<NodePtr> fields = {bag};
NodePtr root = GroupNode::Make("schema", Repetition::REPEATED, fields);

SchemaDescriptor schema;
schema.Init(root);

auto sink = CreateOutputStream();
auto props = WriterProperties::Builder().build();

auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0));
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink, Compression::UNCOMPRESSED, metadata.get());

uint8_t data;
std::shared_ptr<Buffer> buffer =
std::make_shared<Buffer>(&data, std::numeric_limits<int32_t>::max() + int64_t{1});
DataPageV1 over_compressed_limit(buffer, /*num_values=*/100, Encoding::BIT_PACKED,
Encoding::BIT_PACKED, Encoding::BIT_PACKED,
/*uncompressed_size=*/100);
EXPECT_THAT([&]() { pager->WriteDataPage(over_compressed_limit); },
ThrowsMessage<ParquetException>(HasSubstr("overflows INT32_MAX")));
DictionaryPage dictionary_over_compressed_limit(buffer, /*num_values=*/100,
Encoding::PLAIN);
EXPECT_THAT([&]() { pager->WriteDictionaryPage(dictionary_over_compressed_limit); },
ThrowsMessage<ParquetException>(HasSubstr("overflows INT32_MAX")));

buffer = std::make_shared<Buffer>(&data, 1);
DataPageV1 over_uncompressed_limit(
buffer, /*num_values=*/100, Encoding::BIT_PACKED, Encoding::BIT_PACKED,
Encoding::BIT_PACKED,
/*uncompressed_size=*/std::numeric_limits<int32_t>::max() + int64_t{1});
EXPECT_THAT([&]() { pager->WriteDataPage(over_compressed_limit); },
ThrowsMessage<ParquetException>(HasSubstr("overflows INT32_MAX")));
}

TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) {
// In ARROW-3930 we discovered a bug when writing from Arrow when we had data
// that looks like this:
Expand Down