From cb000ab2e1097590102e227f03d867bfab2ff78f Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 24 Jun 2019 17:52:23 -0400 Subject: [PATCH 1/5] add (failing) test which repros issue in c++ --- .../parquet/arrow/arrow-reader-writer-test.cc | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 2c5c5dfe1c0..60f7772a477 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -2658,6 +2658,47 @@ TEST(TestArrowReaderAdHoc, CorruptedSchema) { TryReadDataFile(path, ::arrow::StatusCode::IOError); } +TEST(TestArrowReaderAdHoc, LargeStringColumn) { + // ARROW-3762 + ::arrow::StringBuilder builder; + int64_t length = 1 << 30; + ASSERT_OK(builder.Resize(length)); + ASSERT_OK(builder.ReserveData(length)); + for (int64_t i = 0; i < length; ++i) { + builder.UnsafeAppend("1", 1); + } + std::shared_ptr array; + ASSERT_OK(builder.Finish(&array)); + auto table = Table::Make({std::make_shared("x", array)}); + std::shared_ptr schm; + ASSERT_OK_NO_THROW( + ToParquetSchema(table->schema().get(), *default_writer_properties(), &schm)); + + auto sink = CreateOutputStream(); + + auto schm_node = std::static_pointer_cast( + GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)})); + + auto writer = ParquetFileWriter::Open(sink, schm_node); + FileWriter arrow_writer(default_memory_pool(), std::move(writer), table->schema()); + for (int i : {0, 1}) { + ASSERT_OK_NO_THROW(arrow_writer.WriteTable(*table, table->num_rows())) << i; + } + ASSERT_OK_NO_THROW(arrow_writer.Close()); + + std::shared_ptr tables_buffer; + ASSERT_OK_NO_THROW(sink->Finish(&tables_buffer)); + + // drop to save memory + table.reset(); + array.reset(); + + auto reader = ParquetFileReader::Open(std::make_shared(tables_buffer)); + FileReader arrow_reader(default_memory_pool(), std::move(reader)); + ASSERT_OK_NO_THROW(arrow_reader.ReadTable(&table)); + ASSERT_OK(table->Validate()); +} + TEST(TestArrowReaderAdHoc, HandleDictPageOffsetZero) { // PARQUET-1402: parquet-mr writes files this way which tripped up // some business logic From d626c073cd397e485dbcb134c921bfa85199f405 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 24 Jun 2019 22:30:41 -0400 Subject: [PATCH 2/5] attempted fix, killed: OOM --- cpp/src/arrow/array/builder_binary.cc | 27 +++++++++++++++++++++++---- cpp/src/arrow/array/builder_binary.h | 5 +++-- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/array/builder_binary.cc b/cpp/src/arrow/array/builder_binary.cc index 88c2e86c09f..78efc4a1ed9 100644 --- a/cpp/src/arrow/array/builder_binary.cc +++ b/cpp/src/arrow/array/builder_binary.cc @@ -282,9 +282,7 @@ util::string_view FixedSizeBinaryBuilder::GetView(int64_t i) const { namespace internal { ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_size, MemoryPool* pool) - : max_chunk_size_(max_chunk_size), - chunk_data_size_(0), - builder_(new BinaryBuilder(pool)) {} + : max_chunk_size_(max_chunk_size), builder_(new BinaryBuilder(pool)) {} Status ChunkedBinaryBuilder::Finish(ArrayVector* out) { if (builder_->length() > 0 || chunks_.size() == 0) { @@ -300,8 +298,13 @@ Status ChunkedBinaryBuilder::NextChunk() { std::shared_ptr chunk; RETURN_NOT_OK(builder_->Finish(&chunk)); chunks_.emplace_back(std::move(chunk)); - chunk_data_size_ = 0; + + if (auto capacity = extra_capacity_) { + extra_capacity_ = 0; + return Reserve(capacity); + } + return Status::OK(); } @@ -317,6 +320,22 @@ Status ChunkedStringBuilder::Finish(ArrayVector* out) { return Status::OK(); } +Status ChunkedBinaryBuilder::Reserve(int64_t values) { + if (ARROW_PREDICT_FALSE(extra_capacity_ != 0)) { + extra_capacity_ += values; + return Status::OK(); + } + + auto min_capacity = builder_->length() + values; + auto new_capacity = BufferBuilder::GrowByFactor(builder_->capacity(), min_capacity); + if (ARROW_PREDICT_TRUE(new_capacity <= kListMaximumElements)) { + return builder_->Resize(new_capacity); + } + + extra_capacity_ = new_capacity - kListMaximumElements; + return builder_->Resize(kListMaximumElements); +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h index 91f48dab55b..ca1cfd1e003 100644 --- a/cpp/src/arrow/array/builder_binary.h +++ b/cpp/src/arrow/array/builder_binary.h @@ -347,7 +347,7 @@ class ARROW_EXPORT ChunkedBinaryBuilder { return builder_->AppendNull(); } - Status Reserve(int64_t values) { return builder_->Reserve(values); } + Status Reserve(int64_t values); virtual Status Finish(ArrayVector* out); @@ -355,7 +355,8 @@ class ARROW_EXPORT ChunkedBinaryBuilder { Status NextChunk(); int64_t max_chunk_size_; - int64_t chunk_data_size_; + int64_t chunk_data_size_ = 0; + int64_t extra_capacity_ = 0; std::unique_ptr builder_; std::vector> chunks_; From 3eb57198b14e76aa8712efff3f16430bc1ee8611 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 25 Jun 2019 15:23:43 -0400 Subject: [PATCH 3/5] manage ChunkedBinaryBuilder's capacity --- cpp/src/arrow/array/builder_binary.h | 20 ++++++++++++++----- .../parquet/arrow/arrow-reader-writer-test.cc | 2 +- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h index ca1cfd1e003..1e8b86d0bf5 100644 --- a/cpp/src/arrow/array/builder_binary.h +++ b/cpp/src/arrow/array/builder_binary.h @@ -323,12 +323,22 @@ class ARROW_EXPORT ChunkedBinaryBuilder { Status Append(const uint8_t* value, int32_t length) { if (ARROW_PREDICT_FALSE(length + chunk_data_size_ > max_chunk_size_)) { - // Move onto next chunk, unless the builder length is currently 0, which - // means that max_chunk_size_ is less than the item length - if (builder_->length() > 0) { - ARROW_RETURN_NOT_OK(NextChunk()); + if (chunk_data_size_ == 0) { + // The current item is larger than max_chunk_size_; + // this chunk will be oversize and hold *only* this item + ARROW_RETURN_NOT_OK(builder_->Append(value, length)); + return NextChunk(); } - // else fall through + // The current item would cause chunk_data_size_ to exceed max_chunk_size_ + // finish this chunk and append the current item to the next chunk + ARROW_RETURN_NOT_OK(NextChunk()); + return Append(value, length); + } + + if (ARROW_PREDICT_FALSE(builder_->length() == kListMaximumElements)) { + // The current item would cause builder_ to overflow kListMaximumElements + // finish this chunk and append the current item to the next chunk + ARROW_RETURN_NOT_OK(NextChunk()); } chunk_data_size_ += length; diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 60f7772a477..792f7607149 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -2658,7 +2658,7 @@ TEST(TestArrowReaderAdHoc, CorruptedSchema) { TryReadDataFile(path, ::arrow::StatusCode::IOError); } -TEST(TestArrowReaderAdHoc, LargeStringColumn) { +TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) { // ARROW-3762 ::arrow::StringBuilder builder; int64_t length = 1 << 30; From 5331a2cb7091029bfc3986ff983985cfeb4d701f Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 26 Jun 2019 15:11:28 -0400 Subject: [PATCH 4/5] add test for ChunkedBinaryBuilder's new reserve behavior --- cpp/src/arrow/array-binary-test.cc | 28 +++++++++++++++++++++++++++- cpp/src/arrow/array.h | 3 +-- cpp/src/arrow/array/builder_binary.h | 4 +++- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/array-binary-test.cc b/cpp/src/arrow/array-binary-test.cc index 227f74bd4bb..5b8712c9fbd 100644 --- a/cpp/src/arrow/array-binary-test.cc +++ b/cpp/src/arrow/array-binary-test.cc @@ -740,6 +740,32 @@ TEST_F(TestChunkedBinaryBuilder, LargeElements) { ASSERT_EQ(iterations * bufsize, total_data_size); } +TEST_F(TestChunkedBinaryBuilder, MassiveReserve) { + Init(100); + + auto length = kListMaximumElements + 1; + + // ChunkedBinaryBuilder can reserve memory for more than kListMaximumElements + ASSERT_OK(builder_->Reserve(length)); + + for (int64_t i = 0; i < length; ++i) { + // ChunkedBinaryBuilder can append more than kListMaximumElements + ASSERT_OK(builder_->Append("")); + } + + ArrayVector chunks; + ASSERT_OK(builder_->Finish(&chunks)); + + // should have one chunk full of empty strings and another with one more empty string + ASSERT_EQ(chunks.size(), 2); + ASSERT_EQ(chunks[0]->length(), length - 1); + ASSERT_EQ(chunks[1]->length(), 1); + for (auto&& boxed_chunk : chunks) { + const auto& chunk = checked_cast(*boxed_chunk); + ASSERT_EQ(chunk.value_offset(0), chunk.value_offset(chunk.length())); + } +} + TEST(TestChunkedStringBuilder, BasicOperation) { const int chunksize = 100; internal::ChunkedStringBuilder builder(chunksize); @@ -758,7 +784,7 @@ TEST(TestChunkedStringBuilder, BasicOperation) { // Type is correct for (auto chunk : chunks) { - ASSERT_TRUE(chunk->type()->Equals(*::arrow::utf8())); + ASSERT_TRUE(chunk->type()->Equals(utf8())); } } diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 78542c6b5be..5cca9db3c5c 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -690,8 +690,7 @@ class ARROW_EXPORT BinaryArray : public FlatArray { /// Protected method for constructors void SetData(const std::shared_ptr& data); - // Constructor that allows sub-classes/builders to propagate there logical type up the - // class hierarchy. + // Constructor to allow sub-classes/builders to substitute their own logical type BinaryArray(const std::shared_ptr& type, int64_t length, const std::shared_ptr& value_offsets, const std::shared_ptr& data, diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h index 1e8b86d0bf5..54fbb28f655 100644 --- a/cpp/src/arrow/array/builder_binary.h +++ b/cpp/src/arrow/array/builder_binary.h @@ -351,7 +351,7 @@ class ARROW_EXPORT ChunkedBinaryBuilder { } Status AppendNull() { - if (ARROW_PREDICT_FALSE(builder_->length() == std::numeric_limits::max())) { + if (ARROW_PREDICT_FALSE(builder_->length() == kListMaximumElements)) { ARROW_RETURN_NOT_OK(NextChunk()); } return builder_->AppendNull(); @@ -366,6 +366,8 @@ class ARROW_EXPORT ChunkedBinaryBuilder { int64_t max_chunk_size_; int64_t chunk_data_size_ = 0; + // when Reserve() would cause builder_ to exceed its maximum capacity, + // add to extra_capacity_ instead and wait to reserve until the next chunk int64_t extra_capacity_ = 0; std::unique_ptr builder_; From 95a4e30ffe1680c85fb5e61fe6850d7b7999ab0c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 26 Jun 2019 16:18:48 -0400 Subject: [PATCH 5/5] add configurable maximum element count to ChunkedBinaryBuilder --- cpp/src/arrow/array-binary-test.cc | 28 +++++++++++++-------- cpp/src/arrow/array/builder_binary.cc | 13 +++++++--- cpp/src/arrow/array/builder_binary.h | 35 +++++++++++++++++---------- 3 files changed, 50 insertions(+), 26 deletions(-) diff --git a/cpp/src/arrow/array-binary-test.cc b/cpp/src/arrow/array-binary-test.cc index 5b8712c9fbd..cb8d6d53064 100644 --- a/cpp/src/arrow/array-binary-test.cc +++ b/cpp/src/arrow/array-binary-test.cc @@ -665,6 +665,10 @@ class TestChunkedBinaryBuilder : public ::testing::Test { builder_.reset(new internal::ChunkedBinaryBuilder(chunksize)); } + void Init(int32_t chunksize, int32_t chunklength) { + builder_.reset(new internal::ChunkedBinaryBuilder(chunksize, chunklength)); + } + protected: std::unique_ptr builder_; }; @@ -740,26 +744,30 @@ TEST_F(TestChunkedBinaryBuilder, LargeElements) { ASSERT_EQ(iterations * bufsize, total_data_size); } -TEST_F(TestChunkedBinaryBuilder, MassiveReserve) { - Init(100); +TEST_F(TestChunkedBinaryBuilder, LargeElementCount) { + int32_t max_chunk_length = 100; + Init(100, max_chunk_length); - auto length = kListMaximumElements + 1; + auto length = max_chunk_length + 1; - // ChunkedBinaryBuilder can reserve memory for more than kListMaximumElements + // ChunkedBinaryBuilder can reserve memory for more than its configured maximum + // (per chunk) element count ASSERT_OK(builder_->Reserve(length)); - for (int64_t i = 0; i < length; ++i) { - // ChunkedBinaryBuilder can append more than kListMaximumElements + for (int64_t i = 0; i < 2 * length; ++i) { + // Appending more elements than have been reserved memory simply overflows to the next + // chunk ASSERT_OK(builder_->Append("")); } ArrayVector chunks; ASSERT_OK(builder_->Finish(&chunks)); - // should have one chunk full of empty strings and another with one more empty string - ASSERT_EQ(chunks.size(), 2); - ASSERT_EQ(chunks[0]->length(), length - 1); - ASSERT_EQ(chunks[1]->length(), 1); + // should have two chunks full of empty strings and another with two more empty strings + ASSERT_EQ(chunks.size(), 3); + ASSERT_EQ(chunks[0]->length(), max_chunk_length); + ASSERT_EQ(chunks[1]->length(), max_chunk_length); + ASSERT_EQ(chunks[2]->length(), 2); for (auto&& boxed_chunk : chunks) { const auto& chunk = checked_cast(*boxed_chunk); ASSERT_EQ(chunk.value_offset(0), chunk.value_offset(chunk.length())); diff --git a/cpp/src/arrow/array/builder_binary.cc b/cpp/src/arrow/array/builder_binary.cc index 78efc4a1ed9..ccb79a11a1d 100644 --- a/cpp/src/arrow/array/builder_binary.cc +++ b/cpp/src/arrow/array/builder_binary.cc @@ -281,8 +281,16 @@ util::string_view FixedSizeBinaryBuilder::GetView(int64_t i) const { namespace internal { -ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_size, MemoryPool* pool) - : max_chunk_size_(max_chunk_size), builder_(new BinaryBuilder(pool)) {} +ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length, + MemoryPool* pool) + : max_chunk_value_length_(max_chunk_value_length), + builder_(new BinaryBuilder(pool)) {} + +ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length, + int32_t max_chunk_length, MemoryPool* pool) + : max_chunk_value_length_(max_chunk_value_length), + max_chunk_length_(max_chunk_length), + builder_(new BinaryBuilder(pool)) {} Status ChunkedBinaryBuilder::Finish(ArrayVector* out) { if (builder_->length() > 0 || chunks_.size() == 0) { @@ -298,7 +306,6 @@ Status ChunkedBinaryBuilder::NextChunk() { std::shared_ptr chunk; RETURN_NOT_OK(builder_->Finish(&chunk)); chunks_.emplace_back(std::move(chunk)); - chunk_data_size_ = 0; if (auto capacity = extra_capacity_) { extra_capacity_ = 0; diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h index 54fbb28f655..47d3bae4b89 100644 --- a/cpp/src/arrow/array/builder_binary.h +++ b/cpp/src/arrow/array/builder_binary.h @@ -316,32 +316,37 @@ namespace internal { class ARROW_EXPORT ChunkedBinaryBuilder { public: - ChunkedBinaryBuilder(int32_t max_chunk_size, + ChunkedBinaryBuilder(int32_t max_chunk_value_length, + MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT); + + ChunkedBinaryBuilder(int32_t max_chunk_value_length, int32_t max_chunk_length, MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT); virtual ~ChunkedBinaryBuilder() = default; Status Append(const uint8_t* value, int32_t length) { - if (ARROW_PREDICT_FALSE(length + chunk_data_size_ > max_chunk_size_)) { - if (chunk_data_size_ == 0) { + if (ARROW_PREDICT_FALSE(length + builder_->value_data_length() > + max_chunk_value_length_)) { + if (builder_->value_data_length() == 0) { // The current item is larger than max_chunk_size_; // this chunk will be oversize and hold *only* this item ARROW_RETURN_NOT_OK(builder_->Append(value, length)); return NextChunk(); } - // The current item would cause chunk_data_size_ to exceed max_chunk_size_ - // finish this chunk and append the current item to the next chunk + // The current item would cause builder_->value_data_length() to exceed + // max_chunk_size_, so finish this chunk and append the current item to the next + // chunk ARROW_RETURN_NOT_OK(NextChunk()); return Append(value, length); } - if (ARROW_PREDICT_FALSE(builder_->length() == kListMaximumElements)) { - // The current item would cause builder_ to overflow kListMaximumElements - // finish this chunk and append the current item to the next chunk + if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) { + // The current item would cause builder_->value_data_length() to exceed + // max_chunk_size_, so finish this chunk and append the current item to the next + // chunk ARROW_RETURN_NOT_OK(NextChunk()); } - chunk_data_size_ += length; return builder_->Append(value, length); } @@ -351,7 +356,7 @@ class ARROW_EXPORT ChunkedBinaryBuilder { } Status AppendNull() { - if (ARROW_PREDICT_FALSE(builder_->length() == kListMaximumElements)) { + if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) { ARROW_RETURN_NOT_OK(NextChunk()); } return builder_->AppendNull(); @@ -364,9 +369,13 @@ class ARROW_EXPORT ChunkedBinaryBuilder { protected: Status NextChunk(); - int64_t max_chunk_size_; - int64_t chunk_data_size_ = 0; - // when Reserve() would cause builder_ to exceed its maximum capacity, + // maximum total character data size per chunk + int64_t max_chunk_value_length_; + + // maximum elements allowed per chunk + int64_t max_chunk_length_ = kListMaximumElements; + + // when Reserve() would cause builder_ to exceed its max_chunk_length_, // add to extra_capacity_ instead and wait to reserve until the next chunk int64_t extra_capacity_ = 0;