diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt index 8ee87047a3d..97fbd17f07d 100644 --- a/cpp/src/arrow/compute/CMakeLists.txt +++ b/cpp/src/arrow/compute/CMakeLists.txt @@ -64,6 +64,5 @@ add_arrow_compute_test(internals_test exec_test.cc kernel_test.cc registry_test.cc) -add_arrow_compute_test(exec_test) add_subdirectory(kernels) diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc index 0a41e4c11f0..358a5ad0cee 100644 --- a/cpp/src/arrow/compute/api_aggregate.cc +++ b/cpp/src/arrow/compute/api_aggregate.cc @@ -26,19 +26,19 @@ namespace compute { // Scalar aggregates Result Count(const Datum& value, CountOptions options, ExecContext* ctx) { - return CallFunction(ctx, "count", {value}, &options); + return CallFunction("count", {value}, &options, ctx); } Result Mean(const Datum& value, ExecContext* ctx) { - return CallFunction(ctx, "mean", {value}); + return CallFunction("mean", {value}, ctx); } Result Sum(const Datum& value, ExecContext* ctx) { - return CallFunction(ctx, "sum", {value}); + return CallFunction("sum", {value}, ctx); } Result MinMax(const Datum& value, const MinMaxOptions& options, ExecContext* ctx) { - return CallFunction(ctx, "minmax", {value}, &options); + return CallFunction("minmax", {value}, &options, ctx); } } // namespace compute diff --git a/cpp/src/arrow/compute/api_scalar.cc b/cpp/src/arrow/compute/api_scalar.cc index 07064395b68..d042d082363 100644 --- a/cpp/src/arrow/compute/api_scalar.cc +++ b/cpp/src/arrow/compute/api_scalar.cc @@ -28,12 +28,12 @@ namespace compute { #define SCALAR_EAGER_UNARY(NAME, REGISTRY_NAME) \ Result NAME(const Datum& value, ExecContext* ctx) { \ - return CallFunction(ctx, REGISTRY_NAME, {value}); \ + return CallFunction(REGISTRY_NAME, {value}, ctx); \ } #define SCALAR_EAGER_BINARY(NAME, REGISTRY_NAME) \ Result NAME(const Datum& left, const Datum& right, ExecContext* ctx) { \ - return CallFunction(ctx, REGISTRY_NAME, {left, right}); \ + return CallFunction(REGISTRY_NAME, {left, right}, ctx); \ } // ---------------------------------------------------------------------- @@ -58,7 +58,7 @@ static Result ExecSetLookup(const std::string& func_name, const Datum& da return Status::Invalid(ss.str()); } SetLookupOptions options(value_set, !add_nulls_to_hash_table); - return CallFunction(ctx, func_name, {data}, &options); + return CallFunction(func_name, {data}, &options, ctx); } Result IsIn(const Datum& values, const Datum& value_set, ExecContext* ctx) { @@ -106,7 +106,7 @@ Result Compare(const Datum& left, const Datum& right, CompareOptions opti func_name = "less_equal"; break; } - return CallFunction(ctx, func_name, {left, right}, &options); + return CallFunction(func_name, {left, right}, &options, ctx); } } // namespace compute diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index 6b28b02fa21..f8c09f9c381 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -37,27 +37,27 @@ Result> NthToIndices(const Array& values, int64_t n, ExecContext* ctx) { PartitionOptions options(/*pivot=*/n); ARROW_ASSIGN_OR_RAISE( - Datum result, CallFunction(ctx, "partition_indices", {Datum(values)}, &options)); + Datum result, CallFunction("partition_indices", {Datum(values)}, &options, ctx)); return result.make_array(); } Result> SortToIndices(const Array& values, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction(ctx, "sort_indices", {Datum(values)})); + ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction("sort_indices", {Datum(values)}, ctx)); return result.make_array(); } Result Take(const Datum& values, const Datum& indices, const TakeOptions& options, ExecContext* ctx) { - return CallFunction(ctx, "take", {values, indices}, &options); + return CallFunction("take", {values, indices}, &options, ctx); } Result> Unique(const Datum& value, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction(ctx, "unique", {value})); + ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction("unique", {value}, ctx)); return result.make_array(); } Result DictionaryEncode(const Datum& value, ExecContext* ctx) { - return CallFunction(ctx, "dictionary_encode", {value}); + return CallFunction("dictionary_encode", {value}, ctx); } const char kValuesFieldName[] = "values"; @@ -66,7 +66,7 @@ const int32_t kValuesFieldIndex = 0; const int32_t kCountsFieldIndex = 1; Result> ValueCounts(const Datum& value, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction(ctx, "value_counts", {value})); + ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction("value_counts", {value}, ctx)); return result.make_array(); } @@ -122,7 +122,7 @@ Result Filter(const Datum& values, const Datum& filter, FilterOptions opt FilterTable(*values.table(), filter, options, ctx)); return Datum(out_table); } else { - return CallFunction(ctx, "filter", {values, filter}, &options); + return CallFunction("filter", {values, filter}, &options, ctx); } } diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 7c990ea6c9d..5ff8afe59bc 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -926,17 +926,21 @@ Result> SelectionVector::FromMask( return Status::NotImplemented("FromMask"); } -Result CallFunction(ExecContext* ctx, const std::string& func_name, - const std::vector& args, - const FunctionOptions* options) { +Result CallFunction(const std::string& func_name, const std::vector& args, + const FunctionOptions* options, ExecContext* ctx) { if (ctx == nullptr) { ExecContext default_ctx; - return CallFunction(&default_ctx, func_name, args, options); + return CallFunction(func_name, args, options, &default_ctx); } ARROW_ASSIGN_OR_RAISE(std::shared_ptr func, ctx->func_registry()->GetFunction(func_name)); return func->Execute(args, options, ctx); } +Result CallFunction(const std::string& func_name, const std::vector& args, + ExecContext* ctx) { + return CallFunction(func_name, args, /*options=*/nullptr, ctx); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index d6ba48db366..0e8a7c68b47 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -175,9 +175,13 @@ struct ExecBatch { /// argument checking, iteration of ChunkedArray inputs, and wrapping of /// outputs ARROW_EXPORT -Result CallFunction(ExecContext* ctx, const std::string& func_name, - const std::vector& args, - const FunctionOptions* options = NULLPTR); +Result CallFunction(const std::string& func_name, const std::vector& args, + const FunctionOptions* options, ExecContext* ctx = NULLPTR); + +/// \brief Variant of CallFunction for functions not requiring options +ARROW_EXPORT +Result CallFunction(const std::string& func_name, const std::vector& args, + ExecContext* ctx = NULLPTR); } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec_test.cc b/cpp/src/arrow/compute/exec_test.cc index 933c260344e..45c1fe6dbc1 100644 --- a/cpp/src/arrow/compute/exec_test.cc +++ b/cpp/src/arrow/compute/exec_test.cc @@ -94,13 +94,12 @@ void AssertValidityZeroExtraBits(const ArrayData& arr) { class TestComputeInternals : public ::testing::Test { public: void SetUp() { - registry_ = FunctionRegistry::Make(); rng_.reset(new random::RandomArrayGenerator(/*seed=*/0)); ResetContexts(); } void ResetContexts() { - exec_ctx_.reset(new ExecContext(default_memory_pool(), registry_.get())); + exec_ctx_.reset(new ExecContext(default_memory_pool())); ctx_.reset(new KernelContext(exec_ctx_.get())); } @@ -127,7 +126,6 @@ class TestComputeInternals : public ::testing::Test { protected: std::unique_ptr exec_ctx_; std::unique_ptr ctx_; - std::unique_ptr registry_; std::unique_ptr rng_; }; @@ -627,40 +625,49 @@ void ExecAddInt32(KernelContext* ctx, const ExecBatch& batch, Datum* out) { } class TestCallScalarFunction : public TestComputeInternals { - public: + protected: + static bool initialized_; + void SetUp() { TestComputeInternals::SetUp(); - AddCopyFunctions(); - AddNoPreallocateFunctions(); - AddStatefulFunction(); - AddScalarFunction(); + if (!initialized_) { + initialized_ = true; + AddCopyFunctions(); + AddNoPreallocateFunctions(); + AddStatefulFunction(); + AddScalarFunction(); + } } void AddCopyFunctions() { + auto registry = GetFunctionRegistry(); + // This function simply copies memory from the input argument into the // (preallocated) output - auto func = std::make_shared("copy", 1); + auto func = std::make_shared("test_copy", 1); // Add a few kernels. Our implementation only accepts arrays ASSERT_OK(func->AddKernel({InputType::Array(uint8())}, uint8(), ExecCopy)); ASSERT_OK(func->AddKernel({InputType::Array(int32())}, int32(), ExecCopy)); ASSERT_OK(func->AddKernel({InputType::Array(float64())}, float64(), ExecCopy)); - ASSERT_OK(registry_->AddFunction(func)); + ASSERT_OK(registry->AddFunction(func)); // A version which doesn't want the executor to call PropagateNulls - auto func2 = std::make_shared("copy_computed_bitmap", 1); + auto func2 = std::make_shared("test_copy_computed_bitmap", 1); ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecComputedBitmap); kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; ASSERT_OK(func2->AddKernel(kernel)); - ASSERT_OK(registry_->AddFunction(func2)); + ASSERT_OK(registry->AddFunction(func2)); } void AddNoPreallocateFunctions() { + auto registry = GetFunctionRegistry(); + // A function that allocates its own output memory. We have cases for both // non-preallocated data and non-preallocated validity bitmap - auto f1 = std::make_shared("nopre_data", 1); - auto f2 = std::make_shared("nopre_validity_or_data", 1); + auto f1 = std::make_shared("test_nopre_data", 1); + auto f2 = std::make_shared("test_nopre_validity_or_data", 1); ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecNoPreallocatedData); kernel.mem_allocation = MemAllocation::NO_PREALLOCATE; @@ -670,43 +677,49 @@ class TestCallScalarFunction : public TestComputeInternals { kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE; ASSERT_OK(f2->AddKernel(kernel)); - ASSERT_OK(registry_->AddFunction(f1)); - ASSERT_OK(registry_->AddFunction(f2)); + ASSERT_OK(registry->AddFunction(f1)); + ASSERT_OK(registry->AddFunction(f2)); } void AddStatefulFunction() { + auto registry = GetFunctionRegistry(); + // This function's behavior depends on a static parameter that is made // available to the kernel's execution function through its Options object - auto func = std::make_shared("stateful", 1); + auto func = std::make_shared("test_stateful", 1); ScalarKernel kernel({InputType::Array(int32())}, int32(), ExecStateful, InitStateful); ASSERT_OK(func->AddKernel(kernel)); - ASSERT_OK(registry_->AddFunction(func)); + ASSERT_OK(registry->AddFunction(func)); } void AddScalarFunction() { - auto func = std::make_shared("scalar_add_int32", 2); + auto registry = GetFunctionRegistry(); + + auto func = std::make_shared("test_scalar_add_int32", 2); ASSERT_OK(func->AddKernel({InputType::Scalar(int32()), InputType::Scalar(int32())}, int32(), ExecAddInt32)); - ASSERT_OK(registry_->AddFunction(func)); + ASSERT_OK(registry->AddFunction(func)); } }; +bool TestCallScalarFunction::initialized_ = false; + TEST_F(TestCallScalarFunction, ArgumentValidation) { // Copy accepts only a single array argument Datum d1(GetInt32Array(10)); // Too many args std::vector args = {d1, d1}; - ASSERT_RAISES(Invalid, CallFunction(exec_ctx_.get(), "copy", args)); + ASSERT_RAISES(Invalid, CallFunction("test_copy", args)); // Too few args = {}; - ASSERT_RAISES(Invalid, CallFunction(exec_ctx_.get(), "copy", args)); + ASSERT_RAISES(Invalid, CallFunction("test_copy", args)); // Cannot do scalar args = {Datum(std::make_shared(5))}; - ASSERT_RAISES(NotImplemented, CallFunction(exec_ctx_.get(), "copy", args)); + ASSERT_RAISES(NotImplemented, CallFunction("test_copy", args)); } TEST_F(TestCallScalarFunction, PreallocationCases) { @@ -720,7 +733,7 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { // The default should be a single array output { std::vector args = {Datum(arr)}; - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(exec_ctx_.get(), func_name, args)); + ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args)); ASSERT_EQ(Datum::ARRAY, result.kind()); AssertArraysEqual(*arr, *result.make_array()); } @@ -730,7 +743,7 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { { std::vector args = {Datum(arr)}; exec_ctx_->set_exec_chunksize(80); - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(exec_ctx_.get(), func_name, args)); + ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); AssertArraysEqual(*arr, *result.make_array()); } @@ -740,7 +753,7 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { { std::vector args = {Datum(arr)}; exec_ctx_->set_exec_chunksize(111); - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(exec_ctx_.get(), func_name, args)); + ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); AssertArraysEqual(*arr, *result.make_array()); } @@ -749,7 +762,7 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { auto carr = std::shared_ptr( new ChunkedArray({arr->Slice(0, 100), arr->Slice(100)})); std::vector args = {Datum(carr)}; - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(exec_ctx_.get(), func_name, args)); + ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); std::shared_ptr actual = result.chunked_array(); ASSERT_EQ(1, actual->num_chunks()); AssertChunkedEquivalent(*carr, *actual); @@ -760,7 +773,7 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { std::vector args = {Datum(arr)}; exec_ctx_->set_preallocate_contiguous(false); exec_ctx_->set_exec_chunksize(400); - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(exec_ctx_.get(), func_name, args)); + ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); const ChunkedArray& carr = *result.chunked_array(); ASSERT_EQ(3, carr.num_chunks()); @@ -770,8 +783,8 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { } }; - CheckFunction("copy"); - CheckFunction("copy_computed_bitmap"); + CheckFunction("test_copy"); + CheckFunction("test_copy_computed_bitmap"); } TEST_F(TestCallScalarFunction, BasicNonStandardCases) { @@ -791,15 +804,14 @@ TEST_F(TestCallScalarFunction, BasicNonStandardCases) { // The default should be a single array output { - exec_ctx_->set_exec_chunksize(kDefaultMaxChunksize); - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(exec_ctx_.get(), func_name, args)); + ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args)); AssertArraysEqual(*arr, *result.make_array(), true); } // Split execution into 3 chunks { exec_ctx_->set_exec_chunksize(400); - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(exec_ctx_.get(), func_name, args)); + ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); const ChunkedArray& carr = *result.chunked_array(); ASSERT_EQ(3, carr.num_chunks()); @@ -809,8 +821,8 @@ TEST_F(TestCallScalarFunction, BasicNonStandardCases) { } }; - CheckFunction("nopre_data"); - CheckFunction("nopre_validity_or_data"); + CheckFunction("test_nopre_data"); + CheckFunction("test_nopre_validity_or_data"); } TEST_F(TestCallScalarFunction, StatefulKernel) { @@ -820,16 +832,14 @@ TEST_F(TestCallScalarFunction, StatefulKernel) { ExampleOptions options(multiplier); std::vector args = {Datum(input)}; - ASSERT_OK_AND_ASSIGN(Datum result, - CallFunction(exec_ctx_.get(), "stateful", args, &options)); + ASSERT_OK_AND_ASSIGN(Datum result, CallFunction("test_stateful", args, &options)); AssertArraysEqual(*expected, *result.make_array()); } TEST_F(TestCallScalarFunction, ScalarFunction) { std::vector args = {Datum(std::make_shared(5)), Datum(std::make_shared(7))}; - ASSERT_OK_AND_ASSIGN(Datum result, - CallFunction(exec_ctx_.get(), "scalar_add_int32", args)); + ASSERT_OK_AND_ASSIGN(Datum result, CallFunction("test_scalar_add_int32", args)); ASSERT_EQ(Datum::SCALAR, result.kind()); auto expected = std::make_shared(12);