From 8214b232486d80d293e3a6f2b6d09da9645884d0 Mon Sep 17 00:00:00 2001 From: Bruno Cauet Date: Mon, 12 May 2025 18:50:32 +0200 Subject: [PATCH 1/6] Fix IPC serialization of list array slices where offset==0 && values_offsets[0] > 0. Arrow C++ slices arrays by bumping the top-level `offset` value. However, Arrow Rust slices list arrays by slicing the `value_offsets` buffer. When receiving a Rust Arrow Array in C++ (via the C data interface), its IPC serialization fails to notice that the `value_offsets` buffer needed to be updated, but it still updates the `values` buffer. This leads to a corrupt array on deserialization, with an `value_offsets` buffer that points past the end of the values array. This commit fixes the IPC serialization by also looking at value_offset(0) to determine whether the `value_offsets` buffer needs reconstructing, instead of just looking at offset(). Additionally, this commit updates the comment surrounding the logic, as it had 2 issues: 1. It hints that offset > 0 and value_offsets[0] > 0 happen together, when they actually tend to be exclusive (... unless you slice twice, once in Rust and once in C++). 2. It mentions slicing the values, when that does not happen in the function where the comment appears (GetZeroBasedValueOffsets), but at call site (Visit(Array)). Notes: * I'm surprised the ListViewArray does not have this bug. The code it uses is slightly different. I did not dig into its precise behaviour. * The function could use a cleanup. There is no need for the `offset` symbol, which triggers a copy of the shared_ptr of the offsets buffer for nothing. --- cpp/src/arrow/ipc/read_write_test.cc | 34 ++++++++++++++++++++++++++++ cpp/src/arrow/ipc/writer.cc | 8 +++---- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 8bd28e4d584..d08dbbb79f4 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -579,6 +579,40 @@ TEST_F(TestIpcRoundTrip, SpecificMetadataVersion) { TestMetadataVersion(MetadataVersion::V5); } +TEST_F(TestIpcRoundTrip, AlienSlice) { + // This tests serialization of a sliced ListArray that got sliced "the Rust + // way": by slicing the value_offsets buffer, but keeping top-level offset at + // 0. + + // Values buffer [1, 2, 3, 4, 5] + TypedBufferBuilder values_builder; + ASSERT_OK(values_builder.Reserve(5)); + for (int32_t i = 1; i <= 5; i++) { + ASSERT_OK(values_builder.Append(i)); + } + ASSERT_OK_AND_ASSIGN(auto values_buffer, values_builder.Finish()); + + // Offsets buffer [2, 5] + TypedBufferBuilder offsets_builder; + ASSERT_OK(offsets_builder.Reserve(2)); + ASSERT_OK(offsets_builder.Append(2)); + ASSERT_OK(offsets_builder.Append(5)); + ASSERT_OK_AND_ASSIGN(auto offsets_buffer, offsets_builder.Finish()); + + // Construct the (nested) array. + auto child_data = ArrayData::Make(arrow::int32(), + /*num_rows=*/5, + /*buffers=*/{nullptr, values_buffer}); + auto list_data = ArrayData::Make(list(int32()), + /*num_rows=*/1, + /*buffers=*/{nullptr, offsets_buffer}); + list_data->child_data = {child_data}; + std::shared_ptr list_array = MakeArray(list_data); + ASSERT_OK(list_array->ValidateFull()); + + CheckRoundtrip(list_array); +} + TEST(TestReadMessage, CorruptedSmallInput) { std::string data = "abc"; auto reader = io::BufferReader::FromString(data); diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 90574c7cb61..e77ef842eec 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -327,11 +327,9 @@ class RecordBatchSerializer { auto offsets = array.value_offsets(); int64_t required_bytes = sizeof(offset_type) * (array.length() + 1); - if (array.offset() != 0) { - // If we have a non-zero offset, then the value offsets do not start at - // zero. We must a) create a new offsets array with shifted offsets and - // b) slice the values array accordingly - + if (array.offset() != 0 || (array.length() > 0 && array.value_offset(0) > 0)) { + // If we have a non-zero offset, we must create a new offsets buffer with + // shifted offsets. ARROW_ASSIGN_OR_RAISE(auto shifted_offsets, AllocateBuffer(required_bytes, options_.memory_pool)); From e354e523ee2fb8d8ebd00abcf4fda92bd836d844 Mon Sep 17 00:00:00 2001 From: Bruno Cauet Date: Tue, 20 May 2025 11:54:00 +0000 Subject: [PATCH 2/6] Avoid copying the offsets buffer if it starts with 0 but top-level offset is > 0. Instead, just slice the offsets buffer. This plays nicely with the truncation logic that already exists below. --- cpp/src/arrow/ipc/writer.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index e77ef842eec..8c4aa4ef3b3 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -327,9 +327,9 @@ class RecordBatchSerializer { auto offsets = array.value_offsets(); int64_t required_bytes = sizeof(offset_type) * (array.length() + 1); - if (array.offset() != 0 || (array.length() > 0 && array.value_offset(0) > 0)) { - // If we have a non-zero offset, we must create a new offsets buffer with - // shifted offsets. + if (array.length() > 0 && array.value_offset(0) > 0) { + // If the offset of the first value is non-zero, then we must create a new + // offsets buffer with shifted offsets. ARROW_ASSIGN_OR_RAISE(auto shifted_offsets, AllocateBuffer(required_bytes, options_.memory_pool)); @@ -343,10 +343,10 @@ class RecordBatchSerializer { dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset; offsets = std::move(shifted_offsets); } else { - // ARROW-6046: Slice offsets to used extent, in case we have a truncated - // slice - if (offsets != nullptr && offsets->size() > required_bytes) { - offsets = SliceBuffer(offsets, 0, required_bytes); + // ARROW-6046: if we have a truncated slice with unused leading or + // trailing data, slice it. + if (offsets != nullptr && (array.offset() > 0 || offsets->size() > required_bytes)) { + offsets = SliceBuffer(offsets, array.offset() * sizeof(offset_type), required_bytes); } } *value_offsets = std::move(offsets); From 00bdfcb9fa24904f37607f8afd089841268ca5c8 Mon Sep 17 00:00:00 2001 From: Bruno Cauet Date: Tue, 20 May 2025 12:05:47 +0000 Subject: [PATCH 3/6] Cleanup GetZeroBasedValueOffsets * Extend the loop to cover `i == array.length()` instead of doing it with a dedicated statement. + assign the source offsets at once, instead of callling a method on every loop body. * Avoid copying the `offsets_buffer` shared_ptr (i.e. avoid refcount++ into refcount--). + the function does not rely anymore on complex rules of type deduction for `auto` variables (dropping reference and cv-qualification). * Early return when array.length() == 0. --- cpp/src/arrow/ipc/writer.cc | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 8c4aa4ef3b3..8b7d943fc71 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -324,32 +324,36 @@ class RecordBatchSerializer { // Share slicing logic between ListArray, BinaryArray and LargeBinaryArray using offset_type = typename ArrayType::offset_type; - auto offsets = array.value_offsets(); + if (array.length() == 0) { + *value_offsets = array.value_offsets(); + return Status::OK(); + } int64_t required_bytes = sizeof(offset_type) * (array.length() + 1); - if (array.length() > 0 && array.value_offset(0) > 0) { + if (array.value_offset(0) > 0) { // If the offset of the first value is non-zero, then we must create a new // offsets buffer with shifted offsets. ARROW_ASSIGN_OR_RAISE(auto shifted_offsets, AllocateBuffer(required_bytes, options_.memory_pool)); auto dest_offsets = shifted_offsets->mutable_span_as(); - const offset_type start_offset = array.value_offset(0); + const offset_type* source_offsets = array.raw_value_offsets(); + const offset_type start_offset = source_offsets[0]; - for (int i = 0; i < array.length(); ++i) { - dest_offsets[i] = array.value_offset(i) - start_offset; + for (int i = 0; i <= array.length(); ++i) { + dest_offsets[i] = source_offsets[i] - start_offset; } - // Final offset - dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset; - offsets = std::move(shifted_offsets); + *value_offsets = std::move(shifted_offsets); } else { // ARROW-6046: if we have a truncated slice with unused leading or - // trailing data, slice it. - if (offsets != nullptr && (array.offset() > 0 || offsets->size() > required_bytes)) { - offsets = SliceBuffer(offsets, array.offset() * sizeof(offset_type), required_bytes); + // trailing data, then we slice it. + if (array.offset() > 0 || array.value_offsets()->size() > required_bytes) { + *value_offsets = SliceBuffer( + array.value_offsets(), array.offset() * sizeof(offset_type), required_bytes); + } else { + *value_offsets = array.value_offsets(); } } - *value_offsets = std::move(offsets); return Status::OK(); } From 97a4317e0c10e1c9256b91092daa24d29c8e0be4 Mon Sep 17 00:00:00 2001 From: Bruno Cauet Date: Fri, 30 May 2025 10:57:52 +0200 Subject: [PATCH 4/6] Add tests for IPC round-trip of List & ListView data. --- cpp/src/arrow/ipc/feather_test.cc | 21 +++++++++++++++++++++ cpp/src/arrow/ipc/test_common.cc | 14 ++++++++++---- cpp/src/arrow/ipc/test_common.h | 6 ++++++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/ipc/feather_test.cc b/cpp/src/arrow/ipc/feather_test.cc index ba3f4d828c3..abc4cadf783 100644 --- a/cpp/src/arrow/ipc/feather_test.cc +++ b/cpp/src/arrow/ipc/feather_test.cc @@ -319,6 +319,27 @@ TEST_P(TestFeather, SliceBooleanRoundTrip) { CheckSlices(batch); } +TEST_P(TestFeather, SliceListsRoundTrip) { + if (GetParam().version == kFeatherV1Version) { + // IPC V1 does not support list. + return; + } + std::shared_ptr batch; + ASSERT_OK(ipc::test::MakeListRecordBatchSized(600, &batch)); + CheckSlices(batch); +} + +TEST_P(TestFeather, SliceListsViewRoundTrip) { + if (GetParam().version == kFeatherV1Version) { + // IPC V1 does not support list. + return; + } + std::shared_ptr batch; + ASSERT_OK(ipc::test::MakeListViewRecordBatchSized(600, &batch)); + CheckSlices(batch); +} + + INSTANTIATE_TEST_SUITE_P( FeatherTests, TestFeather, ::testing::Values(TestParam(kFeatherV1Version), TestParam(kFeatherV2Version), diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc index 3d7137e4965..46060a0db10 100644 --- a/cpp/src/arrow/ipc/test_common.cc +++ b/cpp/src/arrow/ipc/test_common.cc @@ -421,7 +421,7 @@ Status MakeNullRecordBatch(std::shared_ptr* out) { return Status::OK(); } -Status MakeListRecordBatch(std::shared_ptr* out) { +Status MakeListRecordBatchSized(const int length, std::shared_ptr* out) { // Make the schema auto f0 = field("f0", list(int32())); auto f1 = field("f1", list(list(int32()))); @@ -431,7 +431,6 @@ Status MakeListRecordBatch(std::shared_ptr* out) { // Example data MemoryPool* pool = default_memory_pool(); - const int length = 200; std::shared_ptr leaf_values, list_array, list_list_array, large_list_array; const bool include_nulls = true; RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values)); @@ -446,7 +445,11 @@ Status MakeListRecordBatch(std::shared_ptr* out) { return Status::OK(); } -Status MakeListViewRecordBatch(std::shared_ptr* out) { +Status MakeListRecordBatch(std::shared_ptr* out) { + return MakeListRecordBatchSized(200, out); +} + +Status MakeListViewRecordBatchSized(const int length, std::shared_ptr* out) { // Make the schema auto f0 = field("f0", list_view(int32())); auto f1 = field("f1", list_view(list_view(int32()))); @@ -456,7 +459,6 @@ Status MakeListViewRecordBatch(std::shared_ptr* out) { // Example data MemoryPool* pool = default_memory_pool(); - const int length = 200; std::shared_ptr leaf_values, list_array, list_list_array, large_list_array; const bool include_nulls = true; RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values)); @@ -471,6 +473,10 @@ Status MakeListViewRecordBatch(std::shared_ptr* out) { return Status::OK(); } +Status MakeListViewRecordBatch(std::shared_ptr* out) { + return MakeListRecordBatchSized(200, out); +} + Status MakeFixedSizeListRecordBatch(std::shared_ptr* out) { // Make the schema auto f0 = field("f0", fixed_size_list(int32(), 1)); diff --git a/cpp/src/arrow/ipc/test_common.h b/cpp/src/arrow/ipc/test_common.h index 189de288795..6044ef207bc 100644 --- a/cpp/src/arrow/ipc/test_common.h +++ b/cpp/src/arrow/ipc/test_common.h @@ -104,9 +104,15 @@ Status MakeStringTypesRecordBatchWithNulls(std::shared_ptr* out); ARROW_TESTING_EXPORT Status MakeNullRecordBatch(std::shared_ptr* out); +ARROW_TESTING_EXPORT +Status MakeListRecordBatchSized(int length, std::shared_ptr* out); + ARROW_TESTING_EXPORT Status MakeListRecordBatch(std::shared_ptr* out); +ARROW_TESTING_EXPORT +Status MakeListViewRecordBatchSized(int length, std::shared_ptr* out); + ARROW_TESTING_EXPORT Status MakeListViewRecordBatch(std::shared_ptr* out); From 3b8e9adf8b3fa93ab7a01bbf8491102e23e6981e Mon Sep 17 00:00:00 2001 From: Bruno Cauet Date: Fri, 30 May 2025 11:01:53 +0200 Subject: [PATCH 5/6] Simplify Array construction in AlienSlice test. --- cpp/src/arrow/ipc/read_write_test.cc | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index d08dbbb79f4..cf0232f9ae4 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -584,13 +584,7 @@ TEST_F(TestIpcRoundTrip, AlienSlice) { // way": by slicing the value_offsets buffer, but keeping top-level offset at // 0. - // Values buffer [1, 2, 3, 4, 5] - TypedBufferBuilder values_builder; - ASSERT_OK(values_builder.Reserve(5)); - for (int32_t i = 1; i <= 5; i++) { - ASSERT_OK(values_builder.Append(i)); - } - ASSERT_OK_AND_ASSIGN(auto values_buffer, values_builder.Finish()); + auto child_data = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5]")->data(); // Offsets buffer [2, 5] TypedBufferBuilder offsets_builder; @@ -599,10 +593,6 @@ TEST_F(TestIpcRoundTrip, AlienSlice) { ASSERT_OK(offsets_builder.Append(5)); ASSERT_OK_AND_ASSIGN(auto offsets_buffer, offsets_builder.Finish()); - // Construct the (nested) array. - auto child_data = ArrayData::Make(arrow::int32(), - /*num_rows=*/5, - /*buffers=*/{nullptr, values_buffer}); auto list_data = ArrayData::Make(list(int32()), /*num_rows=*/1, /*buffers=*/{nullptr, offsets_buffer}); From 6d9393d1b9c9879d482a000a3825b5d5665f2620 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 3 Jun 2025 17:28:15 +0200 Subject: [PATCH 6/6] Nits --- cpp/src/arrow/ipc/feather_test.cc | 11 ++++------- cpp/src/arrow/ipc/read_write_test.cc | 3 +-- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/ipc/feather_test.cc b/cpp/src/arrow/ipc/feather_test.cc index abc4cadf783..e1dc6046a11 100644 --- a/cpp/src/arrow/ipc/feather_test.cc +++ b/cpp/src/arrow/ipc/feather_test.cc @@ -319,27 +319,24 @@ TEST_P(TestFeather, SliceBooleanRoundTrip) { CheckSlices(batch); } -TEST_P(TestFeather, SliceListsRoundTrip) { +TEST_P(TestFeather, SliceListRoundTrip) { if (GetParam().version == kFeatherV1Version) { - // IPC V1 does not support list. - return; + GTEST_SKIP() << "Feather V1 does not support list types"; } std::shared_ptr batch; ASSERT_OK(ipc::test::MakeListRecordBatchSized(600, &batch)); CheckSlices(batch); } -TEST_P(TestFeather, SliceListsViewRoundTrip) { +TEST_P(TestFeather, SliceListViewRoundTrip) { if (GetParam().version == kFeatherV1Version) { - // IPC V1 does not support list. - return; + GTEST_SKIP() << "Feather V1 does not support list view types"; } std::shared_ptr batch; ASSERT_OK(ipc::test::MakeListViewRecordBatchSized(600, &batch)); CheckSlices(batch); } - INSTANTIATE_TEST_SUITE_P( FeatherTests, TestFeather, ::testing::Values(TestParam(kFeatherV1Version), TestParam(kFeatherV2Version), diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index cf0232f9ae4..84ec923ce80 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -579,11 +579,10 @@ TEST_F(TestIpcRoundTrip, SpecificMetadataVersion) { TestMetadataVersion(MetadataVersion::V5); } -TEST_F(TestIpcRoundTrip, AlienSlice) { +TEST_F(TestIpcRoundTrip, ListWithSlicedValues) { // This tests serialization of a sliced ListArray that got sliced "the Rust // way": by slicing the value_offsets buffer, but keeping top-level offset at // 0. - auto child_data = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5]")->data(); // Offsets buffer [2, 5]