From 8edc1d1621e725042886f0a78253b4bda64b9137 Mon Sep 17 00:00:00 2001 From: zanmato Date: Thu, 14 Dec 2023 10:58:50 -0800 Subject: [PATCH 1/2] Fix the issue of `ExecBatchBuilder` when appending consecutive tail rows with the same id may exceed buffer boundary --- cpp/src/arrow/compute/light_array.cc | 6 +++++- cpp/src/arrow/compute/light_array_test.cc | 25 +++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 37d4421fd79..7aaf01ab918 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -395,8 +395,12 @@ int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr& column, --num_rows_left; int row_id_removed = row_ids[num_rows_left]; const uint32_t* offsets = - reinterpret_cast(column->buffers[1]->data()); + reinterpret_cast(column->buffers[1]->data()) + column->offset; num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed]; + // Skip consecutive rows with the same id + while (num_rows_left > 0 && row_id_removed == row_ids[num_rows_left - 1]) { + --num_rows_left; + } } } diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index 71d228bf3f6..827537c2514 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -411,6 +411,31 @@ TEST(ExecBatchBuilder, AppendBatchesSomeRows) { ASSERT_EQ(0, pool->bytes_allocated()); } +TEST(ExecBatchBuilder, AppendBatchDupRows) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + // Case of cross-word copying for the last row, which may exceed the buffer boundary. + { + // 64-byte data fully occupying one minimal 64-byte aligned memory region. + ExecBatch batch_string = JSONToExecBatch({binary()}, R"([["123456789ABCDEF0"], + ["123456789ABCDEF0"], + ["123456789ABCDEF0"], + ["ABCDEF0"], + ["123456789"]])"); // 9-byte tail row, larger than a word. + ASSERT_EQ(batch_string[0].array()->buffers[1]->capacity(), 64); + ASSERT_EQ(batch_string[0].array()->buffers[2]->capacity(), 64); + ExecBatchBuilder builder; + uint16_t row_ids[2] = {4, 4}; + ASSERT_OK(builder.AppendSelected(pool, batch_string, 2, row_ids, /*num_cols=*/1)); + ExecBatch built = builder.Flush(); + ExecBatch batch_string_appended = + JSONToExecBatch({binary()}, R"([["123456789"], ["123456789"]])"); + ASSERT_EQ(batch_string_appended, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + TEST(ExecBatchBuilder, AppendBatchesSomeCols) { std::unique_ptr owned_pool = MemoryPool::CreateDefault(); MemoryPool* pool = owned_pool.get(); From 79f766402d671c2db0df277e022b3f9742e67631 Mon Sep 17 00:00:00 2001 From: zanmato Date: Thu, 21 Dec 2023 10:32:50 -0800 Subject: [PATCH 2/2] Address comments --- cpp/src/arrow/compute/light_array.cc | 3 +-- cpp/src/arrow/compute/light_array.h | 4 +++- cpp/src/arrow/compute/light_array_test.cc | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 3e9e2879f31..93a054de195 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -398,8 +398,7 @@ int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr& column, } else { --num_rows_left; int row_id_removed = row_ids[num_rows_left]; - const uint32_t* offsets = - reinterpret_cast(column->buffers[1]->data()) + column->offset; + const int32_t* offsets = column->GetValues(1); num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed]; // Skip consecutive rows with the same id while (num_rows_left > 0 && row_id_removed == row_ids[num_rows_left - 1]) { diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 87f6b6c76a1..84aa86d64bb 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -416,7 +416,9 @@ class ARROW_EXPORT ExecBatchBuilder { // without checking buffer bounds (useful with SIMD or fixed size memory loads // and stores). // - // The sequence of row_ids provided must be non-decreasing. + // The sequence of row_ids provided must be non-decreasing. In case of consecutive rows + // with the same row id, they are skipped all at once because they occupy the same + // space. // static int NumRowsToSkip(const std::shared_ptr& column, int num_rows, const uint16_t* row_ids, int num_tail_bytes_to_skip); diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index ca0d96b492f..52121530fe9 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -475,6 +475,7 @@ TEST(ExecBatchBuilder, AppendBatchDupRows) { std::unique_ptr owned_pool = MemoryPool::CreateDefault(); MemoryPool* pool = owned_pool.get(); // Case of cross-word copying for the last row, which may exceed the buffer boundary. + // This is a simplified case of GH-32570 { // 64-byte data fully occupying one minimal 64-byte aligned memory region. ExecBatch batch_string = JSONToExecBatch({binary()}, R"([["123456789ABCDEF0"],