diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 78280ba3fc0..7d8aea7bfb2 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +41,7 @@ #include "arrow/record_batch.h" #include "arrow/scalar.h" #include "arrow/status.h" +#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit_util.h" @@ -110,6 +112,109 @@ int64_t ExecBatch::TotalBufferSize() const { return sum; } +struct BufferProperties { + uint64_t address; + int64_t capacity; + friend bool operator<(const BufferProperties& lhs, const BufferProperties& rhs) { + if (lhs.address == rhs.address) { + return (lhs.capacity > rhs.capacity); + } else { + return (lhs.address > rhs.address); + } + } +}; + +bool AddBuffersToSet(std::shared_ptr const& buffer, + std::set* seen_buffers) { + return (buffer && + seen_buffers->insert(BufferProperties{buffer->address(), buffer->capacity()}) + .second); +} + +bool AddBuffersToSet(std::vector> const& buffers, + std::set* seen_buffers) { + bool insertion_occured = false; + for (const auto& buffer : buffers) { + insertion_occured |= + (buffer && + seen_buffers->insert(BufferProperties{buffer->address(), buffer->capacity()}) + .second); + } + return insertion_occured; +} + +bool AddBuffersToSet(const ArrayData& array_data, + std::set* seen_buffers) { + bool insertion_occured = false; + for (const auto& buffer : array_data.buffers) { + insertion_occured |= + (buffer && + seen_buffers->insert(BufferProperties{buffer->address(), buffer->capacity()}) + .second); + } + for (const auto& child : array_data.child_data) { + insertion_occured |= AddBuffersToSet(*child, seen_buffers); + } + if (array_data.dictionary) { + insertion_occured |= AddBuffersToSet(*array_data.dictionary, seen_buffers); + } + return insertion_occured; +} + +bool AddBuffersToSet(const Array& array, std::set* seen_buffers) { + return AddBuffersToSet(*array.data(), seen_buffers); +} + +bool AddBuffersToSet(const ChunkedArray& chunked_array, + std::set* seen_buffers) { + bool insertion_occured = false; + for (const auto& chunk : chunked_array.chunks()) { + insertion_occured |= AddBuffersToSet(*chunk, seen_buffers); + } + return insertion_occured; +} + +bool AddBuffersToSet(const RecordBatch& record_batch, + std::set* seen_buffers) { + bool insertion_occured = false; + for (const auto& column : record_batch.columns()) { + insertion_occured |= AddBuffersToSet(*column, seen_buffers); + } + return insertion_occured; +} + +bool AddBuffersToSet(const Table& table, std::set* seen_buffers) { + bool insertion_occured = false; + for (const auto& column : table.columns()) { + insertion_occured |= AddBuffersToSet(*column, seen_buffers); + } + return insertion_occured; +} + +// Add all Buffers to a given set, return true if anything was actually added. +// If all the buffers in the datum were already in the set, this will return false. +bool AddBuffersToSet(Datum datum, std::set* seen_buffers) { + switch (datum.kind()) { + case Datum::ARRAY: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::CHUNKED_ARRAY: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::RECORD_BATCH: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::TABLE: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::SCALAR: + return false; + default: + DCHECK(false); + return false; + } +} + std::string ExecBatch::ToString() const { std::stringstream ss; PrintTo(*this, &ss); @@ -697,7 +802,26 @@ class ScalarExecutor : public KernelExecutorImpl { } } +#ifndef NDEBUG + + // To check whether the kernel allocated new Buffers, + // insert all the preallocated ones into a set + BufferProperties validity_buffer; + if (validity_preallocated_) { + validity_buffer = {out.array()->buffers[0]->address(), + out.array()->buffers[0]->capacity()}; + } + std::set pre_buffers; + for (size_t i = 0; i < data_preallocated_.size(); ++i) { + const auto& prealloc = data_preallocated_[i]; + if (prealloc.bit_width >= 0) { + AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers); + } + } +#endif // NDEBUG + RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); + if (preallocate_contiguous_) { // Some kernels may like to simply nullify the validity bitmap when // they know the output will have 0 nulls. However, this is not compatible @@ -706,6 +830,31 @@ class ScalarExecutor : public KernelExecutorImpl { DCHECK(out.array()->buffers[0]) << "Null bitmap deleted by kernel but can_write_into_slices = true"; } + +#ifndef NDEBUG + // Check whether the kernel allocated new Buffers + // (instead of using the preallocated ones) + if (validity_preallocated_) { + if (out.array()->buffers[0]) { // it is possible the validity buffer was deleted + if (validity_buffer.address != out.array()->buffers[0]->address() || + validity_buffer.capacity != out.array()->buffers[0]->capacity()) { + return Status::Invalid( + "Pre-allocated validity buffer was modified " + "in function kernel"); + } + } + } + for (size_t i = 0; i < data_preallocated_.size(); ++i) { + const auto& prealloc = data_preallocated_[i]; + if (prealloc.bit_width >= 0) { + if (AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers)) { + return Status::Invalid( + "Unauthorized memory allocations " + "in function kernel"); + } + } + } +#endif // NDEBUG } else { // If we are producing chunked output rather than one big array, then // emit each chunk as soon as it's available @@ -877,7 +1026,60 @@ class VectorExecutor : public KernelExecutorImpl { output_descr_.shape == ValueDescr::ARRAY) { RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array())); } + +#ifndef NDEBUG + // To check whether the kernel allocated new Buffers, + // insert all the preallocated ones into a set + // To check whether the kernel allocated new Buffers, + // insert all the preallocated ones into a set + auto pre_kind = out.kind(); + BufferProperties validity_buffer; + if (validity_preallocated_) { + validity_buffer = {out.array()->buffers[0]->address(), + out.array()->buffers[0]->capacity()}; + } + std::set pre_buffers; + for (size_t i = 0; i < data_preallocated_.size(); ++i) { + const auto& prealloc = data_preallocated_[i]; + if (prealloc.bit_width >= 0) { + AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers); + } + } +#endif // NDEBUG + RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); + +#ifndef NDEBUG + // Check whether the kernel allocated new Buffers + // (instead of using the preallocated ones) + if (validity_preallocated_) { + // it is possible the validity buffer was deleted + if (out.is_array() && out.array()->buffers[0]) { + if (validity_buffer.address != out.array()->buffers[0]->address() || + validity_buffer.capacity != out.array()->buffers[0]->capacity()) { + return Status::Invalid( + "Pre-allocated validity buffer was modified " + "in function kernel"); + } + } + } + for (size_t i = 0; i < data_preallocated_.size(); ++i) { + const auto& prealloc = data_preallocated_[i]; + if (prealloc.bit_width >= 0) { + if (pre_kind != out.kind()) { + return Status::Invalid( + "Pre-allocated out Datum was changed into another type " + "in function kernel"); + } + if (AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers)) { + return Status::Invalid( + "Unauthorized memory allocations " + "in function kernel"); + } + } + } +#endif // NDEBUG + if (!kernel_->finalize) { // If there is no result finalizer (e.g. for hash-based functions, we can // emit the processed batch right away rather than waiting diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 93a02cdb1f1..c1ce4923c27 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -25,6 +25,7 @@ add_arrow_compute_test(scalar_test scalar_cast_test.cc scalar_compare_test.cc scalar_if_else_test.cc + scalar_misbehave_test.cc scalar_nested_test.cc scalar_random_test.cc scalar_set_lookup_test.cc @@ -48,6 +49,7 @@ add_arrow_benchmark(scalar_string_benchmark PREFIX "arrow-compute") add_arrow_compute_test(vector_test SOURCES vector_hash_test.cc + vector_misbehave_test.cc vector_nested_test.cc vector_replace_test.cc vector_selection_test.cc diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc new file mode 100644 index 00000000000..8bb1c6b428c --- /dev/null +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include "arrow/array/concatenate.h" +#include "arrow/compute/api_scalar.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace compute { + +struct ScalarReAllocValidBufExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate a validity buffer even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); + return Status::OK(); + } +}; + +struct ScalarReAllocDataBufExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate a validity buffer even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + return Status::OK(); + } +}; + +const FunctionDoc misbehave_doc{ + "Test kernel that does nothing but allocate memory " + "while it shouldn't", + "This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE).", + {}}; + +TEST(Misbehave, ReallocValidBufferScalarKernel) { + ExecContext ctx; + auto func = std::make_shared("scalar_misbehave", Arity::Unary(), + &misbehave_doc); + DCHECK_OK(func->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, + OutputType(ValueDescr(fixed_size_binary(2))), + ScalarReAllocValidBufExec::Exec)); + Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])")); + const std::vector& args = {datum}; + const FunctionOptions* options = nullptr; + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + testing::HasSubstr("Invalid: " + "Pre-allocated validity buffer was modified " + "in function kernel"), + func->Execute(args, options, &ctx)); +} + +TEST(Misbehave, ReallocDataBufferScalarKernel) { + ExecContext ctx; + auto func = std::make_shared("scalar_misbehave", Arity::Unary(), + &misbehave_doc); + DCHECK_OK(func->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, + OutputType(ValueDescr(fixed_size_binary(2))), + ScalarReAllocDataBufExec::Exec)); + Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])")); + const std::vector& args = {datum}; + const FunctionOptions* options = nullptr; + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, + testing::HasSubstr("Invalid: " + "Unauthorized memory allocations " + "in function kernel"), + func->Execute(args, options, &ctx)); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc new file mode 100644 index 00000000000..ff7839af41c --- /dev/null +++ b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include "arrow/array/concatenate.h" +#include "arrow/chunked_array.h" +#include "arrow/compute/api_vector.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace compute { + +struct VectorReAllocValidBufExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate new buffers even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); + return Status::OK(); + } +}; + +struct VectorReAllocDataBufExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate new buffers even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + return Status::OK(); + } +}; + +void AddVectorMisbehaveKernel(const std::shared_ptr& Vector_function, + Status (*kernel_exec)(KernelContext*, const ExecBatch&, + Datum*)) { + VectorKernel kernel({int32()}, int32(), kernel_exec); + kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; + kernel.mem_allocation = MemAllocation::PREALLOCATE; + kernel.can_write_into_slices = true; + kernel.can_execute_chunkwise = false; + kernel.output_chunked = false; + + DCHECK_OK(Vector_function->AddKernel(std::move(kernel))); +} + +const FunctionDoc misbehave_doc{ + "Test kernel that does nothing but allocate memory " + "while it shouldn't", + "This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE).", + {}}; + +TEST(Misbehave, ReallocValidBufferVectorKernel) { + ExecContext ctx; + auto func = std::make_shared("vector_misbehave", Arity::Unary(), + &misbehave_doc); + AddVectorMisbehaveKernel(func, VectorReAllocValidBufExec::Exec); + Datum datum(ChunkedArray(ArrayVector{}, int32())); + const std::vector& args = {datum}; + const FunctionOptions* options = nullptr; + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + testing::HasSubstr("Invalid: " + "Pre-allocated validity buffer was modified " + "in function kernel"), + func->Execute(args, options, &ctx)); +} + +TEST(Misbehave, ReallocDataBufferVectorKernel) { + ExecContext ctx; + auto func = std::make_shared("vector_misbehave", Arity::Unary(), + &misbehave_doc); + AddVectorMisbehaveKernel(func, VectorReAllocDataBufExec::Exec); + Datum datum(ChunkedArray(ArrayVector{}, int32())); + const std::vector& args = {datum}; + const FunctionOptions* options = nullptr; + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, + testing::HasSubstr("Invalid: " + "Unauthorized memory allocations " + "in function kernel"), + func->Execute(args, options, &ctx)); +} +} // namespace compute +} // namespace arrow