Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ set(ARROW_SRCS
tensor/csf_converter.cc
tensor/csx_converter.cc
type.cc
type_traits.cc
visitor.cc
c/bridge.cc
io/buffered.cc
Expand Down
41 changes: 37 additions & 4 deletions cpp/src/arrow/acero/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"
Expand Down Expand Up @@ -358,10 +359,38 @@ std::optional<int> GetNodeIndex(const std::vector<ExecNode*>& nodes,
return std::nullopt;
}

const char* kAceroAlignmentHandlingEnvVar = "ACERO_ALIGNMENT_HANDLING";

UnalignedBufferHandling DetermineDefaultUnalignedBufferHandling() {
auto maybe_value = ::arrow::internal::GetEnvVar(kAceroAlignmentHandlingEnvVar);
if (!maybe_value.ok()) {
return UnalignedBufferHandling::kWarn;
}
std::string value = maybe_value.MoveValueUnsafe();
if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "warn")) {
return UnalignedBufferHandling::kWarn;
} else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "ignore")) {
return UnalignedBufferHandling::kIgnore;
} else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "reallocate")) {
return UnalignedBufferHandling::kReallocate;
} else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "error")) {
return UnalignedBufferHandling::kError;
} else {
ARROW_LOG(WARNING) << "unrecognized value for ACERO_ALIGNMENT_HANDLING: " << value;
return UnalignedBufferHandling::kWarn;
}
}

} // namespace

const uint32_t ExecPlan::kMaxBatchSize;

UnalignedBufferHandling GetDefaultUnalignedBufferHandling() {
static UnalignedBufferHandling default_value =
DetermineDefaultUnalignedBufferHandling();
return default_value;
}

Result<std::shared_ptr<ExecPlan>> ExecPlan::Make(
QueryOptions opts, ExecContext ctx,
std::shared_ptr<const KeyValueMetadata> metadata) {
Expand Down Expand Up @@ -621,7 +650,8 @@ Future<std::shared_ptr<Table>> DeclarationToTableImpl(
query_options.function_registry);
std::shared_ptr<std::shared_ptr<Table>> output_table =
std::make_shared<std::shared_ptr<Table>>();
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, ExecPlan::Make(exec_ctx));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(query_options, exec_ctx));
TableSinkNodeOptions sink_options(output_table.get());
sink_options.sequence_output = query_options.sequence_output;
sink_options.names = std::move(query_options.field_names);
Expand All @@ -648,7 +678,8 @@ Future<BatchesWithCommonSchema> DeclarationToExecBatchesImpl(
std::shared_ptr<Schema> out_schema;
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, ExecPlan::Make(exec_ctx));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(options, exec_ctx));
SinkNodeOptions sink_options(&sink_gen, &out_schema);
sink_options.sequence_output = options.sequence_output;
Declaration with_sink = Declaration::Sequence({declaration, {"sink", sink_options}});
Expand Down Expand Up @@ -678,7 +709,8 @@ Future<BatchesWithCommonSchema> DeclarationToExecBatchesImpl(
Future<> DeclarationToStatusImpl(Declaration declaration, QueryOptions options,
::arrow::internal::Executor* cpu_executor) {
ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, ExecPlan::Make(exec_ctx));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(options, exec_ctx));
ARROW_ASSIGN_OR_RAISE(ExecNode * last_node, declaration.AddToPlan(exec_plan.get()));
if (!last_node->is_sink()) {
ConsumingSinkNodeOptions sink_options(NullSinkNodeConsumer::Make());
Expand Down Expand Up @@ -972,7 +1004,8 @@ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGen
::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema) {
auto converter = std::make_shared<BatchConverter>();
ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(exec_ctx));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
ExecPlan::Make(options, exec_ctx));
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}});
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/arrow/acero/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ struct ARROW_ACERO_EXPORT Declaration {
std::string label;
};

/// \brief How to handle unaligned buffers
enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kError };

/// \brief get the default behavior of unaligned buffer handling
///
/// This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which
/// can be set to "warn", "ignore", "reallocate", or "error". If the environment
/// variable is not set, or is set to an invalid value, this will return kWarn
UnalignedBufferHandling GetDefaultUnalignedBufferHandling();

/// \brief plan-wide options that can be specified when executing an execution plan
struct ARROW_ACERO_EXPORT QueryOptions {
/// \brief Should the plan use a legacy batching strategy
Expand Down Expand Up @@ -562,6 +572,36 @@ struct ARROW_ACERO_EXPORT QueryOptions {
///
/// If set then the number of names must equal the number of output columns
std::vector<std::string> field_names;

/// \brief Policy for unaligned buffers in source data
///
/// Various compute functions and acero internals will type pun array
/// buffers from uint8_t* to some kind of value type (e.g. we might
/// cast to int32_t* to add two int32 arrays)
///
/// If the buffer is poorly aligned (e.g. an int32 array is not aligned
/// on a 4-byte boundary) then this is technically undefined behavior in C++.
/// However, most modern compilers and CPUs are fairly tolerant of this
/// behavior and nothing bad (beyond a small hit to performance) is likely
/// to happen.
///
/// Note that this only applies to source buffers. All buffers allocated internally
/// by Acero will be suitably aligned.
///
/// If this field is set to kWarn then Acero will check if any buffers are unaligned
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this section of the comment could be broken into comments for the members of UnalignedBufferHandling?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe comments on enum members show up in our API docs. Though, one could argue that no one reads those :)

/// and, if they are, will emit a warning.
///
/// If this field is set to kReallocate then Acero will allocate a new, suitably aligned
/// buffer and copy the contents from the old buffer into this new buffer.
///
/// If this field is set to kError then Acero will gracefully abort the plan instead.
///
/// If this field is set to kIgnore then Acero will not even check if the buffers are
/// unaligned.
///
/// If this field is not set then it will be treated as kWarn unless overridden
/// by the ACERO_ALIGNMENT_HANDLING environment variable
std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
};

/// \brief Calculate the output schema of a declaration
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/arrow/acero/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1704,5 +1704,45 @@ TEST(ExecPlanExecution, SegmentedAggregationWithBatchCrossingSegment) {
{expected});
}

TEST(ExecPlanExecution, UnalignedInput) {
std::shared_ptr<Array> array = ArrayFromJSON(int32(), "[1, 2, 3]");
std::shared_ptr<Array> unaligned = UnalignBuffers(*array);
ASSERT_OK_AND_ASSIGN(ExecBatch sample_batch,
ExecBatch::Make({unaligned}, array->length()));

BatchesWithSchema data;
data.batches = {std::move(sample_batch)};
data.schema = schema({field("i32", int32())});

Declaration plan = Declaration::Sequence({
{"exec_batch_source", ExecBatchSourceNodeOptions(data.schema, data.batches)},
});

int64_t initial_bytes_allocated = default_memory_pool()->total_bytes_allocated();

// By default we should warn and so the plan should finish ok
ASSERT_OK(DeclarationToStatus(plan));
ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated());

QueryOptions query_options;

#ifndef ARROW_UBSAN
// Nothing should happen if we ignore alignment
query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore;
ASSERT_OK(DeclarationToStatus(plan, query_options));
ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated());
Comment on lines +1731 to +1733
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore;
ASSERT_OK(DeclarationToStatus(plan, query_options));
ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated());
#ifndef ARROW_UBSAN
query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore;
ASSERT_OK(DeclarationToStatus(plan, query_options));
ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated());
#endif

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use ARROW_UBSAN regularly and so I was curious why this wasn't a problem. It turns out we never pass -fsanitize=alignment to ubsan. Perhaps we should add this flag?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps open a separate issue for that, but it may open a can of worms :-)

#endif

query_options.unaligned_buffer_handling = UnalignedBufferHandling::kError;
ASSERT_THAT(DeclarationToStatus(plan, query_options),
Raises(StatusCode::Invalid,
testing::HasSubstr("An input buffer was poorly aligned")));
ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated());

query_options.unaligned_buffer_handling = UnalignedBufferHandling::kReallocate;
ASSERT_OK(DeclarationToStatus(plan, query_options));
ASSERT_LT(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated());
}

} // namespace acero
} // namespace arrow
52 changes: 45 additions & 7 deletions cpp/src/arrow/acero/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,46 @@ using arrow::internal::MapVector;
namespace acero {
namespace {

Status HandleUnalignedBuffers(ExecBatch* batch, UnalignedBufferHandling handling) {
if (handling == UnalignedBufferHandling::kIgnore) {
return Status::OK();
}
for (auto& value : batch->values) {
if (value.is_array()) {
switch (handling) {
case UnalignedBufferHandling::kIgnore:
// Should be impossible to get here
return Status::OK();
case UnalignedBufferHandling::kError:
if (!arrow::util::CheckAlignment(*value.array(),
arrow::util::kValueAlignment)) {
return Status::Invalid(
"An input buffer was poorly aligned and UnalignedBufferHandling is set "
"to kError");
}
break;
case UnalignedBufferHandling::kWarn:
if (!arrow::util::CheckAlignment(*value.array(),
arrow::util::kValueAlignment)) {
ARROW_LOG(WARNING)
<< "An input buffer was poorly aligned. This could lead to crashes or "
"poor performance on some hardware. Please ensure that all Acero "
"sources generate aligned buffers, or change the unaligned buffer "
"handling configuration to silence this warning.";
}
break;
case UnalignedBufferHandling::kReallocate: {
ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment(
value.array(), arrow::util::kValueAlignment,
default_memory_pool()));
break;
}
}
}
}
return Status::OK();
}

struct SourceNode : ExecNode, public TracedNode {
SourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
AsyncGenerator<std::optional<ExecBatch>> generator,
Expand Down Expand Up @@ -104,13 +144,11 @@ struct SourceNode : ExecNode, public TracedNode {
batch_size = morsel_length;
}
ExecBatch batch = morsel.Slice(offset, batch_size);
for (auto& value : batch.values) {
if (value.is_array()) {
ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment(
value.make_array(), ipc::kArrowAlignment,
default_memory_pool()));
}
}
UnalignedBufferHandling unaligned_buffer_handling =
plan_->query_context()->options().unaligned_buffer_handling.value_or(
GetDefaultUnalignedBufferHandling());
ARROW_RETURN_NOT_OK(
HandleUnalignedBuffers(&batch, unaligned_buffer_handling));
if (has_ordering) {
batch.index = batch_index;
}
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1099,4 +1099,30 @@ std::shared_ptr<GatingTask> GatingTask::Make(double timeout_seconds) {
return std::make_shared<GatingTask>(timeout_seconds);
}

std::shared_ptr<ArrayData> UnalignBuffers(const ArrayData& array) {
std::vector<std::shared_ptr<Buffer>> new_buffers;
new_buffers.reserve(array.buffers.size());

for (const auto& buffer : array.buffers) {
if (!buffer) {
new_buffers.emplace_back();
continue;
}
EXPECT_OK_AND_ASSIGN(std::shared_ptr<Buffer> padded,
AllocateBuffer(buffer->size() + 1, default_memory_pool()));
memcpy(padded->mutable_data() + 1, buffer->data(), buffer->size());
std::shared_ptr<Buffer> unaligned = SliceBuffer(padded, 1);
new_buffers.push_back(std::move(unaligned));
}

std::shared_ptr<ArrayData> array_data = std::make_shared<ArrayData>(array);
array_data->buffers = std::move(new_buffers);
return array_data;
}

std::shared_ptr<Array> UnalignBuffers(const Array& array) {
std::shared_ptr<ArrayData> array_data = UnalignBuffers(*array.data());
return MakeArray(array_data);
}

} // namespace arrow
9 changes: 9 additions & 0 deletions cpp/src/arrow/testing/gtest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,4 +532,13 @@ class ARROW_TESTING_EXPORT GatingTask {
std::shared_ptr<Impl> impl_;
};

/// \brief create an exact copy of the data where each buffer has a max alignment of 1
///
/// This method does not recurse into the dictionary or children
ARROW_TESTING_EXPORT std::shared_ptr<ArrayData> UnalignBuffers(const ArrayData& array);
/// \brief create an exact copy of the array where each buffer has a max alignment of 1
///
/// This method does not recurse into the dictionary or children
ARROW_TESTING_EXPORT std::shared_ptr<Array> UnalignBuffers(const Array& array);

} // namespace arrow
Loading