Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp/src/arrow/compute/light_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,12 @@ int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr<ArrayData>& column,
} else {
--num_rows_left;
int row_id_removed = row_ids[num_rows_left];
const uint32_t* offsets =
reinterpret_cast<const uint32_t*>(column->buffers[1]->data());
const int32_t* offsets = column->GetValues<int32_t>(1);
num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed];
// Skip consecutive rows with the same id
while (num_rows_left > 0 && row_id_removed == row_ids[num_rows_left - 1]) {
--num_rows_left;
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/light_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ class ARROW_EXPORT ExecBatchBuilder {
// without checking buffer bounds (useful with SIMD or fixed size memory loads
// and stores).
//
// The sequence of row_ids provided must be non-decreasing.
// The sequence of row_ids provided must be non-decreasing. In case of consecutive rows
// with the same row id, they are skipped all at once because they occupy the same
// space.
//
static int NumRowsToSkip(const std::shared_ptr<ArrayData>& column, int num_rows,
const uint16_t* row_ids, int num_tail_bytes_to_skip);
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/compute/light_array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,32 @@ TEST(ExecBatchBuilder, AppendBatchesSomeRows) {
ASSERT_EQ(0, pool->bytes_allocated());
}

TEST(ExecBatchBuilder, AppendBatchDupRows) {
std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
MemoryPool* pool = owned_pool.get();
// Case of cross-word copying for the last row, which may exceed the buffer boundary.
// This is a simplified case of GH-32570
{
// 64-byte data fully occupying one minimal 64-byte aligned memory region.
ExecBatch batch_string = JSONToExecBatch({binary()}, R"([["123456789ABCDEF0"],
["123456789ABCDEF0"],
["123456789ABCDEF0"],
["ABCDEF0"],
["123456789"]])"); // 9-byte tail row, larger than a word.
ASSERT_EQ(batch_string[0].array()->buffers[1]->capacity(), 64);
ASSERT_EQ(batch_string[0].array()->buffers[2]->capacity(), 64);
ExecBatchBuilder builder;
uint16_t row_ids[2] = {4, 4};
ASSERT_OK(builder.AppendSelected(pool, batch_string, 2, row_ids, /*num_cols=*/1));
ExecBatch built = builder.Flush();
ExecBatch batch_string_appended =
JSONToExecBatch({binary()}, R"([["123456789"], ["123456789"]])");
ASSERT_EQ(batch_string_appended, built);
ASSERT_NE(0, pool->bytes_allocated());
}
ASSERT_EQ(0, pool->bytes_allocated());
}

TEST(ExecBatchBuilder, AppendBatchesSomeCols) {
std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
MemoryPool* pool = owned_pool.get();
Expand Down