diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 4e8b2b2d7cc..93a054de195 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -398,9 +398,12 @@ int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr& column, } else { --num_rows_left; int row_id_removed = row_ids[num_rows_left]; - const uint32_t* offsets = - reinterpret_cast(column->buffers[1]->data()); + const int32_t* offsets = column->GetValues(1); num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed]; + // Skip consecutive rows with the same id + while (num_rows_left > 0 && row_id_removed == row_ids[num_rows_left - 1]) { + --num_rows_left; + } } } diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 87f6b6c76a1..84aa86d64bb 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -416,7 +416,9 @@ class ARROW_EXPORT ExecBatchBuilder { // without checking buffer bounds (useful with SIMD or fixed size memory loads // and stores). // - // The sequence of row_ids provided must be non-decreasing. + // The sequence of row_ids provided must be non-decreasing. In case of consecutive rows + // with the same row id, they are skipped all at once because they occupy the same + // space. // static int NumRowsToSkip(const std::shared_ptr& column, int num_rows, const uint16_t* row_ids, int num_tail_bytes_to_skip); diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index 4e33f7b578e..52121530fe9 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -471,6 +471,32 @@ TEST(ExecBatchBuilder, AppendBatchesSomeRows) { ASSERT_EQ(0, pool->bytes_allocated()); } +TEST(ExecBatchBuilder, AppendBatchDupRows) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + // Case of cross-word copying for the last row, which may exceed the buffer boundary. + // This is a simplified case of GH-32570 + { + // 64-byte data fully occupying one minimal 64-byte aligned memory region. + ExecBatch batch_string = JSONToExecBatch({binary()}, R"([["123456789ABCDEF0"], + ["123456789ABCDEF0"], + ["123456789ABCDEF0"], + ["ABCDEF0"], + ["123456789"]])"); // 9-byte tail row, larger than a word. + ASSERT_EQ(batch_string[0].array()->buffers[1]->capacity(), 64); + ASSERT_EQ(batch_string[0].array()->buffers[2]->capacity(), 64); + ExecBatchBuilder builder; + uint16_t row_ids[2] = {4, 4}; + ASSERT_OK(builder.AppendSelected(pool, batch_string, 2, row_ids, /*num_cols=*/1)); + ExecBatch built = builder.Flush(); + ExecBatch batch_string_appended = + JSONToExecBatch({binary()}, R"([["123456789"], ["123456789"]])"); + ASSERT_EQ(batch_string_appended, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + TEST(ExecBatchBuilder, AppendBatchesSomeCols) { std::unique_ptr owned_pool = MemoryPool::CreateDefault(); MemoryPool* pool = owned_pool.get();