diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 4e6826bc61f..d928cdf58b1 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -180,6 +180,7 @@ set(ARROW_SRCS tensor/csf_converter.cc tensor/csx_converter.cc type.cc + type_traits.cc visitor.cc c/bridge.cc io/buffered.cc diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc index 2fe8c484e40..2f03159de56 100644 --- a/cpp/src/arrow/acero/exec_plan.cc +++ b/cpp/src/arrow/acero/exec_plan.cc @@ -36,6 +36,7 @@ #include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/string.h" @@ -358,10 +359,38 @@ std::optional GetNodeIndex(const std::vector& nodes, return std::nullopt; } +const char* kAceroAlignmentHandlingEnvVar = "ACERO_ALIGNMENT_HANDLING"; + +UnalignedBufferHandling DetermineDefaultUnalignedBufferHandling() { + auto maybe_value = ::arrow::internal::GetEnvVar(kAceroAlignmentHandlingEnvVar); + if (!maybe_value.ok()) { + return UnalignedBufferHandling::kWarn; + } + std::string value = maybe_value.MoveValueUnsafe(); + if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "warn")) { + return UnalignedBufferHandling::kWarn; + } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "ignore")) { + return UnalignedBufferHandling::kIgnore; + } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "reallocate")) { + return UnalignedBufferHandling::kReallocate; + } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "error")) { + return UnalignedBufferHandling::kError; + } else { + ARROW_LOG(WARNING) << "unrecognized value for ACERO_ALIGNMENT_HANDLING: " << value; + return UnalignedBufferHandling::kWarn; + } +} + } // namespace const uint32_t ExecPlan::kMaxBatchSize; +UnalignedBufferHandling GetDefaultUnalignedBufferHandling() { + static UnalignedBufferHandling default_value = + DetermineDefaultUnalignedBufferHandling(); + return default_value; +} + Result> ExecPlan::Make( QueryOptions opts, ExecContext ctx, std::shared_ptr metadata) { @@ -621,7 +650,8 @@ Future> DeclarationToTableImpl( query_options.function_registry); std::shared_ptr> output_table = std::make_shared>(); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, ExecPlan::Make(exec_ctx)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(query_options, exec_ctx)); TableSinkNodeOptions sink_options(output_table.get()); sink_options.sequence_output = query_options.sequence_output; sink_options.names = std::move(query_options.field_names); @@ -648,7 +678,8 @@ Future DeclarationToExecBatchesImpl( std::shared_ptr out_schema; AsyncGenerator> sink_gen; ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, ExecPlan::Make(exec_ctx)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(options, exec_ctx)); SinkNodeOptions sink_options(&sink_gen, &out_schema); sink_options.sequence_output = options.sequence_output; Declaration with_sink = Declaration::Sequence({declaration, {"sink", sink_options}}); @@ -678,7 +709,8 @@ Future DeclarationToExecBatchesImpl( Future<> DeclarationToStatusImpl(Declaration declaration, QueryOptions options, ::arrow::internal::Executor* cpu_executor) { ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, ExecPlan::Make(exec_ctx)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(options, exec_ctx)); ARROW_ASSIGN_OR_RAISE(ExecNode * last_node, declaration.AddToPlan(exec_plan.get())); if (!last_node->is_sink()) { ConsumingSinkNodeOptions sink_options(NullSinkNodeConsumer::Make()); @@ -972,7 +1004,8 @@ Result>> DeclarationToRecordBatchGen ::arrow::internal::Executor* cpu_executor, std::shared_ptr* out_schema) { auto converter = std::make_shared(); ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr plan, ExecPlan::Make(exec_ctx)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr plan, + ExecPlan::Make(options, exec_ctx)); Declaration with_sink = Declaration::Sequence( {declaration, {"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}}); diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h index 424d9107d27..f82a0f8106b 100644 --- a/cpp/src/arrow/acero/exec_plan.h +++ b/cpp/src/arrow/acero/exec_plan.h @@ -496,6 +496,16 @@ struct ARROW_ACERO_EXPORT Declaration { std::string label; }; +/// \brief How to handle unaligned buffers +enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kError }; + +/// \brief get the default behavior of unaligned buffer handling +/// +/// This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which +/// can be set to "warn", "ignore", "reallocate", or "error". If the environment +/// variable is not set, or is set to an invalid value, this will return kWarn +UnalignedBufferHandling GetDefaultUnalignedBufferHandling(); + /// \brief plan-wide options that can be specified when executing an execution plan struct ARROW_ACERO_EXPORT QueryOptions { /// \brief Should the plan use a legacy batching strategy @@ -562,6 +572,36 @@ struct ARROW_ACERO_EXPORT QueryOptions { /// /// If set then the number of names must equal the number of output columns std::vector field_names; + + /// \brief Policy for unaligned buffers in source data + /// + /// Various compute functions and acero internals will type pun array + /// buffers from uint8_t* to some kind of value type (e.g. we might + /// cast to int32_t* to add two int32 arrays) + /// + /// If the buffer is poorly aligned (e.g. an int32 array is not aligned + /// on a 4-byte boundary) then this is technically undefined behavior in C++. + /// However, most modern compilers and CPUs are fairly tolerant of this + /// behavior and nothing bad (beyond a small hit to performance) is likely + /// to happen. + /// + /// Note that this only applies to source buffers. All buffers allocated internally + /// by Acero will be suitably aligned. + /// + /// If this field is set to kWarn then Acero will check if any buffers are unaligned + /// and, if they are, will emit a warning. + /// + /// If this field is set to kReallocate then Acero will allocate a new, suitably aligned + /// buffer and copy the contents from the old buffer into this new buffer. + /// + /// If this field is set to kError then Acero will gracefully abort the plan instead. + /// + /// If this field is set to kIgnore then Acero will not even check if the buffers are + /// unaligned. + /// + /// If this field is not set then it will be treated as kWarn unless overridden + /// by the ACERO_ALIGNMENT_HANDLING environment variable + std::optional unaligned_buffer_handling; }; /// \brief Calculate the output schema of a declaration diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc index 8ec5c0f70a9..5f62550177f 100644 --- a/cpp/src/arrow/acero/plan_test.cc +++ b/cpp/src/arrow/acero/plan_test.cc @@ -1704,5 +1704,45 @@ TEST(ExecPlanExecution, SegmentedAggregationWithBatchCrossingSegment) { {expected}); } +TEST(ExecPlanExecution, UnalignedInput) { + std::shared_ptr array = ArrayFromJSON(int32(), "[1, 2, 3]"); + std::shared_ptr unaligned = UnalignBuffers(*array); + ASSERT_OK_AND_ASSIGN(ExecBatch sample_batch, + ExecBatch::Make({unaligned}, array->length())); + + BatchesWithSchema data; + data.batches = {std::move(sample_batch)}; + data.schema = schema({field("i32", int32())}); + + Declaration plan = Declaration::Sequence({ + {"exec_batch_source", ExecBatchSourceNodeOptions(data.schema, data.batches)}, + }); + + int64_t initial_bytes_allocated = default_memory_pool()->total_bytes_allocated(); + + // By default we should warn and so the plan should finish ok + ASSERT_OK(DeclarationToStatus(plan)); + ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); + + QueryOptions query_options; + +#ifndef ARROW_UBSAN + // Nothing should happen if we ignore alignment + query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore; + ASSERT_OK(DeclarationToStatus(plan, query_options)); + ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); +#endif + + query_options.unaligned_buffer_handling = UnalignedBufferHandling::kError; + ASSERT_THAT(DeclarationToStatus(plan, query_options), + Raises(StatusCode::Invalid, + testing::HasSubstr("An input buffer was poorly aligned"))); + ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); + + query_options.unaligned_buffer_handling = UnalignedBufferHandling::kReallocate; + ASSERT_OK(DeclarationToStatus(plan, query_options)); + ASSERT_LT(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); +} + } // namespace acero } // namespace arrow diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index 6c138d8dccd..9cacb237b20 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -52,6 +52,46 @@ using arrow::internal::MapVector; namespace acero { namespace { +Status HandleUnalignedBuffers(ExecBatch* batch, UnalignedBufferHandling handling) { + if (handling == UnalignedBufferHandling::kIgnore) { + return Status::OK(); + } + for (auto& value : batch->values) { + if (value.is_array()) { + switch (handling) { + case UnalignedBufferHandling::kIgnore: + // Should be impossible to get here + return Status::OK(); + case UnalignedBufferHandling::kError: + if (!arrow::util::CheckAlignment(*value.array(), + arrow::util::kValueAlignment)) { + return Status::Invalid( + "An input buffer was poorly aligned and UnalignedBufferHandling is set " + "to kError"); + } + break; + case UnalignedBufferHandling::kWarn: + if (!arrow::util::CheckAlignment(*value.array(), + arrow::util::kValueAlignment)) { + ARROW_LOG(WARNING) + << "An input buffer was poorly aligned. This could lead to crashes or " + "poor performance on some hardware. Please ensure that all Acero " + "sources generate aligned buffers, or change the unaligned buffer " + "handling configuration to silence this warning."; + } + break; + case UnalignedBufferHandling::kReallocate: { + ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment( + value.array(), arrow::util::kValueAlignment, + default_memory_pool())); + break; + } + } + } + } + return Status::OK(); +} + struct SourceNode : ExecNode, public TracedNode { SourceNode(ExecPlan* plan, std::shared_ptr output_schema, AsyncGenerator> generator, @@ -104,13 +144,11 @@ struct SourceNode : ExecNode, public TracedNode { batch_size = morsel_length; } ExecBatch batch = morsel.Slice(offset, batch_size); - for (auto& value : batch.values) { - if (value.is_array()) { - ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment( - value.make_array(), ipc::kArrowAlignment, - default_memory_pool())); - } - } + UnalignedBufferHandling unaligned_buffer_handling = + plan_->query_context()->options().unaligned_buffer_handling.value_or( + GetDefaultUnalignedBufferHandling()); + ARROW_RETURN_NOT_OK( + HandleUnalignedBuffers(&batch, unaligned_buffer_handling)); if (has_ordering) { batch.index = batch_index; } diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index 9569375bda9..6fc709874e7 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -1099,4 +1099,30 @@ std::shared_ptr GatingTask::Make(double timeout_seconds) { return std::make_shared(timeout_seconds); } +std::shared_ptr UnalignBuffers(const ArrayData& array) { + std::vector> new_buffers; + new_buffers.reserve(array.buffers.size()); + + for (const auto& buffer : array.buffers) { + if (!buffer) { + new_buffers.emplace_back(); + continue; + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr padded, + AllocateBuffer(buffer->size() + 1, default_memory_pool())); + memcpy(padded->mutable_data() + 1, buffer->data(), buffer->size()); + std::shared_ptr unaligned = SliceBuffer(padded, 1); + new_buffers.push_back(std::move(unaligned)); + } + + std::shared_ptr array_data = std::make_shared(array); + array_data->buffers = std::move(new_buffers); + return array_data; +} + +std::shared_ptr UnalignBuffers(const Array& array) { + std::shared_ptr array_data = UnalignBuffers(*array.data()); + return MakeArray(array_data); +} + } // namespace arrow diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 55bd307b12c..13fc0b3e81d 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -532,4 +532,13 @@ class ARROW_TESTING_EXPORT GatingTask { std::shared_ptr impl_; }; +/// \brief create an exact copy of the data where each buffer has a max alignment of 1 +/// +/// This method does not recurse into the dictionary or children +ARROW_TESTING_EXPORT std::shared_ptr UnalignBuffers(const ArrayData& array); +/// \brief create an exact copy of the array where each buffer has a max alignment of 1 +/// +/// This method does not recurse into the dictionary or children +ARROW_TESTING_EXPORT std::shared_ptr UnalignBuffers(const Array& array); + } // namespace arrow diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc new file mode 100644 index 00000000000..ac16afe4b8c --- /dev/null +++ b/cpp/src/arrow/type_traits.cc @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/type_traits.h" + +#include "arrow/util/logging.h" + +namespace arrow { + +int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) { + if (buffer_index == 2 && type_id == Type::DENSE_UNION) { + // A dense union array is the only array (so far) that requires alignment + // on a buffer with a buffer_index that is not equal to 1 + return 4; + } + if (buffer_index != 1) { + // If the buffer index is 0 then either: + // * The array type has no buffers, thus this shouldn't be called anyways + // * The array has a validity buffer at 0, no alignment needed + // * The array is a union array and has a types buffer at 0, no alignment needed + // If the buffer index is > 1 then, in all current cases, it represents binary + // data and no alignment is needed. The only exception is dense union buffers + // which are checked above. + return 1; + } + DCHECK_NE(type_id, Type::DICTIONARY); + DCHECK_NE(type_id, Type::EXTENSION); + + switch (type_id) { + case Type::NA: // No buffers + case Type::FIXED_SIZE_LIST: // No second buffer (values in child array) + case Type::FIXED_SIZE_BINARY: // Fixed size binary could be dangerous but the + // compute kernels don't type pun this. E.g. if + // an extension type is storing some kind of struct + // here then the user should do their own alignment + // check before casting to an array of structs + case Type::BOOL: // Always treated as uint8_t* + case Type::INT8: // Always treated as uint8_t* + case Type::UINT8: // Always treated as uint8_t* + case Type::DENSE_UNION: // Union arrays have a uint8_t* types buffer here + case Type::SPARSE_UNION: // Union arrays have a uint8_t* types buffer here + case Type::RUN_END_ENCODED: // No buffers + case Type::STRUCT: // No second buffer + return 1; + case Type::INT16: + case Type::UINT16: + case Type::HALF_FLOAT: + return 2; + case Type::INT32: + case Type::UINT32: + case Type::FLOAT: + case Type::STRING: // Offsets may be cast to int32_t* + case Type::BINARY: // Offsets may be cast to int32_t* + case Type::DATE32: + case Type::TIME32: + case Type::LIST: // Offsets may be cast to int32_t*, data is in child array + case Type::MAP: // This is a list array + case Type::INTERVAL_MONTHS: // Stored as int32_t* + case Type::INTERVAL_DAY_TIME: // Stored as two contiguous 32-bit integers + return 4; + case Type::INT64: + case Type::UINT64: + case Type::DOUBLE: + case Type::DECIMAL128: // May be cast to GenericBasicDecimal* which requires + // alignment of 8 + case Type::DECIMAL256: // May be cast to GenericBasicDecimal* which requires + // alignment of 8 + case Type::LARGE_BINARY: // Offsets may be cast to int64_t* + case Type::LARGE_LIST: // Offsets may be cast to int64_t* + case Type::LARGE_STRING: // Offsets may be cast to int64_t* + case Type::DATE64: + case Type::TIME64: + case Type::TIMESTAMP: + case Type::DURATION: + case Type::INTERVAL_MONTH_DAY_NANO: // Stored as two 32-bit integers and a 64-bit + // integer + return 8; + case Type::DICTIONARY: + case Type::EXTENSION: + case Type::MAX_ID: + break; + } + Status::Invalid("RequiredValueAlignmentForBuffer called with invalid type id ", type_id) + .Warn(); + return 1; +} + +} // namespace arrow diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h index 2d20d87d143..7204fd6d85d 100644 --- a/cpp/src/arrow/type_traits.h +++ b/cpp/src/arrow/type_traits.h @@ -1309,6 +1309,29 @@ static inline int offset_bit_width(Type::type type_id) { return 0; } +/// \brief Get the alignment a buffer should have to be considered "value aligned" +/// +/// Some buffers are frequently type-punned. For example, in an int32 array the +/// values buffer is frequently cast to int32_t* +/// +/// This sort of punning is technically only valid if the pointer is aligned to a +/// proper width (e.g. 4 bytes in the case of int32). However, most modern compilers +/// are quite permissive if we get this wrong. Note that this alignment is something +/// that is guaranteed by malloc (e.g. new int32_t[] will return a buffer that is 4 +/// byte aligned) or common libraries (e.g. numpy) but it is not currently guaranteed +/// by flight (GH-32276). +/// +/// We call this "value aligned" and this method will calculate that required alignment. +/// +/// \param type_id the type of the array containing the buffer +/// Note: this should be the indices type for a dictionary array since +/// A dictionary array's buffers are indices. It should be the storage +/// type for an extension array. +/// \param buffer_index the index of the buffer to check, for example 0 will typically +/// give you the alignment expected of the validity buffer +/// \return the required value alignment in bytes (1 if no alignment required) +int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index); + /// \brief Check for an integer type (signed or unsigned) /// /// \param[in] type the type to check diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc index d77650fcd61..7bc687b1550 100644 --- a/cpp/src/arrow/util/align_util.cc +++ b/cpp/src/arrow/util/align_util.cc @@ -21,23 +21,68 @@ #include "arrow/chunked_array.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/type_fwd.h" +#include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" namespace arrow { namespace util { bool CheckAlignment(const Buffer& buffer, int64_t alignment) { + if (alignment <= 0) { + return true; + } return buffer.address() % alignment == 0; } -bool CheckAlignment(const ArrayData& array, int64_t alignment) { - for (const auto& buffer : array.buffers) { - if (buffer) { - if (!CheckAlignment(*buffer, alignment)) return false; +namespace { + +// Returns the type that controls how the buffers of this ArrayData (not its children) +// should behave +Type::type GetTypeForBuffers(const ArrayData& array) { + Type::type type_id = array.type->storage_id(); + if (type_id == Type::DICTIONARY) { + return ::arrow::internal::checked_pointer_cast(array.type) + ->index_type() + ->id(); + } + return type_id; +} + +// Checks to see if an array's own buffers are aligned but doesn't check +// children +bool CheckSelfAlignment(const ArrayData& array, int64_t alignment) { + if (alignment == kValueAlignment) { + Type::type type_id = GetTypeForBuffers(array); + for (std::size_t i = 0; i < array.buffers.size(); i++) { + if (array.buffers[i]) { + int expected_alignment = + RequiredValueAlignmentForBuffer(type_id, static_cast(i)); + if (!CheckAlignment(*array.buffers[i], expected_alignment)) { + return false; + } + } + } + } else { + for (const auto& buffer : array.buffers) { + if (buffer) { + if (!CheckAlignment(*buffer, alignment)) return false; + } } } + return true; +} + +} // namespace + +bool CheckAlignment(const ArrayData& array, int64_t alignment) { + if (!CheckSelfAlignment(array, alignment)) { + return false; + } - if (array.type->id() == Type::DICTIONARY) { + if (array.dictionary) { if (!CheckAlignment(*array.dictionary, alignment)) return false; } @@ -97,9 +142,22 @@ bool CheckAlignment(const Table& table, int64_t alignment, Result> EnsureAlignment(std::shared_ptr buffer, int64_t alignment, MemoryPool* memory_pool) { + if (alignment == kValueAlignment) { + return Status::Invalid( + "The kValueAlignment option may only be used to call EnsureAlignment on arrays " + "or tables and cannot be used with buffers"); + } + if (alignment <= 0) { + return Status::Invalid("Alignment must be a positive integer"); + } if (!CheckAlignment(*buffer, alignment)) { - ARROW_ASSIGN_OR_RAISE(auto new_buffer, - AllocateBuffer(buffer->size(), alignment, memory_pool)); + if (!buffer->is_cpu()) { + return Status::NotImplemented("Reallocating an unaligned non-CPU buffer."); + } + int64_t minimum_desired_alignment = std::max(kDefaultBufferAlignment, alignment); + ARROW_ASSIGN_OR_RAISE( + auto new_buffer, + AllocateBuffer(buffer->size(), minimum_desired_alignment, memory_pool)); std::memcpy(new_buffer->mutable_data(), buffer->data(), buffer->size()); return std::move(new_buffer); } else { @@ -111,11 +169,18 @@ Result> EnsureAlignment(std::shared_ptr ar int64_t alignment, MemoryPool* memory_pool) { if (!CheckAlignment(*array_data, alignment)) { - std::vector> buffers_ = array_data->buffers; - for (size_t i = 0; i < buffers_.size(); ++i) { - if (buffers_[i]) { + std::vector> buffers = array_data->buffers; + Type::type type_id = GetTypeForBuffers(*array_data); + for (size_t i = 0; i < buffers.size(); ++i) { + if (buffers[i]) { + int64_t expected_alignment = alignment; + if (alignment == kValueAlignment) { + expected_alignment = + RequiredValueAlignmentForBuffer(type_id, static_cast(i)); + } ARROW_ASSIGN_OR_RAISE( - buffers_[i], EnsureAlignment(std::move(buffers_[i]), alignment, memory_pool)); + buffers[i], + EnsureAlignment(std::move(buffers[i]), expected_alignment, memory_pool)); } } @@ -130,10 +195,9 @@ Result> EnsureAlignment(std::shared_ptr ar } auto new_array_data = ArrayData::Make( - array_data->type, array_data->length, std::move(buffers_), array_data->child_data, + array_data->type, array_data->length, std::move(buffers), array_data->child_data, array_data->dictionary, array_data->GetNullCount(), array_data->offset); return std::move(new_array_data); - } else { return std::move(array_data); } diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h index abdabe22fca..63df63749cf 100644 --- a/cpp/src/arrow/util/align_util.h +++ b/cpp/src/arrow/util/align_util.h @@ -70,8 +70,33 @@ inline BitmapWordAlignParams BitmapWordAlign(const uint8_t* data, int64_t bit_of namespace util { // Functions to check if the provided Arrow object is aligned by the specified alignment + +/// \brief Special alignment value to use data type-specific alignment +/// +/// If this is passed as the `alignment` in one of the CheckAlignment or EnsureAlignment +/// functions, then the function will ensure ensure each buffer is suitably aligned +/// for the data type of the array. For example, given an int32 buffer the values +/// buffer's address must be a multiple of 4. Given a large_string buffer the offsets +/// buffer's address must be a multiple of 8. +constexpr int64_t kValueAlignment = -3; + +/// \brief Calculate if the buffer's address is a multiple of `alignment` +/// +/// If `alignment` is less than or equal to 0 then this method will always return true +/// \param buffer the buffer to check +/// \param alignment the alignment (in bytes) to check for ARROW_EXPORT bool CheckAlignment(const Buffer& buffer, int64_t alignment); +/// \brief Calculate if all buffers in the array data are aligned +/// +/// This will also check the buffers in the dictionary and any children +/// \param array the array data to check +/// \param alignment the alignment (in bytes) to check for ARROW_EXPORT bool CheckAlignment(const ArrayData& array, int64_t alignment); +/// \brief Calculate if all buffers in the array are aligned +/// +/// This will also check the buffers in the dictionary and any children +/// \param array the array to check +/// \param alignment the alignment (in bytes) to check for ARROW_EXPORT bool CheckAlignment(const Array& array, int64_t alignment); // Following functions require an additional boolean vector which stores the @@ -82,29 +107,112 @@ ARROW_EXPORT bool CheckAlignment(const Array& array, int64_t alignment); // of the constituent objects during the EnsureAlignment function where certain // objects can be ignored for further checking if we already know that they are // completely aligned. + +/// \brief Calculate which (if any) chunks in a chunked array are unaligned +/// \param array the array to check +/// \param alignment the alignment (in bytes) to check for +/// \param needs_alignment an output vector that will store the results of the check +/// it must be set to a valid vector. Extra elements will be added to the end +/// of the vector for each chunk that is checked. `true` will be stored if +/// the chunk is unaligned. +/// \param offset the index of the chunk to start checking +/// \return true if all chunks (starting at `offset`) are aligned, false otherwise ARROW_EXPORT bool CheckAlignment(const ChunkedArray& array, int64_t alignment, std::vector* needs_alignment, int offset = 0); + +/// \brief calculate which (if any) columns in a record batch are unaligned +/// \param batch the batch to check +/// \param alignment the alignment (in bytes) to check for +/// \param needs_alignment an output vector that will store the results of the +/// check. It must be set to a valid vector. Extra elements will be added +/// to the end of the vector for each column that is checked. `true` will be +/// stored if the column is unaligned. ARROW_EXPORT bool CheckAlignment(const RecordBatch& batch, int64_t alignment, std::vector* needs_alignment); + +/// \brief calculate which (if any) columns in a table are unaligned +/// \param table the table to check +/// \param alignment the alignment (in bytes) to check for +/// \param needs_alignment an output vector that will store the results of the +/// check. It must be set to a valid vector. Extra elements will be added +/// to the end of the vector for each column that is checked. `true` will be +/// stored if the column is unaligned. ARROW_EXPORT bool CheckAlignment(const Table& table, int64_t alignment, std::vector* needs_alignment); +/// \brief return a buffer that has the given alignment and the same data as the input +/// buffer +/// +/// If the input buffer is already aligned then this method will return the input buffer +/// If the input buffer is not already aligned then this method will allocate a new +/// buffer. The alignment of the new buffer will have at least +/// max(kDefaultBufferAlignment, alignment) bytes of alignment. +/// +/// \param buffer the buffer to check +/// \param alignment the alignment (in bytes) to check for +/// \param memory_pool a memory pool that will be used to allocate a new buffer if the +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( std::shared_ptr buffer, int64_t alignment, MemoryPool* memory_pool); +/// \brief return an array data where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param array_data the array data to check +/// \param alignment the alignment (in bytes) to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( std::shared_ptr array_data, int64_t alignment, MemoryPool* memory_pool); +/// \brief return an array where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param array the array to check +/// \param alignment the alignment (in bytes) to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment(std::shared_ptr array, int64_t alignment, MemoryPool* memory_pool); +/// \brief return a chunked array where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param array the chunked array to check +/// \param alignment the alignment (in bytes) to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( std::shared_ptr array, int64_t alignment, MemoryPool* memory_pool); +/// \brief return a record batch where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param batch the batch to check +/// \param alignment the alignment (in bytes) to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment( std::shared_ptr batch, int64_t alignment, MemoryPool* memory_pool); +/// \brief return a table where all buffers are aligned by the given alignment +/// +/// If any input buffer is already aligned then this method will reuse that same input +/// buffer. +/// +/// \param table the table to check +/// \param alignment the alignment (in bytes) to check for +/// \param memory_pool a memory pool that will be used to allocate new buffers if any +/// input buffer is not sufficiently aligned ARROW_EXPORT Result> EnsureAlignment(std::shared_ptr table, int64_t alignment, MemoryPool* memory_pool); diff --git a/cpp/src/arrow/util/align_util_test.cc b/cpp/src/arrow/util/align_util_test.cc index c4ec83de3ee..a14041597a7 100644 --- a/cpp/src/arrow/util/align_util_test.cc +++ b/cpp/src/arrow/util/align_util_test.cc @@ -15,17 +15,24 @@ // specific language governing permissions and limitations // under the License. -#include #include #include #include #include +#include +#include + #include "arrow/array.h" +#include "arrow/buffer.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/testing/extension_type.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" #include "arrow/testing/random.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" #include "arrow/util/align_util.h" namespace arrow { @@ -166,6 +173,62 @@ TEST(BitmapWordAlign, UnalignedDataStart) { } } // namespace internal +TEST(EnsureAlignment, Buffer) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr buffer, AllocateBuffer(/*size=*/1024)); + std::shared_ptr unaligned_view = SliceBuffer(buffer, 1); + std::shared_ptr aligned_view = SliceBuffer(buffer, 0); + + ASSERT_TRUE(util::CheckAlignment(*aligned_view, kDefaultBufferAlignment)); + ASSERT_FALSE(util::CheckAlignment(*unaligned_view, /*alignment=*/2)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned_dupe, + util::EnsureAlignment(aligned_view, /*alignment=*/8, default_memory_pool())); + + ASSERT_EQ(aligned_view->data(), aligned_dupe->data()); + ASSERT_TRUE(util::CheckAlignment(*aligned_dupe, kDefaultBufferAlignment)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr realigned, + util::EnsureAlignment(unaligned_view, /*alignment=*/8, default_memory_pool())); + + ASSERT_NE(realigned->data(), unaligned_view->data()); + // Even though we only asked to check for 8 bytes of alignment, any reallocation will + // always allocate at least kDefaultBufferAlignment bytes of alignment + ASSERT_TRUE(util::CheckAlignment(*realigned, /*alignment=*/kDefaultBufferAlignment)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr realigned_large, + util::EnsureAlignment(unaligned_view, /*alignment=*/256, default_memory_pool())); + // If the user wants more than kDefaultBufferAlignment they should get it + ASSERT_TRUE(util::CheckAlignment(*realigned_large, /*alignment=*/256)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr realigned_huge, + util::EnsureAlignment(unaligned_view, /*alignment=*/2048, default_memory_pool())); + // It should even be valid for the alignment to be larger than the buffer size itself + ASSERT_TRUE(util::CheckAlignment(*realigned_huge, /*alignment=*/2048)); +} + +TEST(EnsureAlignment, BufferInvalid) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr buffer, AllocateBuffer(/*size=*/1024)); + + // This is nonsense but not worth introducing a Status return. We just return true. + ASSERT_TRUE(util::CheckAlignment(*buffer, 0)); + ASSERT_TRUE(util::CheckAlignment(*buffer, -1)); + + ASSERT_THAT(util::EnsureAlignment(buffer, /*alignment=*/0, default_memory_pool()), + Raises(StatusCode::Invalid, + testing::HasSubstr("Alignment must be a positive integer"))); + + ASSERT_THAT( + util::EnsureAlignment(buffer, /*alignment=*/util::kValueAlignment, + default_memory_pool()), + Raises(StatusCode::Invalid, + testing::HasSubstr( + "may only be used to call EnsureAlignment on arrays or tables"))); +} + TEST(EnsureAlignment, Array) { MemoryPool* pool = default_memory_pool(); auto rand = ::arrow::random::RandomArrayGenerator(1923); @@ -278,4 +341,272 @@ TEST(EnsureAlignment, Table) { ASSERT_EQ(util::CheckAlignment(*aligned_table, 2048, &needs_alignment), true); } +using TypesRequiringSomeKindOfAlignment = + testing::Types; + +using TypesNotRequiringAlignment = + testing::Types; + +template +std::shared_ptr sample_type() { + return TypeTraits::type_singleton(); +} + +template <> +std::shared_ptr sample_type() { + return fixed_size_binary(16); +} + +template <> +std::shared_ptr sample_type() { + return fixed_size_list(uint8(), 16); +} + +template <> +std::shared_ptr sample_type() { + return decimal128(32, 6); +} + +template <> +std::shared_ptr sample_type() { + return decimal256(60, 10); +} + +template <> +std::shared_ptr sample_type() { + return large_list(int8()); +} + +template <> +std::shared_ptr sample_type() { + return dense_union({field("x", int8()), field("y", uint8())}); +} + +template <> +std::shared_ptr sample_type() { + return map(utf8(), field("item", utf8())); +} + +template <> +std::shared_ptr sample_type() { + return duration(TimeUnit::NANO); +} + +template <> +std::shared_ptr sample_type() { + return timestamp(TimeUnit::NANO); +} + +template <> +std::shared_ptr sample_type() { + return time32(TimeUnit::SECOND); +} + +template <> +std::shared_ptr sample_type() { + return time64(TimeUnit::NANO); +} + +template <> +std::shared_ptr sample_type() { + return sparse_union({field("x", uint8()), field("y", int8())}, {1, 2}); +} + +template +std::shared_ptr SampleArray() { + random::RandomArrayGenerator gen(42); + return gen.ArrayOf(sample_type(), 100)->data(); +} + +template <> +std::shared_ptr SampleArray() { + auto ty = sparse_union({field("ints", int64()), field("strs", utf8())}, {2, 7}); + auto ints = ArrayFromJSON(int64(), "[0, 1, 2, 3]"); + auto strs = ArrayFromJSON(utf8(), R"(["a", null, "c", "d"])"); + auto ids = ArrayFromJSON(int8(), "[2, 7, 2, 7]")->data()->buffers[1]; + const int length = 4; + SparseUnionArray arr(ty, length, {ints, strs}, ids); + return arr.data(); +} + +class ValueAlignment : public ::testing::Test { + public: + void CheckModified(const ArrayData& src, const ArrayData& dst) { + ASSERT_EQ(src.buffers.size(), dst.buffers.size()); + for (std::size_t i = 0; i < src.buffers.size(); i++) { + if (!src.buffers[i] || !dst.buffers[i]) { + continue; + } + if (src.buffers[i]->address() != dst.buffers[i]->address()) { + return; + } + } + FAIL() << "Expected at least one buffer to have been modified by EnsureAlignment"; + } + + void CheckUnmodified(const ArrayData& src, const ArrayData& dst) { + ASSERT_EQ(src.buffers.size(), dst.buffers.size()); + for (std::size_t i = 0; i < src.buffers.size(); i++) { + if (!src.buffers[i] || !dst.buffers[i]) { + continue; + } + ASSERT_EQ(src.buffers[i]->address(), dst.buffers[i]->address()); + } + } +}; + +template +class ValueAlignmentRequired : public ValueAlignment {}; +template +class ValueAlignmentNotRequired : public ValueAlignment {}; + +TYPED_TEST_SUITE(ValueAlignmentRequired, TypesRequiringSomeKindOfAlignment); +TYPED_TEST_SUITE(ValueAlignmentNotRequired, TypesNotRequiringAlignment); + +// The default buffer alignment should always be large enough for value alignment +TYPED_TEST(ValueAlignmentRequired, DefaultAlignmentSufficient) { + std::shared_ptr data = SampleArray(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(data, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); + AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); + this->CheckUnmodified(*data, *aligned); +} + +TYPED_TEST(ValueAlignmentRequired, RoundTrip) { + std::shared_ptr data = SampleArray(); + std::shared_ptr unaligned = UnalignBuffers(*data); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); + AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); + this->CheckModified(*unaligned, *aligned); +} + +TYPED_TEST(ValueAlignmentNotRequired, RoundTrip) { + std::shared_ptr data = SampleArray(); + std::shared_ptr unaligned = UnalignBuffers(*data); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); + AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); + this->CheckUnmodified(*unaligned, *aligned); +} + +TYPED_TEST(ValueAlignmentNotRequired, DefaultAlignmentSufficient) { + std::shared_ptr data = SampleArray(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(data, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); + AssertArraysEqual(*MakeArray(data), *MakeArray(aligned)); + this->CheckUnmodified(*data, *aligned); +} + +TEST_F(ValueAlignment, DenseUnion) { + std::shared_ptr data = SampleArray(); + ASSERT_TRUE(util::CheckAlignment(*data, util::kValueAlignment)); + + std::shared_ptr unaligned = UnalignBuffers(*data); + ASSERT_FALSE(util::CheckAlignment(*unaligned, util::kValueAlignment)); + // Dense union arrays are the only array type where the buffer at index 2 is expected + // to be aligned (it contains 32-bit offsets and should be 4-byte aligned) + ASSERT_FALSE(util::CheckAlignment(*unaligned->buffers[2], 4)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr realigned, + util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*realigned, util::kValueAlignment)); + ASSERT_TRUE(util::CheckAlignment(*realigned->buffers[2], 4)); + // The buffer at index 1 is the types buffer which does not require realignment + ASSERT_EQ(unaligned->buffers[1]->data(), realigned->buffers[1]->data()); +} + +TEST_F(ValueAlignment, RunEndEncoded) { + // Run end requires alignment, value type does not + std::shared_ptr run_ends = ArrayFromJSON(int32(), "[3, 5]"); + std::shared_ptr values = ArrayFromJSON(int8(), "[50, 100]"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr array, + RunEndEncodedArray::Make(/*logical_length=*/5, std::move(run_ends), + std::move(values), 0)); + + std::shared_ptr unaligned_ree = std::make_shared(*array->data()); + unaligned_ree->child_data[0] = UnalignBuffers(*unaligned_ree->child_data[0]); + unaligned_ree->child_data[1] = UnalignBuffers(*unaligned_ree->child_data[1]); + + std::shared_ptr aligned_ree = std::make_shared(*unaligned_ree); + + ASSERT_OK_AND_ASSIGN( + aligned_ree, + util::EnsureAlignment(aligned_ree, util::kValueAlignment, default_memory_pool())); + ASSERT_TRUE(util::CheckAlignment(*aligned_ree, util::kValueAlignment)); + + this->CheckModified(*unaligned_ree->child_data[0], *aligned_ree->child_data[0]); + this->CheckUnmodified(*unaligned_ree->child_data[1], *aligned_ree->child_data[1]); +} + +TEST_F(ValueAlignment, Dictionary) { + // Dictionary values require alignment, dictionary indices do not + std::shared_ptr int8_utf8 = dictionary(int8(), utf8()); + std::shared_ptr array = ArrayFromJSON(int8_utf8, R"(["x", "x", "y"])"); + + std::shared_ptr unaligned_dict = std::make_shared(*array->data()); + unaligned_dict->dictionary = UnalignBuffers(*unaligned_dict->dictionary); + unaligned_dict = UnalignBuffers(*unaligned_dict); + + std::shared_ptr aligned_dict = std::make_shared(*unaligned_dict); + + ASSERT_OK_AND_ASSIGN( + aligned_dict, + util::EnsureAlignment(aligned_dict, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*aligned_dict, util::kValueAlignment)); + this->CheckUnmodified(*unaligned_dict, *aligned_dict); + this->CheckModified(*unaligned_dict->dictionary, *aligned_dict->dictionary); + + // Dictionary values do not require alignment, dictionary indices do + std::shared_ptr int16_int8 = dictionary(int16(), int8()); + array = ArrayFromJSON(int16_int8, R"([7, 11])"); + + unaligned_dict = std::make_shared(*array->data()); + unaligned_dict->dictionary = UnalignBuffers(*unaligned_dict->dictionary); + unaligned_dict = UnalignBuffers(*unaligned_dict); + + aligned_dict = std::make_shared(*unaligned_dict); + + ASSERT_OK_AND_ASSIGN( + aligned_dict, + util::EnsureAlignment(aligned_dict, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*aligned_dict, util::kValueAlignment)); + this->CheckModified(*unaligned_dict, *aligned_dict); + this->CheckUnmodified(*unaligned_dict->dictionary, *aligned_dict->dictionary); +} + +TEST_F(ValueAlignment, Extension) { + std::shared_ptr array = ExampleSmallint(); + + std::shared_ptr unaligned = UnalignBuffers(*array->data()); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr aligned, + util::EnsureAlignment(unaligned, util::kValueAlignment, default_memory_pool())); + + ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment)); + this->CheckModified(*unaligned, *aligned); +} + } // namespace arrow