From 0f4919db02584702f1681339fe8f43b4b266f158 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 16 Dec 2021 14:43:46 +0100 Subject: [PATCH 01/13] Initial changes for checking whether additional buffers were created by a kernel --- cpp/src/arrow/compute/exec.cc | 17 +++++++++ cpp/src/arrow/datum.cc | 71 +++++++++++++++++++++++++++++++++++ cpp/src/arrow/datum.h | 3 ++ 3 files changed, 91 insertions(+) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 78280ba3fc0..64eaa3eb467 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -697,7 +698,23 @@ class ScalarExecutor : public KernelExecutorImpl { } } + std::unordered_set pre_buffers; + for (auto dat : batch.values) { + dat.AddBuffersToSet(&pre_buffers); + } + out.AddBuffersToSet(&pre_buffers); + RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); + + bool insertion_occured = false; + for (auto dat : batch.values) { + insertion_occured |= dat.AddBuffersToSet(&pre_buffers); + } + insertion_occured |= out.AddBuffersToSet(&pre_buffers); + if (insertion_occured) { + printf("error! Additional buffers were made\n"); + } + if (preallocate_contiguous_) { // Some kernels may like to simply nullify the validity bitmap when // they know the output will have 0 nulls. However, this is not compatible diff --git a/cpp/src/arrow/datum.cc b/cpp/src/arrow/datum.cc index e51c64d8e7a..d115bd783ab 100644 --- a/cpp/src/arrow/datum.cc +++ b/cpp/src/arrow/datum.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include "arrow/array/array_base.h" @@ -132,6 +133,76 @@ int64_t Datum::TotalBufferSize() const { return 0; } } +namespace util { +bool AddBuffersToSet(const ArrayData& array_data, + std::unordered_set* seen_buffers) { + bool insertion_occured = false; + for (const auto& buffer : array_data.buffers) { + insertion_occured = (buffer && seen_buffers->insert(buffer->data()).second); + } + for (const auto& child : array_data.child_data) { + insertion_occured |= AddBuffersToSet(*child, seen_buffers); + } + if (array_data.dictionary) { + insertion_occured |= AddBuffersToSet(*array_data.dictionary, seen_buffers); + } + return insertion_occured; +} + +bool AddBuffersToSet(const Array& array, + std::unordered_set* seen_buffers) { + return AddBuffersToSet(*array.data(), seen_buffers); +} + +bool AddBuffersToSet(const ChunkedArray& chunked_array, + std::unordered_set* seen_buffers) { + bool insertion_occured = false; + for (const auto& chunk : chunked_array.chunks()) { + insertion_occured |= AddBuffersToSet(*chunk, seen_buffers); + } + return insertion_occured; +} + +bool AddBuffersToSet(const RecordBatch& record_batch, + std::unordered_set* seen_buffers) { + bool insertion_occured = false; + for (const auto& column : record_batch.columns()) { + insertion_occured |= AddBuffersToSet(*column, seen_buffers); + } + return insertion_occured; +} + +bool AddBuffersToSet(const Table& table, + std::unordered_set* seen_buffers) { + bool insertion_occured = false; + for (const auto& column : table.columns()) { + insertion_occured |= AddBuffersToSet(*column, seen_buffers); + } + return insertion_occured; +} +} // namespace util + +bool Datum::AddBuffersToSet(std::unordered_set* seen_buffers) const { + switch (this->kind()) { + case Datum::ARRAY: + return util::AddBuffersToSet(*util::get>(this->value), + seen_buffers); + case Datum::CHUNKED_ARRAY: + return util::AddBuffersToSet(*util::get>(this->value), + seen_buffers); + case Datum::RECORD_BATCH: + return util::AddBuffersToSet(*util::get>(this->value), + seen_buffers); + case Datum::TABLE: + return util::AddBuffersToSet(*util::get>(this->value), + seen_buffers); + case Datum::SCALAR: + return false; + default: + DCHECK(false); + return false; + } +} int64_t Datum::null_count() const { if (this->kind() == Datum::ARRAY) { diff --git a/cpp/src/arrow/datum.h b/cpp/src/arrow/datum.h index 514b4247316..ea207f645ba 100644 --- a/cpp/src/arrow/datum.h +++ b/cpp/src/arrow/datum.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -196,6 +197,8 @@ struct ARROW_EXPORT Datum { /// \see arrow::util::TotalBufferSize for caveats int64_t TotalBufferSize() const; + bool AddBuffersToSet(std::unordered_set* seen_buffers) const; + ArrayData* mutable_array() const { return this->array().get(); } std::shared_ptr make_array() const; From 69c62bab4432827983591a9d13cea97417c2310f Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 4 Jan 2022 14:33:43 +0100 Subject: [PATCH 02/13] Moved AddBuffersToSet into exec.cc, added check to Vector Executor --- cpp/src/arrow/compute/exec.cc | 126 ++++++++++++++++++++++++++++++---- cpp/src/arrow/datum.cc | 71 ------------------- cpp/src/arrow/datum.h | 3 - 3 files changed, 111 insertions(+), 89 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 64eaa3eb467..5b9be400a0a 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -41,6 +41,7 @@ #include "arrow/record_batch.h" #include "arrow/scalar.h" #include "arrow/status.h" +#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit_util.h" @@ -111,6 +112,77 @@ int64_t ExecBatch::TotalBufferSize() const { return sum; } +bool AddBuffersToSet(const ArrayData& array_data, + std::unordered_set>* seen_buffers) { + bool insertion_occured = false; + for (const auto& buffer : array_data.buffers) { + insertion_occured = (buffer && seen_buffers->insert(buffer).second); + } + for (const auto& child : array_data.child_data) { + insertion_occured |= AddBuffersToSet(*child, seen_buffers); + } + if (array_data.dictionary) { + insertion_occured |= AddBuffersToSet(*array_data.dictionary, seen_buffers); + } + return insertion_occured; +} + +bool AddBuffersToSet(const Array& array, + std::unordered_set>* seen_buffers) { + return AddBuffersToSet(*array.data(), seen_buffers); +} + +bool AddBuffersToSet(const ChunkedArray& chunked_array, + std::unordered_set>* seen_buffers) { + bool insertion_occured = false; + for (const auto& chunk : chunked_array.chunks()) { + insertion_occured |= AddBuffersToSet(*chunk, seen_buffers); + } + return insertion_occured; +} + +bool AddBuffersToSet(const RecordBatch& record_batch, + std::unordered_set>* seen_buffers) { + bool insertion_occured = false; + for (const auto& column : record_batch.columns()) { + insertion_occured |= AddBuffersToSet(*column, seen_buffers); + } + return insertion_occured; +} + +bool AddBuffersToSet(const Table& table, + std::unordered_set>* seen_buffers) { + bool insertion_occured = false; + for (const auto& column : table.columns()) { + insertion_occured |= AddBuffersToSet(*column, seen_buffers); + } + return insertion_occured; +} + +// Add all Buffers to a given set, return true if anything was actually added. +// If all the buffers in the datum were already in the set, this will return false. +bool AddBuffersToSet(Datum datum, std::unordered_set>* seen_buffers) { + switch (datum.kind()) { + case Datum::ARRAY: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::CHUNKED_ARRAY: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::RECORD_BATCH: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::TABLE: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::SCALAR: + return false; + default: + DCHECK(false); + return false; + } +} + std::string ExecBatch::ToString() const { std::stringstream ss; PrintTo(*this, &ss); @@ -698,31 +770,33 @@ class ScalarExecutor : public KernelExecutorImpl { } } - std::unordered_set pre_buffers; - for (auto dat : batch.values) { - dat.AddBuffersToSet(&pre_buffers); + // To check whether the kernel allocated new Buffers, insert all the preallocated ones into a set + std::unordered_set> pre_buffers; + if (preallocate_contiguous_) { + for (auto dat: batch.values) { + AddBuffersToSet(dat, &pre_buffers); + } + AddBuffersToSet(out, &pre_buffers); } - out.AddBuffersToSet(&pre_buffers); RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); - bool insertion_occured = false; - for (auto dat : batch.values) { - insertion_occured |= dat.AddBuffersToSet(&pre_buffers); - } - insertion_occured |= out.AddBuffersToSet(&pre_buffers); - if (insertion_occured) { - printf("error! Additional buffers were made\n"); - } - if (preallocate_contiguous_) { // Some kernels may like to simply nullify the validity bitmap when // they know the output will have 0 nulls. However, this is not compatible // with writing into slices. if (output_descr_.shape == ValueDescr::ARRAY) { - DCHECK(out.array()->buffers[0]) - << "Null bitmap deleted by kernel but can_write_into_slices = true"; + DCHECK(out.array()->buffers[0]) + << "Null bitmap deleted by kernel but can_write_into_slices = true"; } + + // Check whether the kernel allocated new Buffers (instead of using the preallocated ones) + bool insertion_occured = false; + for (auto dat : batch.values) { + insertion_occured |= AddBuffersToSet(dat, &pre_buffers); + } + insertion_occured |= AddBuffersToSet(out, &pre_buffers); + DCHECK_EQ(insertion_occured, false) << "Unauthorized memory allocations in function kernel"; } else { // If we are producing chunked output rather than one big array, then // emit each chunk as soon as it's available @@ -894,7 +968,29 @@ class VectorExecutor : public KernelExecutorImpl { output_descr_.shape == ValueDescr::ARRAY) { RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array())); } + + // To check whether the kernel allocated new Buffers, insert all the preallocated ones into a set + std::unordered_set> pre_buffers; + if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + for (auto dat: batch.values) { + AddBuffersToSet(dat, &pre_buffers); + } + AddBuffersToSet(out, &pre_buffers); + } + RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); + + if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + + // Check whether the kernel allocated new Buffers (instead of using the preallocated ones) + bool insertion_occured = false; + for (auto dat: batch.values) { + insertion_occured |= AddBuffersToSet(dat, &pre_buffers); + } + insertion_occured |= AddBuffersToSet(out, &pre_buffers); + DCHECK_EQ(insertion_occured, false) << "Unauthorized memory allocations in function kernel"; + } + if (!kernel_->finalize) { // If there is no result finalizer (e.g. for hash-based functions, we can // emit the processed batch right away rather than waiting diff --git a/cpp/src/arrow/datum.cc b/cpp/src/arrow/datum.cc index d115bd783ab..e51c64d8e7a 100644 --- a/cpp/src/arrow/datum.cc +++ b/cpp/src/arrow/datum.cc @@ -20,7 +20,6 @@ #include #include #include -#include #include #include "arrow/array/array_base.h" @@ -133,76 +132,6 @@ int64_t Datum::TotalBufferSize() const { return 0; } } -namespace util { -bool AddBuffersToSet(const ArrayData& array_data, - std::unordered_set* seen_buffers) { - bool insertion_occured = false; - for (const auto& buffer : array_data.buffers) { - insertion_occured = (buffer && seen_buffers->insert(buffer->data()).second); - } - for (const auto& child : array_data.child_data) { - insertion_occured |= AddBuffersToSet(*child, seen_buffers); - } - if (array_data.dictionary) { - insertion_occured |= AddBuffersToSet(*array_data.dictionary, seen_buffers); - } - return insertion_occured; -} - -bool AddBuffersToSet(const Array& array, - std::unordered_set* seen_buffers) { - return AddBuffersToSet(*array.data(), seen_buffers); -} - -bool AddBuffersToSet(const ChunkedArray& chunked_array, - std::unordered_set* seen_buffers) { - bool insertion_occured = false; - for (const auto& chunk : chunked_array.chunks()) { - insertion_occured |= AddBuffersToSet(*chunk, seen_buffers); - } - return insertion_occured; -} - -bool AddBuffersToSet(const RecordBatch& record_batch, - std::unordered_set* seen_buffers) { - bool insertion_occured = false; - for (const auto& column : record_batch.columns()) { - insertion_occured |= AddBuffersToSet(*column, seen_buffers); - } - return insertion_occured; -} - -bool AddBuffersToSet(const Table& table, - std::unordered_set* seen_buffers) { - bool insertion_occured = false; - for (const auto& column : table.columns()) { - insertion_occured |= AddBuffersToSet(*column, seen_buffers); - } - return insertion_occured; -} -} // namespace util - -bool Datum::AddBuffersToSet(std::unordered_set* seen_buffers) const { - switch (this->kind()) { - case Datum::ARRAY: - return util::AddBuffersToSet(*util::get>(this->value), - seen_buffers); - case Datum::CHUNKED_ARRAY: - return util::AddBuffersToSet(*util::get>(this->value), - seen_buffers); - case Datum::RECORD_BATCH: - return util::AddBuffersToSet(*util::get>(this->value), - seen_buffers); - case Datum::TABLE: - return util::AddBuffersToSet(*util::get>(this->value), - seen_buffers); - case Datum::SCALAR: - return false; - default: - DCHECK(false); - return false; - } -} int64_t Datum::null_count() const { if (this->kind() == Datum::ARRAY) { diff --git a/cpp/src/arrow/datum.h b/cpp/src/arrow/datum.h index ea207f645ba..514b4247316 100644 --- a/cpp/src/arrow/datum.h +++ b/cpp/src/arrow/datum.h @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -197,8 +196,6 @@ struct ARROW_EXPORT Datum { /// \see arrow::util::TotalBufferSize for caveats int64_t TotalBufferSize() const; - bool AddBuffersToSet(std::unordered_set* seen_buffers) const; - ArrayData* mutable_array() const { return this->array().get(); } std::shared_ptr make_array() const; From a7ec5e47cab48990fa66c5a901d2ea12c1cb6bdc Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 6 Jan 2022 11:10:19 +0100 Subject: [PATCH 03/13] Added test for detecting allocation misbehavior in Scalar kernels # Conflicts: # cpp/src/arrow/compute/kernels/CMakeLists.txt --- cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/compute/api_scalar.cc | 5 ++ cpp/src/arrow/compute/exec.cc | 34 +++++--- cpp/src/arrow/compute/kernels/CMakeLists.txt | 1 + .../arrow/compute/kernels/scalar_misbehave.cc | 84 +++++++++++++++++++ .../compute/kernels/scalar_misbehave_test.cc | 43 ++++++++++ .../arrow/compute/kernels/vector_misbehave.cc | 83 ++++++++++++++++++ cpp/src/arrow/compute/registry.cc | 2 + cpp/src/arrow/compute/registry_internal.h | 2 + 9 files changed, 245 insertions(+), 11 deletions(-) create mode 100644 cpp/src/arrow/compute/kernels/scalar_misbehave.cc create mode 100644 cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc create mode 100644 cpp/src/arrow/compute/kernels/vector_misbehave.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index beda9079c1e..90a46191a3f 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -421,6 +421,7 @@ if(ARROW_COMPUTE) compute/kernels/scalar_cast_temporal.cc compute/kernels/scalar_compare.cc compute/kernels/scalar_if_else.cc + compute/kernels/scalar_misbehave.cc compute/kernels/scalar_nested.cc compute/kernels/scalar_random.cc compute/kernels/scalar_set_lookup.cc @@ -432,6 +433,7 @@ if(ARROW_COMPUTE) compute/kernels/util_internal.cc compute/kernels/vector_array_sort.cc compute/kernels/vector_hash.cc + compute/kernels/vector_misbehave.cc compute/kernels/vector_nested.cc compute/kernels/vector_replace.cc compute/kernels/vector_selection.cc diff --git a/cpp/src/arrow/compute/api_scalar.cc b/cpp/src/arrow/compute/api_scalar.cc index f247510b6f0..8c30dea198f 100644 --- a/cpp/src/arrow/compute/api_scalar.cc +++ b/cpp/src/arrow/compute/api_scalar.cc @@ -732,6 +732,11 @@ Result IsNull(const Datum& arg, NullOptions options, ExecContext* ctx) { return CallFunction("is_null", {arg}, &options, ctx); } +Result Misbehave(const Datum& cond, const Datum& if_true, const Datum& if_false, + ExecContext* ctx) { + return CallFunction("misbehave", {}, ctx); +} + // ---------------------------------------------------------------------- // Temporal functions diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 5b9be400a0a..535aaf9cee6 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -161,7 +161,8 @@ bool AddBuffersToSet(const Table& table, // Add all Buffers to a given set, return true if anything was actually added. // If all the buffers in the datum were already in the set, this will return false. -bool AddBuffersToSet(Datum datum, std::unordered_set>* seen_buffers) { +bool AddBuffersToSet(Datum datum, + std::unordered_set>* seen_buffers) { switch (datum.kind()) { case Datum::ARRAY: return AddBuffersToSet(*util::get>(datum.value), @@ -770,10 +771,11 @@ class ScalarExecutor : public KernelExecutorImpl { } } - // To check whether the kernel allocated new Buffers, insert all the preallocated ones into a set + // To check whether the kernel allocated new Buffers, + // insert all the preallocated ones into a set std::unordered_set> pre_buffers; if (preallocate_contiguous_) { - for (auto dat: batch.values) { + for (auto dat : batch.values) { AddBuffersToSet(dat, &pre_buffers); } AddBuffersToSet(out, &pre_buffers); @@ -790,13 +792,21 @@ class ScalarExecutor : public KernelExecutorImpl { << "Null bitmap deleted by kernel but can_write_into_slices = true"; } - // Check whether the kernel allocated new Buffers (instead of using the preallocated ones) + // Check whether the kernel allocated new Buffers + // (instead of using the preallocated ones) bool insertion_occured = false; for (auto dat : batch.values) { insertion_occured |= AddBuffersToSet(dat, &pre_buffers); } insertion_occured |= AddBuffersToSet(out, &pre_buffers); - DCHECK_EQ(insertion_occured, false) << "Unauthorized memory allocations in function kernel"; + DCHECK_EQ(insertion_occured, false) << "Unauthorized memory allocations " + "in function kernel"; +// TODO: instead of aborting (which is what DCHECK_EQ() above does), +// throw an Error we can catch in a test +// if (insertion_occured) { +// return Status::ExecutionError("Unauthorized memory allocations " +// "in function kernel"); +// } } else { // If we are producing chunked output rather than one big array, then // emit each chunk as soon as it's available @@ -969,10 +979,11 @@ class VectorExecutor : public KernelExecutorImpl { RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array())); } - // To check whether the kernel allocated new Buffers, insert all the preallocated ones into a set + // To check whether the kernel allocated new Buffers, + // insert all the preallocated ones into a set std::unordered_set> pre_buffers; if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { - for (auto dat: batch.values) { + for (auto dat : batch.values) { AddBuffersToSet(dat, &pre_buffers); } AddBuffersToSet(out, &pre_buffers); @@ -981,14 +992,15 @@ class VectorExecutor : public KernelExecutorImpl { RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { - - // Check whether the kernel allocated new Buffers (instead of using the preallocated ones) + // Check whether the kernel allocated new Buffers + // (instead of using the preallocated ones) bool insertion_occured = false; - for (auto dat: batch.values) { + for (auto dat : batch.values) { insertion_occured |= AddBuffersToSet(dat, &pre_buffers); } insertion_occured |= AddBuffersToSet(out, &pre_buffers); - DCHECK_EQ(insertion_occured, false) << "Unauthorized memory allocations in function kernel"; + DCHECK_EQ(insertion_occured, false) << "Unauthorized memory allocations " + "in function kernel"; } if (!kernel_->finalize) { diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 93a02cdb1f1..4d11b52474e 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -25,6 +25,7 @@ add_arrow_compute_test(scalar_test scalar_cast_test.cc scalar_compare_test.cc scalar_if_else_test.cc + scalar_misbehave_test.cc scalar_nested_test.cc scalar_random_test.cc scalar_set_lookup_test.cc diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave.cc new file mode 100644 index 00000000000..076d35319b4 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave.cc @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/builder_nested.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/array/builder_time.h" +#include "arrow/array/builder_union.h" +#include "arrow/compute/api.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/util/bit_block_counter.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/bitmap.h" +#include "arrow/util/bitmap_ops.h" +#include "arrow/util/bitmap_reader.h" + +namespace arrow { + +namespace compute { +namespace internal { + +namespace { + +template +struct ScalarMisbehaveExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate a buffer even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], + ctx->Allocate(64)); + //return Status::NotImplemented("This kernel only exists for testing purposes"); + // The function should return OK, otherwise the buffer check is not performed + return Status::OK(); + } +}; + +struct ScalarMisbehaveFunction : ScalarFunction { + using ScalarFunction::ScalarFunction; +}; + +void AddScalarMisbehaveKernels(const std::shared_ptr& scalar_function) { + ScalarKernel kernel({}, null(), + ScalarMisbehaveExec::Exec); + kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; + kernel.mem_allocation = MemAllocation::PREALLOCATE; + kernel.can_write_into_slices = true; + + DCHECK_OK(scalar_function->AddKernel(std::move(kernel))); + } +} // namespace + +const FunctionDoc misbehave_doc{"Test kernel that does nothing but allocate memory " + "while it shouldn't", + ("This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE)."), + {}}; + +void RegisterScalarMisbehave(FunctionRegistry* registry) { +auto func = + std::make_shared("misbehave", Arity::Nullary(), + &misbehave_doc); + + AddScalarMisbehaveKernels(func); +DCHECK_OK(registry->AddFunction(std::move(func))); +//TODO: We want to prevent people from actually using this kernel. +// Can we add the function only for testing? +} + +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc new file mode 100644 index 00000000000..b76cb3e4c68 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include "arrow/array.h" +#include "arrow/array/concatenate.h" +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/kernels/test_util.h" +#include "arrow/compute/registry.h" +#include "arrow/compute/exec_internal.h" +#include "arrow/testing/gtest_util.h" + +namespace arrow { +namespace compute { +/* +class ScalarExecutor : public KernelExecutorImpl { +public: + Status Execute(const std::vector& args, ExecListener* listener) override { +*/ +TEST(Misbehave, MisbehavingScalarKernel) { +// ASSERT_RAISES_WITH_MESSAGE(ExecutionError, +// "Unauthorized memory allocations in function kernel", +// CallFunction("misbehave", {})); + ASSERT_NOT_OK(CallFunction("misbehave", {})); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave.cc b/cpp/src/arrow/compute/kernels/vector_misbehave.cc new file mode 100644 index 00000000000..427ef656b50 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/vector_misbehave.cc @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/array/data.h" +#include "arrow/compute/api_vector.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/compute/kernels/util_internal.h" +#include "arrow/compute/kernels/vector_sort_internal.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_block_counter.h" +#include "arrow/util/bitmap.h" +#include "arrow/util/bitmap_ops.h" +#include "arrow/util/checked_cast.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +using internal::checked_cast; + +namespace compute { +namespace internal { + +namespace { + +struct VectorMisbehave { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + auto out_arr = out->mutable_array(); + ARROW_ASSIGN_OR_RAISE(out_arr->buffers[0], ctx->Allocate(64)); + //return Status::NotImplemented("This kernel only exists for testing purposes"); + return Status::OK(); + } +}; + +const FunctionDoc misbehave_doc{"Test kernel that does nothing but allocate memory " + "while it shouldn't", + ("This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE)."), + {}}; + +} // namespace + +void RegisterVectorMisbehave(FunctionRegistry* registry) { + // The kernel outputs into preallocated memory and is never null + VectorKernel base; + base.mem_allocation = MemAllocation::PREALLOCATE; + base.null_handling = NullHandling::COMPUTED_PREALLOCATE; + + auto func = std::make_shared( + "vector_misbehave", Arity::Nullary(), &misbehave_doc); + // TODO: Does this need to be defined? Not for scalar kernels, it seems + // base.init = PartitionNthToIndicesState::Init; + base.signature = KernelSignature::Make({}, uint64()); + base.exec = VectorMisbehave::Exec; + DCHECK_OK(func->AddKernel(base)); + DCHECK_OK(registry->AddFunction(std::move(func))); +} + +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/registry.cc b/cpp/src/arrow/compute/registry.cc index 8ab83a72e5e..eb30cefe988 100644 --- a/cpp/src/arrow/compute/registry.cc +++ b/cpp/src/arrow/compute/registry.cc @@ -161,6 +161,7 @@ static std::unique_ptr CreateBuiltInRegistry() { RegisterScalarCast(registry.get()); RegisterScalarComparison(registry.get()); RegisterScalarIfElse(registry.get()); + RegisterScalarMisbehave(registry.get()); RegisterScalarNested(registry.get()); RegisterScalarRandom(registry.get()); // Nullary RegisterScalarSetLookup(registry.get()); @@ -175,6 +176,7 @@ static std::unique_ptr CreateBuiltInRegistry() { // Vector functions RegisterVectorArraySort(registry.get()); RegisterVectorHash(registry.get()); + RegisterVectorMisbehave(registry.get()); RegisterVectorNested(registry.get()); RegisterVectorReplace(registry.get()); RegisterVectorSelection(registry.get()); diff --git a/cpp/src/arrow/compute/registry_internal.h b/cpp/src/arrow/compute/registry_internal.h index 35f7b079529..0689437ef49 100644 --- a/cpp/src/arrow/compute/registry_internal.h +++ b/cpp/src/arrow/compute/registry_internal.h @@ -30,6 +30,7 @@ void RegisterScalarBoolean(FunctionRegistry* registry); void RegisterScalarCast(FunctionRegistry* registry); void RegisterScalarComparison(FunctionRegistry* registry); void RegisterScalarIfElse(FunctionRegistry* registry); +void RegisterScalarMisbehave(FunctionRegistry* registry); void RegisterScalarNested(FunctionRegistry* registry); void RegisterScalarRandom(FunctionRegistry* registry); // Nullary void RegisterScalarSetLookup(FunctionRegistry* registry); @@ -44,6 +45,7 @@ void RegisterScalarOptions(FunctionRegistry* registry); // Vector functions void RegisterVectorArraySort(FunctionRegistry* registry); void RegisterVectorHash(FunctionRegistry* registry); +void RegisterVectorMisbehave(FunctionRegistry* registry); void RegisterVectorNested(FunctionRegistry* registry); void RegisterVectorReplace(FunctionRegistry* registry); void RegisterVectorSelection(FunctionRegistry* registry); From 905a1931fc2811e4c2b1bcdd97978ba46692ac19 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 6 Jan 2022 11:04:41 +0100 Subject: [PATCH 04/13] Fixes to formatting, removed unused includes --- cpp/src/arrow/compute/api_scalar.cc | 2 +- cpp/src/arrow/compute/exec.cc | 118 +++++++++--------- .../arrow/compute/kernels/scalar_misbehave.cc | 66 ++++------ .../compute/kernels/scalar_misbehave_test.cc | 12 +- .../arrow/compute/kernels/vector_misbehave.cc | 30 ++--- 5 files changed, 101 insertions(+), 127 deletions(-) diff --git a/cpp/src/arrow/compute/api_scalar.cc b/cpp/src/arrow/compute/api_scalar.cc index 8c30dea198f..f13b48a7d7b 100644 --- a/cpp/src/arrow/compute/api_scalar.cc +++ b/cpp/src/arrow/compute/api_scalar.cc @@ -733,7 +733,7 @@ Result IsNull(const Datum& arg, NullOptions options, ExecContext* ctx) { } Result Misbehave(const Datum& cond, const Datum& if_true, const Datum& if_false, - ExecContext* ctx) { + ExecContext* ctx) { return CallFunction("misbehave", {}, ctx); } diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 535aaf9cee6..48097e07a34 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -114,74 +114,74 @@ int64_t ExecBatch::TotalBufferSize() const { bool AddBuffersToSet(const ArrayData& array_data, std::unordered_set>* seen_buffers) { - bool insertion_occured = false; - for (const auto& buffer : array_data.buffers) { - insertion_occured = (buffer && seen_buffers->insert(buffer).second); - } - for (const auto& child : array_data.child_data) { - insertion_occured |= AddBuffersToSet(*child, seen_buffers); - } - if (array_data.dictionary) { - insertion_occured |= AddBuffersToSet(*array_data.dictionary, seen_buffers); - } - return insertion_occured; + bool insertion_occured = false; + for (const auto& buffer : array_data.buffers) { + insertion_occured = (buffer && seen_buffers->insert(buffer).second); + } + for (const auto& child : array_data.child_data) { + insertion_occured |= AddBuffersToSet(*child, seen_buffers); + } + if (array_data.dictionary) { + insertion_occured |= AddBuffersToSet(*array_data.dictionary, seen_buffers); + } + return insertion_occured; } bool AddBuffersToSet(const Array& array, std::unordered_set>* seen_buffers) { - return AddBuffersToSet(*array.data(), seen_buffers); + return AddBuffersToSet(*array.data(), seen_buffers); } bool AddBuffersToSet(const ChunkedArray& chunked_array, std::unordered_set>* seen_buffers) { - bool insertion_occured = false; - for (const auto& chunk : chunked_array.chunks()) { - insertion_occured |= AddBuffersToSet(*chunk, seen_buffers); - } - return insertion_occured; + bool insertion_occured = false; + for (const auto& chunk : chunked_array.chunks()) { + insertion_occured |= AddBuffersToSet(*chunk, seen_buffers); + } + return insertion_occured; } bool AddBuffersToSet(const RecordBatch& record_batch, std::unordered_set>* seen_buffers) { - bool insertion_occured = false; - for (const auto& column : record_batch.columns()) { - insertion_occured |= AddBuffersToSet(*column, seen_buffers); - } - return insertion_occured; + bool insertion_occured = false; + for (const auto& column : record_batch.columns()) { + insertion_occured |= AddBuffersToSet(*column, seen_buffers); + } + return insertion_occured; } bool AddBuffersToSet(const Table& table, std::unordered_set>* seen_buffers) { - bool insertion_occured = false; - for (const auto& column : table.columns()) { - insertion_occured |= AddBuffersToSet(*column, seen_buffers); - } - return insertion_occured; + bool insertion_occured = false; + for (const auto& column : table.columns()) { + insertion_occured |= AddBuffersToSet(*column, seen_buffers); + } + return insertion_occured; } // Add all Buffers to a given set, return true if anything was actually added. // If all the buffers in the datum were already in the set, this will return false. bool AddBuffersToSet(Datum datum, std::unordered_set>* seen_buffers) { - switch (datum.kind()) { - case Datum::ARRAY: - return AddBuffersToSet(*util::get>(datum.value), - seen_buffers); - case Datum::CHUNKED_ARRAY: - return AddBuffersToSet(*util::get>(datum.value), - seen_buffers); - case Datum::RECORD_BATCH: - return AddBuffersToSet(*util::get>(datum.value), - seen_buffers); - case Datum::TABLE: - return AddBuffersToSet(*util::get>(datum.value), - seen_buffers); - case Datum::SCALAR: - return false; - default: - DCHECK(false); - return false; - } + switch (datum.kind()) { + case Datum::ARRAY: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::CHUNKED_ARRAY: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::RECORD_BATCH: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::TABLE: + return AddBuffersToSet(*util::get>(datum.value), + seen_buffers); + case Datum::SCALAR: + return false; + default: + DCHECK(false); + return false; + } } std::string ExecBatch::ToString() const { @@ -775,10 +775,10 @@ class ScalarExecutor : public KernelExecutorImpl { // insert all the preallocated ones into a set std::unordered_set> pre_buffers; if (preallocate_contiguous_) { - for (auto dat : batch.values) { - AddBuffersToSet(dat, &pre_buffers); - } - AddBuffersToSet(out, &pre_buffers); + for (auto dat : batch.values) { + AddBuffersToSet(dat, &pre_buffers); + } + AddBuffersToSet(out, &pre_buffers); } RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); @@ -788,25 +788,25 @@ class ScalarExecutor : public KernelExecutorImpl { // they know the output will have 0 nulls. However, this is not compatible // with writing into slices. if (output_descr_.shape == ValueDescr::ARRAY) { - DCHECK(out.array()->buffers[0]) - << "Null bitmap deleted by kernel but can_write_into_slices = true"; + DCHECK(out.array()->buffers[0]) + << "Null bitmap deleted by kernel but can_write_into_slices = true"; } // Check whether the kernel allocated new Buffers // (instead of using the preallocated ones) bool insertion_occured = false; for (auto dat : batch.values) { - insertion_occured |= AddBuffersToSet(dat, &pre_buffers); + insertion_occured |= AddBuffersToSet(dat, &pre_buffers); } insertion_occured |= AddBuffersToSet(out, &pre_buffers); DCHECK_EQ(insertion_occured, false) << "Unauthorized memory allocations " "in function kernel"; -// TODO: instead of aborting (which is what DCHECK_EQ() above does), -// throw an Error we can catch in a test -// if (insertion_occured) { -// return Status::ExecutionError("Unauthorized memory allocations " -// "in function kernel"); -// } + // TODO: instead of aborting (which is what DCHECK_EQ() above does), + // consider throwing an Error we can catch in a test + // if (insertion_occured) { + // return Status::ExecutionError("Unauthorized memory allocations " + // "in function kernel"); + // } } else { // If we are producing chunked output rather than one big array, then // emit each chunk as soon as it's available diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave.cc index 076d35319b4..34fc545f7f3 100644 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave.cc +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave.cc @@ -15,17 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include "arrow/array/builder_nested.h" -#include "arrow/array/builder_primitive.h" -#include "arrow/array/builder_time.h" -#include "arrow/array/builder_union.h" #include "arrow/compute/api.h" #include "arrow/compute/kernels/codegen_internal.h" #include "arrow/util/bit_block_counter.h" -#include "arrow/util/bit_run_reader.h" -#include "arrow/util/bitmap.h" #include "arrow/util/bitmap_ops.h" -#include "arrow/util/bitmap_reader.h" namespace arrow { @@ -36,47 +29,42 @@ namespace { template struct ScalarMisbehaveExec { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // allocate a buffer even though we've promised not to - ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], - ctx->Allocate(64)); - //return Status::NotImplemented("This kernel only exists for testing purposes"); - // The function should return OK, otherwise the buffer check is not performed - return Status::OK(); - } -}; - -struct ScalarMisbehaveFunction : ScalarFunction { - using ScalarFunction::ScalarFunction; + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate a buffer even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->Allocate(64)); + // return Status::NotImplemented("This kernel only exists for testing purposes"); + // The function should return OK, otherwise the buffer check is not performed + return Status::OK(); + } }; void AddScalarMisbehaveKernels(const std::shared_ptr& scalar_function) { - ScalarKernel kernel({}, null(), - ScalarMisbehaveExec::Exec); - kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; - kernel.mem_allocation = MemAllocation::PREALLOCATE; - kernel.can_write_into_slices = true; + ScalarKernel kernel({}, null(), + ScalarMisbehaveExec::Exec); + kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; + kernel.mem_allocation = MemAllocation::PREALLOCATE; + kernel.can_write_into_slices = true; - DCHECK_OK(scalar_function->AddKernel(std::move(kernel))); - } -} // namespace + DCHECK_OK(scalar_function->AddKernel(std::move(kernel))); +} -const FunctionDoc misbehave_doc{"Test kernel that does nothing but allocate memory " - "while it shouldn't", - ("This Kernel only exists for testing purposes.\n" - "It allocates memory while it promised not to \n" - "(because of MemAllocation::PREALLOCATE)."), - {}}; +const FunctionDoc misbehave_doc{ + "Test kernel that does nothing but allocate memory " + "while it shouldn't", + "This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE).", + {}}; +} // namespace void RegisterScalarMisbehave(FunctionRegistry* registry) { -auto func = - std::make_shared("misbehave", Arity::Nullary(), - &misbehave_doc); + auto func = std::make_shared("scalar_misbehave", Arity::Nullary(), + &misbehave_doc); AddScalarMisbehaveKernels(func); -DCHECK_OK(registry->AddFunction(std::move(func))); -//TODO: We want to prevent people from actually using this kernel. -// Can we add the function only for testing? + DCHECK_OK(registry->AddFunction(std::move(func))); + // TODO: We want to prevent people from actually using this kernel. + // Can we add the function only for testing? } } // namespace internal diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc index b76cb3e4c68..8c44dc9ee8f 100644 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc @@ -16,13 +16,9 @@ // under the License. #include -#include "arrow/array.h" #include "arrow/array/concatenate.h" #include "arrow/compute/api_scalar.h" -#include "arrow/compute/cast.h" -#include "arrow/compute/kernels/test_util.h" #include "arrow/compute/registry.h" -#include "arrow/compute/exec_internal.h" #include "arrow/testing/gtest_util.h" namespace arrow { @@ -33,10 +29,10 @@ class ScalarExecutor : public KernelExecutorImpl { Status Execute(const std::vector& args, ExecListener* listener) override { */ TEST(Misbehave, MisbehavingScalarKernel) { -// ASSERT_RAISES_WITH_MESSAGE(ExecutionError, -// "Unauthorized memory allocations in function kernel", -// CallFunction("misbehave", {})); - ASSERT_NOT_OK(CallFunction("misbehave", {})); + // ASSERT_RAISES_WITH_MESSAGE(ExecutionError, + // "Unauthorized memory allocations in function kernel", + // CallFunction("misbehave", {})); + ASSERT_NOT_OK(CallFunction("scalar_misbehave", {})); } } // namespace compute diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave.cc b/cpp/src/arrow/compute/kernels/vector_misbehave.cc index 427ef656b50..e83236cdb59 100644 --- a/cpp/src/arrow/compute/kernels/vector_misbehave.cc +++ b/cpp/src/arrow/compute/kernels/vector_misbehave.cc @@ -15,25 +15,14 @@ // specific language governing permissions and limitations // under the License. -#include -#include -#include -#include -#include -#include #include -#include "arrow/array/data.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/common.h" -#include "arrow/compute/kernels/util_internal.h" #include "arrow/compute/kernels/vector_sort_internal.h" -#include "arrow/type_traits.h" #include "arrow/util/bit_block_counter.h" -#include "arrow/util/bitmap.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/checked_cast.h" -#include "arrow/visitor_inline.h" namespace arrow { @@ -48,17 +37,18 @@ struct VectorMisbehave { static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { auto out_arr = out->mutable_array(); ARROW_ASSIGN_OR_RAISE(out_arr->buffers[0], ctx->Allocate(64)); - //return Status::NotImplemented("This kernel only exists for testing purposes"); + // return Status::NotImplemented("This kernel only exists for testing purposes"); return Status::OK(); } }; -const FunctionDoc misbehave_doc{"Test kernel that does nothing but allocate memory " - "while it shouldn't", - ("This Kernel only exists for testing purposes.\n" - "It allocates memory while it promised not to \n" - "(because of MemAllocation::PREALLOCATE)."), - {}}; +const FunctionDoc misbehave_doc{ + "Test kernel that does nothing but allocate memory " + "while it shouldn't", + "This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE).", + {}}; } // namespace @@ -68,8 +58,8 @@ void RegisterVectorMisbehave(FunctionRegistry* registry) { base.mem_allocation = MemAllocation::PREALLOCATE; base.null_handling = NullHandling::COMPUTED_PREALLOCATE; - auto func = std::make_shared( - "vector_misbehave", Arity::Nullary(), &misbehave_doc); + auto func = std::make_shared("vector_misbehave", Arity::Nullary(), + &misbehave_doc); // TODO: Does this need to be defined? Not for scalar kernels, it seems // base.init = PartitionNthToIndicesState::Init; base.signature = KernelSignature::Make({}, uint64()); From 9997e6dc8feb2ad9d68e76602aadf41416f7be75 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 12 Jan 2022 17:42:50 +0100 Subject: [PATCH 05/13] Removed test kernel from registry and API, no longer using DCHECK --- cpp/src/arrow/compute/api_scalar.cc | 5 -- cpp/src/arrow/compute/exec.cc | 41 +++++------ cpp/src/arrow/compute/kernels/CMakeLists.txt | 1 + .../arrow/compute/kernels/scalar_misbehave.cc | 34 +++++---- .../compute/kernels/scalar_misbehave_test.cc | 23 +++--- .../arrow/compute/kernels/vector_misbehave.cc | 71 +++++++++---------- .../compute/kernels/vector_misbehave_test.cc | 41 +++++++++++ cpp/src/arrow/compute/registry.cc | 2 - cpp/src/arrow/compute/registry_internal.h | 2 - 9 files changed, 124 insertions(+), 96 deletions(-) create mode 100644 cpp/src/arrow/compute/kernels/vector_misbehave_test.cc diff --git a/cpp/src/arrow/compute/api_scalar.cc b/cpp/src/arrow/compute/api_scalar.cc index f13b48a7d7b..f247510b6f0 100644 --- a/cpp/src/arrow/compute/api_scalar.cc +++ b/cpp/src/arrow/compute/api_scalar.cc @@ -732,11 +732,6 @@ Result IsNull(const Datum& arg, NullOptions options, ExecContext* ctx) { return CallFunction("is_null", {arg}, &options, ctx); } -Result Misbehave(const Datum& cond, const Datum& if_true, const Datum& if_false, - ExecContext* ctx) { - return CallFunction("misbehave", {}, ctx); -} - // ---------------------------------------------------------------------- // Temporal functions diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 48097e07a34..b7179126a3c 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -771,18 +771,18 @@ class ScalarExecutor : public KernelExecutorImpl { } } +#ifndef NDEBUG // To check whether the kernel allocated new Buffers, // insert all the preallocated ones into a set std::unordered_set> pre_buffers; if (preallocate_contiguous_) { - for (auto dat : batch.values) { - AddBuffersToSet(dat, &pre_buffers); - } AddBuffersToSet(out, &pre_buffers); } +#endif // NDEBUG RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); + if (preallocate_contiguous_) { // Some kernels may like to simply nullify the validity bitmap when // they know the output will have 0 nulls. However, this is not compatible @@ -792,21 +792,15 @@ class ScalarExecutor : public KernelExecutorImpl { << "Null bitmap deleted by kernel but can_write_into_slices = true"; } +#ifndef NDEBUG // Check whether the kernel allocated new Buffers // (instead of using the preallocated ones) - bool insertion_occured = false; - for (auto dat : batch.values) { - insertion_occured |= AddBuffersToSet(dat, &pre_buffers); + bool insertion_occured = AddBuffersToSet(out, &pre_buffers); + if (insertion_occured) { + return Status::ExecutionError("Unauthorized memory allocations " + "in function kernel"); } - insertion_occured |= AddBuffersToSet(out, &pre_buffers); - DCHECK_EQ(insertion_occured, false) << "Unauthorized memory allocations " - "in function kernel"; - // TODO: instead of aborting (which is what DCHECK_EQ() above does), - // consider throwing an Error we can catch in a test - // if (insertion_occured) { - // return Status::ExecutionError("Unauthorized memory allocations " - // "in function kernel"); - // } +#endif // NDEBUG } else { // If we are producing chunked output rather than one big array, then // emit each chunk as soon as it's available @@ -979,29 +973,28 @@ class VectorExecutor : public KernelExecutorImpl { RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array())); } +#ifndef NDEBUG // To check whether the kernel allocated new Buffers, // insert all the preallocated ones into a set std::unordered_set> pre_buffers; if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { - for (auto dat : batch.values) { - AddBuffersToSet(dat, &pre_buffers); - } AddBuffersToSet(out, &pre_buffers); } +#endif // NDEBUG RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); +#ifndef NDEBUG if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { // Check whether the kernel allocated new Buffers // (instead of using the preallocated ones) - bool insertion_occured = false; - for (auto dat : batch.values) { - insertion_occured |= AddBuffersToSet(dat, &pre_buffers); + bool insertion_occured = AddBuffersToSet(out, &pre_buffers); + if (insertion_occured) { + return Status::ExecutionError("Unauthorized memory allocations " + "in function kernel"); } - insertion_occured |= AddBuffersToSet(out, &pre_buffers); - DCHECK_EQ(insertion_occured, false) << "Unauthorized memory allocations " - "in function kernel"; } +#endif // NDEBUG if (!kernel_->finalize) { // If there is no result finalizer (e.g. for hash-based functions, we can diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 4d11b52474e..c1ce4923c27 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -49,6 +49,7 @@ add_arrow_benchmark(scalar_string_benchmark PREFIX "arrow-compute") add_arrow_compute_test(vector_test SOURCES vector_hash_test.cc + vector_misbehave_test.cc vector_nested_test.cc vector_replace_test.cc vector_selection_test.cc diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave.cc index 34fc545f7f3..54daba7771f 100644 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave.cc +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave.cc @@ -31,7 +31,11 @@ template struct ScalarMisbehaveExec { static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { // allocate a buffer even though we've promised not to - ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->Allocate(64)); + ARROW_ASSIGN_OR_RAISE(auto buffer, ctx->Allocate(64)); + BufferVector buffers{nullptr, std::move(buffer)}; + auto array = std::make_shared(int64(), 8, std::move(buffers), + 0); + *out = array; // return Status::NotImplemented("This kernel only exists for testing purposes"); // The function should return OK, otherwise the buffer check is not performed return Status::OK(); @@ -39,13 +43,16 @@ struct ScalarMisbehaveExec { }; void AddScalarMisbehaveKernels(const std::shared_ptr& scalar_function) { - ScalarKernel kernel({}, null(), - ScalarMisbehaveExec::Exec); - kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; - kernel.mem_allocation = MemAllocation::PREALLOCATE; - kernel.can_write_into_slices = true; +// ScalarKernel kernel({InputType(Type::FIXED_SIZE_BINARY)}, +// OutputType(ValueDescr(fixed_size_binary(2)))), +// ScalarMisbehaveExec::Exec); +// kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; +// kernel.mem_allocation = MemAllocation::PREALLOCATE; //is the default +// kernel.can_write_into_slices = true; - DCHECK_OK(scalar_function->AddKernel(std::move(kernel))); + DCHECK_OK(scalar_function->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, + OutputType(ValueDescr(fixed_size_binary(2))), + ScalarMisbehaveExec::Exec)); } const FunctionDoc misbehave_doc{ @@ -57,16 +64,13 @@ const FunctionDoc misbehave_doc{ {}}; } // namespace -void RegisterScalarMisbehave(FunctionRegistry* registry) { - auto func = std::make_shared("scalar_misbehave", Arity::Nullary(), +} // namespace internal +using arrow::compute::internal::AddScalarMisbehaveKernels; +std::shared_ptr CreateScalarMisbehaveFunction() { + auto func = std::make_shared("scalar_misbehave", Arity::Unary(), &misbehave_doc); - AddScalarMisbehaveKernels(func); - DCHECK_OK(registry->AddFunction(std::move(func))); - // TODO: We want to prevent people from actually using this kernel. - // Can we add the function only for testing? + return func; } - -} // namespace internal } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc index 8c44dc9ee8f..1f1eb6ee9a1 100644 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc @@ -18,22 +18,23 @@ #include #include "arrow/array/concatenate.h" #include "arrow/compute/api_scalar.h" -#include "arrow/compute/registry.h" #include "arrow/testing/gtest_util.h" namespace arrow { namespace compute { -/* -class ScalarExecutor : public KernelExecutorImpl { -public: - Status Execute(const std::vector& args, ExecListener* listener) override { -*/ +std::shared_ptr CreateScalarMisbehaveFunction(); + TEST(Misbehave, MisbehavingScalarKernel) { - // ASSERT_RAISES_WITH_MESSAGE(ExecutionError, - // "Unauthorized memory allocations in function kernel", - // CallFunction("misbehave", {})); - ASSERT_NOT_OK(CallFunction("scalar_misbehave", {})); + ExecContext ctx; + auto func = CreateScalarMisbehaveFunction(); + Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])")); + const std::vector &args = {datum}; + const FunctionOptions *options = nullptr; + ASSERT_RAISES_WITH_MESSAGE(ExecutionError, + "ExecutionError in Gandiva: " + "Unauthorized memory allocations " + "in function kernel", + func->Execute(args, options, &ctx)); } - } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave.cc b/cpp/src/arrow/compute/kernels/vector_misbehave.cc index e83236cdb59..b808d492aaa 100644 --- a/cpp/src/arrow/compute/kernels/vector_misbehave.cc +++ b/cpp/src/arrow/compute/kernels/vector_misbehave.cc @@ -15,59 +15,56 @@ // specific language governing permissions and limitations // under the License. -#include - -#include "arrow/compute/api_vector.h" -#include "arrow/compute/kernels/common.h" -#include "arrow/compute/kernels/vector_sort_internal.h" +#include "arrow/compute/api.h" +#include "arrow/compute/kernels/codegen_internal.h" #include "arrow/util/bit_block_counter.h" #include "arrow/util/bitmap_ops.h" -#include "arrow/util/checked_cast.h" namespace arrow { -using internal::checked_cast; - namespace compute { namespace internal { namespace { -struct VectorMisbehave { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - auto out_arr = out->mutable_array(); - ARROW_ASSIGN_OR_RAISE(out_arr->buffers[0], ctx->Allocate(64)); - // return Status::NotImplemented("This kernel only exists for testing purposes"); - return Status::OK(); - } +template +struct VectorMisbehaveExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate new buffers even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + return Status::OK(); + } }; -const FunctionDoc misbehave_doc{ - "Test kernel that does nothing but allocate memory " - "while it shouldn't", - "This Kernel only exists for testing purposes.\n" - "It allocates memory while it promised not to \n" - "(because of MemAllocation::PREALLOCATE).", - {}}; +void AddVectorMisbehaveKernels(const std::shared_ptr& Vector_function) { + VectorKernel kernel({int32()}, int32(), + VectorMisbehaveExec::Exec); + kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; + kernel.mem_allocation = MemAllocation::PREALLOCATE; + kernel.can_write_into_slices = true; + kernel.can_execute_chunkwise = false; + kernel.output_chunked = false; -} // namespace + DCHECK_OK(Vector_function->AddKernel(std::move(kernel))); +} -void RegisterVectorMisbehave(FunctionRegistry* registry) { - // The kernel outputs into preallocated memory and is never null - VectorKernel base; - base.mem_allocation = MemAllocation::PREALLOCATE; - base.null_handling = NullHandling::COMPUTED_PREALLOCATE; +const FunctionDoc misbehave_doc{ + "Test kernel that does nothing but allocate memory " + "while it shouldn't", + "This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE).", + {}}; +} // namespace - auto func = std::make_shared("vector_misbehave", Arity::Nullary(), +} // namespace internal +using arrow::compute::internal::AddVectorMisbehaveKernels; +std::shared_ptr CreateVectorMisbehaveFunction() { + auto func = std::make_shared("vector_misbehave", Arity::Unary(), &misbehave_doc); - // TODO: Does this need to be defined? Not for scalar kernels, it seems - // base.init = PartitionNthToIndicesState::Init; - base.signature = KernelSignature::Make({}, uint64()); - base.exec = VectorMisbehave::Exec; - DCHECK_OK(func->AddKernel(base)); - DCHECK_OK(registry->AddFunction(std::move(func))); + AddVectorMisbehaveKernels(func); + return func; } - -} // namespace internal } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc new file mode 100644 index 00000000000..754a8777547 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include "arrow/array/concatenate.h" +#include "arrow/chunked_array.h" +#include "arrow/compute/api_vector.h" +#include "arrow/testing/gtest_util.h" + +namespace arrow { +namespace compute { +std::shared_ptr CreateVectorMisbehaveFunction(); + +TEST(Misbehave, MisbehavingVectorKernel) { + ExecContext ctx; + auto func = CreateVectorMisbehaveFunction(); + Datum datum(ChunkedArray(ArrayVector{}, int32())); + const std::vector &args = {datum}; + const FunctionOptions *options = nullptr; + ASSERT_RAISES_WITH_MESSAGE(ExecutionError, + "ExecutionError in Gandiva: " + "Unauthorized memory allocations " + "in function kernel", + func->Execute(args, options, &ctx)); +} +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/registry.cc b/cpp/src/arrow/compute/registry.cc index eb30cefe988..8ab83a72e5e 100644 --- a/cpp/src/arrow/compute/registry.cc +++ b/cpp/src/arrow/compute/registry.cc @@ -161,7 +161,6 @@ static std::unique_ptr CreateBuiltInRegistry() { RegisterScalarCast(registry.get()); RegisterScalarComparison(registry.get()); RegisterScalarIfElse(registry.get()); - RegisterScalarMisbehave(registry.get()); RegisterScalarNested(registry.get()); RegisterScalarRandom(registry.get()); // Nullary RegisterScalarSetLookup(registry.get()); @@ -176,7 +175,6 @@ static std::unique_ptr CreateBuiltInRegistry() { // Vector functions RegisterVectorArraySort(registry.get()); RegisterVectorHash(registry.get()); - RegisterVectorMisbehave(registry.get()); RegisterVectorNested(registry.get()); RegisterVectorReplace(registry.get()); RegisterVectorSelection(registry.get()); diff --git a/cpp/src/arrow/compute/registry_internal.h b/cpp/src/arrow/compute/registry_internal.h index 0689437ef49..35f7b079529 100644 --- a/cpp/src/arrow/compute/registry_internal.h +++ b/cpp/src/arrow/compute/registry_internal.h @@ -30,7 +30,6 @@ void RegisterScalarBoolean(FunctionRegistry* registry); void RegisterScalarCast(FunctionRegistry* registry); void RegisterScalarComparison(FunctionRegistry* registry); void RegisterScalarIfElse(FunctionRegistry* registry); -void RegisterScalarMisbehave(FunctionRegistry* registry); void RegisterScalarNested(FunctionRegistry* registry); void RegisterScalarRandom(FunctionRegistry* registry); // Nullary void RegisterScalarSetLookup(FunctionRegistry* registry); @@ -45,7 +44,6 @@ void RegisterScalarOptions(FunctionRegistry* registry); // Vector functions void RegisterVectorArraySort(FunctionRegistry* registry); void RegisterVectorHash(FunctionRegistry* registry); -void RegisterVectorMisbehave(FunctionRegistry* registry); void RegisterVectorNested(FunctionRegistry* registry); void RegisterVectorReplace(FunctionRegistry* registry); void RegisterVectorSelection(FunctionRegistry* registry); From bd6d2819bee885691219bb41410487587d9866fb Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 13 Jan 2022 17:34:41 +0100 Subject: [PATCH 06/13] Moved the misbehaving test kernels inside the test file --- cpp/src/arrow/CMakeLists.txt | 2 - .../arrow/compute/kernels/scalar_misbehave.cc | 76 ------------------- .../compute/kernels/scalar_misbehave_test.cc | 58 ++++++++++++-- .../arrow/compute/kernels/vector_misbehave.cc | 70 ----------------- .../compute/kernels/vector_misbehave_test.cc | 53 +++++++++++-- 5 files changed, 99 insertions(+), 160 deletions(-) delete mode 100644 cpp/src/arrow/compute/kernels/scalar_misbehave.cc delete mode 100644 cpp/src/arrow/compute/kernels/vector_misbehave.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 90a46191a3f..beda9079c1e 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -421,7 +421,6 @@ if(ARROW_COMPUTE) compute/kernels/scalar_cast_temporal.cc compute/kernels/scalar_compare.cc compute/kernels/scalar_if_else.cc - compute/kernels/scalar_misbehave.cc compute/kernels/scalar_nested.cc compute/kernels/scalar_random.cc compute/kernels/scalar_set_lookup.cc @@ -433,7 +432,6 @@ if(ARROW_COMPUTE) compute/kernels/util_internal.cc compute/kernels/vector_array_sort.cc compute/kernels/vector_hash.cc - compute/kernels/vector_misbehave.cc compute/kernels/vector_nested.cc compute/kernels/vector_replace.cc compute/kernels/vector_selection.cc diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave.cc deleted file mode 100644 index 54daba7771f..00000000000 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave.cc +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/compute/api.h" -#include "arrow/compute/kernels/codegen_internal.h" -#include "arrow/util/bit_block_counter.h" -#include "arrow/util/bitmap_ops.h" - -namespace arrow { - -namespace compute { -namespace internal { - -namespace { - -template -struct ScalarMisbehaveExec { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // allocate a buffer even though we've promised not to - ARROW_ASSIGN_OR_RAISE(auto buffer, ctx->Allocate(64)); - BufferVector buffers{nullptr, std::move(buffer)}; - auto array = std::make_shared(int64(), 8, std::move(buffers), - 0); - *out = array; - // return Status::NotImplemented("This kernel only exists for testing purposes"); - // The function should return OK, otherwise the buffer check is not performed - return Status::OK(); - } -}; - -void AddScalarMisbehaveKernels(const std::shared_ptr& scalar_function) { -// ScalarKernel kernel({InputType(Type::FIXED_SIZE_BINARY)}, -// OutputType(ValueDescr(fixed_size_binary(2)))), -// ScalarMisbehaveExec::Exec); -// kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; -// kernel.mem_allocation = MemAllocation::PREALLOCATE; //is the default -// kernel.can_write_into_slices = true; - - DCHECK_OK(scalar_function->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, - OutputType(ValueDescr(fixed_size_binary(2))), - ScalarMisbehaveExec::Exec)); -} - -const FunctionDoc misbehave_doc{ - "Test kernel that does nothing but allocate memory " - "while it shouldn't", - "This Kernel only exists for testing purposes.\n" - "It allocates memory while it promised not to \n" - "(because of MemAllocation::PREALLOCATE).", - {}}; -} // namespace - -} // namespace internal -using arrow::compute::internal::AddScalarMisbehaveKernels; -std::shared_ptr CreateScalarMisbehaveFunction() { - auto func = std::make_shared("scalar_misbehave", Arity::Unary(), - &misbehave_doc); - AddScalarMisbehaveKernels(func); - return func; -} -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc index 1f1eb6ee9a1..81a056c3a06 100644 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc @@ -15,26 +15,72 @@ // specific language governing permissions and limitations // under the License. +#include #include #include "arrow/array/concatenate.h" #include "arrow/compute/api_scalar.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/logging.h" namespace arrow { namespace compute { -std::shared_ptr CreateScalarMisbehaveFunction(); +namespace { + +template +struct ScalarMisbehaveExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate a buffer even though we've promised not to + ARROW_ASSIGN_OR_RAISE(auto buffer, ctx->Allocate(64)); + BufferVector buffers{nullptr, std::move(buffer)}; + auto array = std::make_shared(int64(), 8, std::move(buffers)); + *out = array; + // return Status::NotImplemented("This kernel only exists for testing purposes"); + // The function should return OK, otherwise the buffer check is not performed + return Status::OK(); + } +}; +} // namespace internal + +void AddScalarMisbehaveKernels(const std::shared_ptr& scalar_function) { +// ScalarKernel kernel({InputType(Type::FIXED_SIZE_BINARY)}, +// OutputType(ValueDescr(fixed_size_binary(2)))), +// ScalarMisbehaveExec::Exec); +// kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; +// kernel.mem_allocation = MemAllocation::PREALLOCATE; //is the default +// kernel.can_write_into_slices = true; + + DCHECK_OK(scalar_function->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, + OutputType(ValueDescr(fixed_size_binary(2))), + ScalarMisbehaveExec::Exec)); +} + +const FunctionDoc misbehave_doc{ + "Test kernel that does nothing but allocate memory " + "while it shouldn't", + "This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE).", + {}}; + +std::shared_ptr CreateScalarMisbehaveFunction() { + auto func = std::make_shared("scalar_misbehave", Arity::Unary(), + &misbehave_doc); + AddScalarMisbehaveKernels(func); + return func; +} TEST(Misbehave, MisbehavingScalarKernel) { ExecContext ctx; auto func = CreateScalarMisbehaveFunction(); Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])")); const std::vector &args = {datum}; const FunctionOptions *options = nullptr; - ASSERT_RAISES_WITH_MESSAGE(ExecutionError, - "ExecutionError in Gandiva: " - "Unauthorized memory allocations " - "in function kernel", - func->Execute(args, options, &ctx)); + EXPECT_RAISES_WITH_MESSAGE_THAT(ExecutionError, + testing::HasSubstr( + "ExecutionError in Gandiva: " + "Unauthorized memory allocations " + "in function kernel"), + func->Execute(args, options, &ctx)); } } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave.cc b/cpp/src/arrow/compute/kernels/vector_misbehave.cc deleted file mode 100644 index b808d492aaa..00000000000 --- a/cpp/src/arrow/compute/kernels/vector_misbehave.cc +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/compute/api.h" -#include "arrow/compute/kernels/codegen_internal.h" -#include "arrow/util/bit_block_counter.h" -#include "arrow/util/bitmap_ops.h" - -namespace arrow { - -namespace compute { -namespace internal { - -namespace { - -template -struct VectorMisbehaveExec { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // allocate new buffers even though we've promised not to - ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); - ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); - return Status::OK(); - } -}; - -void AddVectorMisbehaveKernels(const std::shared_ptr& Vector_function) { - VectorKernel kernel({int32()}, int32(), - VectorMisbehaveExec::Exec); - kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; - kernel.mem_allocation = MemAllocation::PREALLOCATE; - kernel.can_write_into_slices = true; - kernel.can_execute_chunkwise = false; - kernel.output_chunked = false; - - DCHECK_OK(Vector_function->AddKernel(std::move(kernel))); -} - -const FunctionDoc misbehave_doc{ - "Test kernel that does nothing but allocate memory " - "while it shouldn't", - "This Kernel only exists for testing purposes.\n" - "It allocates memory while it promised not to \n" - "(because of MemAllocation::PREALLOCATE).", - {}}; -} // namespace - -} // namespace internal -using arrow::compute::internal::AddVectorMisbehaveKernels; -std::shared_ptr CreateVectorMisbehaveFunction() { - auto func = std::make_shared("vector_misbehave", Arity::Unary(), - &misbehave_doc); - AddVectorMisbehaveKernels(func); - return func; -} -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc index 754a8777547..d00062f3313 100644 --- a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc @@ -15,15 +15,55 @@ // specific language governing permissions and limitations // under the License. +#include #include #include "arrow/array/concatenate.h" #include "arrow/chunked_array.h" #include "arrow/compute/api_vector.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/logging.h" namespace arrow { namespace compute { -std::shared_ptr CreateVectorMisbehaveFunction(); +namespace { + +template +struct VectorMisbehaveExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate new buffers even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + return Status::OK(); + } +}; + +void AddVectorMisbehaveKernels(const std::shared_ptr& Vector_function) { + VectorKernel kernel({int32()}, int32(), + VectorMisbehaveExec::Exec); + kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; + kernel.mem_allocation = MemAllocation::PREALLOCATE; + kernel.can_write_into_slices = true; + kernel.can_execute_chunkwise = false; + kernel.output_chunked = false; + + DCHECK_OK(Vector_function->AddKernel(std::move(kernel))); +} +} // namespace + +const FunctionDoc misbehave_doc{ + "Test kernel that does nothing but allocate memory " + "while it shouldn't", + "This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE).", + {}}; + +std::shared_ptr CreateVectorMisbehaveFunction() { + auto func = std::make_shared("vector_misbehave", Arity::Unary(), + &misbehave_doc); + AddVectorMisbehaveKernels(func); + return func; +} TEST(Misbehave, MisbehavingVectorKernel) { ExecContext ctx; @@ -31,11 +71,12 @@ TEST(Misbehave, MisbehavingVectorKernel) { Datum datum(ChunkedArray(ArrayVector{}, int32())); const std::vector &args = {datum}; const FunctionOptions *options = nullptr; - ASSERT_RAISES_WITH_MESSAGE(ExecutionError, - "ExecutionError in Gandiva: " - "Unauthorized memory allocations " - "in function kernel", - func->Execute(args, options, &ctx)); + EXPECT_RAISES_WITH_MESSAGE_THAT(ExecutionError, + testing::HasSubstr( + "ExecutionError in Gandiva: " + "Unauthorized memory allocations " + "in function kernel"), + func->Execute(args, options, &ctx)); } } // namespace compute } // namespace arrow From 9ede680b0c5bf0fe5bc1c9e40449d3e01f418356 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 14 Jan 2022 12:28:50 +0100 Subject: [PATCH 07/13] Code cleanup and formatting --- cpp/src/arrow/compute/exec.cc | 11 ++-- .../compute/kernels/scalar_misbehave_test.cc | 58 ++++++++----------- .../compute/kernels/vector_misbehave_test.cc | 41 ++++++------- 3 files changed, 47 insertions(+), 63 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index b7179126a3c..199f8e700e3 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -782,7 +782,6 @@ class ScalarExecutor : public KernelExecutorImpl { RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); - if (preallocate_contiguous_) { // Some kernels may like to simply nullify the validity bitmap when // they know the output will have 0 nulls. However, this is not compatible @@ -797,8 +796,9 @@ class ScalarExecutor : public KernelExecutorImpl { // (instead of using the preallocated ones) bool insertion_occured = AddBuffersToSet(out, &pre_buffers); if (insertion_occured) { - return Status::ExecutionError("Unauthorized memory allocations " - "in function kernel"); + return Status::ExecutionError( + "Unauthorized memory allocations " + "in function kernel"); } #endif // NDEBUG } else { @@ -990,8 +990,9 @@ class VectorExecutor : public KernelExecutorImpl { // (instead of using the preallocated ones) bool insertion_occured = AddBuffersToSet(out, &pre_buffers); if (insertion_occured) { - return Status::ExecutionError("Unauthorized memory allocations " - "in function kernel"); + return Status::ExecutionError( + "Unauthorized memory allocations " + "in function kernel"); } } #endif // NDEBUG diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc index 81a056c3a06..982183a4369 100644 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc @@ -25,43 +25,31 @@ namespace arrow { namespace compute { -namespace { - -template struct ScalarMisbehaveExec { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // allocate a buffer even though we've promised not to - ARROW_ASSIGN_OR_RAISE(auto buffer, ctx->Allocate(64)); - BufferVector buffers{nullptr, std::move(buffer)}; - auto array = std::make_shared(int64(), 8, std::move(buffers)); - *out = array; - // return Status::NotImplemented("This kernel only exists for testing purposes"); - // The function should return OK, otherwise the buffer check is not performed - return Status::OK(); - } + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate a buffer even though we've promised not to + ARROW_ASSIGN_OR_RAISE(auto buffer, ctx->Allocate(64)); + ARROW_ASSIGN_OR_RAISE(auto nullbitmap, ctx->AllocateBitmap(8)); + BufferVector buffers{nullbitmap, buffer}; + auto array = std::make_shared(int64(), 8, buffers); + *out = Datum(array); + return Status::OK(); + } }; -} // namespace internal void AddScalarMisbehaveKernels(const std::shared_ptr& scalar_function) { -// ScalarKernel kernel({InputType(Type::FIXED_SIZE_BINARY)}, -// OutputType(ValueDescr(fixed_size_binary(2)))), -// ScalarMisbehaveExec::Exec); -// kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; -// kernel.mem_allocation = MemAllocation::PREALLOCATE; //is the default -// kernel.can_write_into_slices = true; - DCHECK_OK(scalar_function->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, OutputType(ValueDescr(fixed_size_binary(2))), - ScalarMisbehaveExec::Exec)); + ScalarMisbehaveExec::Exec)); } const FunctionDoc misbehave_doc{ - "Test kernel that does nothing but allocate memory " - "while it shouldn't", - "This Kernel only exists for testing purposes.\n" - "It allocates memory while it promised not to \n" - "(because of MemAllocation::PREALLOCATE).", - {}}; + "Test kernel that does nothing but allocate memory " + "while it shouldn't", + "This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE).", + {}}; std::shared_ptr CreateScalarMisbehaveFunction() { auto func = std::make_shared("scalar_misbehave", Arity::Unary(), @@ -69,18 +57,18 @@ std::shared_ptr CreateScalarMisbehaveFunction() { AddScalarMisbehaveKernels(func); return func; } + TEST(Misbehave, MisbehavingScalarKernel) { ExecContext ctx; auto func = CreateScalarMisbehaveFunction(); Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])")); - const std::vector &args = {datum}; - const FunctionOptions *options = nullptr; + const std::vector& args = {datum}; + const FunctionOptions* options = nullptr; EXPECT_RAISES_WITH_MESSAGE_THAT(ExecutionError, - testing::HasSubstr( - "ExecutionError in Gandiva: " - "Unauthorized memory allocations " - "in function kernel"), - func->Execute(args, options, &ctx)); + testing::HasSubstr("ExecutionError in Gandiva: " + "Unauthorized memory allocations " + "in function kernel"), + func->Execute(args, options, &ctx)); } } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc index d00062f3313..434db15d888 100644 --- a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc @@ -25,21 +25,18 @@ namespace arrow { namespace compute { -namespace { -template struct VectorMisbehaveExec { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // allocate new buffers even though we've promised not to - ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); - ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); - return Status::OK(); - } + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate new buffers even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + return Status::OK(); + } }; void AddVectorMisbehaveKernels(const std::shared_ptr& Vector_function) { - VectorKernel kernel({int32()}, int32(), - VectorMisbehaveExec::Exec); + VectorKernel kernel({int32()}, int32(), VectorMisbehaveExec::Exec); kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; kernel.mem_allocation = MemAllocation::PREALLOCATE; kernel.can_write_into_slices = true; @@ -48,15 +45,14 @@ void AddVectorMisbehaveKernels(const std::shared_ptr& Vector_fun DCHECK_OK(Vector_function->AddKernel(std::move(kernel))); } -} // namespace const FunctionDoc misbehave_doc{ - "Test kernel that does nothing but allocate memory " - "while it shouldn't", - "This Kernel only exists for testing purposes.\n" - "It allocates memory while it promised not to \n" - "(because of MemAllocation::PREALLOCATE).", - {}}; + "Test kernel that does nothing but allocate memory " + "while it shouldn't", + "This Kernel only exists for testing purposes.\n" + "It allocates memory while it promised not to \n" + "(because of MemAllocation::PREALLOCATE).", + {}}; std::shared_ptr CreateVectorMisbehaveFunction() { auto func = std::make_shared("vector_misbehave", Arity::Unary(), @@ -69,13 +65,12 @@ TEST(Misbehave, MisbehavingVectorKernel) { ExecContext ctx; auto func = CreateVectorMisbehaveFunction(); Datum datum(ChunkedArray(ArrayVector{}, int32())); - const std::vector &args = {datum}; - const FunctionOptions *options = nullptr; + const std::vector& args = {datum}; + const FunctionOptions* options = nullptr; EXPECT_RAISES_WITH_MESSAGE_THAT(ExecutionError, - testing::HasSubstr( - "ExecutionError in Gandiva: " - "Unauthorized memory allocations " - "in function kernel"), + testing::HasSubstr("ExecutionError in Gandiva: " + "Unauthorized memory allocations " + "in function kernel"), func->Execute(args, options, &ctx)); } } // namespace compute From 817893e5dab59b061e91e134d9111c9e463cddf9 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 3 Feb 2022 11:32:14 +0100 Subject: [PATCH 08/13] Changed to Buffer* for performance --- cpp/src/arrow/compute/exec.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 199f8e700e3..3055d0c6289 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -113,10 +113,10 @@ int64_t ExecBatch::TotalBufferSize() const { } bool AddBuffersToSet(const ArrayData& array_data, - std::unordered_set>* seen_buffers) { + std::unordered_set* seen_buffers) { bool insertion_occured = false; for (const auto& buffer : array_data.buffers) { - insertion_occured = (buffer && seen_buffers->insert(buffer).second); + insertion_occured = (buffer && seen_buffers->insert(buffer.get()).second); } for (const auto& child : array_data.child_data) { insertion_occured |= AddBuffersToSet(*child, seen_buffers); @@ -128,12 +128,12 @@ bool AddBuffersToSet(const ArrayData& array_data, } bool AddBuffersToSet(const Array& array, - std::unordered_set>* seen_buffers) { + std::unordered_set* seen_buffers) { return AddBuffersToSet(*array.data(), seen_buffers); } bool AddBuffersToSet(const ChunkedArray& chunked_array, - std::unordered_set>* seen_buffers) { + std::unordered_set* seen_buffers) { bool insertion_occured = false; for (const auto& chunk : chunked_array.chunks()) { insertion_occured |= AddBuffersToSet(*chunk, seen_buffers); @@ -142,7 +142,7 @@ bool AddBuffersToSet(const ChunkedArray& chunked_array, } bool AddBuffersToSet(const RecordBatch& record_batch, - std::unordered_set>* seen_buffers) { + std::unordered_set* seen_buffers) { bool insertion_occured = false; for (const auto& column : record_batch.columns()) { insertion_occured |= AddBuffersToSet(*column, seen_buffers); @@ -151,7 +151,7 @@ bool AddBuffersToSet(const RecordBatch& record_batch, } bool AddBuffersToSet(const Table& table, - std::unordered_set>* seen_buffers) { + std::unordered_set* seen_buffers) { bool insertion_occured = false; for (const auto& column : table.columns()) { insertion_occured |= AddBuffersToSet(*column, seen_buffers); @@ -162,7 +162,7 @@ bool AddBuffersToSet(const Table& table, // Add all Buffers to a given set, return true if anything was actually added. // If all the buffers in the datum were already in the set, this will return false. bool AddBuffersToSet(Datum datum, - std::unordered_set>* seen_buffers) { + std::unordered_set* seen_buffers) { switch (datum.kind()) { case Datum::ARRAY: return AddBuffersToSet(*util::get>(datum.value), @@ -774,7 +774,7 @@ class ScalarExecutor : public KernelExecutorImpl { #ifndef NDEBUG // To check whether the kernel allocated new Buffers, // insert all the preallocated ones into a set - std::unordered_set> pre_buffers; + std::unordered_set pre_buffers; if (preallocate_contiguous_) { AddBuffersToSet(out, &pre_buffers); } @@ -976,7 +976,7 @@ class VectorExecutor : public KernelExecutorImpl { #ifndef NDEBUG // To check whether the kernel allocated new Buffers, // insert all the preallocated ones into a set - std::unordered_set> pre_buffers; + std::unordered_set pre_buffers; if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { AddBuffersToSet(out, &pre_buffers); } From cf720a0f59dd286bd338692aa54c758cae380861 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 3 Feb 2022 12:03:01 +0100 Subject: [PATCH 09/13] Fixed bug in AddBuffersToSet --- cpp/src/arrow/compute/exec.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 3055d0c6289..1657a83d2bc 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -116,7 +116,7 @@ bool AddBuffersToSet(const ArrayData& array_data, std::unordered_set* seen_buffers) { bool insertion_occured = false; for (const auto& buffer : array_data.buffers) { - insertion_occured = (buffer && seen_buffers->insert(buffer.get()).second); + insertion_occured |= (buffer && seen_buffers->insert(buffer.get()).second); } for (const auto& child : array_data.child_data) { insertion_occured |= AddBuffersToSet(*child, seen_buffers); From 58a964ca50aab844d1499aed6e7803601420b98e Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 3 Feb 2022 12:03:20 +0100 Subject: [PATCH 10/13] Changed Error type to "Invalid" --- cpp/src/arrow/compute/exec.cc | 4 ++-- cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc | 4 ++-- cpp/src/arrow/compute/kernels/vector_misbehave_test.cc | 5 +++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 1657a83d2bc..5212d22a1c1 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -796,7 +796,7 @@ class ScalarExecutor : public KernelExecutorImpl { // (instead of using the preallocated ones) bool insertion_occured = AddBuffersToSet(out, &pre_buffers); if (insertion_occured) { - return Status::ExecutionError( + return Status::Invalid( "Unauthorized memory allocations " "in function kernel"); } @@ -990,7 +990,7 @@ class VectorExecutor : public KernelExecutorImpl { // (instead of using the preallocated ones) bool insertion_occured = AddBuffersToSet(out, &pre_buffers); if (insertion_occured) { - return Status::ExecutionError( + return Status::Invalid( "Unauthorized memory allocations " "in function kernel"); } diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc index 982183a4369..ef5912d41c1 100644 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc @@ -64,8 +64,8 @@ TEST(Misbehave, MisbehavingScalarKernel) { Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])")); const std::vector& args = {datum}; const FunctionOptions* options = nullptr; - EXPECT_RAISES_WITH_MESSAGE_THAT(ExecutionError, - testing::HasSubstr("ExecutionError in Gandiva: " + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, + testing::HasSubstr("Invalid: " "Unauthorized memory allocations " "in function kernel"), func->Execute(args, options, &ctx)); diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc index 434db15d888..6a1707fd655 100644 --- a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc @@ -31,6 +31,7 @@ struct VectorMisbehaveExec { // allocate new buffers even though we've promised not to ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + printf("boom\n"); return Status::OK(); } }; @@ -67,8 +68,8 @@ TEST(Misbehave, MisbehavingVectorKernel) { Datum datum(ChunkedArray(ArrayVector{}, int32())); const std::vector& args = {datum}; const FunctionOptions* options = nullptr; - EXPECT_RAISES_WITH_MESSAGE_THAT(ExecutionError, - testing::HasSubstr("ExecutionError in Gandiva: " + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, + testing::HasSubstr("Invalid: " "Unauthorized memory allocations " "in function kernel"), func->Execute(args, options, &ctx)); From ca2e719a083c3d8d95706896c3a1e33408fc985c Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 3 Feb 2022 16:59:57 +0100 Subject: [PATCH 11/13] Separate checks for validity and other buffers being modified --- cpp/src/arrow/compute/exec.cc | 122 ++++++++++++++---- .../compute/kernels/scalar_misbehave_test.cc | 51 +++++--- .../compute/kernels/vector_misbehave_test.cc | 37 ++++-- 3 files changed, 157 insertions(+), 53 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 5212d22a1c1..4a0ab479639 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include @@ -112,11 +112,40 @@ int64_t ExecBatch::TotalBufferSize() const { return sum; } +struct BufferProperties { + uint64_t address; + int64_t capacity; + friend bool operator< (const BufferProperties &lhs, const BufferProperties &rhs) { + if (lhs.address == rhs.address) { + return (lhs.capacity > rhs.capacity); + } else { + return (lhs.address > rhs.address); + } + } +}; + +bool AddBuffersToSet(std::shared_ptr buffer, + std::set* seen_buffers) { + return (buffer && seen_buffers->insert( + BufferProperties{buffer->address(), buffer->capacity()}).second); +} + +bool AddBuffersToSet(std::vector> buffers, + std::set* seen_buffers) { + bool insertion_occured = false; + for (const auto& buffer : buffers) { + insertion_occured |= (buffer && seen_buffers->insert( + BufferProperties{buffer->address(), buffer->capacity()}).second); + } + return insertion_occured; +} + bool AddBuffersToSet(const ArrayData& array_data, - std::unordered_set* seen_buffers) { + std::set* seen_buffers) { bool insertion_occured = false; for (const auto& buffer : array_data.buffers) { - insertion_occured |= (buffer && seen_buffers->insert(buffer.get()).second); + insertion_occured |= (buffer && seen_buffers->insert( + BufferProperties{buffer->address(), buffer->capacity()}).second); } for (const auto& child : array_data.child_data) { insertion_occured |= AddBuffersToSet(*child, seen_buffers); @@ -128,12 +157,12 @@ bool AddBuffersToSet(const ArrayData& array_data, } bool AddBuffersToSet(const Array& array, - std::unordered_set* seen_buffers) { + std::set* seen_buffers) { return AddBuffersToSet(*array.data(), seen_buffers); } bool AddBuffersToSet(const ChunkedArray& chunked_array, - std::unordered_set* seen_buffers) { + std::set* seen_buffers) { bool insertion_occured = false; for (const auto& chunk : chunked_array.chunks()) { insertion_occured |= AddBuffersToSet(*chunk, seen_buffers); @@ -142,7 +171,7 @@ bool AddBuffersToSet(const ChunkedArray& chunked_array, } bool AddBuffersToSet(const RecordBatch& record_batch, - std::unordered_set* seen_buffers) { + std::set* seen_buffers) { bool insertion_occured = false; for (const auto& column : record_batch.columns()) { insertion_occured |= AddBuffersToSet(*column, seen_buffers); @@ -151,7 +180,7 @@ bool AddBuffersToSet(const RecordBatch& record_batch, } bool AddBuffersToSet(const Table& table, - std::unordered_set* seen_buffers) { + std::set* seen_buffers) { bool insertion_occured = false; for (const auto& column : table.columns()) { insertion_occured |= AddBuffersToSet(*column, seen_buffers); @@ -162,7 +191,7 @@ bool AddBuffersToSet(const Table& table, // Add all Buffers to a given set, return true if anything was actually added. // If all the buffers in the datum were already in the set, this will return false. bool AddBuffersToSet(Datum datum, - std::unordered_set* seen_buffers) { + std::set* seen_buffers) { switch (datum.kind()) { case Datum::ARRAY: return AddBuffersToSet(*util::get>(datum.value), @@ -772,11 +801,20 @@ class ScalarExecutor : public KernelExecutorImpl { } #ifndef NDEBUG + // To check whether the kernel allocated new Buffers, // insert all the preallocated ones into a set - std::unordered_set pre_buffers; - if (preallocate_contiguous_) { - AddBuffersToSet(out, &pre_buffers); + BufferProperties validity_buffer; + if (validity_preallocated_) { + validity_buffer = {out.array()->buffers[0]->address(), + out.array()->buffers[0]->capacity()}; + } + std::set pre_buffers; + for (size_t i = 0; i < data_preallocated_.size(); ++i) { + const auto &prealloc = data_preallocated_[i]; + if (prealloc.bit_width >= 0) { + AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers); + } } #endif // NDEBUG @@ -794,11 +832,23 @@ class ScalarExecutor : public KernelExecutorImpl { #ifndef NDEBUG // Check whether the kernel allocated new Buffers // (instead of using the preallocated ones) - bool insertion_occured = AddBuffersToSet(out, &pre_buffers); - if (insertion_occured) { - return Status::Invalid( - "Unauthorized memory allocations " - "in function kernel"); + if (validity_preallocated_) { + if (validity_buffer.address != out.array()->buffers[0]->address() || + validity_buffer.capacity != out.array()->buffers[0]->capacity()) { + return Status::Invalid( + "Pre-allocated validity buffer was modified " + "in function kernel"); + } + } + for (size_t i = 0; i < data_preallocated_.size(); ++i) { + const auto &prealloc = data_preallocated_[i]; + if (prealloc.bit_width >= 0) { + if (AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers)) { + return Status::Invalid( + "Unauthorized memory allocations " + "in function kernel"); + } + } } #endif // NDEBUG } else { @@ -976,23 +1026,43 @@ class VectorExecutor : public KernelExecutorImpl { #ifndef NDEBUG // To check whether the kernel allocated new Buffers, // insert all the preallocated ones into a set - std::unordered_set pre_buffers; - if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { - AddBuffersToSet(out, &pre_buffers); + // To check whether the kernel allocated new Buffers, + // insert all the preallocated ones into a set + BufferProperties validity_buffer; + if (validity_preallocated_) { + validity_buffer = {out.array()->buffers[0]->address(), + out.array()->buffers[0]->capacity()}; + } + std::set pre_buffers; + for (size_t i = 0; i < data_preallocated_.size(); ++i) { + const auto &prealloc = data_preallocated_[i]; + if (prealloc.bit_width >= 0) { + AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers); + } } #endif // NDEBUG RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); #ifndef NDEBUG - if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { - // Check whether the kernel allocated new Buffers - // (instead of using the preallocated ones) - bool insertion_occured = AddBuffersToSet(out, &pre_buffers); - if (insertion_occured) { + // Check whether the kernel allocated new Buffers + // (instead of using the preallocated ones) + if (validity_preallocated_) { + if (validity_buffer.address != out.array()->buffers[0]->address() || + validity_buffer.capacity != out.array()->buffers[0]->capacity()) { return Status::Invalid( - "Unauthorized memory allocations " - "in function kernel"); + "Pre-allocated validity buffer was modified " + "in function kernel"); + } + } + for (size_t i = 0; i < data_preallocated_.size(); ++i) { + const auto &prealloc = data_preallocated_[i]; + if (prealloc.bit_width >= 0) { + if (AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers)) { + return Status::Invalid( + "Unauthorized memory allocations " + "in function kernel"); + } } } #endif // NDEBUG diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc index ef5912d41c1..8ce28fd3a96 100644 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc @@ -25,23 +25,22 @@ namespace arrow { namespace compute { -struct ScalarMisbehaveExec { +struct ScalarReAllocValidBufExec { static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // allocate a buffer even though we've promised not to - ARROW_ASSIGN_OR_RAISE(auto buffer, ctx->Allocate(64)); - ARROW_ASSIGN_OR_RAISE(auto nullbitmap, ctx->AllocateBitmap(8)); - BufferVector buffers{nullbitmap, buffer}; - auto array = std::make_shared(int64(), 8, buffers); - *out = Datum(array); + // allocate a validity buffer even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); return Status::OK(); } }; -void AddScalarMisbehaveKernels(const std::shared_ptr& scalar_function) { - DCHECK_OK(scalar_function->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, - OutputType(ValueDescr(fixed_size_binary(2))), - ScalarMisbehaveExec::Exec)); -} +struct ScalarReAllocDataBufExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate a validity buffer even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + return Status::OK(); + } +}; + const FunctionDoc misbehave_doc{ "Test kernel that does nothing but allocate memory " @@ -51,16 +50,31 @@ const FunctionDoc misbehave_doc{ "(because of MemAllocation::PREALLOCATE).", {}}; -std::shared_ptr CreateScalarMisbehaveFunction() { + +TEST(Misbehave, ReallocValidBufferScalarKernel) { + ExecContext ctx; auto func = std::make_shared("scalar_misbehave", Arity::Unary(), &misbehave_doc); - AddScalarMisbehaveKernels(func); - return func; + DCHECK_OK(func->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, + OutputType(ValueDescr(fixed_size_binary(2))), + ScalarReAllocValidBufExec::Exec)); + Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])")); + const std::vector& args = {datum}; + const FunctionOptions* options = nullptr; + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, + testing::HasSubstr("Invalid: " + "Pre-allocated validity buffer was modified " + "in function kernel"), + func->Execute(args, options, &ctx)); } -TEST(Misbehave, MisbehavingScalarKernel) { +TEST(Misbehave, ReallocDataBufferScalarKernel) { ExecContext ctx; - auto func = CreateScalarMisbehaveFunction(); + auto func = std::make_shared("scalar_misbehave", Arity::Unary(), + &misbehave_doc); + DCHECK_OK(func->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, + OutputType(ValueDescr(fixed_size_binary(2))), + ScalarReAllocDataBufExec::Exec)); Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])")); const std::vector& args = {datum}; const FunctionOptions* options = nullptr; @@ -70,5 +84,8 @@ TEST(Misbehave, MisbehavingScalarKernel) { "in function kernel"), func->Execute(args, options, &ctx)); } + +//TODO: add tests for only pre-allocating validity bitmap, +//batched pre-allocation vs allocate_contiguous, } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc index 6a1707fd655..a2b1f4be8c6 100644 --- a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc @@ -26,18 +26,25 @@ namespace arrow { namespace compute { -struct VectorMisbehaveExec { +struct VectorReAllocValidBufExec { static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { // allocate new buffers even though we've promised not to ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8)); - ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); - printf("boom\n"); return Status::OK(); } }; -void AddVectorMisbehaveKernels(const std::shared_ptr& Vector_function) { - VectorKernel kernel({int32()}, int32(), VectorMisbehaveExec::Exec); +struct VectorReAllocDataBufExec { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate new buffers even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + return Status::OK(); + } +}; + +void AddVectorMisbehaveKernel(const std::shared_ptr& Vector_function, + Status (*kernel_exec)(KernelContext*, const ExecBatch&, Datum*)) { + VectorKernel kernel({int32()}, int32(),kernel_exec); kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; kernel.mem_allocation = MemAllocation::PREALLOCATE; kernel.can_write_into_slices = true; @@ -55,16 +62,26 @@ const FunctionDoc misbehave_doc{ "(because of MemAllocation::PREALLOCATE).", {}}; -std::shared_ptr CreateVectorMisbehaveFunction() { +TEST(Misbehave, ReallocValidBufferVectorKernel) { + ExecContext ctx; auto func = std::make_shared("vector_misbehave", Arity::Unary(), &misbehave_doc); - AddVectorMisbehaveKernels(func); - return func; + AddVectorMisbehaveKernel(func, VectorReAllocValidBufExec::Exec); + Datum datum(ChunkedArray(ArrayVector{}, int32())); + const std::vector& args = {datum}; + const FunctionOptions* options = nullptr; + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, + testing::HasSubstr("Invalid: " + "Pre-allocated validity buffer was modified " + "in function kernel"), + func->Execute(args, options, &ctx)); } -TEST(Misbehave, MisbehavingVectorKernel) { +TEST(Misbehave, ReallocDataBufferVectorKernel) { ExecContext ctx; - auto func = CreateVectorMisbehaveFunction(); + auto func = std::make_shared("vector_misbehave", Arity::Unary(), + &misbehave_doc); + AddVectorMisbehaveKernel(func, VectorReAllocDataBufExec::Exec); Datum datum(ChunkedArray(ArrayVector{}, int32())); const std::vector& args = {datum}; const FunctionOptions* options = nullptr; From 64205600454a94968356ad1875354c286d7b78f9 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 4 Feb 2022 14:24:18 +0100 Subject: [PATCH 12/13] Added check for changed Datum type --- cpp/src/arrow/compute/exec.cc | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 4a0ab479639..3a869130844 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -833,11 +833,13 @@ class ScalarExecutor : public KernelExecutorImpl { // Check whether the kernel allocated new Buffers // (instead of using the preallocated ones) if (validity_preallocated_) { - if (validity_buffer.address != out.array()->buffers[0]->address() || - validity_buffer.capacity != out.array()->buffers[0]->capacity()) { - return Status::Invalid( - "Pre-allocated validity buffer was modified " - "in function kernel"); + if (out.array()->buffers[0]) { // it is possible the validity buffer was deleted + if (validity_buffer.address != out.array()->buffers[0]->address() || + validity_buffer.capacity != out.array()->buffers[0]->capacity()) { + return Status::Invalid( + "Pre-allocated validity buffer was modified " + "in function kernel"); + } } } for (size_t i = 0; i < data_preallocated_.size(); ++i) { @@ -1028,6 +1030,7 @@ class VectorExecutor : public KernelExecutorImpl { // insert all the preallocated ones into a set // To check whether the kernel allocated new Buffers, // insert all the preallocated ones into a set + auto pre_kind = out.kind(); BufferProperties validity_buffer; if (validity_preallocated_) { validity_buffer = {out.array()->buffers[0]->address(), @@ -1048,16 +1051,23 @@ class VectorExecutor : public KernelExecutorImpl { // Check whether the kernel allocated new Buffers // (instead of using the preallocated ones) if (validity_preallocated_) { - if (validity_buffer.address != out.array()->buffers[0]->address() || - validity_buffer.capacity != out.array()->buffers[0]->capacity()) { - return Status::Invalid( - "Pre-allocated validity buffer was modified " - "in function kernel"); + if (out.is_array() && out.array()->buffers[0]) { // it is possible the validity buffer was deleted + if (validity_buffer.address != out.array()->buffers[0]->address() || + validity_buffer.capacity != out.array()->buffers[0]->capacity()) { + return Status::Invalid( + "Pre-allocated validity buffer was modified " + "in function kernel"); + } } } for (size_t i = 0; i < data_preallocated_.size(); ++i) { const auto &prealloc = data_preallocated_[i]; if (prealloc.bit_width >= 0) { + if (pre_kind != out.kind()) { + return Status::Invalid( + "Pre-allocated out Datum was changed into another type " + "in function kernel"); + } if (AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers)) { return Status::Invalid( "Unauthorized memory allocations " From c3901e19cfcd39b9994f1fe4aa67b9d6b60e76ba Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 4 Feb 2022 14:52:21 +0100 Subject: [PATCH 13/13] Code formatting --- cpp/src/arrow/compute/exec.cc | 81 ++++++++++--------- .../compute/kernels/scalar_misbehave_test.cc | 29 +++---- .../compute/kernels/vector_misbehave_test.cc | 26 +++--- 3 files changed, 69 insertions(+), 67 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 3a869130844..7d8aea7bfb2 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -21,8 +21,8 @@ #include #include #include -#include #include +#include #include #include @@ -113,29 +113,32 @@ int64_t ExecBatch::TotalBufferSize() const { } struct BufferProperties { - uint64_t address; - int64_t capacity; - friend bool operator< (const BufferProperties &lhs, const BufferProperties &rhs) { - if (lhs.address == rhs.address) { - return (lhs.capacity > rhs.capacity); - } else { - return (lhs.address > rhs.address); - } + uint64_t address; + int64_t capacity; + friend bool operator<(const BufferProperties& lhs, const BufferProperties& rhs) { + if (lhs.address == rhs.address) { + return (lhs.capacity > rhs.capacity); + } else { + return (lhs.address > rhs.address); } + } }; -bool AddBuffersToSet(std::shared_ptr buffer, +bool AddBuffersToSet(std::shared_ptr const& buffer, std::set* seen_buffers) { - return (buffer && seen_buffers->insert( - BufferProperties{buffer->address(), buffer->capacity()}).second); + return (buffer && + seen_buffers->insert(BufferProperties{buffer->address(), buffer->capacity()}) + .second); } -bool AddBuffersToSet(std::vector> buffers, +bool AddBuffersToSet(std::vector> const& buffers, std::set* seen_buffers) { bool insertion_occured = false; for (const auto& buffer : buffers) { - insertion_occured |= (buffer && seen_buffers->insert( - BufferProperties{buffer->address(), buffer->capacity()}).second); + insertion_occured |= + (buffer && + seen_buffers->insert(BufferProperties{buffer->address(), buffer->capacity()}) + .second); } return insertion_occured; } @@ -144,8 +147,10 @@ bool AddBuffersToSet(const ArrayData& array_data, std::set* seen_buffers) { bool insertion_occured = false; for (const auto& buffer : array_data.buffers) { - insertion_occured |= (buffer && seen_buffers->insert( - BufferProperties{buffer->address(), buffer->capacity()}).second); + insertion_occured |= + (buffer && + seen_buffers->insert(BufferProperties{buffer->address(), buffer->capacity()}) + .second); } for (const auto& child : array_data.child_data) { insertion_occured |= AddBuffersToSet(*child, seen_buffers); @@ -156,8 +161,7 @@ bool AddBuffersToSet(const ArrayData& array_data, return insertion_occured; } -bool AddBuffersToSet(const Array& array, - std::set* seen_buffers) { +bool AddBuffersToSet(const Array& array, std::set* seen_buffers) { return AddBuffersToSet(*array.data(), seen_buffers); } @@ -179,8 +183,7 @@ bool AddBuffersToSet(const RecordBatch& record_batch, return insertion_occured; } -bool AddBuffersToSet(const Table& table, - std::set* seen_buffers) { +bool AddBuffersToSet(const Table& table, std::set* seen_buffers) { bool insertion_occured = false; for (const auto& column : table.columns()) { insertion_occured |= AddBuffersToSet(*column, seen_buffers); @@ -190,8 +193,7 @@ bool AddBuffersToSet(const Table& table, // Add all Buffers to a given set, return true if anything was actually added. // If all the buffers in the datum were already in the set, this will return false. -bool AddBuffersToSet(Datum datum, - std::set* seen_buffers) { +bool AddBuffersToSet(Datum datum, std::set* seen_buffers) { switch (datum.kind()) { case Datum::ARRAY: return AddBuffersToSet(*util::get>(datum.value), @@ -811,7 +813,7 @@ class ScalarExecutor : public KernelExecutorImpl { } std::set pre_buffers; for (size_t i = 0; i < data_preallocated_.size(); ++i) { - const auto &prealloc = data_preallocated_[i]; + const auto& prealloc = data_preallocated_[i]; if (prealloc.bit_width >= 0) { AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers); } @@ -833,22 +835,22 @@ class ScalarExecutor : public KernelExecutorImpl { // Check whether the kernel allocated new Buffers // (instead of using the preallocated ones) if (validity_preallocated_) { - if (out.array()->buffers[0]) { // it is possible the validity buffer was deleted + if (out.array()->buffers[0]) { // it is possible the validity buffer was deleted if (validity_buffer.address != out.array()->buffers[0]->address() || validity_buffer.capacity != out.array()->buffers[0]->capacity()) { return Status::Invalid( - "Pre-allocated validity buffer was modified " - "in function kernel"); + "Pre-allocated validity buffer was modified " + "in function kernel"); } } } for (size_t i = 0; i < data_preallocated_.size(); ++i) { - const auto &prealloc = data_preallocated_[i]; + const auto& prealloc = data_preallocated_[i]; if (prealloc.bit_width >= 0) { if (AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers)) { return Status::Invalid( - "Unauthorized memory allocations " - "in function kernel"); + "Unauthorized memory allocations " + "in function kernel"); } } } @@ -1038,7 +1040,7 @@ class VectorExecutor : public KernelExecutorImpl { } std::set pre_buffers; for (size_t i = 0; i < data_preallocated_.size(); ++i) { - const auto &prealloc = data_preallocated_[i]; + const auto& prealloc = data_preallocated_[i]; if (prealloc.bit_width >= 0) { AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers); } @@ -1051,27 +1053,28 @@ class VectorExecutor : public KernelExecutorImpl { // Check whether the kernel allocated new Buffers // (instead of using the preallocated ones) if (validity_preallocated_) { - if (out.is_array() && out.array()->buffers[0]) { // it is possible the validity buffer was deleted + // it is possible the validity buffer was deleted + if (out.is_array() && out.array()->buffers[0]) { if (validity_buffer.address != out.array()->buffers[0]->address() || validity_buffer.capacity != out.array()->buffers[0]->capacity()) { return Status::Invalid( - "Pre-allocated validity buffer was modified " - "in function kernel"); + "Pre-allocated validity buffer was modified " + "in function kernel"); } } } for (size_t i = 0; i < data_preallocated_.size(); ++i) { - const auto &prealloc = data_preallocated_[i]; + const auto& prealloc = data_preallocated_[i]; if (prealloc.bit_width >= 0) { if (pre_kind != out.kind()) { return Status::Invalid( - "Pre-allocated out Datum was changed into another type " - "in function kernel"); + "Pre-allocated out Datum was changed into another type " + "in function kernel"); } if (AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers)) { return Status::Invalid( - "Unauthorized memory allocations " - "in function kernel"); + "Unauthorized memory allocations " + "in function kernel"); } } } diff --git a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc index 8ce28fd3a96..8bb1c6b428c 100644 --- a/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc @@ -34,14 +34,13 @@ struct ScalarReAllocValidBufExec { }; struct ScalarReAllocDataBufExec { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // allocate a validity buffer even though we've promised not to - ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); - return Status::OK(); - } + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate a validity buffer even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + return Status::OK(); + } }; - const FunctionDoc misbehave_doc{ "Test kernel that does nothing but allocate memory " "while it shouldn't", @@ -50,22 +49,22 @@ const FunctionDoc misbehave_doc{ "(because of MemAllocation::PREALLOCATE).", {}}; - TEST(Misbehave, ReallocValidBufferScalarKernel) { ExecContext ctx; auto func = std::make_shared("scalar_misbehave", Arity::Unary(), &misbehave_doc); DCHECK_OK(func->AddKernel({InputType(Type::FIXED_SIZE_BINARY)}, - OutputType(ValueDescr(fixed_size_binary(2))), - ScalarReAllocValidBufExec::Exec)); + OutputType(ValueDescr(fixed_size_binary(2))), + ScalarReAllocValidBufExec::Exec)); Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])")); const std::vector& args = {datum}; const FunctionOptions* options = nullptr; - EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, - testing::HasSubstr("Invalid: " - "Pre-allocated validity buffer was modified " - "in function kernel"), - func->Execute(args, options, &ctx)); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + testing::HasSubstr("Invalid: " + "Pre-allocated validity buffer was modified " + "in function kernel"), + func->Execute(args, options, &ctx)); } TEST(Misbehave, ReallocDataBufferScalarKernel) { @@ -85,7 +84,5 @@ TEST(Misbehave, ReallocDataBufferScalarKernel) { func->Execute(args, options, &ctx)); } -//TODO: add tests for only pre-allocating validity bitmap, -//batched pre-allocation vs allocate_contiguous, } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc index a2b1f4be8c6..ff7839af41c 100644 --- a/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_misbehave_test.cc @@ -35,16 +35,17 @@ struct VectorReAllocValidBufExec { }; struct VectorReAllocDataBufExec { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // allocate new buffers even though we've promised not to - ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); - return Status::OK(); - } + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // allocate new buffers even though we've promised not to + ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64)); + return Status::OK(); + } }; void AddVectorMisbehaveKernel(const std::shared_ptr& Vector_function, - Status (*kernel_exec)(KernelContext*, const ExecBatch&, Datum*)) { - VectorKernel kernel({int32()}, int32(),kernel_exec); + Status (*kernel_exec)(KernelContext*, const ExecBatch&, + Datum*)) { + VectorKernel kernel({int32()}, int32(), kernel_exec); kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; kernel.mem_allocation = MemAllocation::PREALLOCATE; kernel.can_write_into_slices = true; @@ -70,11 +71,12 @@ TEST(Misbehave, ReallocValidBufferVectorKernel) { Datum datum(ChunkedArray(ArrayVector{}, int32())); const std::vector& args = {datum}; const FunctionOptions* options = nullptr; - EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, - testing::HasSubstr("Invalid: " - "Pre-allocated validity buffer was modified " - "in function kernel"), - func->Execute(args, options, &ctx)); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + testing::HasSubstr("Invalid: " + "Pre-allocated validity buffer was modified " + "in function kernel"), + func->Execute(args, options, &ctx)); } TEST(Misbehave, ReallocDataBufferVectorKernel) {