Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion cpp/src/arrow/array-binary-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,10 @@ class TestChunkedBinaryBuilder : public ::testing::Test {
builder_.reset(new internal::ChunkedBinaryBuilder(chunksize));
}

void Init(int32_t chunksize, int32_t chunklength) {
builder_.reset(new internal::ChunkedBinaryBuilder(chunksize, chunklength));
}

protected:
std::unique_ptr<internal::ChunkedBinaryBuilder> builder_;
};
Expand Down Expand Up @@ -740,6 +744,36 @@ TEST_F(TestChunkedBinaryBuilder, LargeElements) {
ASSERT_EQ(iterations * bufsize, total_data_size);
}

TEST_F(TestChunkedBinaryBuilder, LargeElementCount) {
int32_t max_chunk_length = 100;
Init(100, max_chunk_length);

auto length = max_chunk_length + 1;

// ChunkedBinaryBuilder can reserve memory for more than its configured maximum
// (per chunk) element count
ASSERT_OK(builder_->Reserve(length));

for (int64_t i = 0; i < 2 * length; ++i) {
// Appending more elements than have been reserved memory simply overflows to the next
// chunk
ASSERT_OK(builder_->Append(""));
}

ArrayVector chunks;
ASSERT_OK(builder_->Finish(&chunks));

// should have two chunks full of empty strings and another with two more empty strings
ASSERT_EQ(chunks.size(), 3);
ASSERT_EQ(chunks[0]->length(), max_chunk_length);
ASSERT_EQ(chunks[1]->length(), max_chunk_length);
ASSERT_EQ(chunks[2]->length(), 2);
for (auto&& boxed_chunk : chunks) {
const auto& chunk = checked_cast<const BinaryArray&>(*boxed_chunk);
ASSERT_EQ(chunk.value_offset(0), chunk.value_offset(chunk.length()));
}
}

TEST(TestChunkedStringBuilder, BasicOperation) {
const int chunksize = 100;
internal::ChunkedStringBuilder builder(chunksize);
Expand All @@ -758,7 +792,7 @@ TEST(TestChunkedStringBuilder, BasicOperation) {

// Type is correct
for (auto chunk : chunks) {
ASSERT_TRUE(chunk->type()->Equals(*::arrow::utf8()));
ASSERT_TRUE(chunk->type()->Equals(utf8()));
}
}

Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -690,8 +690,7 @@ class ARROW_EXPORT BinaryArray : public FlatArray {
/// Protected method for constructors
void SetData(const std::shared_ptr<ArrayData>& data);

// Constructor that allows sub-classes/builders to propagate there logical type up the
// class hierarchy.
// Constructor to allow sub-classes/builders to substitute their own logical type
BinaryArray(const std::shared_ptr<DataType>& type, int64_t length,
const std::shared_ptr<Buffer>& value_offsets,
const std::shared_ptr<Buffer>& data,
Expand Down
34 changes: 30 additions & 4 deletions cpp/src/arrow/array/builder_binary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,15 @@ util::string_view FixedSizeBinaryBuilder::GetView(int64_t i) const {

namespace internal {

ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_size, MemoryPool* pool)
: max_chunk_size_(max_chunk_size),
chunk_data_size_(0),
ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length,
MemoryPool* pool)
: max_chunk_value_length_(max_chunk_value_length),
builder_(new BinaryBuilder(pool)) {}

ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length,
int32_t max_chunk_length, MemoryPool* pool)
: max_chunk_value_length_(max_chunk_value_length),
max_chunk_length_(max_chunk_length),
builder_(new BinaryBuilder(pool)) {}

Status ChunkedBinaryBuilder::Finish(ArrayVector* out) {
Expand All @@ -301,7 +307,11 @@ Status ChunkedBinaryBuilder::NextChunk() {
RETURN_NOT_OK(builder_->Finish(&chunk));
chunks_.emplace_back(std::move(chunk));

chunk_data_size_ = 0;
if (auto capacity = extra_capacity_) {
extra_capacity_ = 0;
return Reserve(capacity);
}

return Status::OK();
}

Expand All @@ -317,6 +327,22 @@ Status ChunkedStringBuilder::Finish(ArrayVector* out) {
return Status::OK();
}

Status ChunkedBinaryBuilder::Reserve(int64_t values) {
if (ARROW_PREDICT_FALSE(extra_capacity_ != 0)) {
extra_capacity_ += values;
return Status::OK();
}

auto min_capacity = builder_->length() + values;
auto new_capacity = BufferBuilder::GrowByFactor(builder_->capacity(), min_capacity);
if (ARROW_PREDICT_TRUE(new_capacity <= kListMaximumElements)) {
return builder_->Resize(new_capacity);
}

extra_capacity_ = new_capacity - kListMaximumElements;
return builder_->Resize(kListMaximumElements);
}

} // namespace internal

} // namespace arrow
46 changes: 34 additions & 12 deletions cpp/src/arrow/array/builder_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,22 +316,37 @@ namespace internal {

class ARROW_EXPORT ChunkedBinaryBuilder {
public:
ChunkedBinaryBuilder(int32_t max_chunk_size,
ChunkedBinaryBuilder(int32_t max_chunk_value_length,
MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);

ChunkedBinaryBuilder(int32_t max_chunk_value_length, int32_t max_chunk_length,
MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);

virtual ~ChunkedBinaryBuilder() = default;

Status Append(const uint8_t* value, int32_t length) {
if (ARROW_PREDICT_FALSE(length + chunk_data_size_ > max_chunk_size_)) {
// Move onto next chunk, unless the builder length is currently 0, which
// means that max_chunk_size_ is less than the item length
if (builder_->length() > 0) {
ARROW_RETURN_NOT_OK(NextChunk());
if (ARROW_PREDICT_FALSE(length + builder_->value_data_length() >
max_chunk_value_length_)) {
if (builder_->value_data_length() == 0) {
// The current item is larger than max_chunk_size_;
// this chunk will be oversize and hold *only* this item
ARROW_RETURN_NOT_OK(builder_->Append(value, length));
return NextChunk();
}
// else fall through
// The current item would cause builder_->value_data_length() to exceed
// max_chunk_size_, so finish this chunk and append the current item to the next
// chunk
ARROW_RETURN_NOT_OK(NextChunk());
return Append(value, length);
}

if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) {
// The current item would cause builder_->value_data_length() to exceed
// max_chunk_size_, so finish this chunk and append the current item to the next
// chunk
ARROW_RETURN_NOT_OK(NextChunk());
}

chunk_data_size_ += length;
return builder_->Append(value, length);
}

Expand All @@ -341,21 +356,28 @@ class ARROW_EXPORT ChunkedBinaryBuilder {
}

Status AppendNull() {
if (ARROW_PREDICT_FALSE(builder_->length() == std::numeric_limits<int32_t>::max())) {
if (ARROW_PREDICT_FALSE(builder_->length() == max_chunk_length_)) {
ARROW_RETURN_NOT_OK(NextChunk());
}
return builder_->AppendNull();
}

Status Reserve(int64_t values) { return builder_->Reserve(values); }
Status Reserve(int64_t values);

virtual Status Finish(ArrayVector* out);

protected:
Status NextChunk();

int64_t max_chunk_size_;
int64_t chunk_data_size_;
// maximum total character data size per chunk
int64_t max_chunk_value_length_;

// maximum elements allowed per chunk
int64_t max_chunk_length_ = kListMaximumElements;

// when Reserve() would cause builder_ to exceed its max_chunk_length_,
// add to extra_capacity_ instead and wait to reserve until the next chunk
int64_t extra_capacity_ = 0;

std::unique_ptr<BinaryBuilder> builder_;
std::vector<std::shared_ptr<Array>> chunks_;
Expand Down
41 changes: 41 additions & 0 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2658,6 +2658,47 @@ TEST(TestArrowReaderAdHoc, CorruptedSchema) {
TryReadDataFile(path, ::arrow::StatusCode::IOError);
}

TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) {
// ARROW-3762
::arrow::StringBuilder builder;
int64_t length = 1 << 30;
ASSERT_OK(builder.Resize(length));
ASSERT_OK(builder.ReserveData(length));
for (int64_t i = 0; i < length; ++i) {
builder.UnsafeAppend("1", 1);
}
std::shared_ptr<Array> array;
ASSERT_OK(builder.Finish(&array));
auto table = Table::Make({std::make_shared<Column>("x", array)});
std::shared_ptr<SchemaDescriptor> schm;
ASSERT_OK_NO_THROW(
ToParquetSchema(table->schema().get(), *default_writer_properties(), &schm));

auto sink = CreateOutputStream();

auto schm_node = std::static_pointer_cast<GroupNode>(
GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)}));

auto writer = ParquetFileWriter::Open(sink, schm_node);
FileWriter arrow_writer(default_memory_pool(), std::move(writer), table->schema());
for (int i : {0, 1}) {
ASSERT_OK_NO_THROW(arrow_writer.WriteTable(*table, table->num_rows())) << i;
}
ASSERT_OK_NO_THROW(arrow_writer.Close());

std::shared_ptr<Buffer> tables_buffer;
ASSERT_OK_NO_THROW(sink->Finish(&tables_buffer));

// drop to save memory
table.reset();
array.reset();

auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
FileReader arrow_reader(default_memory_pool(), std::move(reader));
ASSERT_OK_NO_THROW(arrow_reader.ReadTable(&table));
ASSERT_OK(table->Validate());
}

TEST(TestArrowReaderAdHoc, HandleDictPageOffsetZero) {
// PARQUET-1402: parquet-mr writes files this way which tripped up
// some business logic
Expand Down