From 87e7dfac327aeff7e28e9b0ca0383476984d602b Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 11 May 2023 13:32:12 -0700 Subject: [PATCH 01/18] Added an option to EnsureAlignment to require only malloc-compatible alignment and not full 64-byte alignment --- cpp/src/arrow/acero/source_node.cc | 7 +- cpp/src/arrow/util/align_util.cc | 122 +++++++++++++++++++++++++++-- cpp/src/arrow/util/align_util.h | 98 +++++++++++++++++++++++ 3 files changed, 217 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index 6c138d8dccd..d1945bf2d5d 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -106,9 +106,10 @@ struct SourceNode : ExecNode, public TracedNode { ExecBatch batch = morsel.Slice(offset, batch_size); for (auto& value : batch.values) { if (value.is_array()) { - ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment( - value.make_array(), ipc::kArrowAlignment, - default_memory_pool())); + ARROW_ASSIGN_OR_RAISE( + value, arrow::util::EnsureAlignment(value.array(), + arrow::util::kMallocAlignment, + default_memory_pool())); } } if (has_ordering) { diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc index d77650fcd61..8d583ec7aee 100644 --- a/cpp/src/arrow/util/align_util.cc +++ b/cpp/src/arrow/util/align_util.cc @@ -21,6 +21,8 @@ #include "arrow/chunked_array.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" namespace arrow { @@ -30,10 +32,109 @@ bool CheckAlignment(const Buffer& buffer, int64_t alignment) { return buffer.address() % alignment == 0; } +namespace { + +// Some buffers are frequently type-punned. For example, in an int32 array the +// values buffer is frequently cast to int32_t* +// +// This sort of punning is only valid if the pointer is aligned to a proper width +// (e.g. 4 bytes in the case of int32). +// +// We generally assume that all buffers are at least 8-bit aligned and so we only +// need to worry about buffers that are commonly cast to wider data types. Note that +// this alignment is something that is guaranteed by malloc (e.g. new int32_t[] will +// return a buffer that is 4 byte aligned) or common libraries (e.g. numpy) but it is +// not currently guaranteed by flight (GH-32276). +// +// By happy coincedence, for every data type, the only buffer that might need wider +// alignment is the second buffer (at index 1). This function returns the expected +// alignment (in bits) of the second buffer for the given array to safely allow this cast. +// +// If the array's type doesn't have a second buffer or the second buffer is not expected +// to be type punned, then we return 8. +int GetMallocValuesAlignment(const ArrayData& array) { + // Make sure to use the storage type id + auto type_id = array.type->storage_id(); + if (type_id == Type::DICTIONARY) { + // The values buffer is in a different ArrayData and so we only check the indices + // buffer here. The values array data will be checked by the calling method. + type_id = ::arrow::internal::checked_pointer_cast(array.type) + ->index_type() + ->id(); + } + switch (type_id) { + case Type::NA: // No buffers + case Type::FIXED_SIZE_LIST: // No second buffer (values in child array) + case Type::FIXED_SIZE_BINARY: // Fixed size binary could be dangerous but the + // compute kernels don't type pun this. E.g. if + // an extension type is storing some kind of struct + // here then the user should do their own alignment + // check before casting to an array of structs + case Type::BOOL: // Always treated as uint8_t* + case Type::INT8: // Always treated as uint8_t* + case Type::UINT8: // Always treated as uint8_t* + case Type::DECIMAL128: // Always treated as uint8_t* + case Type::DECIMAL256: // Always treated as uint8_t* + case Type::SPARSE_UNION: // Types array is uint8_t, no offsets array + case Type::RUN_END_ENCODED: // No buffers + case Type::STRUCT: // No buffers beyond validity + // These have no buffers or all buffers need only byte alignment + return 1; + case Type::INT16: + case Type::UINT16: + case Type::HALF_FLOAT: + return 2; + case Type::INT32: + case Type::UINT32: + case Type::FLOAT: + case Type::STRING: // Offsets may be cast to int32_t*, data is only uint8_t* + case Type::BINARY: // Offsets may be cast to int32_t*, data is only uint8_t* + case Type::DATE32: + case Type::TIME32: + case Type::LIST: // Offsets may be cast to int32_t*, data is in child array + case Type::MAP: // This is a list array + case Type::DENSE_UNION: // Has an offsets buffer of int32_t* + case Type::INTERVAL_MONTHS: // Stored as int32_t* + return 4; + case Type::INT64: + case Type::UINT64: + case Type::DOUBLE: + case Type::LARGE_BINARY: // Offsets may be cast to int64_t* + case Type::LARGE_LIST: // Offsets may be cast to int64_t* + case Type::LARGE_STRING: // Offsets may be cast to int64_t* + case Type::DATE64: + case Type::TIME64: + case Type::TIMESTAMP: + case Type::DURATION: + case Type::INTERVAL_DAY_TIME: // Stored as two contiguous 32-bit integers but may be + // cast to struct* containing both integers + return 8; + case Type::INTERVAL_MONTH_DAY_NANO: // Stored as two 32-bit integers and a 64-bit + // integer + return 16; + default: + Status::Invalid("Could not check alignment for type id ", type_id, + " keeping existing alignment") + .Warn(); + return 8; + } +} + +} // namespace + bool CheckAlignment(const ArrayData& array, int64_t alignment) { - for (const auto& buffer : array.buffers) { - if (buffer) { - if (!CheckAlignment(*buffer, alignment)) return false; + if (alignment == kMallocAlignment) { + int malloc_alignment = GetMallocValuesAlignment(array); + if (array.buffers.size() >= 2) { + if (!CheckAlignment(*array.buffers[1], malloc_alignment)) { + return false; + } + } + } else { + for (const auto& buffer : array.buffers) { + if (buffer) { + if (!CheckAlignment(*buffer, alignment)) return false; + } } } @@ -112,10 +213,17 @@ Result> EnsureAlignment(std::shared_ptr ar MemoryPool* memory_pool) { if (!CheckAlignment(*array_data, alignment)) { std::vector> buffers_ = array_data->buffers; - for (size_t i = 0; i < buffers_.size(); ++i) { - if (buffers_[i]) { - ARROW_ASSIGN_OR_RAISE( - buffers_[i], EnsureAlignment(std::move(buffers_[i]), alignment, memory_pool)); + if (alignment == kMallocAlignment) { + int malloc_alignment = GetMallocValuesAlignment(*array_data); + DCHECK(buffers_.size() >= 2); + ARROW_ASSIGN_OR_RAISE(buffers_[1], EnsureAlignment(std::move(buffers_[1]), + malloc_alignment, memory_pool)); + } else { + for (size_t i = 0; i < buffers_.size(); ++i) { + if (buffers_[i]) { + ARROW_ASSIGN_OR_RAISE(buffers_[i], EnsureAlignment(std::move(buffers_[i]), + alignment, memory_pool)); + } } } diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h index abdabe22fca..a2b7660ab6f 100644 --- a/cpp/src/arrow/util/align_util.h +++ b/cpp/src/arrow/util/align_util.h @@ -70,8 +70,26 @@ inline BitmapWordAlignParams BitmapWordAlign(const uint8_t* data, int64_t bit_of namespace util { // Functions to check if the provided Arrow object is aligned by the specified alignment + +/// \brief if this is specified in one of the CheckAlignment or EnsureAlignment functions +/// then the funciton will ensure each buffer is suitably aligned for the data type of the +/// array. For example, given an int32 buffer the validity buffer must be a multiple of 8 +/// and the values buffer must be a multiple of 32. Given a large_string buffer the +/// validity buffer and values buffers must be multiples of 8 and the offsets buffer must +/// be a multiple of 64. +constexpr int64_t kMallocAlignment = -3; + +/// \brief calculate if the buffer's address is a multiple of `alignment` +/// \param buffer the buffer to check +/// \param alignment the alignment to check for ARROW_EXPORT bool CheckAlignment(const Buffer& buffer, int64_t alignment); +/// \brief calculate if all buffer's in the array data are aligned +/// \param array the array data to check +/// \param alignment the alignment to check for ARROW_EXPORT bool CheckAlignment(const ArrayData& array, int64_t alignment); +/// \brief calculate if all buffer's in the array are aligned +/// \param array the array to check +/// \param alignment the alignment to check for ARROW_EXPORT bool CheckAlignment(const Array& array, int64_t alignment); // Following functions require an additional boolean vector which stores the @@ -82,29 +100,109 @@ ARROW_EXPORT bool CheckAlignment(const Array& array, int64_t alignment); // of the constituent objects during the EnsureAlignment function where certain // objects can be ignored for further checking if we already know that they are // completely aligned. + +/// \brief calculate which (if any) chunks in a chunked array are unaligned +/// \param array the array to check +/// \param alignment the alignment to check for +/// \param needs_alignment an output vector that will store the results of the check +/// it must be set to a valid vector. Extra elements will be added to the end +/// of the vector for each chunk that is checked. `true` will be stored if +/// the chunk is unaligned. +/// \param offset an optional offset to specify which chunk to start checking at +/// \return true if all chunks (starting at `offset`) are aligned, false otherwise ARROW_EXPORT bool CheckAlignment(const ChunkedArray& array, int64_t alignment, std::vector* needs_alignment, int offset = 0); + +/// \brief calculate which (if any) columns in a record batch are unaligned +/// \param batch the batch to check +/// \param alignment the alignment to check for +/// \param needs_alignment an output vector that will store the results of the +/// check. It must be set to a valid vector. Extra elements will be added +/// to the end of the vector for each column that is checked. `true` will be +/// stored if the column is unaligned. ARROW_EXPORT bool CheckAlignment(const RecordBatch& batch, int64_t alignment, std::vector* needs_alignment); + +/// \brief calculate which (if any) columns in a table are unaligned +/// \param table the table to check +/// \param alignment the alignment to check for +/// \param needs_alignment an output vector that will store the results of the +/// check. It must be set to a valid vector. Extra elements will be added +/// to the end of the vector for each column that is checked. `true` will be +/// stored if the column is unaligned. ARROW_EXPORT bool CheckAlignment(const Table& table, int64_t alignment, std::vector* needs_alignment); +/// \brief return a buffer that has the given alignment and the same data as the input +/// buffer +/// +/// If the input buffer is already aligned then this method will return the input buffer +/// +/// \param buffer the buffer to check +/// \param alignment the alignment to check for +/// \param memory_pool a memory pool that will be used to allocate a new buffer if the +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( std::shared_ptr buffer, int64_t alignment, MemoryPool* memory_pool); +/// \brief return an array data where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param array_data the array data to check +/// \param alignment the alignment to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( std::shared_ptr array_data, int64_t alignment, MemoryPool* memory_pool); +/// \brief return an array where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param array_data the array to check +/// \param alignment the alignment to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment(std::shared_ptr array, int64_t alignment, MemoryPool* memory_pool); +/// \brief return a chunked array where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param array the chunked array to check +/// \param alignment the alignment to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( std::shared_ptr array, int64_t alignment, MemoryPool* memory_pool); +/// \brief return a record batch where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param batch the batch to check +/// \param alignment the alignment to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( std::shared_ptr batch, int64_t alignment, MemoryPool* memory_pool); +/// \brief return a table where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param table the table to check +/// \param alignment the alignment to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment(std::shared_ptr table, int64_t alignment, MemoryPool* memory_pool); From 9124352907292d2808f3e2ab40969c8e935cb262 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 11 May 2023 13:38:47 -0700 Subject: [PATCH 02/18] Minor doc comment fix --- cpp/src/arrow/util/align_util.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h index a2b7660ab6f..b6376c63305 100644 --- a/cpp/src/arrow/util/align_util.h +++ b/cpp/src/arrow/util/align_util.h @@ -162,7 +162,7 @@ ARROW_EXPORT Result> EnsureAlignment( /// If any input buffer is already aligned then this method will reuse that same input /// buffer. /// -/// \param array_data the array to check +/// \param array the array to check /// \param alignment the alignment to check for /// \param memory_pool a memory pool that will be used to allocate new buffers if any /// input buffer is not sufficiently aligned From bd673963f5258524437efe82df8d5522f501f7ab Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 11 May 2023 19:26:20 -0700 Subject: [PATCH 03/18] Lint --- cpp/src/arrow/util/align_util.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc index 8d583ec7aee..3ab671595cf 100644 --- a/cpp/src/arrow/util/align_util.cc +++ b/cpp/src/arrow/util/align_util.cc @@ -215,7 +215,7 @@ Result> EnsureAlignment(std::shared_ptr ar std::vector> buffers_ = array_data->buffers; if (alignment == kMallocAlignment) { int malloc_alignment = GetMallocValuesAlignment(*array_data); - DCHECK(buffers_.size() >= 2); + DCHECK_GE(buffers_.size(), 2); ARROW_ASSIGN_OR_RAISE(buffers_[1], EnsureAlignment(std::move(buffers_[1]), malloc_alignment, memory_pool)); } else { From 692c32c022773487d3122c5a0a31bf094680b4e8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 12 May 2023 01:48:03 -0700 Subject: [PATCH 04/18] Added unit tests. Fixed a few minor issues when actually aligning buffers --- cpp/src/arrow/util/align_util.cc | 42 +++-- cpp/src/arrow/util/align_util_test.cc | 249 ++++++++++++++++++++++++++ 2 files changed, 278 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc index 3ab671595cf..10b751e9393 100644 --- a/cpp/src/arrow/util/align_util.cc +++ b/cpp/src/arrow/util/align_util.cc @@ -120,9 +120,9 @@ int GetMallocValuesAlignment(const ArrayData& array) { } } -} // namespace - -bool CheckAlignment(const ArrayData& array, int64_t alignment) { +// Checks to see if an array's own buffers are aligned but doesn't check +// children +bool CheckSelfAlignment(const ArrayData& array, int64_t alignment) { if (alignment == kMallocAlignment) { int malloc_alignment = GetMallocValuesAlignment(array); if (array.buffers.size() >= 2) { @@ -137,6 +137,15 @@ bool CheckAlignment(const ArrayData& array, int64_t alignment) { } } } + return true; +} + +} // namespace + +bool CheckAlignment(const ArrayData& array, int64_t alignment) { + if (!CheckSelfAlignment(array, alignment)) { + return false; + } if (array.type->id() == Type::DICTIONARY) { if (!CheckAlignment(*array.dictionary, alignment)) return false; @@ -213,16 +222,23 @@ Result> EnsureAlignment(std::shared_ptr ar MemoryPool* memory_pool) { if (!CheckAlignment(*array_data, alignment)) { std::vector> buffers_ = array_data->buffers; - if (alignment == kMallocAlignment) { - int malloc_alignment = GetMallocValuesAlignment(*array_data); - DCHECK_GE(buffers_.size(), 2); - ARROW_ASSIGN_OR_RAISE(buffers_[1], EnsureAlignment(std::move(buffers_[1]), - malloc_alignment, memory_pool)); - } else { - for (size_t i = 0; i < buffers_.size(); ++i) { - if (buffers_[i]) { - ARROW_ASSIGN_OR_RAISE(buffers_[i], EnsureAlignment(std::move(buffers_[i]), - alignment, memory_pool)); + if (!CheckSelfAlignment(*array_data, alignment)) { + if (alignment == kMallocAlignment) { + DCHECK_GE(buffers_.size(), 2); + // If we get here then we know that the values buffer is not aligned properly. + // Since we need to copy the buffer we might as well update it to + // kDefaultBufferAlignment. This helps to avoid cases where the minimum required + // alignment is less than the allocator's minimum alignment (e.g. malloc requires + // a minimum of 8 byte alignment on a 64-bit system) + ARROW_ASSIGN_OR_RAISE( + buffers_[1], EnsureAlignment(std::move(buffers_[1]), kDefaultBufferAlignment, + memory_pool)); + } else { + for (size_t i = 0; i < buffers_.size(); ++i) { + if (buffers_[i]) { + ARROW_ASSIGN_OR_RAISE(buffers_[i], EnsureAlignment(std::move(buffers_[i]), + alignment, memory_pool)); + } } } } diff --git a/cpp/src/arrow/util/align_util_test.cc b/cpp/src/arrow/util/align_util_test.cc index c4ec83de3ee..08c4868bed2 100644 --- a/cpp/src/arrow/util/align_util_test.cc +++ b/cpp/src/arrow/util/align_util_test.cc @@ -24,6 +24,7 @@ #include "arrow/array.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/testing/extension_type.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/util/align_util.h" @@ -278,4 +279,252 @@ TEST(EnsureAlignment, Table) { ASSERT_EQ(util::CheckAlignment(*aligned_table, 2048, &needs_alignment), true); } +using TypesRequiringSomeKindOfAlignment = + testing::Types; + +using TypesNotRequiringAlignment = + testing::Types; + +TEST(EnsureAlignment, Malloc) {} + +template +std::shared_ptr sample_type() { + return TypeTraits::type_singleton(); +} + +template <> +std::shared_ptr sample_type() { + return fixed_size_binary(16); +} + +template <> +std::shared_ptr sample_type() { + return fixed_size_list(uint8(), 16); +} + +template <> +std::shared_ptr sample_type() { + return decimal128(32, 6); +} + +template <> +std::shared_ptr sample_type() { + return decimal256(60, 10); +} + +template <> +std::shared_ptr sample_type() { + return large_list(int8()); +} + +template <> +std::shared_ptr sample_type() { + return dense_union({field("x", int8()), field("y", uint8())}); +} + +template <> +std::shared_ptr sample_type() { + return map(utf8(), field("item", utf8())); +} + +template <> +std::shared_ptr sample_type() { + return duration(TimeUnit::NANO); +} + +template <> +std::shared_ptr sample_type() { + return timestamp(TimeUnit::NANO); +} + +template <> +std::shared_ptr sample_type() { + return time32(TimeUnit::SECOND); +} + +template <> +std::shared_ptr sample_type() { + return time64(TimeUnit::NANO); +} + +template <> +std::shared_ptr sample_type() { + return sparse_union({field("x", uint8()), field("y", int8())}, {1, 2}); +} + +template +std::shared_ptr SampleArray() { + random::RandomArrayGenerator gen(42); + return gen.ArrayOf(sample_type(), 100)->data(); +} + +template <> +std::shared_ptr SampleArray() { + auto ty = sparse_union({field("ints", int64()), field("strs", utf8())}, {2, 7}); + auto ints = ArrayFromJSON(int64(), "[0, 1, 2, 3]"); + auto strs = ArrayFromJSON(utf8(), R"(["a", null, "c", "d"])"); + auto ids = ArrayFromJSON(int8(), "[2, 7, 2, 7]")->data()->buffers[1]; + const int length = 4; + SparseUnionArray arr(ty, length, {ints, strs}, ids); + return arr.data(); +} + +class MallocAlignment : public ::testing::Test { + public: + void CheckModified(const ArrayData& src, const ArrayData& dst) { + ASSERT_EQ(src.buffers.size(), dst.buffers.size()); + for (std::size_t i = 0; i < src.buffers.size(); i++) { + if (!src.buffers[i] || !dst.buffers[i]) { + continue; + } + if (src.buffers[i]->address() != dst.buffers[i]->address()) { + return; + } + } + FAIL() << "Expected at least one buffer to have been modified by EnsureAlignment"; + } + + void CheckUnmodified(const ArrayData& src, const ArrayData& dst) { + ASSERT_EQ(src.buffers.size(), dst.buffers.size()); + for (std::size_t i = 0; i < src.buffers.size(); i++) { + if (!src.buffers[i] || !dst.buffers[i]) { + continue; + } + ASSERT_EQ(src.buffers[i]->address(), dst.buffers[i]->address()); + } + } + + std::shared_ptr UnalignValues(const ArrayData& array) { + if (array.buffers.size() < 2) { + // We can't unalign the values if there isn't a values buffer but we can + // still make sure EnsureAligned is a no-op + return std::make_shared(array); + } + std::vector> new_buffers(array.buffers); + + const auto& buffer_to_modify = array.buffers[1]; + EXPECT_OK_AND_ASSIGN( + std::shared_ptr padded, + AllocateBuffer(buffer_to_modify->size() + 1, default_memory_pool())); + memcpy(padded->mutable_data() + 1, buffer_to_modify->data(), + buffer_to_modify->size()); + std::shared_ptr unaligned = SliceBuffer(padded, 1); + new_buffers[1] = std::move(unaligned); + + std::shared_ptr array_data = std::make_shared(array); + array_data->buffers = std::move(new_buffers); + return array_data; + } + + std::shared_ptr UnalignValues(const Array& array) { + std::shared_ptr array_data = UnalignValues(*array.data()); + return MakeArray(array_data); + } +}; + +template +class MallocAlignmentRequired : public MallocAlignment {}; +template +class MallocAlignmentNotRequired : public MallocAlignment {}; + +TYPED_TEST_SUITE(MallocAlignmentRequired, TypesRequiringSomeKindOfAlignment); +TYPED_TEST_SUITE(MallocAlignmentNotRequired, TypesNotRequiringAlignment); + +TYPED_TEST(MallocAlignmentRequired, RoundTrip) { + std::shared_ptr data = SampleArray(); + std::shared_ptr unaligned = this->UnalignValues(*data); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(unaligned, util::kMallocAlignment, default_memory_pool())); + + AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); + this->CheckModified(*unaligned, *aligned); +} + +TYPED_TEST(MallocAlignmentNotRequired, RoundTrip) { + std::shared_ptr data = SampleArray(); + std::shared_ptr unaligned = this->UnalignValues(*data); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(unaligned, util::kMallocAlignment, default_memory_pool())); + + AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); + this->CheckUnmodified(*unaligned, *aligned); +} + +TEST_F(MallocAlignment, RunEndEncoded) { + // Run end requires alignment, value type does not + std::shared_ptr run_ends = ArrayFromJSON(int32(), "[3, 5]"); + std::shared_ptr values = ArrayFromJSON(int8(), "[50, 100]"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr array, + RunEndEncodedArray::Make(/*logical_length=*/5, std::move(run_ends), + std::move(values), 0)); + + std::shared_ptr unaligned_ree = std::make_shared(*array->data()); + unaligned_ree->child_data[0] = this->UnalignValues(*unaligned_ree->child_data[0]); + unaligned_ree->child_data[1] = this->UnalignValues(*unaligned_ree->child_data[1]); + + std::shared_ptr aligned_ree = std::make_shared(*unaligned_ree); + + ASSERT_OK_AND_ASSIGN( + aligned_ree, + util::EnsureAlignment(aligned_ree, util::kMallocAlignment, default_memory_pool())); + + this->CheckModified(*unaligned_ree->child_data[0], *aligned_ree->child_data[0]); + this->CheckUnmodified(*unaligned_ree->child_data[1], *aligned_ree->child_data[1]); +} + +TEST_F(MallocAlignment, Dictionary) { + // Dictionary values require alignment, dictionary keys do not + std::shared_ptr int8_utf8 = dictionary(int8(), utf8()); + std::shared_ptr array = ArrayFromJSON(int8_utf8, R"(["x", "x", "y"])"); + + std::shared_ptr unaligned_dict = std::make_shared(*array->data()); + unaligned_dict->dictionary = this->UnalignValues(*unaligned_dict->dictionary); + unaligned_dict = this->UnalignValues(*unaligned_dict); + + std::shared_ptr aligned_dict = std::make_shared(*unaligned_dict); + + ASSERT_OK_AND_ASSIGN( + aligned_dict, + util::EnsureAlignment(aligned_dict, util::kMallocAlignment, default_memory_pool())); + + this->CheckUnmodified(*unaligned_dict, *aligned_dict); + this->CheckModified(*unaligned_dict->dictionary, *aligned_dict->dictionary); + + // Dictionary values do not require alignment, dictionary keys do + std::shared_ptr int16_dec128 = dictionary(int16(), decimal128(5, 2)); + array = ArrayFromJSON(int16_dec128, R"(["123.45", "111.22"])"); + + unaligned_dict = std::make_shared(*array->data()); + unaligned_dict->dictionary = this->UnalignValues(*unaligned_dict->dictionary); + unaligned_dict = this->UnalignValues(*unaligned_dict); + + aligned_dict = std::make_shared(*unaligned_dict); + + ASSERT_OK_AND_ASSIGN( + aligned_dict, + util::EnsureAlignment(aligned_dict, util::kMallocAlignment, default_memory_pool())); + + this->CheckModified(*unaligned_dict, *aligned_dict); + this->CheckUnmodified(*unaligned_dict->dictionary, *aligned_dict->dictionary); +} + +TEST_F(MallocAlignment, Extension) { + std::shared_ptr array = ExampleSmallint(); + + std::shared_ptr unaligned = this->UnalignValues(*array->data()); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(unaligned, util::kMallocAlignment, default_memory_pool())); + + this->CheckModified(*unaligned, *aligned); +} + } // namespace arrow From b3c54a91ea55adb6834e11702b9751a041fb012f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 12 May 2023 02:08:02 -0700 Subject: [PATCH 05/18] Fixing default in alignment check --- cpp/src/arrow/util/align_util.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc index 10b751e9393..2069ca2f794 100644 --- a/cpp/src/arrow/util/align_util.cc +++ b/cpp/src/arrow/util/align_util.cc @@ -116,7 +116,7 @@ int GetMallocValuesAlignment(const ArrayData& array) { Status::Invalid("Could not check alignment for type id ", type_id, " keeping existing alignment") .Warn(); - return 8; + return 1; } } From 29189369ebc912ae0d0d859542d5eed214b8d2a3 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 19 May 2023 11:27:36 -0700 Subject: [PATCH 06/18] Addressed PR feedback --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/acero/source_node.cc | 2 +- cpp/src/arrow/type_traits.cc | 94 +++++++++++++++++ cpp/src/arrow/type_traits.h | 22 ++++ cpp/src/arrow/util/align_util.cc | 142 +++++++------------------- cpp/src/arrow/util/align_util.h | 3 +- cpp/src/arrow/util/align_util_test.cc | 24 ++--- 7 files changed, 167 insertions(+), 121 deletions(-) create mode 100644 cpp/src/arrow/type_traits.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 4e6826bc61f..d928cdf58b1 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -180,6 +180,7 @@ set(ARROW_SRCS tensor/csf_converter.cc tensor/csx_converter.cc type.cc + type_traits.cc visitor.cc c/bridge.cc io/buffered.cc diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index d1945bf2d5d..6f8843e76c7 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -108,7 +108,7 @@ struct SourceNode : ExecNode, public TracedNode { if (value.is_array()) { ARROW_ASSIGN_OR_RAISE( value, arrow::util::EnsureAlignment(value.array(), - arrow::util::kMallocAlignment, + arrow::util::kValueAlignment, default_memory_pool())); } } diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc new file mode 100644 index 00000000000..094138eab7a --- /dev/null +++ b/cpp/src/arrow/type_traits.cc @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/type_traits.h" + +#include "arrow/util/logging.h" + +namespace arrow { + +namespace {} // namespace + +int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { + if (buffer_index != 1) { + // If the buffer index is 0 then either: + // * The array type has no buffers, thus this shouldn't be called anyways + // * The array has a validity buffer at 0, no alignment needed + // * The array is a union array and has a types buffer at 0, no alignment needed + // If the buffer index is > 1 then, in all current cases, it represents binary + // data and no alignment is needed + return 1; + } + DCHECK_NE(type_id, Type::DICTIONARY); + DCHECK_NE(type_id, Type::EXTENSION); + + switch (type_id) { + case Type::NA: // No buffers + case Type::FIXED_SIZE_LIST: // No second buffer (values in child array) + case Type::FIXED_SIZE_BINARY: // Fixed size binary could be dangerous but the + // compute kernels don't type pun this. E.g. if + // an extension type is storing some kind of struct + // here then the user should do their own alignment + // check before casting to an array of structs + case Type::BOOL: // Always treated as uint8_t* + case Type::INT8: // Always treated as uint8_t* + case Type::UINT8: // Always treated as uint8_t* + case Type::SPARSE_UNION: // No second buffer + case Type::RUN_END_ENCODED: // No buffers + case Type::STRUCT: // No second buffer + return 1; + case Type::INT16: + case Type::UINT16: + case Type::HALF_FLOAT: + return 2; + case Type::INT32: + case Type::UINT32: + case Type::FLOAT: + case Type::STRING: // Offsets may be cast to int32_t* + case Type::BINARY: // Offsets may be cast to int32_t* + case Type::DATE32: + case Type::TIME32: + case Type::LIST: // Offsets may be cast to int32_t*, data is in child array + case Type::MAP: // This is a list array + case Type::DENSE_UNION: // Has an offsets buffer of int32_t* + case Type::INTERVAL_MONTHS: // Stored as int32_t* + case Type::INTERVAL_DAY_TIME: // Stored as two contiguous 32-bit integers + return 4; + case Type::INT64: + case Type::UINT64: + case Type::DOUBLE: + case Type::DECIMAL128: // May be cast to GenericBasicDecimal* which requires + // alignment of 8 + case Type::DECIMAL256: // May be cast to GenericBasicDecimal* which requires + // alignment of 8 + case Type::LARGE_BINARY: // Offsets may be cast to int64_t* + case Type::LARGE_LIST: // Offsets may be cast to int64_t* + case Type::LARGE_STRING: // Offsets may be cast to int64_t* + case Type::DATE64: + case Type::TIME64: + case Type::TIMESTAMP: + case Type::DURATION: + case Type::INTERVAL_MONTH_DAY_NANO: // Stored as two 32-bit integers and a 64-bit + // integer + return 8; + default: + Status::Invalid("Could not check alignment for type id ", type_id).Warn(); + return 1; + } +} + +} // namespace arrow \ No newline at end of file diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h index 2d20d87d143..4ee0a7163db 100644 --- a/cpp/src/arrow/type_traits.h +++ b/cpp/src/arrow/type_traits.h @@ -1309,6 +1309,28 @@ static inline int offset_bit_width(Type::type type_id) { return 0; } +/// \brief get the alignment a buffer should have to be considered "value aligned" +/// +/// Some buffers are frequently type-punned. For example, in an int32 array the +/// values buffer is frequently cast to int32_t* +/// +/// This sort of punning is technically only valid if the pointer is aligned to a +/// proper width (e.g. 4 bytes in the case of int32). However, most modern compilers +/// are quite permissive if we get this wrong. Note that this alignment is something +/// that is guaranteed by malloc (e.g. new int32_t[] will return a buffer that is 4 +/// byte aligned) or common libraries (e.g. numpy) but it is not currently guaranteed +/// by flight (GH-32276). +/// +/// We call this "value aligned" and this method will calculate that required alignment. +/// +/// \param type_id the type of the array containing the buffer +/// Note: this should be the indices type for a dictionary array since +/// A dictionary array's buffers are indices. It should be the storage +/// type for an extension array. +/// \param buffer_index the index of the buffer to check, for example 0 will typically +/// give you the alignment expected of the validity buffer +int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index); + /// \brief Check for an integer type (signed or unsigned) /// /// \param[in] type the type to check diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc index 2069ca2f794..8bb65ed1012 100644 --- a/cpp/src/arrow/util/align_util.cc +++ b/cpp/src/arrow/util/align_util.cc @@ -21,6 +21,7 @@ #include "arrow/chunked_array.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" @@ -34,100 +35,30 @@ bool CheckAlignment(const Buffer& buffer, int64_t alignment) { namespace { -// Some buffers are frequently type-punned. For example, in an int32 array the -// values buffer is frequently cast to int32_t* -// -// This sort of punning is only valid if the pointer is aligned to a proper width -// (e.g. 4 bytes in the case of int32). -// -// We generally assume that all buffers are at least 8-bit aligned and so we only -// need to worry about buffers that are commonly cast to wider data types. Note that -// this alignment is something that is guaranteed by malloc (e.g. new int32_t[] will -// return a buffer that is 4 byte aligned) or common libraries (e.g. numpy) but it is -// not currently guaranteed by flight (GH-32276). -// -// By happy coincedence, for every data type, the only buffer that might need wider -// alignment is the second buffer (at index 1). This function returns the expected -// alignment (in bits) of the second buffer for the given array to safely allow this cast. -// -// If the array's type doesn't have a second buffer or the second buffer is not expected -// to be type punned, then we return 8. -int GetMallocValuesAlignment(const ArrayData& array) { - // Make sure to use the storage type id - auto type_id = array.type->storage_id(); +// Returns the type that controls how the buffers of this ArrayData (not its children) +// should behave +Type::type GetTypeForBuffers(const ArrayData& array) { + Type::type type_id = array.type->storage_id(); if (type_id == Type::DICTIONARY) { - // The values buffer is in a different ArrayData and so we only check the indices - // buffer here. The values array data will be checked by the calling method. - type_id = ::arrow::internal::checked_pointer_cast(array.type) - ->index_type() - ->id(); - } - switch (type_id) { - case Type::NA: // No buffers - case Type::FIXED_SIZE_LIST: // No second buffer (values in child array) - case Type::FIXED_SIZE_BINARY: // Fixed size binary could be dangerous but the - // compute kernels don't type pun this. E.g. if - // an extension type is storing some kind of struct - // here then the user should do their own alignment - // check before casting to an array of structs - case Type::BOOL: // Always treated as uint8_t* - case Type::INT8: // Always treated as uint8_t* - case Type::UINT8: // Always treated as uint8_t* - case Type::DECIMAL128: // Always treated as uint8_t* - case Type::DECIMAL256: // Always treated as uint8_t* - case Type::SPARSE_UNION: // Types array is uint8_t, no offsets array - case Type::RUN_END_ENCODED: // No buffers - case Type::STRUCT: // No buffers beyond validity - // These have no buffers or all buffers need only byte alignment - return 1; - case Type::INT16: - case Type::UINT16: - case Type::HALF_FLOAT: - return 2; - case Type::INT32: - case Type::UINT32: - case Type::FLOAT: - case Type::STRING: // Offsets may be cast to int32_t*, data is only uint8_t* - case Type::BINARY: // Offsets may be cast to int32_t*, data is only uint8_t* - case Type::DATE32: - case Type::TIME32: - case Type::LIST: // Offsets may be cast to int32_t*, data is in child array - case Type::MAP: // This is a list array - case Type::DENSE_UNION: // Has an offsets buffer of int32_t* - case Type::INTERVAL_MONTHS: // Stored as int32_t* - return 4; - case Type::INT64: - case Type::UINT64: - case Type::DOUBLE: - case Type::LARGE_BINARY: // Offsets may be cast to int64_t* - case Type::LARGE_LIST: // Offsets may be cast to int64_t* - case Type::LARGE_STRING: // Offsets may be cast to int64_t* - case Type::DATE64: - case Type::TIME64: - case Type::TIMESTAMP: - case Type::DURATION: - case Type::INTERVAL_DAY_TIME: // Stored as two contiguous 32-bit integers but may be - // cast to struct* containing both integers - return 8; - case Type::INTERVAL_MONTH_DAY_NANO: // Stored as two 32-bit integers and a 64-bit - // integer - return 16; - default: - Status::Invalid("Could not check alignment for type id ", type_id, - " keeping existing alignment") - .Warn(); - return 1; + return ::arrow::internal::checked_pointer_cast(array.type) + ->index_type() + ->id(); } + return type_id; } // Checks to see if an array's own buffers are aligned but doesn't check // children bool CheckSelfAlignment(const ArrayData& array, int64_t alignment) { - if (alignment == kMallocAlignment) { - int malloc_alignment = GetMallocValuesAlignment(array); - if (array.buffers.size() >= 2) { - if (!CheckAlignment(*array.buffers[1], malloc_alignment)) { - return false; + if (alignment == kValueAlignment) { + Type::type type_id = GetTypeForBuffers(array); + for (std::size_t i = 0; i < array.buffers.size(); i++) { + if (array.buffers[i]) { + int expected_alignment = + RequiredValueAlignmentForBuffer(type_id, static_cast(i)); + if (!CheckAlignment(*array.buffers[i], expected_alignment)) { + return false; + } } } } else { @@ -204,12 +135,17 @@ bool CheckAlignment(const Table& table, int64_t alignment, return all_aligned; } +// Most allocators require a minimum of 8-byte alignment. +constexpr int64_t kMinimumAlignment = 8; + Result> EnsureAlignment(std::shared_ptr buffer, int64_t alignment, MemoryPool* memory_pool) { if (!CheckAlignment(*buffer, alignment)) { - ARROW_ASSIGN_OR_RAISE(auto new_buffer, - AllocateBuffer(buffer->size(), alignment, memory_pool)); + int64_t minimal_compatible_alignment = std::max(kMinimumAlignment, alignment); + ARROW_ASSIGN_OR_RAISE( + auto new_buffer, + AllocateBuffer(buffer->size(), minimal_compatible_alignment, memory_pool)); std::memcpy(new_buffer->mutable_data(), buffer->data(), buffer->size()); return std::move(new_buffer); } else { @@ -222,24 +158,17 @@ Result> EnsureAlignment(std::shared_ptr ar MemoryPool* memory_pool) { if (!CheckAlignment(*array_data, alignment)) { std::vector> buffers_ = array_data->buffers; - if (!CheckSelfAlignment(*array_data, alignment)) { - if (alignment == kMallocAlignment) { - DCHECK_GE(buffers_.size(), 2); - // If we get here then we know that the values buffer is not aligned properly. - // Since we need to copy the buffer we might as well update it to - // kDefaultBufferAlignment. This helps to avoid cases where the minimum required - // alignment is less than the allocator's minimum alignment (e.g. malloc requires - // a minimum of 8 byte alignment on a 64-bit system) - ARROW_ASSIGN_OR_RAISE( - buffers_[1], EnsureAlignment(std::move(buffers_[1]), kDefaultBufferAlignment, - memory_pool)); - } else { - for (size_t i = 0; i < buffers_.size(); ++i) { - if (buffers_[i]) { - ARROW_ASSIGN_OR_RAISE(buffers_[i], EnsureAlignment(std::move(buffers_[i]), - alignment, memory_pool)); - } + Type::type type_id = GetTypeForBuffers(*array_data); + for (size_t i = 0; i < buffers_.size(); ++i) { + if (buffers_[i]) { + int64_t expected_alignment = alignment; + if (alignment == kValueAlignment) { + expected_alignment = + RequiredValueAlignmentForBuffer(type_id, static_cast(i)); } + ARROW_ASSIGN_OR_RAISE( + buffers_[i], + EnsureAlignment(std::move(buffers_[i]), expected_alignment, memory_pool)); } } @@ -257,7 +186,6 @@ Result> EnsureAlignment(std::shared_ptr ar array_data->type, array_data->length, std::move(buffers_), array_data->child_data, array_data->dictionary, array_data->GetNullCount(), array_data->offset); return std::move(new_array_data); - } else { return std::move(array_data); } diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h index b6376c63305..f9640892f43 100644 --- a/cpp/src/arrow/util/align_util.h +++ b/cpp/src/arrow/util/align_util.h @@ -72,12 +72,13 @@ namespace util { // Functions to check if the provided Arrow object is aligned by the specified alignment /// \brief if this is specified in one of the CheckAlignment or EnsureAlignment functions +/// /// then the funciton will ensure each buffer is suitably aligned for the data type of the /// array. For example, given an int32 buffer the validity buffer must be a multiple of 8 /// and the values buffer must be a multiple of 32. Given a large_string buffer the /// validity buffer and values buffers must be multiples of 8 and the offsets buffer must /// be a multiple of 64. -constexpr int64_t kMallocAlignment = -3; +constexpr int64_t kValueAlignment = -3; /// \brief calculate if the buffer's address is a multiple of `alignment` /// \param buffer the buffer to check diff --git a/cpp/src/arrow/util/align_util_test.cc b/cpp/src/arrow/util/align_util_test.cc index 08c4868bed2..e9358705869 100644 --- a/cpp/src/arrow/util/align_util_test.cc +++ b/cpp/src/arrow/util/align_util_test.cc @@ -282,13 +282,13 @@ TEST(EnsureAlignment, Table) { using TypesRequiringSomeKindOfAlignment = testing::Types; + Decimal128Type, Decimal256Type, TimestampType, DurationType, MapType, + DenseUnionType, LargeBinaryType, LargeListType, LargeStringType, + MonthIntervalType, DayTimeIntervalType, MonthDayNanoIntervalType>; using TypesNotRequiringAlignment = testing::Types; + BooleanType, SparseUnionType>; TEST(EnsureAlignment, Malloc) {} @@ -440,7 +440,7 @@ TYPED_TEST(MallocAlignmentRequired, RoundTrip) { std::shared_ptr unaligned = this->UnalignValues(*data); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, - util::EnsureAlignment(unaligned, util::kMallocAlignment, default_memory_pool())); + util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); this->CheckModified(*unaligned, *aligned); @@ -451,7 +451,7 @@ TYPED_TEST(MallocAlignmentNotRequired, RoundTrip) { std::shared_ptr unaligned = this->UnalignValues(*data); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, - util::EnsureAlignment(unaligned, util::kMallocAlignment, default_memory_pool())); + util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); this->CheckUnmodified(*unaligned, *aligned); @@ -473,7 +473,7 @@ TEST_F(MallocAlignment, RunEndEncoded) { ASSERT_OK_AND_ASSIGN( aligned_ree, - util::EnsureAlignment(aligned_ree, util::kMallocAlignment, default_memory_pool())); + util::EnsureAlignment(aligned_ree, util::kValueAlignment, default_memory_pool())); this->CheckModified(*unaligned_ree->child_data[0], *aligned_ree->child_data[0]); this->CheckUnmodified(*unaligned_ree->child_data[1], *aligned_ree->child_data[1]); @@ -492,14 +492,14 @@ TEST_F(MallocAlignment, Dictionary) { ASSERT_OK_AND_ASSIGN( aligned_dict, - util::EnsureAlignment(aligned_dict, util::kMallocAlignment, default_memory_pool())); + util::EnsureAlignment(aligned_dict, util::kValueAlignment, default_memory_pool())); this->CheckUnmodified(*unaligned_dict, *aligned_dict); this->CheckModified(*unaligned_dict->dictionary, *aligned_dict->dictionary); // Dictionary values do not require alignment, dictionary keys do - std::shared_ptr int16_dec128 = dictionary(int16(), decimal128(5, 2)); - array = ArrayFromJSON(int16_dec128, R"(["123.45", "111.22"])"); + std::shared_ptr int16_int8 = dictionary(int16(), int8()); + array = ArrayFromJSON(int16_int8, R"([7, 11])"); unaligned_dict = std::make_shared(*array->data()); unaligned_dict->dictionary = this->UnalignValues(*unaligned_dict->dictionary); @@ -509,7 +509,7 @@ TEST_F(MallocAlignment, Dictionary) { ASSERT_OK_AND_ASSIGN( aligned_dict, - util::EnsureAlignment(aligned_dict, util::kMallocAlignment, default_memory_pool())); + util::EnsureAlignment(aligned_dict, util::kValueAlignment, default_memory_pool())); this->CheckModified(*unaligned_dict, *aligned_dict); this->CheckUnmodified(*unaligned_dict->dictionary, *aligned_dict->dictionary); @@ -522,7 +522,7 @@ TEST_F(MallocAlignment, Extension) { ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, - util::EnsureAlignment(unaligned, util::kMallocAlignment, default_memory_pool())); + util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); this->CheckModified(*unaligned, *aligned); } From a429a7cdf6f2d7dee957703943136fcda9ba6e01 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 19 May 2023 11:30:24 -0700 Subject: [PATCH 07/18] Update cpp/src/arrow/util/align_util.h Co-authored-by: Benjamin Kietzman --- cpp/src/arrow/util/align_util.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h index f9640892f43..3ea0f11d925 100644 --- a/cpp/src/arrow/util/align_util.h +++ b/cpp/src/arrow/util/align_util.h @@ -73,7 +73,7 @@ namespace util { /// \brief if this is specified in one of the CheckAlignment or EnsureAlignment functions /// -/// then the funciton will ensure each buffer is suitably aligned for the data type of the +/// then the function will ensure each buffer is suitably aligned for the data type of the /// array. For example, given an int32 buffer the validity buffer must be a multiple of 8 /// and the values buffer must be a multiple of 32. Given a large_string buffer the /// validity buffer and values buffers must be multiples of 8 and the offsets buffer must From 84d91db2bcc5987115d51ab99636adf7feedce3c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 19 May 2023 11:33:05 -0700 Subject: [PATCH 08/18] Minor typo --- cpp/src/arrow/type_traits.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc index 094138eab7a..25382101eab 100644 --- a/cpp/src/arrow/type_traits.cc +++ b/cpp/src/arrow/type_traits.cc @@ -21,8 +21,6 @@ namespace arrow { -namespace {} // namespace - int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { if (buffer_index != 1) { // If the buffer index is 0 then either: From 55079a37b650df1c5c98854945f7e8cc09a93f40 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 19 May 2023 11:33:38 -0700 Subject: [PATCH 09/18] Newline at end of file --- cpp/src/arrow/type_traits.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc index 25382101eab..22ecac8720f 100644 --- a/cpp/src/arrow/type_traits.cc +++ b/cpp/src/arrow/type_traits.cc @@ -89,4 +89,4 @@ int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { } } -} // namespace arrow \ No newline at end of file +} // namespace arrow From feb917e9da121bd94a5bc3f227c5ce9c3805a692 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 19 May 2023 12:24:25 -0700 Subject: [PATCH 10/18] Made the unaligned buffer handling behavior in Acero configurable with a default to warn --- cpp/src/arrow/acero/exec_plan.cc | 12 ++++--- cpp/src/arrow/acero/exec_plan.h | 28 +++++++++++++++ cpp/src/arrow/acero/plan_test.cc | 38 ++++++++++++++++++++ cpp/src/arrow/acero/source_node.cc | 51 ++++++++++++++++++++++----- cpp/src/arrow/testing/gtest_util.cc | 26 ++++++++++++++ cpp/src/arrow/testing/gtest_util.h | 5 +++ cpp/src/arrow/util/align_util_test.cc | 45 +++++------------------ 7 files changed, 157 insertions(+), 48 deletions(-) diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc index 2fe8c484e40..19e075662c4 100644 --- a/cpp/src/arrow/acero/exec_plan.cc +++ b/cpp/src/arrow/acero/exec_plan.cc @@ -621,7 +621,8 @@ Future> DeclarationToTableImpl( query_options.function_registry); std::shared_ptr> output_table = std::make_shared>(); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, ExecPlan::Make(exec_ctx)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(query_options, exec_ctx)); TableSinkNodeOptions sink_options(output_table.get()); sink_options.sequence_output = query_options.sequence_output; sink_options.names = std::move(query_options.field_names); @@ -648,7 +649,8 @@ Future DeclarationToExecBatchesImpl( std::shared_ptr out_schema; AsyncGenerator> sink_gen; ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, ExecPlan::Make(exec_ctx)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(options, exec_ctx)); SinkNodeOptions sink_options(&sink_gen, &out_schema); sink_options.sequence_output = options.sequence_output; Declaration with_sink = Declaration::Sequence({declaration, {"sink", sink_options}}); @@ -678,7 +680,8 @@ Future DeclarationToExecBatchesImpl( Future<> DeclarationToStatusImpl(Declaration declaration, QueryOptions options, ::arrow::internal::Executor* cpu_executor) { ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, ExecPlan::Make(exec_ctx)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(options, exec_ctx)); ARROW_ASSIGN_OR_RAISE(ExecNode * last_node, declaration.AddToPlan(exec_plan.get())); if (!last_node->is_sink()) { ConsumingSinkNodeOptions sink_options(NullSinkNodeConsumer::Make()); @@ -972,7 +975,8 @@ Result>> DeclarationToRecordBatchGen ::arrow::internal::Executor* cpu_executor, std::shared_ptr* out_schema) { auto converter = std::make_shared(); ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr plan, ExecPlan::Make(exec_ctx)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr plan, + ExecPlan::Make(options, exec_ctx)); Declaration with_sink = Declaration::Sequence( {declaration, {"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}}); diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h index 424d9107d27..c4c8820d418 100644 --- a/cpp/src/arrow/acero/exec_plan.h +++ b/cpp/src/arrow/acero/exec_plan.h @@ -496,6 +496,9 @@ struct ARROW_ACERO_EXPORT Declaration { std::string label; }; +/// \brief describes how to handle unaligned buffers +enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kAbort }; + /// \brief plan-wide options that can be specified when executing an execution plan struct ARROW_ACERO_EXPORT QueryOptions { /// \brief Should the plan use a legacy batching strategy @@ -562,6 +565,31 @@ struct ARROW_ACERO_EXPORT QueryOptions { /// /// If set then the number of names must equal the number of output columns std::vector field_names; + + /// Various compute functions and acero internals will type pun array + /// buffers from uint8_t* to some kind of value type (e.g. we might + /// cast to int32_t* to add two int32 arrays) + /// + /// If the buffer is poorly algined (e.g. an int32 array is not aligned + /// on a 4-byte boundary) then this is technically undefined behavior. + /// However, most modern compilers and CPUs are fairly tolerant of this + /// behavior and nothing bad (beyond a small hit to performance) is likely + /// to happen. + /// + /// Note that this only applies to source buffers. All buffers allocated interally + /// by Acero will be suitably aligned. + /// + /// If this field is set to kWarn then Acero will check if any buffers are unaligned + /// and, if they are, will emit a warning. + /// + /// If this field is set to kReallocate then Acero will allocate a new, suitably aligned + /// buffer and copy the contents from the old buffer into this new buffer. + /// + /// If this field is set to kError then Acero will gracefully abort the plan instead. + /// + /// If this field is set to kIgnore then Acero will not even check if the buffers are + /// unaligned. + UnalignedBufferHandling unaligned_buffer_handling = UnalignedBufferHandling::kWarn; }; /// \brief Calculate the output schema of a declaration diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc index 8ec5c0f70a9..379cb7dac6d 100644 --- a/cpp/src/arrow/acero/plan_test.cc +++ b/cpp/src/arrow/acero/plan_test.cc @@ -1704,5 +1704,43 @@ TEST(ExecPlanExecution, SegmentedAggregationWithBatchCrossingSegment) { {expected}); } +TEST(ExecPlanExecution, UnalignedInput) { + std::shared_ptr array = ArrayFromJSON(int32(), "[1, 2, 3]"); + std::shared_ptr unaligned = UnalignValues(*array); + ASSERT_OK_AND_ASSIGN(ExecBatch sample_batch, + ExecBatch::Make({unaligned}, array->length())); + + BatchesWithSchema data; + data.batches = {std::move(sample_batch)}; + data.schema = schema({field("i32", int32())}); + + Declaration plan = Declaration::Sequence({ + {"exec_batch_source", ExecBatchSourceNodeOptions(data.schema, data.batches)}, + }); + + int64_t initial_bytes_allocated = default_memory_pool()->total_bytes_allocated(); + + // By default we should warn and so the plan should finish ok + ASSERT_OK(DeclarationToStatus(plan)); + ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); + + QueryOptions query_options; + + // Nothing should happen if we ignore alignment + query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore; + ASSERT_OK(DeclarationToStatus(plan, query_options)); + ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); + + query_options.unaligned_buffer_handling = UnalignedBufferHandling::kAbort; + ASSERT_THAT(DeclarationToStatus(plan, query_options), + Raises(StatusCode::Invalid, + testing::HasSubstr("An input buffer was poorly aligned"))); + ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); + + query_options.unaligned_buffer_handling = UnalignedBufferHandling::kReallocate; + ASSERT_OK(DeclarationToStatus(plan, query_options)); + ASSERT_LT(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); +} + } // namespace acero } // namespace arrow diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index 6f8843e76c7..abcb3b50c80 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -52,6 +52,45 @@ using arrow::internal::MapVector; namespace acero { namespace { +Status HandleUnalignedBuffers(ExecBatch* batch, UnalignedBufferHandling handling) { + if (handling == UnalignedBufferHandling::kIgnore) { + return Status::OK(); + } + for (auto& value : batch->values) { + if (value.is_array()) { + switch (handling) { + case UnalignedBufferHandling::kAbort: + if (!arrow::util::CheckAlignment(*value.array(), + arrow::util::kValueAlignment)) { + return Status::Invalid( + "An input buffer was poorly aligned and UnalignedBufferHandling is set " + "to kAbort"); + } + break; + case UnalignedBufferHandling::kWarn: + if (!arrow::util::CheckAlignment(*value.array(), + arrow::util::kValueAlignment)) { + ARROW_LOG(WARNING) + << "An input buffer was poorly aligned. This could lead to crashes or " + "poor performance on some hardware. Please ensure that all Acero " + "sources generate aligned buffers."; + } + break; + case UnalignedBufferHandling::kReallocate: { + ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment( + value.array(), arrow::util::kValueAlignment, + default_memory_pool())); + break; + } + default: + return Status::UnknownError("Unexpected UnalignedBufferHandling option: ", + static_cast(handling)); + } + } + } + return Status::OK(); +} + struct SourceNode : ExecNode, public TracedNode { SourceNode(ExecPlan* plan, std::shared_ptr output_schema, AsyncGenerator> generator, @@ -104,14 +143,10 @@ struct SourceNode : ExecNode, public TracedNode { batch_size = morsel_length; } ExecBatch batch = morsel.Slice(offset, batch_size); - for (auto& value : batch.values) { - if (value.is_array()) { - ARROW_ASSIGN_OR_RAISE( - value, arrow::util::EnsureAlignment(value.array(), - arrow::util::kValueAlignment, - default_memory_pool())); - } - } + UnalignedBufferHandling unaligned_buffer_handling = + plan_->query_context()->options().unaligned_buffer_handling; + ARROW_RETURN_NOT_OK( + HandleUnalignedBuffers(&batch, unaligned_buffer_handling)); if (has_ordering) { batch.index = batch_index; } diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index 9569375bda9..a1af4099c40 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -1099,4 +1099,30 @@ std::shared_ptr GatingTask::Make(double timeout_seconds) { return std::make_shared(timeout_seconds); } +std::shared_ptr UnalignValues(const ArrayData& array) { + if (array.buffers.size() < 2) { + // We can't unalign the values if there isn't a values buffer but we can + // still make sure EnsureAligned is a no-op + return std::make_shared(array); + } + std::vector> new_buffers(array.buffers); + + const auto& buffer_to_modify = array.buffers[1]; + EXPECT_OK_AND_ASSIGN( + std::shared_ptr padded, + AllocateBuffer(buffer_to_modify->size() + 1, default_memory_pool())); + memcpy(padded->mutable_data() + 1, buffer_to_modify->data(), buffer_to_modify->size()); + std::shared_ptr unaligned = SliceBuffer(padded, 1); + new_buffers[1] = std::move(unaligned); + + std::shared_ptr array_data = std::make_shared(array); + array_data->buffers = std::move(new_buffers); + return array_data; +} + +std::shared_ptr UnalignValues(const Array& array) { + std::shared_ptr array_data = UnalignValues(*array.data()); + return MakeArray(array_data); +} + } // namespace arrow diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 55bd307b12c..e22834d593e 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -532,4 +532,9 @@ class ARROW_TESTING_EXPORT GatingTask { std::shared_ptr impl_; }; +/// \brief modify an array so that the buffer at index 1 (if it has one) is unaligned +std::shared_ptr UnalignValues(const ArrayData& array); +/// \brief modify an array so that the buffer at index 1 (if it has one) is unaligned +std::shared_ptr UnalignValues(const Array& array); + } // namespace arrow diff --git a/cpp/src/arrow/util/align_util_test.cc b/cpp/src/arrow/util/align_util_test.cc index e9358705869..15c5ef75923 100644 --- a/cpp/src/arrow/util/align_util_test.cc +++ b/cpp/src/arrow/util/align_util_test.cc @@ -398,33 +398,6 @@ class MallocAlignment : public ::testing::Test { ASSERT_EQ(src.buffers[i]->address(), dst.buffers[i]->address()); } } - - std::shared_ptr UnalignValues(const ArrayData& array) { - if (array.buffers.size() < 2) { - // We can't unalign the values if there isn't a values buffer but we can - // still make sure EnsureAligned is a no-op - return std::make_shared(array); - } - std::vector> new_buffers(array.buffers); - - const auto& buffer_to_modify = array.buffers[1]; - EXPECT_OK_AND_ASSIGN( - std::shared_ptr padded, - AllocateBuffer(buffer_to_modify->size() + 1, default_memory_pool())); - memcpy(padded->mutable_data() + 1, buffer_to_modify->data(), - buffer_to_modify->size()); - std::shared_ptr unaligned = SliceBuffer(padded, 1); - new_buffers[1] = std::move(unaligned); - - std::shared_ptr array_data = std::make_shared(array); - array_data->buffers = std::move(new_buffers); - return array_data; - } - - std::shared_ptr UnalignValues(const Array& array) { - std::shared_ptr array_data = UnalignValues(*array.data()); - return MakeArray(array_data); - } }; template @@ -437,7 +410,7 @@ TYPED_TEST_SUITE(MallocAlignmentNotRequired, TypesNotRequiringAlignment); TYPED_TEST(MallocAlignmentRequired, RoundTrip) { std::shared_ptr data = SampleArray(); - std::shared_ptr unaligned = this->UnalignValues(*data); + std::shared_ptr unaligned = UnalignValues(*data); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); @@ -448,7 +421,7 @@ TYPED_TEST(MallocAlignmentRequired, RoundTrip) { TYPED_TEST(MallocAlignmentNotRequired, RoundTrip) { std::shared_ptr data = SampleArray(); - std::shared_ptr unaligned = this->UnalignValues(*data); + std::shared_ptr unaligned = UnalignValues(*data); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); @@ -466,8 +439,8 @@ TEST_F(MallocAlignment, RunEndEncoded) { std::move(values), 0)); std::shared_ptr unaligned_ree = std::make_shared(*array->data()); - unaligned_ree->child_data[0] = this->UnalignValues(*unaligned_ree->child_data[0]); - unaligned_ree->child_data[1] = this->UnalignValues(*unaligned_ree->child_data[1]); + unaligned_ree->child_data[0] = UnalignValues(*unaligned_ree->child_data[0]); + unaligned_ree->child_data[1] = UnalignValues(*unaligned_ree->child_data[1]); std::shared_ptr aligned_ree = std::make_shared(*unaligned_ree); @@ -485,8 +458,8 @@ TEST_F(MallocAlignment, Dictionary) { std::shared_ptr array = ArrayFromJSON(int8_utf8, R"(["x", "x", "y"])"); std::shared_ptr unaligned_dict = std::make_shared(*array->data()); - unaligned_dict->dictionary = this->UnalignValues(*unaligned_dict->dictionary); - unaligned_dict = this->UnalignValues(*unaligned_dict); + unaligned_dict->dictionary = UnalignValues(*unaligned_dict->dictionary); + unaligned_dict = UnalignValues(*unaligned_dict); std::shared_ptr aligned_dict = std::make_shared(*unaligned_dict); @@ -502,8 +475,8 @@ TEST_F(MallocAlignment, Dictionary) { array = ArrayFromJSON(int16_int8, R"([7, 11])"); unaligned_dict = std::make_shared(*array->data()); - unaligned_dict->dictionary = this->UnalignValues(*unaligned_dict->dictionary); - unaligned_dict = this->UnalignValues(*unaligned_dict); + unaligned_dict->dictionary = UnalignValues(*unaligned_dict->dictionary); + unaligned_dict = UnalignValues(*unaligned_dict); aligned_dict = std::make_shared(*unaligned_dict); @@ -518,7 +491,7 @@ TEST_F(MallocAlignment, Dictionary) { TEST_F(MallocAlignment, Extension) { std::shared_ptr array = ExampleSmallint(); - std::shared_ptr unaligned = this->UnalignValues(*array->data()); + std::shared_ptr unaligned = UnalignValues(*array->data()); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, From 28a046b685809127712eef6eef6c0cb6d5356100 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 19 May 2023 12:35:19 -0700 Subject: [PATCH 11/18] Removed default from switch paths where it is better to let default be a compiler error --- cpp/src/arrow/acero/source_node.cc | 6 +++--- cpp/src/arrow/type_traits.cc | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index abcb3b50c80..96f58b0c4b6 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -59,6 +59,9 @@ Status HandleUnalignedBuffers(ExecBatch* batch, UnalignedBufferHandling handling for (auto& value : batch->values) { if (value.is_array()) { switch (handling) { + case UnalignedBufferHandling::kIgnore: + // Should be impossible to get here + return Status::OK(); case UnalignedBufferHandling::kAbort: if (!arrow::util::CheckAlignment(*value.array(), arrow::util::kValueAlignment)) { @@ -82,9 +85,6 @@ Status HandleUnalignedBuffers(ExecBatch* batch, UnalignedBufferHandling handling default_memory_pool())); break; } - default: - return Status::UnknownError("Unexpected UnalignedBufferHandling option: ", - static_cast(handling)); } } } diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc index 22ecac8720f..67fe879505b 100644 --- a/cpp/src/arrow/type_traits.cc +++ b/cpp/src/arrow/type_traits.cc @@ -83,7 +83,9 @@ int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { case Type::INTERVAL_MONTH_DAY_NANO: // Stored as two 32-bit integers and a 64-bit // integer return 8; - default: + case Type::DICTIONARY: + case Type::EXTENSION: + case Type::MAX_ID: Status::Invalid("Could not check alignment for type id ", type_id).Warn(); return 1; } From e7545ed1b8bc2eb65cf3aa4aa35c7a2629353e7b Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 19 May 2023 13:53:17 -0700 Subject: [PATCH 12/18] Adding windows export macros --- cpp/src/arrow/testing/gtest_util.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index e22834d593e..46806bdd924 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -533,8 +533,8 @@ class ARROW_TESTING_EXPORT GatingTask { }; /// \brief modify an array so that the buffer at index 1 (if it has one) is unaligned -std::shared_ptr UnalignValues(const ArrayData& array); +ARROW_TESTING_EXPORT std::shared_ptr UnalignValues(const ArrayData& array); /// \brief modify an array so that the buffer at index 1 (if it has one) is unaligned -std::shared_ptr UnalignValues(const Array& array); +ARROW_TESTING_EXPORT std::shared_ptr UnalignValues(const Array& array); } // namespace arrow From 9d1071d5d5c01386010c5e822a98a683cedece5e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 22 May 2023 13:51:30 -0700 Subject: [PATCH 13/18] Added an environment variable that can be used to control the default unaligned buffer behavior --- cpp/src/arrow/acero/exec_plan.cc | 29 +++++++++++++++++++++++++++++ cpp/src/arrow/acero/exec_plan.h | 12 +++++++++++- cpp/src/arrow/acero/source_node.cc | 8 +++++++- cpp/src/arrow/type_traits.cc | 6 +++++- 4 files changed, 52 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc index 19e075662c4..1d2585e6ef1 100644 --- a/cpp/src/arrow/acero/exec_plan.cc +++ b/cpp/src/arrow/acero/exec_plan.cc @@ -36,6 +36,7 @@ #include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/string.h" @@ -358,10 +359,38 @@ std::optional GetNodeIndex(const std::vector& nodes, return std::nullopt; } +const std::string kAceroAlignmentHandlingEnvVar = "ACERO_ALIGNMENT_HANDLING"; + +UnalignedBufferHandling DetermineDefaultUnalignedBufferHandling() { + auto maybe_value = ::arrow::internal::GetEnvVar(kAceroAlignmentHandlingEnvVar); + if (!maybe_value.ok()) { + return UnalignedBufferHandling::kWarn; + } + std::string value = maybe_value.MoveValueUnsafe(); + if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "warn")) { + return UnalignedBufferHandling::kWarn; + } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "ignore")) { + return UnalignedBufferHandling::kIgnore; + } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "reallocate")) { + return UnalignedBufferHandling::kReallocate; + } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "abort")) { + return UnalignedBufferHandling::kAbort; + } else { + ARROW_LOG(WARNING) << "unrecognized value for ACERO_ALIGNMENT_HANDLING: " << value; + return UnalignedBufferHandling::kWarn; + } +} + } // namespace const uint32_t ExecPlan::kMaxBatchSize; +UnalignedBufferHandling GetDefaultUnalignedBufferHandling() { + static UnalignedBufferHandling default_value = + DetermineDefaultUnalignedBufferHandling(); + return default_value; +} + Result> ExecPlan::Make( QueryOptions opts, ExecContext ctx, std::shared_ptr metadata) { diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h index c4c8820d418..06db7f22881 100644 --- a/cpp/src/arrow/acero/exec_plan.h +++ b/cpp/src/arrow/acero/exec_plan.h @@ -499,6 +499,13 @@ struct ARROW_ACERO_EXPORT Declaration { /// \brief describes how to handle unaligned buffers enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kAbort }; +/// \brief get the default behavior of unaligned buffer handling +/// +/// This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which +/// can be set to "warn", "ignore", "reallocate", or "abort". If the environment +/// variable is not set, or is set to an invalid value, this will return kWarn +UnalignedBufferHandling GetDefaultUnalignedBufferHandling(); + /// \brief plan-wide options that can be specified when executing an execution plan struct ARROW_ACERO_EXPORT QueryOptions { /// \brief Should the plan use a legacy batching strategy @@ -589,7 +596,10 @@ struct ARROW_ACERO_EXPORT QueryOptions { /// /// If this field is set to kIgnore then Acero will not even check if the buffers are /// unaligned. - UnalignedBufferHandling unaligned_buffer_handling = UnalignedBufferHandling::kWarn; + /// + /// If this field is not set then it will be treated as kWarn unless overridden + /// by the ACERO_ALIGNMENT_HANDLING environment variable + std::optional unaligned_buffer_handling; }; /// \brief Calculate the output schema of a declaration diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index 96f58b0c4b6..10198663bfb 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -143,8 +143,14 @@ struct SourceNode : ExecNode, public TracedNode { batch_size = morsel_length; } ExecBatch batch = morsel.Slice(offset, batch_size); - UnalignedBufferHandling unaligned_buffer_handling = + std::optional opt_unaligned_buffer_handling = plan_->query_context()->options().unaligned_buffer_handling; + UnalignedBufferHandling unaligned_buffer_handling; + if (opt_unaligned_buffer_handling.has_value()) { + unaligned_buffer_handling = *opt_unaligned_buffer_handling; + } else { + unaligned_buffer_handling = GetDefaultUnalignedBufferHandling(); + } ARROW_RETURN_NOT_OK( HandleUnalignedBuffers(&batch, unaligned_buffer_handling)); if (has_ordering) { diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc index 67fe879505b..eb489951835 100644 --- a/cpp/src/arrow/type_traits.cc +++ b/cpp/src/arrow/type_traits.cc @@ -86,9 +86,13 @@ int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { case Type::DICTIONARY: case Type::EXTENSION: case Type::MAX_ID: - Status::Invalid("Could not check alignment for type id ", type_id).Warn(); + Status::Invalid("RequiredValueAlignmentForBuffer called with invalid type id ", + type_id) + .Warn(); return 1; } + Status::Invalid("Could not check alignment for type id ", type_id).Warn(); + return 1; } } // namespace arrow From e8696505c7ec356b4367259f9eecdfdc4866d856 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 22 May 2023 22:19:54 -0700 Subject: [PATCH 14/18] Addressed review feedback --- cpp/src/arrow/acero/exec_plan.cc | 2 +- cpp/src/arrow/acero/source_node.cc | 11 +++-------- cpp/src/arrow/type_traits.cc | 8 +++----- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc index 1d2585e6ef1..27cd96456bb 100644 --- a/cpp/src/arrow/acero/exec_plan.cc +++ b/cpp/src/arrow/acero/exec_plan.cc @@ -359,7 +359,7 @@ std::optional GetNodeIndex(const std::vector& nodes, return std::nullopt; } -const std::string kAceroAlignmentHandlingEnvVar = "ACERO_ALIGNMENT_HANDLING"; +const char* kAceroAlignmentHandlingEnvVar = "ACERO_ALIGNMENT_HANDLING"; UnalignedBufferHandling DetermineDefaultUnalignedBufferHandling() { auto maybe_value = ::arrow::internal::GetEnvVar(kAceroAlignmentHandlingEnvVar); diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index 10198663bfb..cafb4ef2b44 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -143,14 +143,9 @@ struct SourceNode : ExecNode, public TracedNode { batch_size = morsel_length; } ExecBatch batch = morsel.Slice(offset, batch_size); - std::optional opt_unaligned_buffer_handling = - plan_->query_context()->options().unaligned_buffer_handling; - UnalignedBufferHandling unaligned_buffer_handling; - if (opt_unaligned_buffer_handling.has_value()) { - unaligned_buffer_handling = *opt_unaligned_buffer_handling; - } else { - unaligned_buffer_handling = GetDefaultUnalignedBufferHandling(); - } + UnalignedBufferHandling unaligned_buffer_handling = + plan_->query_context()->options().unaligned_buffer_handling.value_or( + GetDefaultUnalignedBufferHandling()); ARROW_RETURN_NOT_OK( HandleUnalignedBuffers(&batch, unaligned_buffer_handling)); if (has_ordering) { diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc index eb489951835..181dfd3f3c8 100644 --- a/cpp/src/arrow/type_traits.cc +++ b/cpp/src/arrow/type_traits.cc @@ -86,12 +86,10 @@ int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { case Type::DICTIONARY: case Type::EXTENSION: case Type::MAX_ID: - Status::Invalid("RequiredValueAlignmentForBuffer called with invalid type id ", - type_id) - .Warn(); - return 1; + break; } - Status::Invalid("Could not check alignment for type id ", type_id).Warn(); + Status::Invalid("RequiredValueAlignmentForBuffer called with invalid type id ", type_id) + .Warn(); return 1; } From d6f9b00ba91e5cfb49ea50f3ebd298c30d9cd00f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 25 May 2023 02:27:17 -0700 Subject: [PATCH 15/18] Addressing comments from the review --- cpp/src/arrow/acero/exec_plan.cc | 4 +-- cpp/src/arrow/acero/exec_plan.h | 14 +++++---- cpp/src/arrow/acero/plan_test.cc | 4 ++- cpp/src/arrow/acero/source_node.cc | 7 +++-- cpp/src/arrow/testing/gtest_util.h | 4 +++ cpp/src/arrow/type_traits.h | 3 +- cpp/src/arrow/util/align_util.cc | 25 ++++++++++----- cpp/src/arrow/util/align_util.h | 45 ++++++++++++++------------- cpp/src/arrow/util/align_util_test.cc | 32 ++++++++++--------- 9 files changed, 83 insertions(+), 55 deletions(-) diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc index 27cd96456bb..2f03159de56 100644 --- a/cpp/src/arrow/acero/exec_plan.cc +++ b/cpp/src/arrow/acero/exec_plan.cc @@ -373,8 +373,8 @@ UnalignedBufferHandling DetermineDefaultUnalignedBufferHandling() { return UnalignedBufferHandling::kIgnore; } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "reallocate")) { return UnalignedBufferHandling::kReallocate; - } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "abort")) { - return UnalignedBufferHandling::kAbort; + } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "error")) { + return UnalignedBufferHandling::kError; } else { ARROW_LOG(WARNING) << "unrecognized value for ACERO_ALIGNMENT_HANDLING: " << value; return UnalignedBufferHandling::kWarn; diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h index 06db7f22881..f82a0f8106b 100644 --- a/cpp/src/arrow/acero/exec_plan.h +++ b/cpp/src/arrow/acero/exec_plan.h @@ -496,13 +496,13 @@ struct ARROW_ACERO_EXPORT Declaration { std::string label; }; -/// \brief describes how to handle unaligned buffers -enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kAbort }; +/// \brief How to handle unaligned buffers +enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kError }; /// \brief get the default behavior of unaligned buffer handling /// /// This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which -/// can be set to "warn", "ignore", "reallocate", or "abort". If the environment +/// can be set to "warn", "ignore", "reallocate", or "error". If the environment /// variable is not set, or is set to an invalid value, this will return kWarn UnalignedBufferHandling GetDefaultUnalignedBufferHandling(); @@ -573,17 +573,19 @@ struct ARROW_ACERO_EXPORT QueryOptions { /// If set then the number of names must equal the number of output columns std::vector field_names; + /// \brief Policy for unaligned buffers in source data + /// /// Various compute functions and acero internals will type pun array /// buffers from uint8_t* to some kind of value type (e.g. we might /// cast to int32_t* to add two int32 arrays) /// - /// If the buffer is poorly algined (e.g. an int32 array is not aligned - /// on a 4-byte boundary) then this is technically undefined behavior. + /// If the buffer is poorly aligned (e.g. an int32 array is not aligned + /// on a 4-byte boundary) then this is technically undefined behavior in C++. /// However, most modern compilers and CPUs are fairly tolerant of this /// behavior and nothing bad (beyond a small hit to performance) is likely /// to happen. /// - /// Note that this only applies to source buffers. All buffers allocated interally + /// Note that this only applies to source buffers. All buffers allocated internally /// by Acero will be suitably aligned. /// /// If this field is set to kWarn then Acero will check if any buffers are unaligned diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc index 379cb7dac6d..f4cb2b48496 100644 --- a/cpp/src/arrow/acero/plan_test.cc +++ b/cpp/src/arrow/acero/plan_test.cc @@ -1726,12 +1726,14 @@ TEST(ExecPlanExecution, UnalignedInput) { QueryOptions query_options; +#ifndef ARROW_UBSAN // Nothing should happen if we ignore alignment query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore; ASSERT_OK(DeclarationToStatus(plan, query_options)); ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); +#endif - query_options.unaligned_buffer_handling = UnalignedBufferHandling::kAbort; + query_options.unaligned_buffer_handling = UnalignedBufferHandling::kError; ASSERT_THAT(DeclarationToStatus(plan, query_options), Raises(StatusCode::Invalid, testing::HasSubstr("An input buffer was poorly aligned"))); diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index cafb4ef2b44..9cacb237b20 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -62,12 +62,12 @@ Status HandleUnalignedBuffers(ExecBatch* batch, UnalignedBufferHandling handling case UnalignedBufferHandling::kIgnore: // Should be impossible to get here return Status::OK(); - case UnalignedBufferHandling::kAbort: + case UnalignedBufferHandling::kError: if (!arrow::util::CheckAlignment(*value.array(), arrow::util::kValueAlignment)) { return Status::Invalid( "An input buffer was poorly aligned and UnalignedBufferHandling is set " - "to kAbort"); + "to kError"); } break; case UnalignedBufferHandling::kWarn: @@ -76,7 +76,8 @@ Status HandleUnalignedBuffers(ExecBatch* batch, UnalignedBufferHandling handling ARROW_LOG(WARNING) << "An input buffer was poorly aligned. This could lead to crashes or " "poor performance on some hardware. Please ensure that all Acero " - "sources generate aligned buffers."; + "sources generate aligned buffers, or change the unaligned buffer " + "handling configuration to silence this warning."; } break; case UnalignedBufferHandling::kReallocate: { diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 46806bdd924..1f93fc9ccc3 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -533,8 +533,12 @@ class ARROW_TESTING_EXPORT GatingTask { }; /// \brief modify an array so that the buffer at index 1 (if it has one) is unaligned +/// +/// This method does not recurse into the dictionary or children ARROW_TESTING_EXPORT std::shared_ptr UnalignValues(const ArrayData& array); /// \brief modify an array so that the buffer at index 1 (if it has one) is unaligned +/// +/// This method does not recurse into the dictionary or children ARROW_TESTING_EXPORT std::shared_ptr UnalignValues(const Array& array); } // namespace arrow diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h index 4ee0a7163db..7204fd6d85d 100644 --- a/cpp/src/arrow/type_traits.h +++ b/cpp/src/arrow/type_traits.h @@ -1309,7 +1309,7 @@ static inline int offset_bit_width(Type::type type_id) { return 0; } -/// \brief get the alignment a buffer should have to be considered "value aligned" +/// \brief Get the alignment a buffer should have to be considered "value aligned" /// /// Some buffers are frequently type-punned. For example, in an int32 array the /// values buffer is frequently cast to int32_t* @@ -1329,6 +1329,7 @@ static inline int offset_bit_width(Type::type type_id) { /// type for an extension array. /// \param buffer_index the index of the buffer to check, for example 0 will typically /// give you the alignment expected of the validity buffer +/// \return the required value alignment in bytes (1 if no alignment required) int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index); /// \brief Check for an integer type (signed or unsigned) diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc index 8bb65ed1012..f0122bb3d27 100644 --- a/cpp/src/arrow/util/align_util.cc +++ b/cpp/src/arrow/util/align_util.cc @@ -78,7 +78,7 @@ bool CheckAlignment(const ArrayData& array, int64_t alignment) { return false; } - if (array.type->id() == Type::DICTIONARY) { + if (array.dictionary) { if (!CheckAlignment(*array.dictionary, alignment)) return false; } @@ -141,7 +141,18 @@ constexpr int64_t kMinimumAlignment = 8; Result> EnsureAlignment(std::shared_ptr buffer, int64_t alignment, MemoryPool* memory_pool) { + if (alignment == kValueAlignment) { + return Status::Invalid( + "The kValueAlignment option may only be used to call EnsureAlignment on arrays " + "or tables and cannot be used with buffers"); + } + if (alignment <= 0) { + return Status::Invalid("Alignment must be a positive integer"); + } if (!CheckAlignment(*buffer, alignment)) { + if (!buffer->is_cpu()) { + return Status::NotImplemented("Reallocating an unaligned non-CPU buffer."); + } int64_t minimal_compatible_alignment = std::max(kMinimumAlignment, alignment); ARROW_ASSIGN_OR_RAISE( auto new_buffer, @@ -157,18 +168,18 @@ Result> EnsureAlignment(std::shared_ptr ar int64_t alignment, MemoryPool* memory_pool) { if (!CheckAlignment(*array_data, alignment)) { - std::vector> buffers_ = array_data->buffers; + std::vector> buffers = array_data->buffers; Type::type type_id = GetTypeForBuffers(*array_data); - for (size_t i = 0; i < buffers_.size(); ++i) { - if (buffers_[i]) { + for (size_t i = 0; i < buffers.size(); ++i) { + if (buffers[i]) { int64_t expected_alignment = alignment; if (alignment == kValueAlignment) { expected_alignment = RequiredValueAlignmentForBuffer(type_id, static_cast(i)); } ARROW_ASSIGN_OR_RAISE( - buffers_[i], - EnsureAlignment(std::move(buffers_[i]), expected_alignment, memory_pool)); + buffers[i], + EnsureAlignment(std::move(buffers[i]), expected_alignment, memory_pool)); } } @@ -183,7 +194,7 @@ Result> EnsureAlignment(std::shared_ptr ar } auto new_array_data = ArrayData::Make( - array_data->type, array_data->length, std::move(buffers_), array_data->child_data, + array_data->type, array_data->length, std::move(buffers), array_data->child_data, array_data->dictionary, array_data->GetNullCount(), array_data->offset); return std::move(new_array_data); } else { diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h index 3ea0f11d925..24b75fe472a 100644 --- a/cpp/src/arrow/util/align_util.h +++ b/cpp/src/arrow/util/align_util.h @@ -74,23 +74,26 @@ namespace util { /// \brief if this is specified in one of the CheckAlignment or EnsureAlignment functions /// /// then the function will ensure each buffer is suitably aligned for the data type of the -/// array. For example, given an int32 buffer the validity buffer must be a multiple of 8 -/// and the values buffer must be a multiple of 32. Given a large_string buffer the -/// validity buffer and values buffers must be multiples of 8 and the offsets buffer must -/// be a multiple of 64. +/// array. For example, given an int32 buffer the values buffer's address must be a +/// multiple of 4. Given a large_string buffer the offsets buffer's address must be a +/// multiple of 8. constexpr int64_t kValueAlignment = -3; -/// \brief calculate if the buffer's address is a multiple of `alignment` +/// \brief Calculate if the buffer's address is a multiple of `alignment` /// \param buffer the buffer to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for ARROW_EXPORT bool CheckAlignment(const Buffer& buffer, int64_t alignment); -/// \brief calculate if all buffer's in the array data are aligned +/// \brief Calculate if all buffers in the array data are aligned +/// +/// This will also check the buffers in the dictionary and any children /// \param array the array data to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for ARROW_EXPORT bool CheckAlignment(const ArrayData& array, int64_t alignment); -/// \brief calculate if all buffer's in the array are aligned +/// \brief Calculate if all buffers in the array are aligned +/// +/// This will also check the buffers in the dictionary and any children /// \param array the array to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for ARROW_EXPORT bool CheckAlignment(const Array& array, int64_t alignment); // Following functions require an additional boolean vector which stores the @@ -102,21 +105,21 @@ ARROW_EXPORT bool CheckAlignment(const Array& array, int64_t alignment); // objects can be ignored for further checking if we already know that they are // completely aligned. -/// \brief calculate which (if any) chunks in a chunked array are unaligned +/// \brief Calculate which (if any) chunks in a chunked array are unaligned /// \param array the array to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for /// \param needs_alignment an output vector that will store the results of the check /// it must be set to a valid vector. Extra elements will be added to the end /// of the vector for each chunk that is checked. `true` will be stored if /// the chunk is unaligned. -/// \param offset an optional offset to specify which chunk to start checking at +/// \param offset the index of the chunk to start checking /// \return true if all chunks (starting at `offset`) are aligned, false otherwise ARROW_EXPORT bool CheckAlignment(const ChunkedArray& array, int64_t alignment, std::vector* needs_alignment, int offset = 0); /// \brief calculate which (if any) columns in a record batch are unaligned /// \param batch the batch to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for /// \param needs_alignment an output vector that will store the results of the /// check. It must be set to a valid vector. Extra elements will be added /// to the end of the vector for each column that is checked. `true` will be @@ -126,7 +129,7 @@ ARROW_EXPORT bool CheckAlignment(const RecordBatch& batch, int64_t alignment, /// \brief calculate which (if any) columns in a table are unaligned /// \param table the table to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for /// \param needs_alignment an output vector that will store the results of the /// check. It must be set to a valid vector. Extra elements will be added /// to the end of the vector for each column that is checked. `true` will be @@ -140,7 +143,7 @@ ARROW_EXPORT bool CheckAlignment(const Table& table, int64_t alignment, /// If the input buffer is already aligned then this method will return the input buffer /// /// \param buffer the buffer to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for /// \param memory_pool a memory pool that will be used to allocate a new buffer if the /// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( @@ -152,7 +155,7 @@ ARROW_EXPORT Result> EnsureAlignment( /// buffer. /// /// \param array_data the array data to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for /// \param memory_pool a memory pool that will be used to allocate new buffers if any /// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( @@ -164,7 +167,7 @@ ARROW_EXPORT Result> EnsureAlignment( /// buffer. /// /// \param array the array to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for /// \param memory_pool a memory pool that will be used to allocate new buffers if any /// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment(std::shared_ptr array, @@ -177,7 +180,7 @@ ARROW_EXPORT Result> EnsureAlignment(std::shared_ptr> EnsureAlignment( @@ -189,7 +192,7 @@ ARROW_EXPORT Result> EnsureAlignment( /// buffer. /// /// \param batch the batch to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for /// \param memory_pool a memory pool that will be used to allocate new buffers if any /// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( @@ -201,7 +204,7 @@ ARROW_EXPORT Result> EnsureAlignment( /// buffer. /// /// \param table the table to check -/// \param alignment the alignment to check for +/// \param alignment the alignment (in bytes) to check for /// \param memory_pool a memory pool that will be used to allocate new buffers if any /// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment(std::shared_ptr
table, diff --git a/cpp/src/arrow/util/align_util_test.cc b/cpp/src/arrow/util/align_util_test.cc index 15c5ef75923..79dd72db687 100644 --- a/cpp/src/arrow/util/align_util_test.cc +++ b/cpp/src/arrow/util/align_util_test.cc @@ -290,8 +290,6 @@ using TypesNotRequiringAlignment = testing::Types; -TEST(EnsureAlignment, Malloc) {} - template std::shared_ptr sample_type() { return TypeTraits::type_singleton(); @@ -374,7 +372,7 @@ std::shared_ptr SampleArray() { return arr.data(); } -class MallocAlignment : public ::testing::Test { +class ValueAlignment : public ::testing::Test { public: void CheckModified(const ArrayData& src, const ArrayData& dst) { ASSERT_EQ(src.buffers.size(), dst.buffers.size()); @@ -401,36 +399,38 @@ class MallocAlignment : public ::testing::Test { }; template -class MallocAlignmentRequired : public MallocAlignment {}; +class ValueAlignmentRequired : public ValueAlignment {}; template -class MallocAlignmentNotRequired : public MallocAlignment {}; +class ValueAlignmentNotRequired : public ValueAlignment {}; -TYPED_TEST_SUITE(MallocAlignmentRequired, TypesRequiringSomeKindOfAlignment); -TYPED_TEST_SUITE(MallocAlignmentNotRequired, TypesNotRequiringAlignment); +TYPED_TEST_SUITE(ValueAlignmentRequired, TypesRequiringSomeKindOfAlignment); +TYPED_TEST_SUITE(ValueAlignmentNotRequired, TypesNotRequiringAlignment); -TYPED_TEST(MallocAlignmentRequired, RoundTrip) { +TYPED_TEST(ValueAlignmentRequired, RoundTrip) { std::shared_ptr data = SampleArray(); std::shared_ptr unaligned = UnalignValues(*data); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); this->CheckModified(*unaligned, *aligned); } -TYPED_TEST(MallocAlignmentNotRequired, RoundTrip) { +TYPED_TEST(ValueAlignmentNotRequired, RoundTrip) { std::shared_ptr data = SampleArray(); std::shared_ptr unaligned = UnalignValues(*data); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); this->CheckUnmodified(*unaligned, *aligned); } -TEST_F(MallocAlignment, RunEndEncoded) { +TEST_F(ValueAlignment, RunEndEncoded) { // Run end requires alignment, value type does not std::shared_ptr run_ends = ArrayFromJSON(int32(), "[3, 5]"); std::shared_ptr values = ArrayFromJSON(int8(), "[50, 100]"); @@ -447,13 +447,14 @@ TEST_F(MallocAlignment, RunEndEncoded) { ASSERT_OK_AND_ASSIGN( aligned_ree, util::EnsureAlignment(aligned_ree, util::kValueAlignment, default_memory_pool())); + ASSERT_TRUE(util::CheckAlignment(*aligned_ree, util::kValueAlignment)); this->CheckModified(*unaligned_ree->child_data[0], *aligned_ree->child_data[0]); this->CheckUnmodified(*unaligned_ree->child_data[1], *aligned_ree->child_data[1]); } -TEST_F(MallocAlignment, Dictionary) { - // Dictionary values require alignment, dictionary keys do not +TEST_F(ValueAlignment, Dictionary) { + // Dictionary values require alignment, dictionary indices do not std::shared_ptr int8_utf8 = dictionary(int8(), utf8()); std::shared_ptr array = ArrayFromJSON(int8_utf8, R"(["x", "x", "y"])"); @@ -467,10 +468,11 @@ TEST_F(MallocAlignment, Dictionary) { aligned_dict, util::EnsureAlignment(aligned_dict, util::kValueAlignment, default_memory_pool())); + ASSERT_TRUE(util::CheckAlignment(*aligned_dict, util::kValueAlignment)); this->CheckUnmodified(*unaligned_dict, *aligned_dict); this->CheckModified(*unaligned_dict->dictionary, *aligned_dict->dictionary); - // Dictionary values do not require alignment, dictionary keys do + // Dictionary values do not require alignment, dictionary indices do std::shared_ptr int16_int8 = dictionary(int16(), int8()); array = ArrayFromJSON(int16_int8, R"([7, 11])"); @@ -484,11 +486,12 @@ TEST_F(MallocAlignment, Dictionary) { aligned_dict, util::EnsureAlignment(aligned_dict, util::kValueAlignment, default_memory_pool())); + ASSERT_TRUE(util::CheckAlignment(*aligned_dict, util::kValueAlignment)); this->CheckModified(*unaligned_dict, *aligned_dict); this->CheckUnmodified(*unaligned_dict->dictionary, *aligned_dict->dictionary); } -TEST_F(MallocAlignment, Extension) { +TEST_F(ValueAlignment, Extension) { std::shared_ptr array = ExampleSmallint(); std::shared_ptr unaligned = UnalignValues(*array->data()); @@ -497,6 +500,7 @@ TEST_F(MallocAlignment, Extension) { std::shared_ptr aligned, util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); this->CheckModified(*unaligned, *aligned); } From c8dfd402967ac7e0158222c1d84e9112f42dfba3 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 25 May 2023 02:29:57 -0700 Subject: [PATCH 16/18] Update cpp/src/arrow/util/align_util.h Co-authored-by: Antoine Pitrou --- cpp/src/arrow/util/align_util.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h index 24b75fe472a..8fe819478e3 100644 --- a/cpp/src/arrow/util/align_util.h +++ b/cpp/src/arrow/util/align_util.h @@ -71,9 +71,11 @@ namespace util { // Functions to check if the provided Arrow object is aligned by the specified alignment -/// \brief if this is specified in one of the CheckAlignment or EnsureAlignment functions +/// \brief Special alignment value to use data type-specific alignment /// -/// then the function will ensure each buffer is suitably aligned for the data type of the +/// If this is passed as the `alignment` in one of the CheckAlignment or EnsureAlignment +/// functions, then the function will ensure ensure each buffer is suitably aligned +/// for the data type of the /// array. For example, given an int32 buffer the values buffer's address must be a /// multiple of 4. Given a large_string buffer the offsets buffer's address must be a /// multiple of 8. From ad6e3dfd656b8aab622b3fbfdce8885148496dc8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 26 May 2023 09:12:09 -0700 Subject: [PATCH 17/18] Address review comments. Expand unit tests. Fix dense union arrays so we are properly looking for the offsets at index 2 and not index 1 --- cpp/src/arrow/acero/plan_test.cc | 2 +- cpp/src/arrow/testing/gtest_util.cc | 32 +++---- cpp/src/arrow/testing/gtest_util.h | 8 +- cpp/src/arrow/type_traits.cc | 16 ++-- cpp/src/arrow/util/align_util.cc | 11 +-- cpp/src/arrow/util/align_util.h | 5 ++ cpp/src/arrow/util/align_util_test.cc | 125 +++++++++++++++++++++++--- 7 files changed, 158 insertions(+), 41 deletions(-) diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc index f4cb2b48496..5f62550177f 100644 --- a/cpp/src/arrow/acero/plan_test.cc +++ b/cpp/src/arrow/acero/plan_test.cc @@ -1706,7 +1706,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithBatchCrossingSegment) { TEST(ExecPlanExecution, UnalignedInput) { std::shared_ptr array = ArrayFromJSON(int32(), "[1, 2, 3]"); - std::shared_ptr unaligned = UnalignValues(*array); + std::shared_ptr unaligned = UnalignBuffers(*array); ASSERT_OK_AND_ASSIGN(ExecBatch sample_batch, ExecBatch::Make({unaligned}, array->length())); diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index a1af4099c40..6fc709874e7 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -1099,29 +1099,29 @@ std::shared_ptr GatingTask::Make(double timeout_seconds) { return std::make_shared(timeout_seconds); } -std::shared_ptr UnalignValues(const ArrayData& array) { - if (array.buffers.size() < 2) { - // We can't unalign the values if there isn't a values buffer but we can - // still make sure EnsureAligned is a no-op - return std::make_shared(array); - } - std::vector> new_buffers(array.buffers); +std::shared_ptr UnalignBuffers(const ArrayData& array) { + std::vector> new_buffers; + new_buffers.reserve(array.buffers.size()); - const auto& buffer_to_modify = array.buffers[1]; - EXPECT_OK_AND_ASSIGN( - std::shared_ptr padded, - AllocateBuffer(buffer_to_modify->size() + 1, default_memory_pool())); - memcpy(padded->mutable_data() + 1, buffer_to_modify->data(), buffer_to_modify->size()); - std::shared_ptr unaligned = SliceBuffer(padded, 1); - new_buffers[1] = std::move(unaligned); + for (const auto& buffer : array.buffers) { + if (!buffer) { + new_buffers.emplace_back(); + continue; + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr padded, + AllocateBuffer(buffer->size() + 1, default_memory_pool())); + memcpy(padded->mutable_data() + 1, buffer->data(), buffer->size()); + std::shared_ptr unaligned = SliceBuffer(padded, 1); + new_buffers.push_back(std::move(unaligned)); + } std::shared_ptr array_data = std::make_shared(array); array_data->buffers = std::move(new_buffers); return array_data; } -std::shared_ptr UnalignValues(const Array& array) { - std::shared_ptr array_data = UnalignValues(*array.data()); +std::shared_ptr UnalignBuffers(const Array& array) { + std::shared_ptr array_data = UnalignBuffers(*array.data()); return MakeArray(array_data); } diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 1f93fc9ccc3..13fc0b3e81d 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -532,13 +532,13 @@ class ARROW_TESTING_EXPORT GatingTask { std::shared_ptr impl_; }; -/// \brief modify an array so that the buffer at index 1 (if it has one) is unaligned +/// \brief create an exact copy of the data where each buffer has a max alignment of 1 /// /// This method does not recurse into the dictionary or children -ARROW_TESTING_EXPORT std::shared_ptr UnalignValues(const ArrayData& array); -/// \brief modify an array so that the buffer at index 1 (if it has one) is unaligned +ARROW_TESTING_EXPORT std::shared_ptr UnalignBuffers(const ArrayData& array); +/// \brief create an exact copy of the array where each buffer has a max alignment of 1 /// /// This method does not recurse into the dictionary or children -ARROW_TESTING_EXPORT std::shared_ptr UnalignValues(const Array& array); +ARROW_TESTING_EXPORT std::shared_ptr UnalignBuffers(const Array& array); } // namespace arrow diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc index 181dfd3f3c8..ac16afe4b8c 100644 --- a/cpp/src/arrow/type_traits.cc +++ b/cpp/src/arrow/type_traits.cc @@ -22,13 +22,19 @@ namespace arrow { int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { + if (buffer_index == 2 && type_id == Type::DENSE_UNION) { + // A dense union array is the only array (so far) that requires alignment + // on a buffer with a buffer_index that is not equal to 1 + return 4; + } if (buffer_index != 1) { // If the buffer index is 0 then either: // * The array type has no buffers, thus this shouldn't be called anyways // * The array has a validity buffer at 0, no alignment needed // * The array is a union array and has a types buffer at 0, no alignment needed // If the buffer index is > 1 then, in all current cases, it represents binary - // data and no alignment is needed + // data and no alignment is needed. The only exception is dense union buffers + // which are checked above. return 1; } DCHECK_NE(type_id, Type::DICTIONARY); @@ -45,7 +51,8 @@ int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { case Type::BOOL: // Always treated as uint8_t* case Type::INT8: // Always treated as uint8_t* case Type::UINT8: // Always treated as uint8_t* - case Type::SPARSE_UNION: // No second buffer + case Type::DENSE_UNION: // Union arrays have a uint8_t* types buffer here + case Type::SPARSE_UNION: // Union arrays have a uint8_t* types buffer here case Type::RUN_END_ENCODED: // No buffers case Type::STRUCT: // No second buffer return 1; @@ -60,9 +67,8 @@ int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { case Type::BINARY: // Offsets may be cast to int32_t* case Type::DATE32: case Type::TIME32: - case Type::LIST: // Offsets may be cast to int32_t*, data is in child array - case Type::MAP: // This is a list array - case Type::DENSE_UNION: // Has an offsets buffer of int32_t* + case Type::LIST: // Offsets may be cast to int32_t*, data is in child array + case Type::MAP: // This is a list array case Type::INTERVAL_MONTHS: // Stored as int32_t* case Type::INTERVAL_DAY_TIME: // Stored as two contiguous 32-bit integers return 4; diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc index f0122bb3d27..7bc687b1550 100644 --- a/cpp/src/arrow/util/align_util.cc +++ b/cpp/src/arrow/util/align_util.cc @@ -21,6 +21,7 @@ #include "arrow/chunked_array.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/type_fwd.h" #include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" @@ -30,6 +31,9 @@ namespace arrow { namespace util { bool CheckAlignment(const Buffer& buffer, int64_t alignment) { + if (alignment <= 0) { + return true; + } return buffer.address() % alignment == 0; } @@ -135,9 +139,6 @@ bool CheckAlignment(const Table& table, int64_t alignment, return all_aligned; } -// Most allocators require a minimum of 8-byte alignment. -constexpr int64_t kMinimumAlignment = 8; - Result> EnsureAlignment(std::shared_ptr buffer, int64_t alignment, MemoryPool* memory_pool) { @@ -153,10 +154,10 @@ Result> EnsureAlignment(std::shared_ptr buffer, if (!buffer->is_cpu()) { return Status::NotImplemented("Reallocating an unaligned non-CPU buffer."); } - int64_t minimal_compatible_alignment = std::max(kMinimumAlignment, alignment); + int64_t minimum_desired_alignment = std::max(kDefaultBufferAlignment, alignment); ARROW_ASSIGN_OR_RAISE( auto new_buffer, - AllocateBuffer(buffer->size(), minimal_compatible_alignment, memory_pool)); + AllocateBuffer(buffer->size(), minimum_desired_alignment, memory_pool)); std::memcpy(new_buffer->mutable_data(), buffer->data(), buffer->size()); return std::move(new_buffer); } else { diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h index 8fe819478e3..a08c9233973 100644 --- a/cpp/src/arrow/util/align_util.h +++ b/cpp/src/arrow/util/align_util.h @@ -82,6 +82,8 @@ namespace util { constexpr int64_t kValueAlignment = -3; /// \brief Calculate if the buffer's address is a multiple of `alignment` +/// +/// If `alignment` is less than or equal to 0 then this method will always return true /// \param buffer the buffer to check /// \param alignment the alignment (in bytes) to check for ARROW_EXPORT bool CheckAlignment(const Buffer& buffer, int64_t alignment); @@ -143,6 +145,9 @@ ARROW_EXPORT bool CheckAlignment(const Table& table, int64_t alignment, /// buffer /// /// If the input buffer is already aligned then this method will return the input buffer +/// If the input buffer is not already aligned then this method will allocate a new +/// buffer. The alignment of the new buffer will have at least +/// max(kDefaultBufferAlignment, alignment) bytes of alignment. /// /// \param buffer the buffer to check /// \param alignment the alignment (in bytes) to check for diff --git a/cpp/src/arrow/util/align_util_test.cc b/cpp/src/arrow/util/align_util_test.cc index 79dd72db687..a14041597a7 100644 --- a/cpp/src/arrow/util/align_util_test.cc +++ b/cpp/src/arrow/util/align_util_test.cc @@ -15,18 +15,24 @@ // specific language governing permissions and limitations // under the License. -#include #include #include #include #include +#include +#include + #include "arrow/array.h" +#include "arrow/buffer.h" #include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/testing/extension_type.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" #include "arrow/testing/random.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" #include "arrow/util/align_util.h" namespace arrow { @@ -167,6 +173,62 @@ TEST(BitmapWordAlign, UnalignedDataStart) { } } // namespace internal +TEST(EnsureAlignment, Buffer) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr buffer, AllocateBuffer(/*size=*/1024)); + std::shared_ptr unaligned_view = SliceBuffer(buffer, 1); + std::shared_ptr aligned_view = SliceBuffer(buffer, 0); + + ASSERT_TRUE(util::CheckAlignment(*aligned_view, kDefaultBufferAlignment)); + ASSERT_FALSE(util::CheckAlignment(*unaligned_view, /*alignment=*/2)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned_dupe, + util::EnsureAlignment(aligned_view, /*alignment=*/8, default_memory_pool())); + + ASSERT_EQ(aligned_view->data(), aligned_dupe->data()); + ASSERT_TRUE(util::CheckAlignment(*aligned_dupe, kDefaultBufferAlignment)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr realigned, + util::EnsureAlignment(unaligned_view, /*alignment=*/8, default_memory_pool())); + + ASSERT_NE(realigned->data(), unaligned_view->data()); + // Even though we only asked to check for 8 bytes of alignment, any reallocation will + // always allocate at least kDefaultBufferAlignment bytes of alignment + ASSERT_TRUE(util::CheckAlignment(*realigned, /*alignment=*/kDefaultBufferAlignment)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr realigned_large, + util::EnsureAlignment(unaligned_view, /*alignment=*/256, default_memory_pool())); + // If the user wants more than kDefaultBufferAlignment they should get it + ASSERT_TRUE(util::CheckAlignment(*realigned_large, /*alignment=*/256)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr realigned_huge, + util::EnsureAlignment(unaligned_view, /*alignment=*/2048, default_memory_pool())); + // It should even be valid for the alignment to be larger than the buffer size itself + ASSERT_TRUE(util::CheckAlignment(*realigned_huge, /*alignment=*/2048)); +} + +TEST(EnsureAlignment, BufferInvalid) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr buffer, AllocateBuffer(/*size=*/1024)); + + // This is nonsense but not worth introducing a Status return. We just return true. + ASSERT_TRUE(util::CheckAlignment(*buffer, 0)); + ASSERT_TRUE(util::CheckAlignment(*buffer, -1)); + + ASSERT_THAT(util::EnsureAlignment(buffer, /*alignment=*/0, default_memory_pool()), + Raises(StatusCode::Invalid, + testing::HasSubstr("Alignment must be a positive integer"))); + + ASSERT_THAT( + util::EnsureAlignment(buffer, /*alignment=*/util::kValueAlignment, + default_memory_pool()), + Raises(StatusCode::Invalid, + testing::HasSubstr( + "may only be used to call EnsureAlignment on arrays or tables"))); +} + TEST(EnsureAlignment, Array) { MemoryPool* pool = default_memory_pool(); auto rand = ::arrow::random::RandomArrayGenerator(1923); @@ -406,9 +468,21 @@ class ValueAlignmentNotRequired : public ValueAlignment {}; TYPED_TEST_SUITE(ValueAlignmentRequired, TypesRequiringSomeKindOfAlignment); TYPED_TEST_SUITE(ValueAlignmentNotRequired, TypesNotRequiringAlignment); +// The default buffer alignment should always be large enough for value alignment +TYPED_TEST(ValueAlignmentRequired, DefaultAlignmentSufficient) { + std::shared_ptr data = SampleArray(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(data, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); + AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); + this->CheckUnmodified(*data, *aligned); +} + TYPED_TEST(ValueAlignmentRequired, RoundTrip) { std::shared_ptr data = SampleArray(); - std::shared_ptr unaligned = UnalignValues(*data); + std::shared_ptr unaligned = UnalignBuffers(*data); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); @@ -420,7 +494,7 @@ TYPED_TEST(ValueAlignmentRequired, RoundTrip) { TYPED_TEST(ValueAlignmentNotRequired, RoundTrip) { std::shared_ptr data = SampleArray(); - std::shared_ptr unaligned = UnalignValues(*data); + std::shared_ptr unaligned = UnalignBuffers(*data); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); @@ -430,6 +504,37 @@ TYPED_TEST(ValueAlignmentNotRequired, RoundTrip) { this->CheckUnmodified(*unaligned, *aligned); } +TYPED_TEST(ValueAlignmentNotRequired, DefaultAlignmentSufficient) { + std::shared_ptr data = SampleArray(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(data, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); + AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); + this->CheckUnmodified(*data, *aligned); +} + +TEST_F(ValueAlignment, DenseUnion) { + std::shared_ptr data = SampleArray(); + ASSERT_TRUE(util::CheckAlignment(*data, util::kValueAlignment)); + + std::shared_ptr unaligned = UnalignBuffers(*data); + ASSERT_FALSE(util::CheckAlignment(*unaligned, util::kValueAlignment)); + // Dense union arrays are the only array type where the buffer at index 2 is expected + // to be aligned (it contains 32-bit offsets and should be 4-byte aligned) + ASSERT_FALSE(util::CheckAlignment(*unaligned->buffers[2], 4)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr realigned, + util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*realigned, util::kValueAlignment)); + ASSERT_TRUE(util::CheckAlignment(*realigned->buffers[2], 4)); + // The buffer at index 1 is the types buffer which does not require realignment + ASSERT_EQ(unaligned->buffers[1]->data(), realigned->buffers[1]->data()); +} + TEST_F(ValueAlignment, RunEndEncoded) { // Run end requires alignment, value type does not std::shared_ptr run_ends = ArrayFromJSON(int32(), "[3, 5]"); @@ -439,8 +544,8 @@ TEST_F(ValueAlignment, RunEndEncoded) { std::move(values), 0)); std::shared_ptr unaligned_ree = std::make_shared(*array->data()); - unaligned_ree->child_data[0] = UnalignValues(*unaligned_ree->child_data[0]); - unaligned_ree->child_data[1] = UnalignValues(*unaligned_ree->child_data[1]); + unaligned_ree->child_data[0] = UnalignBuffers(*unaligned_ree->child_data[0]); + unaligned_ree->child_data[1] = UnalignBuffers(*unaligned_ree->child_data[1]); std::shared_ptr aligned_ree = std::make_shared(*unaligned_ree); @@ -459,8 +564,8 @@ TEST_F(ValueAlignment, Dictionary) { std::shared_ptr array = ArrayFromJSON(int8_utf8, R"(["x", "x", "y"])"); std::shared_ptr unaligned_dict = std::make_shared(*array->data()); - unaligned_dict->dictionary = UnalignValues(*unaligned_dict->dictionary); - unaligned_dict = UnalignValues(*unaligned_dict); + unaligned_dict->dictionary = UnalignBuffers(*unaligned_dict->dictionary); + unaligned_dict = UnalignBuffers(*unaligned_dict); std::shared_ptr aligned_dict = std::make_shared(*unaligned_dict); @@ -477,8 +582,8 @@ TEST_F(ValueAlignment, Dictionary) { array = ArrayFromJSON(int16_int8, R"([7, 11])"); unaligned_dict = std::make_shared(*array->data()); - unaligned_dict->dictionary = UnalignValues(*unaligned_dict->dictionary); - unaligned_dict = UnalignValues(*unaligned_dict); + unaligned_dict->dictionary = UnalignBuffers(*unaligned_dict->dictionary); + unaligned_dict = UnalignBuffers(*unaligned_dict); aligned_dict = std::make_shared(*unaligned_dict); @@ -494,7 +599,7 @@ TEST_F(ValueAlignment, Dictionary) { TEST_F(ValueAlignment, Extension) { std::shared_ptr array = ExampleSmallint(); - std::shared_ptr unaligned = UnalignValues(*array->data()); + std::shared_ptr unaligned = UnalignBuffers(*array->data()); ASSERT_OK_AND_ASSIGN( std::shared_ptr aligned, From c434bb254f04450c25a502662af22cd9e771ae8f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 26 May 2023 11:08:05 -0700 Subject: [PATCH 18/18] Cleaning up comment --- cpp/src/arrow/util/align_util.h | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h index a08c9233973..63df63749cf 100644 --- a/cpp/src/arrow/util/align_util.h +++ b/cpp/src/arrow/util/align_util.h @@ -75,10 +75,9 @@ namespace util { /// /// If this is passed as the `alignment` in one of the CheckAlignment or EnsureAlignment /// functions, then the function will ensure ensure each buffer is suitably aligned -/// for the data type of the -/// array. For example, given an int32 buffer the values buffer's address must be a -/// multiple of 4. Given a large_string buffer the offsets buffer's address must be a -/// multiple of 8. +/// for the data type of the array. For example, given an int32 buffer the values +/// buffer's address must be a multiple of 4. Given a large_string buffer the offsets +/// buffer's address must be a multiple of 8. constexpr int64_t kValueAlignment = -3; /// \brief Calculate if the buffer's address is a multiple of `alignment`