Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <set>
#include <sstream>
#include <utility>
#include <vector>
Expand All @@ -40,6 +41,7 @@
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
Expand Down Expand Up @@ -110,6 +112,109 @@ int64_t ExecBatch::TotalBufferSize() const {
return sum;
}

struct BufferProperties {
uint64_t address;
int64_t capacity;
friend bool operator<(const BufferProperties& lhs, const BufferProperties& rhs) {
if (lhs.address == rhs.address) {
return (lhs.capacity > rhs.capacity);
} else {
return (lhs.address > rhs.address);
}
}
};

bool AddBuffersToSet(std::shared_ptr<Buffer> const& buffer,
std::set<BufferProperties>* seen_buffers) {
return (buffer &&
seen_buffers->insert(BufferProperties{buffer->address(), buffer->capacity()})
.second);
}

bool AddBuffersToSet(std::vector<std::shared_ptr<Buffer>> const& buffers,
std::set<BufferProperties>* seen_buffers) {
bool insertion_occured = false;
for (const auto& buffer : buffers) {
insertion_occured |=
(buffer &&
seen_buffers->insert(BufferProperties{buffer->address(), buffer->capacity()})
.second);
}
return insertion_occured;
}

bool AddBuffersToSet(const ArrayData& array_data,
std::set<BufferProperties>* seen_buffers) {
bool insertion_occured = false;
for (const auto& buffer : array_data.buffers) {
insertion_occured |=
(buffer &&
seen_buffers->insert(BufferProperties{buffer->address(), buffer->capacity()})
.second);
}
for (const auto& child : array_data.child_data) {
insertion_occured |= AddBuffersToSet(*child, seen_buffers);
}
if (array_data.dictionary) {
insertion_occured |= AddBuffersToSet(*array_data.dictionary, seen_buffers);
}
return insertion_occured;
}

bool AddBuffersToSet(const Array& array, std::set<BufferProperties>* seen_buffers) {
return AddBuffersToSet(*array.data(), seen_buffers);
}

bool AddBuffersToSet(const ChunkedArray& chunked_array,
std::set<BufferProperties>* seen_buffers) {
bool insertion_occured = false;
for (const auto& chunk : chunked_array.chunks()) {
insertion_occured |= AddBuffersToSet(*chunk, seen_buffers);
}
return insertion_occured;
}

bool AddBuffersToSet(const RecordBatch& record_batch,
std::set<BufferProperties>* seen_buffers) {
bool insertion_occured = false;
for (const auto& column : record_batch.columns()) {
insertion_occured |= AddBuffersToSet(*column, seen_buffers);
}
return insertion_occured;
}

bool AddBuffersToSet(const Table& table, std::set<BufferProperties>* seen_buffers) {
bool insertion_occured = false;
for (const auto& column : table.columns()) {
insertion_occured |= AddBuffersToSet(*column, seen_buffers);
}
return insertion_occured;
}

// Add all Buffers to a given set, return true if anything was actually added.
// If all the buffers in the datum were already in the set, this will return false.
bool AddBuffersToSet(Datum datum, std::set<BufferProperties>* seen_buffers) {
switch (datum.kind()) {
case Datum::ARRAY:
return AddBuffersToSet(*util::get<std::shared_ptr<ArrayData>>(datum.value),
seen_buffers);
case Datum::CHUNKED_ARRAY:
return AddBuffersToSet(*util::get<std::shared_ptr<ChunkedArray>>(datum.value),
seen_buffers);
case Datum::RECORD_BATCH:
return AddBuffersToSet(*util::get<std::shared_ptr<RecordBatch>>(datum.value),
seen_buffers);
case Datum::TABLE:
return AddBuffersToSet(*util::get<std::shared_ptr<Table>>(datum.value),
seen_buffers);
Copy link
Contributor

@edponce edponce Jan 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious if hashing all the values-buffers pointers would suffice instead of storing them in a set. We would still need to traverse nested data structures to capture all the child buffers. This could be solved using a Visitor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hashing would be nicer, but it would also trigger the error if a kernel deletes a buffer it doesn't need anymore. And I don't think we care about that, just newly allocated ones. What do you think?

case Datum::SCALAR:
return false;
default:
DCHECK(false);
return false;
}
}

std::string ExecBatch::ToString() const {
std::stringstream ss;
PrintTo(*this, &ss);
Expand Down Expand Up @@ -697,7 +802,26 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
}
}

#ifndef NDEBUG

// To check whether the kernel allocated new Buffers,
// insert all the preallocated ones into a set
BufferProperties validity_buffer;
if (validity_preallocated_) {
validity_buffer = {out.array()->buffers[0]->address(),
out.array()->buffers[0]->capacity()};
}
std::set<BufferProperties> pre_buffers;
for (size_t i = 0; i < data_preallocated_.size(); ++i) {
const auto& prealloc = data_preallocated_[i];
if (prealloc.bit_width >= 0) {
AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers);
}
}
#endif // NDEBUG

RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));

if (preallocate_contiguous_) {
// Some kernels may like to simply nullify the validity bitmap when
// they know the output will have 0 nulls. However, this is not compatible
Expand All @@ -706,6 +830,31 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
DCHECK(out.array()->buffers[0])
<< "Null bitmap deleted by kernel but can_write_into_slices = true";
}

#ifndef NDEBUG
// Check whether the kernel allocated new Buffers
// (instead of using the preallocated ones)
if (validity_preallocated_) {
if (out.array()->buffers[0]) { // it is possible the validity buffer was deleted
if (validity_buffer.address != out.array()->buffers[0]->address() ||
validity_buffer.capacity != out.array()->buffers[0]->capacity()) {
return Status::Invalid(
"Pre-allocated validity buffer was modified "
"in function kernel");
}
}
}
for (size_t i = 0; i < data_preallocated_.size(); ++i) {
const auto& prealloc = data_preallocated_[i];
if (prealloc.bit_width >= 0) {
if (AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers)) {
return Status::Invalid(
"Unauthorized memory allocations "
"in function kernel");
}
}
}
#endif // NDEBUG
} else {
// If we are producing chunked output rather than one big array, then
// emit each chunk as soon as it's available
Expand Down Expand Up @@ -877,7 +1026,60 @@ class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
output_descr_.shape == ValueDescr::ARRAY) {
RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array()));
}

#ifndef NDEBUG
// To check whether the kernel allocated new Buffers,
// insert all the preallocated ones into a set
// To check whether the kernel allocated new Buffers,
// insert all the preallocated ones into a set
auto pre_kind = out.kind();
BufferProperties validity_buffer;
if (validity_preallocated_) {
validity_buffer = {out.array()->buffers[0]->address(),
out.array()->buffers[0]->capacity()};
}
std::set<BufferProperties> pre_buffers;
for (size_t i = 0; i < data_preallocated_.size(); ++i) {
const auto& prealloc = data_preallocated_[i];
if (prealloc.bit_width >= 0) {
AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers);
}
}
#endif // NDEBUG

RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));

#ifndef NDEBUG
// Check whether the kernel allocated new Buffers
// (instead of using the preallocated ones)
if (validity_preallocated_) {
// it is possible the validity buffer was deleted
if (out.is_array() && out.array()->buffers[0]) {
if (validity_buffer.address != out.array()->buffers[0]->address() ||
validity_buffer.capacity != out.array()->buffers[0]->capacity()) {
return Status::Invalid(
"Pre-allocated validity buffer was modified "
"in function kernel");
}
}
}
for (size_t i = 0; i < data_preallocated_.size(); ++i) {
const auto& prealloc = data_preallocated_[i];
if (prealloc.bit_width >= 0) {
if (pre_kind != out.kind()) {
return Status::Invalid(
"Pre-allocated out Datum was changed into another type "
"in function kernel");
}
if (AddBuffersToSet(out.array()->buffers[i + 1], &pre_buffers)) {
return Status::Invalid(
"Unauthorized memory allocations "
"in function kernel");
}
}
}
#endif // NDEBUG

if (!kernel_->finalize) {
// If there is no result finalizer (e.g. for hash-based functions, we can
// emit the processed batch right away rather than waiting
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/kernels/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ add_arrow_compute_test(scalar_test
scalar_cast_test.cc
scalar_compare_test.cc
scalar_if_else_test.cc
scalar_misbehave_test.cc
scalar_nested_test.cc
scalar_random_test.cc
scalar_set_lookup_test.cc
Expand All @@ -48,6 +49,7 @@ add_arrow_benchmark(scalar_string_benchmark PREFIX "arrow-compute")
add_arrow_compute_test(vector_test
SOURCES
vector_hash_test.cc
vector_misbehave_test.cc
vector_nested_test.cc
vector_replace_test.cc
vector_selection_test.cc
Expand Down
88 changes: 88 additions & 0 deletions cpp/src/arrow/compute/kernels/scalar_misbehave_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>
#include "arrow/array/concatenate.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/logging.h"

namespace arrow {
namespace compute {

struct ScalarReAllocValidBufExec {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// allocate a validity buffer even though we've promised not to
ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[0], ctx->AllocateBitmap(8));
return Status::OK();
}
};

struct ScalarReAllocDataBufExec {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// allocate a validity buffer even though we've promised not to
ARROW_ASSIGN_OR_RAISE(out->mutable_array()->buffers[1], ctx->Allocate(64));
return Status::OK();
}
};

const FunctionDoc misbehave_doc{
"Test kernel that does nothing but allocate memory "
"while it shouldn't",
"This Kernel only exists for testing purposes.\n"
"It allocates memory while it promised not to \n"
"(because of MemAllocation::PREALLOCATE).",
{}};

TEST(Misbehave, ReallocValidBufferScalarKernel) {
ExecContext ctx;
auto func = std::make_shared<ScalarFunction>("scalar_misbehave", Arity::Unary(),
&misbehave_doc);
DCHECK_OK(func->AddKernel({InputType(Type::FIXED_SIZE_BINARY)},
OutputType(ValueDescr(fixed_size_binary(2))),
ScalarReAllocValidBufExec::Exec));
Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])"));
const std::vector<Datum>& args = {datum};
const FunctionOptions* options = nullptr;
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid,
testing::HasSubstr("Invalid: "
"Pre-allocated validity buffer was modified "
"in function kernel"),
func->Execute(args, options, &ctx));
}

TEST(Misbehave, ReallocDataBufferScalarKernel) {
ExecContext ctx;
auto func = std::make_shared<ScalarFunction>("scalar_misbehave", Arity::Unary(),
&misbehave_doc);
DCHECK_OK(func->AddKernel({InputType(Type::FIXED_SIZE_BINARY)},
OutputType(ValueDescr(fixed_size_binary(2))),
ScalarReAllocDataBufExec::Exec));
Datum datum(ArrayFromJSON(fixed_size_binary(6), R"(["123456"])"));
const std::vector<Datum>& args = {datum};
const FunctionOptions* options = nullptr;
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
testing::HasSubstr("Invalid: "
"Unauthorized memory allocations "
"in function kernel"),
func->Execute(args, options, &ctx));
}

} // namespace compute
} // namespace arrow
Loading