diff --git a/cpp/src/arrow/array-binary-test.cc b/cpp/src/arrow/array-binary-test.cc index 227f74bd4bb..cb8d6d53064 100644 --- a/cpp/src/arrow/array-binary-test.cc +++ b/cpp/src/arrow/array-binary-test.cc @@ -665,6 +665,10 @@ class TestChunkedBinaryBuilder : public ::testing::Test { builder_.reset(new internal::ChunkedBinaryBuilder(chunksize)); } + void Init(int32_t chunksize, int32_t chunklength) { + builder_.reset(new internal::ChunkedBinaryBuilder(chunksize, chunklength)); + } + protected: std::unique_ptr builder_; }; @@ -740,6 +744,36 @@ TEST_F(TestChunkedBinaryBuilder, LargeElements) { ASSERT_EQ(iterations * bufsize, total_data_size); } +TEST_F(TestChunkedBinaryBuilder, LargeElementCount) { + int32_t max_chunk_length = 100; + Init(100, max_chunk_length); + + auto length = max_chunk_length + 1; + + // ChunkedBinaryBuilder can reserve memory for more than its configured maximum + // (per chunk) element count + ASSERT_OK(builder_->Reserve(length)); + + for (int64_t i = 0; i < 2 * length; ++i) { + // Appending more elements than have been reserved memory simply overflows to the next + // chunk + ASSERT_OK(builder_->Append("")); + } + + ArrayVector chunks; + ASSERT_OK(builder_->Finish(&chunks)); + + // should have two chunks full of empty strings and another with two more empty strings + ASSERT_EQ(chunks.size(), 3); + ASSERT_EQ(chunks[0]->length(), max_chunk_length); + ASSERT_EQ(chunks[1]->length(), max_chunk_length); + ASSERT_EQ(chunks[2]->length(), 2); + for (auto&& boxed_chunk : chunks) { + const auto& chunk = checked_cast(*boxed_chunk); + ASSERT_EQ(chunk.value_offset(0), chunk.value_offset(chunk.length())); + } +} + TEST(TestChunkedStringBuilder, BasicOperation) { const int chunksize = 100; internal::ChunkedStringBuilder builder(chunksize); @@ -758,7 +792,7 @@ TEST(TestChunkedStringBuilder, BasicOperation) { // Type is correct for (auto chunk : chunks) { - ASSERT_TRUE(chunk->type()->Equals(*::arrow::utf8())); + ASSERT_TRUE(chunk->type()->Equals(utf8())); } } diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 78542c6b5be..5cca9db3c5c 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -690,8 +690,7 @@ class ARROW_EXPORT BinaryArray : public FlatArray { /// Protected method for constructors void SetData(const std::shared_ptr& data); - // Constructor that allows sub-classes/builders to propagate there logical type up the - // class hierarchy. + // Constructor to allow sub-classes/builders to substitute their own logical type BinaryArray(const std::shared_ptr& type, int64_t length, const std::shared_ptr& value_offsets, const std::shared_ptr& data, diff --git a/cpp/src/arrow/array/builder_binary.cc b/cpp/src/arrow/array/builder_binary.cc index 88c2e86c09f..ccb79a11a1d 100644 --- a/cpp/src/arrow/array/builder_binary.cc +++ b/cpp/src/arrow/array/builder_binary.cc @@ -281,9 +281,15 @@ util::string_view FixedSizeBinaryBuilder::GetView(int64_t i) const { namespace internal { -ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_size, MemoryPool* pool) - : max_chunk_size_(max_chunk_size), - chunk_data_size_(0), +ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length, + MemoryPool* pool) + : max_chunk_value_length_(max_chunk_value_length), + builder_(new BinaryBuilder(pool)) {} + +ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length, + int32_t max_chunk_length, MemoryPool* pool) + : max_chunk_value_length_(max_chunk_value_length), + max_chunk_length_(max_chunk_length), builder_(new BinaryBuilder(pool)) {} Status ChunkedBinaryBuilder::Finish(ArrayVector* out) { @@ -301,7 +307,11 @@ Status ChunkedBinaryBuilder::NextChunk() { RETURN_NOT_OK(builder_->Finish(&chunk)); chunks_.emplace_back(std::move(chunk)); - chunk_data_size_ = 0; + if (auto capacity = extra_capacity_) { + extra_capacity_ = 0; + return Reserve(capacity); + } + return Status::OK(); } @@ -317,6 +327,22 @@ Status ChunkedStringBuilder::Finish(ArrayVector* out) { return Status::OK(); } +Status ChunkedBinaryBuilder::Reserve(int64_t values) { + if (ARROW_PREDICT_FALSE(extra_capacity_ != 0)) { + extra_capacity_ += values; + return Status::OK(); + } + + auto min_capacity = builder_->length() + values; + auto new_capacity = BufferBuilder::GrowByFactor(builder_->capacity(), min_capacity); + if (ARROW_PREDICT_TRUE(new_capacity <= kListMaximumElements)) { + return builder_->Resize(new_capacity); + } + + extra_capacity_ = new_capacity - kListMaximumElements; + return builder_->Resize(kListMaximumElements); +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h index 91f48dab55b..47d3bae4b89 100644 --- a/cpp/src/arrow/array/builder_binary.h +++ b/cpp/src/arrow/array/builder_binary.h @@ -316,22 +316,37 @@ namespace internal { class ARROW_EXPORT ChunkedBinaryBuilder { public: - ChunkedBinaryBuilder(int32_t max_chunk_size, + ChunkedBinaryBuilder(int32_t max_chunk_value_length, + MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT); + + ChunkedBinaryBuilder(int32_t max_chunk_value_length, int32_t max_chunk_length, MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT); virtual ~ChunkedBinaryBuilder() = default; Status Append(const uint8_t* value, int32_t length) { - if (ARROW_PREDICT_FALSE(length + chunk_data_size_ > max_chunk_size_)) { - // Move onto next chunk, unless the builder length is currently 0, which - // means that max_chunk_size_ is less than the item length - if (builder_->length() > 0) { - ARROW_RETURN_NOT_OK(NextChunk()); + if (ARROW_PREDICT_FALSE(length + builder_->value_data_length() > + max_chunk_value_length_)) { + if (builder_->value_data_length() == 0) { + // The current item is larger than max_chunk_size_; + // this chunk will be oversize and hold *only* this item + ARROW_RETURN_NOT_OK(builder_->Append(value, length)); + return NextChunk(); } - // else fall through + // The current item would cause builder_->value_data_length() to exceed + // max_chunk_size_, so finish this chunk and append the current item to the next + // chunk + ARROW_RETURN_NOT_OK(NextChunk()); + return Append(value, length); + } + + if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) { + // The current item would cause builder_->value_data_length() to exceed + // max_chunk_size_, so finish this chunk and append the current item to the next + // chunk + ARROW_RETURN_NOT_OK(NextChunk()); } - chunk_data_size_ += length; return builder_->Append(value, length); } @@ -341,21 +356,28 @@ class ARROW_EXPORT ChunkedBinaryBuilder { } Status AppendNull() { - if (ARROW_PREDICT_FALSE(builder_->length() == std::numeric_limits::max())) { + if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) { ARROW_RETURN_NOT_OK(NextChunk()); } return builder_->AppendNull(); } - Status Reserve(int64_t values) { return builder_->Reserve(values); } + Status Reserve(int64_t values); virtual Status Finish(ArrayVector* out); protected: Status NextChunk(); - int64_t max_chunk_size_; - int64_t chunk_data_size_; + // maximum total character data size per chunk + int64_t max_chunk_value_length_; + + // maximum elements allowed per chunk + int64_t max_chunk_length_ = kListMaximumElements; + + // when Reserve() would cause builder_ to exceed its max_chunk_length_, + // add to extra_capacity_ instead and wait to reserve until the next chunk + int64_t extra_capacity_ = 0; std::unique_ptr builder_; std::vector> chunks_; diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 2c5c5dfe1c0..792f7607149 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -2658,6 +2658,47 @@ TEST(TestArrowReaderAdHoc, CorruptedSchema) { TryReadDataFile(path, ::arrow::StatusCode::IOError); } +TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) { + // ARROW-3762 + ::arrow::StringBuilder builder; + int64_t length = 1 << 30; + ASSERT_OK(builder.Resize(length)); + ASSERT_OK(builder.ReserveData(length)); + for (int64_t i = 0; i < length; ++i) { + builder.UnsafeAppend("1", 1); + } + std::shared_ptr array; + ASSERT_OK(builder.Finish(&array)); + auto table = Table::Make({std::make_shared("x", array)}); + std::shared_ptr schm; + ASSERT_OK_NO_THROW( + ToParquetSchema(table->schema().get(), *default_writer_properties(), &schm)); + + auto sink = CreateOutputStream(); + + auto schm_node = std::static_pointer_cast( + GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)})); + + auto writer = ParquetFileWriter::Open(sink, schm_node); + FileWriter arrow_writer(default_memory_pool(), std::move(writer), table->schema()); + for (int i : {0, 1}) { + ASSERT_OK_NO_THROW(arrow_writer.WriteTable(*table, table->num_rows())) << i; + } + ASSERT_OK_NO_THROW(arrow_writer.Close()); + + std::shared_ptr tables_buffer; + ASSERT_OK_NO_THROW(sink->Finish(&tables_buffer)); + + // drop to save memory + table.reset(); + array.reset(); + + auto reader = ParquetFileReader::Open(std::make_shared(tables_buffer)); + FileReader arrow_reader(default_memory_pool(), std::move(reader)); + ASSERT_OK_NO_THROW(arrow_reader.ReadTable(&table)); + ASSERT_OK(table->Validate()); +} + TEST(TestArrowReaderAdHoc, HandleDictPageOffsetZero) { // PARQUET-1402: parquet-mr writes files this way which tripped up // some business logic