From 3855d4348e8617e52d6171b6381293986cb01c7c Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Mon, 5 Sep 2022 06:44:46 -0400 Subject: [PATCH 01/19] ARROW-17613: [C++] Add function execution API for a preconfigured kernel --- cpp/src/arrow/compute/exec.cc | 26 ++++ cpp/src/arrow/compute/exec.h | 40 ++++++ cpp/src/arrow/compute/function.cc | 225 +++++++++++++++++++----------- cpp/src/arrow/compute/function.h | 17 +++ 4 files changed, 224 insertions(+), 84 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index cf91bada6c6..e8def5d7315 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -1295,5 +1295,31 @@ Result CallFunction(const std::string& func_name, const ExecBatch& batch, return CallFunction(func_name, batch, /*options=*/nullptr, ctx); } +Result> GetFunctionExecutor( + const std::string& func_name, const std::vector& args, + const FunctionOptions* options, ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr func, + ctx->func_registry()->GetFunction(func_name)); + return func->BestExecutor(args, options, ctx); +} + +Result> GetFunctionExecutor( + const std::string& func_name, const std::vector& args, ExecContext* ctx) { + return GetFunctionExecutor(func_name, args, /*options=*/nullptr, ctx); +} + +Result> GetFunctionExecutor( + const std::string& func_name, const ExecBatch& batch, const FunctionOptions* options, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr func, + ctx->func_registry()->GetFunction(func_name)); + return func->BestExecutor(batch, options, ctx); +} + +Result> GetFunctionExecutor( + const std::string& func_name, const ExecBatch& batch, ExecContext* ctx) { + return GetFunctionExecutor(func_name, batch, /*options=*/nullptr, ctx); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index d03b073bb88..07d5bed9c41 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -47,6 +47,7 @@ class CpuInfo; namespace compute { +class FunctionExecutor; class FunctionOptions; class FunctionRegistry; @@ -442,5 +443,44 @@ Result CallFunction(const std::string& func_name, const ExecBatch& batch, /// @} +/// \defgroup compute-function-executor One-shot calls to obtain function executors +/// +/// @{ + +/// \brief One-shot executor provider for all types of functions. +/// +/// Does kernel dispatch and argument checking, while iteration of ChunkedArray inputs +/// and wrapping of outputs are deferred to the executor. +ARROW_EXPORT +Result> GetFunctionExecutor( + const std::string& func_name, const std::vector& args, + const FunctionOptions* options, ExecContext* ctx = NULLPTR); + +/// \brief Variant of GetFunctionExecutor which uses a function's default options. +/// +/// NB: Some functions require FunctionOptions be provided. +ARROW_EXPORT +Result> GetFunctionExecutor( + const std::string& func_name, const std::vector& args, + ExecContext* ctx = NULLPTR); + +/// \brief One-shot executor provider for all types of functions. +/// +/// Does kernel dispatch and argument checking, while iteration of ChunkedArray inputs +/// and wrapping of outputs are deferred to the executor. +ARROW_EXPORT +Result> GetFunctionExecutor( + const std::string& func_name, const ExecBatch& batch, const FunctionOptions* options, + ExecContext* ctx = NULLPTR); + +/// \brief Variant of GetFunctionExecutor which uses a function's default options. +/// +/// NB: Some functions require FunctionOptions be provided. +ARROW_EXPORT +Result> GetFunctionExecutor( + const std::string& func_name, const ExecBatch& batch, ExecContext* ctx = NULLPTR); + +/// @} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 12d80a8c9ae..34c867e766a 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -97,6 +97,28 @@ Status Function::CheckArity(size_t num_args) const { return CheckArityImpl(*this, static_cast(num_args)); } +namespace { + +Status CheckAllArrayOrScalar(const std::vector& values) { + for (const auto& value : values) { + if (!value.is_value()) { + return Status::Invalid("Tried executing function with non-value type: ", + value.ToString()); + } + } + return Status::OK(); +} + +Status CheckOptions(const Function& function, const FunctionOptions* options) { + if (options == nullptr && function.doc().options_required) { + return Status::Invalid("Function '", function.name(), + "' cannot be called without options"); + } + return Status::OK(); +} + +} // namespace + namespace detail { Status NoMatchingKernel(const Function* func, const std::vector& types) { @@ -167,6 +189,93 @@ const Kernel* DispatchExactImpl(const Function* func, return nullptr; } +struct FunctionExecutorImpl : public FunctionExecutor { + FunctionExecutorImpl(std::vector inputs, const Kernel* kernel, + ExecContext* ctx, std::unique_ptr executor, + Function::Kind func_kind, const std::string& func_name) + : inputs(std::move(inputs)), + kernel(kernel), + kernel_ctx(ctx, kernel), + executor(std::move(executor)), + func_kind(func_kind), + func_name(func_name), + state(), + options(NULLPTR) {} + virtual ~FunctionExecutorImpl() {} + + Status Init(const FunctionOptions* options) { + if (kernel->init) { + ARROW_ASSIGN_OR_RAISE(state, kernel->init(&kernel_ctx, {kernel, inputs, options})); + kernel_ctx.SetState(state.get()); + } + + RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, inputs, options})); + this->options = options; + return Status::OK(); + } + + Result Execute(const std::vector& args, int64_t passed_length) override { + util::tracing::Span span; + + START_COMPUTE_SPAN(span, func_name, + {{"function.name", func_name}, + {"function.options", options ? options->ToString() : ""}, + {"function.kind", func_kind}}); + + ExecContext* ctx = kernel_ctx.exec_context(); + // Cast arguments if necessary + std::vector args_with_cast; + for (size_t i = 0; i != args.size(); ++i) { + std::shared_ptr in_type = inputs[i].GetSharedPtr(); + auto arg = args[i]; + if (in_type != args[i].type()) { + ARROW_ASSIGN_OR_RAISE(arg, Cast(args[i], CastOptions::Safe(in_type), ctx)); + } + args_with_cast.push_back(arg); + } + + detail::DatumAccumulator listener; + + ExecBatch input(std::move(args_with_cast), /*length=*/0); + if (input.num_values() == 0) { + if (passed_length != -1) { + input.length = passed_length; + } + } else { + bool all_same_length = false; + int64_t inferred_length = detail::InferBatchLength(input.values, &all_same_length); + input.length = inferred_length; + if (func_kind == Function::SCALAR) { + if (passed_length != -1 && passed_length != inferred_length) { + return Status::Invalid( + "Passed batch length for execution did not match actual" + " length of values for scalar function execution"); + } + } else if (func_kind == Function::VECTOR) { + auto vkernel = static_cast(kernel); + if (!(all_same_length || !vkernel->can_execute_chunkwise)) { + return Status::Invalid("Vector kernel arguments must all be the same length"); + } + } + } + RETURN_NOT_OK(executor->Execute(input, &listener)); + const auto out = executor->WrapResults(input.values, listener.values()); +#ifndef NDEBUG + DCHECK_OK(executor->CheckResultType(out, func_name.c_str())); +#endif + return out; + } + + std::vector inputs; + const Kernel* kernel; + KernelContext kernel_ctx; + std::unique_ptr executor; + Function::Kind func_kind; + std::string func_name; + std::unique_ptr state; + const FunctionOptions* options; +}; + } // namespace detail Result Function::DispatchExact( @@ -187,114 +296,62 @@ Result Function::DispatchBest(std::vector* values) co return DispatchExact(*values); } -namespace { - -Status CheckAllArrayOrScalar(const std::vector& values) { - for (const auto& value : values) { - if (!value.is_value()) { - return Status::Invalid("Tried executing function with non-value type: ", - value.ToString()); - } - } - return Status::OK(); -} - -Status CheckOptions(const Function& function, const FunctionOptions* options) { - if (options == nullptr && function.doc().options_required) { - return Status::Invalid("Function '", function.name(), - "' cannot be called without options"); - } - return Status::OK(); -} - -Result ExecuteInternal(const Function& func, std::vector args, - int64_t passed_length, const FunctionOptions* options, - ExecContext* ctx) { - std::unique_ptr default_ctx; +Result> Function::BestExecutor( + const std::vector& args, const FunctionOptions* options, + ExecContext* ctx) const { if (options == nullptr) { - RETURN_NOT_OK(CheckOptions(func, options)); - options = func.default_options(); + RETURN_NOT_OK(CheckOptions(*this, options)); + options = default_options(); } if (ctx == nullptr) { - default_ctx.reset(new ExecContext()); - ctx = default_ctx.get(); + ExecContext default_ctx; + return BestExecutor(args, options, &default_ctx); } - util::tracing::Span span; - - START_COMPUTE_SPAN(span, func.name(), - {{"function.name", func.name()}, - {"function.options", options ? options->ToString() : ""}, - {"function.kind", func.kind()}}); - // type-check Datum arguments here. Really we'd like to avoid this as much as // possible RETURN_NOT_OK(CheckAllArrayOrScalar(args)); - std::vector in_types(args.size()); + std::vector inputs(args.size()); for (size_t i = 0; i != args.size(); ++i) { - in_types[i] = args[i].type().get(); + inputs[i] = TypeHolder(args[i].type()); } std::unique_ptr executor; - if (func.kind() == Function::SCALAR) { + if (kind() == Function::SCALAR) { executor = detail::KernelExecutor::MakeScalar(); - } else if (func.kind() == Function::VECTOR) { + } else if (kind() == Function::VECTOR) { executor = detail::KernelExecutor::MakeVector(); - } else if (func.kind() == Function::SCALAR_AGGREGATE) { + } else if (kind() == Function::SCALAR_AGGREGATE) { executor = detail::KernelExecutor::MakeScalarAggregate(); } else { return Status::NotImplemented("Direct execution of HASH_AGGREGATE functions"); } - ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, func.DispatchBest(&in_types)); + ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, DispatchBest(&inputs)); - // Cast arguments if necessary - for (size_t i = 0; i != args.size(); ++i) { - if (in_types[i] != args[i].type()) { - ARROW_ASSIGN_OR_RAISE(args[i], Cast(args[i], CastOptions::Safe(in_types[i]), ctx)); - } - } - - KernelContext kernel_ctx{ctx, kernel}; - - std::unique_ptr state; - if (kernel->init) { - ARROW_ASSIGN_OR_RAISE(state, kernel->init(&kernel_ctx, {kernel, in_types, options})); - kernel_ctx.SetState(state.get()); - } + auto func_exec = std::make_shared( + std::move(inputs), kernel, ctx, std::move(executor), kind(), name_); + RETURN_NOT_OK(func_exec->Init(options)); + return func_exec; +} - RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options})); +Result> Function::BestExecutor( + const ExecBatch& batch, const FunctionOptions* options, ExecContext* ctx) const { + return BestExecutor(batch.values, options, ctx); +} - detail::DatumAccumulator listener; +namespace { - ExecBatch input(std::move(args), /*length=*/0); - if (input.num_values() == 0) { - if (passed_length != -1) { - input.length = passed_length; - } - } else { - bool all_same_length = false; - int64_t inferred_length = detail::InferBatchLength(input.values, &all_same_length); - input.length = inferred_length; - if (func.kind() == Function::SCALAR) { - if (passed_length != -1 && passed_length != inferred_length) { - return Status::Invalid( - "Passed batch length for execution did not match actual" - " length of values for scalar function execution"); - } - } else if (func.kind() == Function::VECTOR) { - auto vkernel = static_cast(kernel); - if (!(all_same_length || !vkernel->can_execute_chunkwise)) { - return Status::Invalid("Vector kernel arguments must all be the same length"); - } - } +Result ExecuteInternal(const Function& func, std::vector args, + int64_t passed_length, const FunctionOptions* options, + ExecContext* ctx) { + if (ctx == nullptr) { + ExecContext default_ctx; + return ExecuteInternal(func, args, passed_length, options, &default_ctx); } - RETURN_NOT_OK(executor->Execute(input, &listener)); - const auto out = executor->WrapResults(input.values, listener.values()); -#ifndef NDEBUG - DCHECK_OK(executor->CheckResultType(out, func.name().c_str())); -#endif - return out; + + ARROW_ASSIGN_OR_RAISE(auto executor, func.BestExecutor(args, options, ctx)); + return executor->Execute(args, passed_length); } } // namespace diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index 7f2fba68caf..56dca2f0493 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -159,6 +159,14 @@ struct ARROW_EXPORT FunctionDoc { static const FunctionDoc& Empty(); }; +/// \brief An executor of a function with a preconfigured kernel +struct FunctionExecutor { + virtual ~FunctionExecutor() {} + /// \brief Execute the preconfigured kernel with arguments that must fit it + virtual Result Execute(const std::vector& args, + int64_t passed_length = -1) = 0; +}; + /// \brief Base class for compute functions. Function implementations contain a /// collection of "kernels" which are implementations of the function for /// specific argument types. Selecting a viable kernel for executing a function @@ -225,6 +233,15 @@ class ARROW_EXPORT Function { /// required by the kernel. virtual Result DispatchBest(std::vector* values) const; + /// \brief Get a function executor with a best-matching kernel + virtual Result> BestExecutor( + const std::vector& args, const FunctionOptions* options, + ExecContext* ctx) const; + + /// \brief Get a function executor with a best-matching kernel + virtual Result> BestExecutor( + const ExecBatch& batch, const FunctionOptions* options, ExecContext* ctx) const; + /// \brief Execute the function eagerly with the passed input arguments with /// kernel dispatch, batch iteration, and memory allocation details taken /// care of. From 7f6c1894685115a85569ceb97e98a6151f81c42e Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Mon, 5 Sep 2022 07:15:33 -0400 Subject: [PATCH 02/19] Fix declarations --- cpp/src/arrow/compute/exec.h | 2 +- cpp/src/arrow/compute/function.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 07d5bed9c41..dfc061f73c5 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -47,7 +47,7 @@ class CpuInfo; namespace compute { -class FunctionExecutor; +struct FunctionExecutor; class FunctionOptions; class FunctionRegistry; diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index 56dca2f0493..c27545b4efe 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -160,7 +160,7 @@ struct ARROW_EXPORT FunctionDoc { }; /// \brief An executor of a function with a preconfigured kernel -struct FunctionExecutor { +struct ARROW_EXPORT FunctionExecutor { virtual ~FunctionExecutor() {} /// \brief Execute the preconfigured kernel with arguments that must fit it virtual Result Execute(const std::vector& args, From eb93a9ce8c7ad4e1fe02199551cc25ac71b387e8 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Mon, 5 Sep 2022 11:31:57 -0400 Subject: [PATCH 03/19] cleanup --- cpp/src/arrow/compute/function.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 34c867e766a..a2dbd4507b9 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -190,10 +190,10 @@ const Kernel* DispatchExactImpl(const Function* func, } struct FunctionExecutorImpl : public FunctionExecutor { - FunctionExecutorImpl(std::vector inputs, const Kernel* kernel, + FunctionExecutorImpl(std::vector in_types, const Kernel* kernel, ExecContext* ctx, std::unique_ptr executor, Function::Kind func_kind, const std::string& func_name) - : inputs(std::move(inputs)), + : in_types(std::move(in_types)), kernel(kernel), kernel_ctx(ctx, kernel), executor(std::move(executor)), @@ -205,11 +205,12 @@ struct FunctionExecutorImpl : public FunctionExecutor { Status Init(const FunctionOptions* options) { if (kernel->init) { - ARROW_ASSIGN_OR_RAISE(state, kernel->init(&kernel_ctx, {kernel, inputs, options})); + ARROW_ASSIGN_OR_RAISE(state, + kernel->init(&kernel_ctx, {kernel, in_types, options})); kernel_ctx.SetState(state.get()); } - RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, inputs, options})); + RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options})); this->options = options; return Status::OK(); } @@ -226,7 +227,7 @@ struct FunctionExecutorImpl : public FunctionExecutor { // Cast arguments if necessary std::vector args_with_cast; for (size_t i = 0; i != args.size(); ++i) { - std::shared_ptr in_type = inputs[i].GetSharedPtr(); + const auto& in_type = in_types[i]; auto arg = args[i]; if (in_type != args[i].type()) { ARROW_ASSIGN_OR_RAISE(arg, Cast(args[i], CastOptions::Safe(in_type), ctx)); @@ -266,7 +267,7 @@ struct FunctionExecutorImpl : public FunctionExecutor { return out; } - std::vector inputs; + std::vector in_types; const Kernel* kernel; KernelContext kernel_ctx; std::unique_ptr executor; From d47b901a1c633660c0a9453c8808a09afb43a530 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Fri, 9 Sep 2022 16:00:28 -0400 Subject: [PATCH 04/19] requested changes --- cpp/src/arrow/compute/exec.cc | 31 +++---- cpp/src/arrow/compute/exec.h | 27 +----- cpp/src/arrow/compute/function.cc | 95 +++++++++------------- cpp/src/arrow/compute/function.h | 17 ++-- cpp/src/arrow/compute/function_internal.cc | 21 +++++ cpp/src/arrow/compute/function_internal.h | 4 + 6 files changed, 83 insertions(+), 112 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index e8def5d7315..22c5effa26c 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -33,6 +33,7 @@ #include "arrow/chunked_array.h" #include "arrow/compute/exec_internal.h" #include "arrow/compute/function.h" +#include "arrow/compute/function_internal.h" #include "arrow/compute/kernel.h" #include "arrow/compute/registry.h" #include "arrow/datum.h" @@ -1297,28 +1298,16 @@ Result CallFunction(const std::string& func_name, const ExecBatch& batch, Result> GetFunctionExecutor( const std::string& func_name, const std::vector& args, - const FunctionOptions* options, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr func, - ctx->func_registry()->GetFunction(func_name)); - return func->BestExecutor(args, options, ctx); -} - -Result> GetFunctionExecutor( - const std::string& func_name, const std::vector& args, ExecContext* ctx) { - return GetFunctionExecutor(func_name, args, /*options=*/nullptr, ctx); -} - -Result> GetFunctionExecutor( - const std::string& func_name, const ExecBatch& batch, const FunctionOptions* options, - ExecContext* ctx) { + const FunctionOptions* options, FunctionRegistry* func_registry) { + if (func_registry == NULLPTR) { + func_registry = GetFunctionRegistry(); + } + ARROW_ASSIGN_OR_RAISE(auto inputs, internal::GetFunctionArgumentTypes(args)); ARROW_ASSIGN_OR_RAISE(std::shared_ptr func, - ctx->func_registry()->GetFunction(func_name)); - return func->BestExecutor(batch, options, ctx); -} - -Result> GetFunctionExecutor( - const std::string& func_name, const ExecBatch& batch, ExecContext* ctx) { - return GetFunctionExecutor(func_name, batch, /*options=*/nullptr, ctx); + func_registry->GetFunction(func_name)); + ARROW_ASSIGN_OR_RAISE(auto func_exec, func->GetBestExecutor(inputs)); + ARROW_RETURN_NOT_OK(func_exec->Init(options)); + return func_exec; } } // namespace compute diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index dfc061f73c5..7db083fe9c9 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -30,6 +30,7 @@ #include "arrow/array/data.h" #include "arrow/compute/exec/expression.h" +#include "arrow/compute/registry.h" #include "arrow/datum.h" #include "arrow/memory_pool.h" #include "arrow/result.h" @@ -454,31 +455,7 @@ Result CallFunction(const std::string& func_name, const ExecBatch& batch, ARROW_EXPORT Result> GetFunctionExecutor( const std::string& func_name, const std::vector& args, - const FunctionOptions* options, ExecContext* ctx = NULLPTR); - -/// \brief Variant of GetFunctionExecutor which uses a function's default options. -/// -/// NB: Some functions require FunctionOptions be provided. -ARROW_EXPORT -Result> GetFunctionExecutor( - const std::string& func_name, const std::vector& args, - ExecContext* ctx = NULLPTR); - -/// \brief One-shot executor provider for all types of functions. -/// -/// Does kernel dispatch and argument checking, while iteration of ChunkedArray inputs -/// and wrapping of outputs are deferred to the executor. -ARROW_EXPORT -Result> GetFunctionExecutor( - const std::string& func_name, const ExecBatch& batch, const FunctionOptions* options, - ExecContext* ctx = NULLPTR); - -/// \brief Variant of GetFunctionExecutor which uses a function's default options. -/// -/// NB: Some functions require FunctionOptions be provided. -ARROW_EXPORT -Result> GetFunctionExecutor( - const std::string& func_name, const ExecBatch& batch, ExecContext* ctx = NULLPTR); + const FunctionOptions* options = NULLPTR, FunctionRegistry* func_registry = NULLPTR); /// @} diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index a2dbd4507b9..5df63052b3c 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -31,6 +31,7 @@ #include "arrow/datum.h" #include "arrow/util/cpu_info.h" #include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" #include "arrow/util/tracing_internal.h" namespace arrow { @@ -99,16 +100,6 @@ Status Function::CheckArity(size_t num_args) const { namespace { -Status CheckAllArrayOrScalar(const std::vector& values) { - for (const auto& value : values) { - if (!value.is_value()) { - return Status::Invalid("Tried executing function with non-value type: ", - value.ToString()); - } - } - return Status::OK(); -} - Status CheckOptions(const Function& function, const FunctionOptions* options) { if (options == nullptr && function.doc().options_required) { return Status::Invalid("Function '", function.name(), @@ -191,39 +182,55 @@ const Kernel* DispatchExactImpl(const Function* func, struct FunctionExecutorImpl : public FunctionExecutor { FunctionExecutorImpl(std::vector in_types, const Kernel* kernel, - ExecContext* ctx, std::unique_ptr executor, - Function::Kind func_kind, const std::string& func_name) + std::unique_ptr executor, + const Function& func) : in_types(std::move(in_types)), kernel(kernel), - kernel_ctx(ctx, kernel), + kernel_ctx(), executor(std::move(executor)), - func_kind(func_kind), - func_name(func_name), + func(func), state(), options(NULLPTR) {} virtual ~FunctionExecutorImpl() {} - Status Init(const FunctionOptions* options) { + Status KernelInit(const FunctionOptions* options) { + if (!kernel_ctx) { + return Status::Invalid("function executor with null kernel context"); + } + if (options == nullptr) { + RETURN_NOT_OK(CheckOptions(func, options)); + options = func.default_options(); + } if (kernel->init) { ARROW_ASSIGN_OR_RAISE(state, - kernel->init(&kernel_ctx, {kernel, in_types, options})); - kernel_ctx.SetState(state.get()); + kernel->init(kernel_ctx.get(), {kernel, in_types, options})); + kernel_ctx->SetState(state.get()); } - RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options})); + RETURN_NOT_OK(executor->Init(kernel_ctx.get(), {kernel, in_types, options})); this->options = options; return Status::OK(); } + Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override { + kernel_ctx.reset(new KernelContext{exec_ctx, kernel}); + return KernelInit(options); + } + Result Execute(const std::vector& args, int64_t passed_length) override { util::tracing::Span span; + auto func_kind = func.kind(); + auto func_name = func.name(); START_COMPUTE_SPAN(span, func_name, {{"function.name", func_name}, {"function.options", options ? options->ToString() : ""}, {"function.kind", func_kind}}); - ExecContext* ctx = kernel_ctx.exec_context(); + if (!kernel_ctx) { + ARROW_RETURN_NOT_OK(Init(NULLPTR, default_exec_context())); + } + ExecContext* ctx = kernel_ctx->exec_context(); // Cast arguments if necessary std::vector args_with_cast; for (size_t i = 0; i != args.size(); ++i) { @@ -269,10 +276,9 @@ struct FunctionExecutorImpl : public FunctionExecutor { std::vector in_types; const Kernel* kernel; - KernelContext kernel_ctx; + std::unique_ptr kernel_ctx; std::unique_ptr executor; - Function::Kind func_kind; - std::string func_name; + const Function& func; std::unique_ptr state; const FunctionOptions* options; }; @@ -297,25 +303,8 @@ Result Function::DispatchBest(std::vector* values) co return DispatchExact(*values); } -Result> Function::BestExecutor( - const std::vector& args, const FunctionOptions* options, - ExecContext* ctx) const { - if (options == nullptr) { - RETURN_NOT_OK(CheckOptions(*this, options)); - options = default_options(); - } - if (ctx == nullptr) { - ExecContext default_ctx; - return BestExecutor(args, options, &default_ctx); - } - - // type-check Datum arguments here. Really we'd like to avoid this as much as - // possible - RETURN_NOT_OK(CheckAllArrayOrScalar(args)); - std::vector inputs(args.size()); - for (size_t i = 0; i != args.size(); ++i) { - inputs[i] = TypeHolder(args[i].type()); - } +Result> Function::GetBestExecutor( + std::vector& inputs) const { std::unique_ptr executor; if (kind() == Function::SCALAR) { @@ -330,15 +319,8 @@ Result> Function::BestExecutor( ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, DispatchBest(&inputs)); - auto func_exec = std::make_shared( - std::move(inputs), kernel, ctx, std::move(executor), kind(), name_); - RETURN_NOT_OK(func_exec->Init(options)); - return func_exec; -} - -Result> Function::BestExecutor( - const ExecBatch& batch, const FunctionOptions* options, ExecContext* ctx) const { - return BestExecutor(batch.values, options, ctx); + return std::make_shared( + std::move(inputs), kernel, std::move(executor), *this); } namespace { @@ -346,13 +328,10 @@ namespace { Result ExecuteInternal(const Function& func, std::vector args, int64_t passed_length, const FunctionOptions* options, ExecContext* ctx) { - if (ctx == nullptr) { - ExecContext default_ctx; - return ExecuteInternal(func, args, passed_length, options, &default_ctx); - } - - ARROW_ASSIGN_OR_RAISE(auto executor, func.BestExecutor(args, options, ctx)); - return executor->Execute(args, passed_length); + ARROW_ASSIGN_OR_RAISE(auto inputs, internal::GetFunctionArgumentTypes(args)); + ARROW_ASSIGN_OR_RAISE(auto func_exec, func.GetBestExecutor(inputs)); + ARROW_RETURN_NOT_OK(func_exec->Init(options, ctx)); + return func_exec->Execute(args, passed_length); } } // namespace diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index c27545b4efe..647f87030e9 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -161,7 +161,13 @@ struct ARROW_EXPORT FunctionDoc { /// \brief An executor of a function with a preconfigured kernel struct ARROW_EXPORT FunctionExecutor { - virtual ~FunctionExecutor() {} + virtual ~FunctionExecutor() = default; + /// \brief Initialize a preconfigured kernel + /// + /// This method may be called zero or more times. By default, the kernel is initialized + /// with default function options and exec context. + virtual Status Init(const FunctionOptions* options = NULLPTR, + ExecContext* exec_ctx = NULLPTR) = 0; /// \brief Execute the preconfigured kernel with arguments that must fit it virtual Result Execute(const std::vector& args, int64_t passed_length = -1) = 0; @@ -234,13 +240,8 @@ class ARROW_EXPORT Function { virtual Result DispatchBest(std::vector* values) const; /// \brief Get a function executor with a best-matching kernel - virtual Result> BestExecutor( - const std::vector& args, const FunctionOptions* options, - ExecContext* ctx) const; - - /// \brief Get a function executor with a best-matching kernel - virtual Result> BestExecutor( - const ExecBatch& batch, const FunctionOptions* options, ExecContext* ctx) const; + virtual Result> GetBestExecutor( + std::vector& inputs) const; /// \brief Execute the function eagerly with the passed input arguments with /// kernel dispatch, batch iteration, and memory allocation details taken diff --git a/cpp/src/arrow/compute/function_internal.cc b/cpp/src/arrow/compute/function_internal.cc index 0a926e0a39c..99daaeb2641 100644 --- a/cpp/src/arrow/compute/function_internal.cc +++ b/cpp/src/arrow/compute/function_internal.cc @@ -108,6 +108,27 @@ Result> DeserializeFunctionOptions( return FunctionOptionsFromStructScalar(scalar); } +Status CheckAllArrayOrScalar(const std::vector& values) { + for (const auto& value : values) { + if (!value.is_value()) { + return Status::Invalid("Tried executing function with non-value type: ", + value.ToString()); + } + } + return Status::OK(); +} + +Result> GetFunctionArgumentTypes(const std::vector& args) { + // type-check Datum arguments here. Really we'd like to avoid this as much as + // possible + RETURN_NOT_OK(CheckAllArrayOrScalar(args)); + std::vector inputs(args.size()); + for (size_t i = 0; i != args.size(); ++i) { + inputs[i] = TypeHolder(args[i].type()); + } + return inputs; +} + } // namespace internal } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/function_internal.h b/cpp/src/arrow/compute/function_internal.h index 17261332619..26f6a6c51bc 100644 --- a/cpp/src/arrow/compute/function_internal.h +++ b/cpp/src/arrow/compute/function_internal.h @@ -664,6 +664,10 @@ const FunctionOptionsType* GetFunctionOptionsType(const Properties&... propertie return &instance; } +Status CheckAllArrayOrScalar(const std::vector& values); + +Result> GetFunctionArgumentTypes(const std::vector& args); + } // namespace internal } // namespace compute } // namespace arrow From 8c3d07bc09402b6732c1082c83c1e7b6aaa239a0 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Fri, 9 Sep 2022 17:05:58 -0400 Subject: [PATCH 05/19] lint --- cpp/src/arrow/compute/function.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 5df63052b3c..e88adb5edb7 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -305,7 +305,6 @@ Result Function::DispatchBest(std::vector* values) co Result> Function::GetBestExecutor( std::vector& inputs) const { - std::unique_ptr executor; if (kind() == Function::SCALAR) { executor = detail::KernelExecutor::MakeScalar(); @@ -319,8 +318,8 @@ Result> Function::GetBestExecutor( ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, DispatchBest(&inputs)); - return std::make_shared( - std::move(inputs), kernel, std::move(executor), *this); + return std::make_shared(std::move(inputs), kernel, + std::move(executor), *this); } namespace { From 9366eb99e154a7c13a6cb1cbdbdef50f5b553893 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 25 Sep 2022 04:53:30 -0400 Subject: [PATCH 06/19] requested fixed, add test --- cpp/src/arrow/compute/exec.cc | 12 +++-- cpp/src/arrow/compute/exec.h | 11 +++- cpp/src/arrow/compute/function.cc | 30 +++++------ cpp/src/arrow/compute/function.h | 2 +- cpp/src/arrow/compute/function_test.cc | 71 +++++++++++++++++++++++++- 5 files changed, 105 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 22c5effa26c..881e942a319 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -1297,18 +1297,24 @@ Result CallFunction(const std::string& func_name, const ExecBatch& batch, } Result> GetFunctionExecutor( - const std::string& func_name, const std::vector& args, + const std::string& func_name, std::vector in_types, const FunctionOptions* options, FunctionRegistry* func_registry) { if (func_registry == NULLPTR) { func_registry = GetFunctionRegistry(); } - ARROW_ASSIGN_OR_RAISE(auto inputs, internal::GetFunctionArgumentTypes(args)); ARROW_ASSIGN_OR_RAISE(std::shared_ptr func, func_registry->GetFunction(func_name)); - ARROW_ASSIGN_OR_RAISE(auto func_exec, func->GetBestExecutor(inputs)); + ARROW_ASSIGN_OR_RAISE(auto func_exec, func->GetBestExecutor(std::move(in_types))); ARROW_RETURN_NOT_OK(func_exec->Init(options)); return func_exec; } +Result> GetFunctionExecutor( + const std::string& func_name, const std::vector& args, + const FunctionOptions* options, FunctionRegistry* func_registry) { + ARROW_ASSIGN_OR_RAISE(auto in_types, internal::GetFunctionArgumentTypes(args)); + return GetFunctionExecutor(func_name, std::move(in_types), options, func_registry); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 7db083fe9c9..894fd98ed8f 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -450,7 +450,16 @@ Result CallFunction(const std::string& func_name, const ExecBatch& batch, /// \brief One-shot executor provider for all types of functions. /// -/// Does kernel dispatch and argument checking, while iteration of ChunkedArray inputs +/// Does kernel dispatch and type checking while iteration of ChunkedArray inputs +/// and wrapping of outputs are deferred to the executor. +ARROW_EXPORT +Result> GetFunctionExecutor( + const std::string& func_name, std::vector in_types, + const FunctionOptions* options = NULLPTR, FunctionRegistry* func_registry = NULLPTR); + +/// \brief One-shot executor provider for all types of functions. +/// +/// Does kernel dispatch and argument type-checking while iteration of ChunkedArray inputs /// and wrapping of outputs are deferred to the executor. ARROW_EXPORT Result> GetFunctionExecutor( diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index e88adb5edb7..4a63a34a630 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -186,34 +186,33 @@ struct FunctionExecutorImpl : public FunctionExecutor { const Function& func) : in_types(std::move(in_types)), kernel(kernel), - kernel_ctx(), + kernel_ctx(default_exec_context(), kernel), executor(std::move(executor)), func(func), state(), - options(NULLPTR) {} + options(NULLPTR), + inited(false) {} virtual ~FunctionExecutorImpl() {} Status KernelInit(const FunctionOptions* options) { - if (!kernel_ctx) { - return Status::Invalid("function executor with null kernel context"); - } - if (options == nullptr) { - RETURN_NOT_OK(CheckOptions(func, options)); + RETURN_NOT_OK(CheckOptions(func, options)); + if (options == NULLPTR) { options = func.default_options(); } if (kernel->init) { ARROW_ASSIGN_OR_RAISE(state, - kernel->init(kernel_ctx.get(), {kernel, in_types, options})); - kernel_ctx->SetState(state.get()); + kernel->init(&kernel_ctx, {kernel, in_types, options})); + kernel_ctx.SetState(state.get()); } - RETURN_NOT_OK(executor->Init(kernel_ctx.get(), {kernel, in_types, options})); + RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options})); this->options = options; + inited = true; return Status::OK(); } Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override { - kernel_ctx.reset(new KernelContext{exec_ctx, kernel}); + kernel_ctx = KernelContext{exec_ctx, kernel}; return KernelInit(options); } @@ -227,10 +226,10 @@ struct FunctionExecutorImpl : public FunctionExecutor { {"function.options", options ? options->ToString() : ""}, {"function.kind", func_kind}}); - if (!kernel_ctx) { + if (!inited) { ARROW_RETURN_NOT_OK(Init(NULLPTR, default_exec_context())); } - ExecContext* ctx = kernel_ctx->exec_context(); + ExecContext* ctx = kernel_ctx.exec_context(); // Cast arguments if necessary std::vector args_with_cast; for (size_t i = 0; i != args.size(); ++i) { @@ -276,11 +275,12 @@ struct FunctionExecutorImpl : public FunctionExecutor { std::vector in_types; const Kernel* kernel; - std::unique_ptr kernel_ctx; + KernelContext kernel_ctx; std::unique_ptr executor; const Function& func; std::unique_ptr state; const FunctionOptions* options; + bool inited; }; } // namespace detail @@ -304,7 +304,7 @@ Result Function::DispatchBest(std::vector* values) co } Result> Function::GetBestExecutor( - std::vector& inputs) const { + std::vector inputs) const { std::unique_ptr executor; if (kind() == Function::SCALAR) { executor = detail::KernelExecutor::MakeScalar(); diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index 647f87030e9..6e846b65519 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -241,7 +241,7 @@ class ARROW_EXPORT Function { /// \brief Get a function executor with a best-matching kernel virtual Result> GetBestExecutor( - std::vector& inputs) const; + std::vector inputs) const; /// \brief Execute the function eagerly with the passed input arguments with /// kernel dispatch, batch iteration, and memory allocation details taken diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index ea151e81f0b..eec3c22aab2 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -23,6 +23,7 @@ #include #include +#include "arrow/array/builder_primitive.h" #include "arrow/compute/api_aggregate.h" #include "arrow/compute/api_scalar.h" #include "arrow/compute/api_vector.h" @@ -33,6 +34,7 @@ #include "arrow/testing/gtest_util.h" #include "arrow/type.h" #include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" namespace arrow { namespace compute { @@ -307,7 +309,7 @@ TEST(ScalarAggregateFunction, Basics) { } Result> NoopInit(KernelContext*, const KernelInitArgs&) { - return nullptr; + return NULLPTR; } Status NoopConsume(KernelContext*, const ExecSpan&) { return Status::OK(); } @@ -351,5 +353,72 @@ TEST(ScalarAggregateFunction, DispatchExact) { ASSERT_TRUE(selected_kernel->signature->MatchesInputs(dispatch_args)); } +TEST(FunctionExecutor, Basics) { + VectorFunction func("vector_test", Arity::Binary(), /*doc=*/FunctionDoc::Empty()); + bool init_called = false; + ExecContext exec_ctx; + struct TestFunctionOptions : public FunctionOptions { + TestFunctionOptions() : FunctionOptions(NULLPTR) {} + } options; + auto init = + [&init_called, &exec_ctx, &options]( + KernelContext* kernel_ctx, + const KernelInitArgs& init_args) -> Result> { + init_called = true; + if (&exec_ctx != kernel_ctx->exec_context()) { + return Status::Invalid("expected exec context not found in kernel context"); + } + if (&options != init_args.options) { + return Status::Invalid("expected options not found in kernel init args"); + } + return NULLPTR; + }; + auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) { + DCHECK_EQ(2, args.values.size()); + const int32_t* vals[2]; + for (size_t i = 0; i < 2; i++) { + DCHECK(args.values[i].is_array()); + const ArraySpan& array = args.values[i].array; + DCHECK_EQ(*int32(), *array.type); + vals[i] = array.GetValues(1); + } + DCHECK(out->is_array_data()); + auto out_data = out->array_data(); + Int32Builder builder; + for (int64_t i = 0; i < args.length; i++) { + builder.Append(vals[0][i] + vals[1][i]); + } + ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish()); + *out_data.get() = *array->data(); + return Status::OK(); + }; + std::vector in_types = {int32(), int32()}; + OutputType out_type = int32(); + ASSERT_OK(func.AddKernel(in_types, out_type, exec, init)); + ASSERT_OK_AND_ASSIGN(const Kernel* dispatched, func.DispatchExact({int32(), int32()})); + ASSERT_EQ(exec, static_cast(dispatched)->exec); + std::vector inputs = {int32(), int32()}; + ASSERT_OK_AND_ASSIGN(auto func_exec, func.GetBestExecutor(inputs)); + ASSERT_FALSE(init_called); + ASSERT_OK(func_exec->Init(&options, &exec_ctx)); + ASSERT_TRUE(init_called); + auto build_array = [](int32_t i) -> Result { + Int32Builder builder; + builder.Append(i); + ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish()); + return Datum(array->data()); + }; + for (int32_t i = 1; i <= 3; i++) { + ASSERT_OK_AND_ASSIGN(auto value0, build_array(i)); + ASSERT_OK_AND_ASSIGN(auto value1, build_array(i + 1)); + std::vector values = {value0, value1}; + ASSERT_OK_AND_ASSIGN(auto result, func_exec->Execute(values, 1)); + ASSERT_TRUE(result.is_array()); + auto int32_data = dynamic_cast(result.array().get()); + ASSERT_NE(NULLPTR, int32_data); + EXPECT_EQ(2 * i + 1, int32_data->GetValues(1)[0]); + } +} + } // namespace compute } // namespace arrow From 181584460b5eafc58bac527e0799808d8537cbca Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 25 Sep 2022 07:38:47 -0400 Subject: [PATCH 07/19] fix for C++17 --- cpp/src/arrow/compute/function.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 4a63a34a630..17eff6c87e6 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -31,7 +31,6 @@ #include "arrow/datum.h" #include "arrow/util/cpu_info.h" #include "arrow/util/logging.h" -#include "arrow/util/make_unique.h" #include "arrow/util/tracing_internal.h" namespace arrow { From a025492a9ec851900e477c22810589488b999fa3 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 25 Sep 2022 08:41:38 -0400 Subject: [PATCH 08/19] fix status checking --- cpp/src/arrow/compute/function_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index eec3c22aab2..0e5ebf93028 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -386,7 +386,7 @@ TEST(FunctionExecutor, Basics) { auto out_data = out->array_data(); Int32Builder builder; for (int64_t i = 0; i < args.length; i++) { - builder.Append(vals[0][i] + vals[1][i]); + ARROW_RETURN_NOT_OK(builder.Append(vals[0][i] + vals[1][i])); } ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish()); *out_data.get() = *array->data(); @@ -404,7 +404,7 @@ TEST(FunctionExecutor, Basics) { ASSERT_TRUE(init_called); auto build_array = [](int32_t i) -> Result { Int32Builder builder; - builder.Append(i); + ARROW_RETURN_NOT_OK(builder.Append(i)); ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish()); return Datum(array->data()); }; From 65ef921fcd5784f1ae57c1e2403b800eb4a169af Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Mon, 26 Sep 2022 08:12:24 -0400 Subject: [PATCH 09/19] fix test function options --- cpp/src/arrow/compute/function_test.cc | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index 0e5ebf93028..621594bffe6 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -28,6 +28,7 @@ #include "arrow/compute/api_scalar.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/cast.h" +#include "arrow/compute/function_internal.h" #include "arrow/compute/kernel.h" #include "arrow/datum.h" #include "arrow/status.h" @@ -353,13 +354,28 @@ TEST(ScalarAggregateFunction, DispatchExact) { ASSERT_TRUE(selected_kernel->signature->MatchesInputs(dispatch_args)); } +namespace { + +struct TestFunctionOptions : public FunctionOptions { + TestFunctionOptions(); + + static const char* kTypeName; +}; + +static auto kTestFunctionOptionsType = + internal::GetFunctionOptionsType(); + +TestFunctionOptions::TestFunctionOptions() : FunctionOptions(kTestFunctionOptionsType) {} + +const char* TestFunctionOptions::kTypeName = "test_options"; + +} // namespace + TEST(FunctionExecutor, Basics) { VectorFunction func("vector_test", Arity::Binary(), /*doc=*/FunctionDoc::Empty()); bool init_called = false; ExecContext exec_ctx; - struct TestFunctionOptions : public FunctionOptions { - TestFunctionOptions() : FunctionOptions(NULLPTR) {} - } options; + TestFunctionOptions options; auto init = [&init_called, &exec_ctx, &options]( KernelContext* kernel_ctx, From 516452eec5fbfd078ab8dd5db13c75a1cd750d47 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Tue, 27 Sep 2022 02:43:05 -0400 Subject: [PATCH 10/19] fix symbol exporting --- cpp/src/arrow/compute/function_internal.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/function_internal.h b/cpp/src/arrow/compute/function_internal.h index 445d0d44675..e88175f4b48 100644 --- a/cpp/src/arrow/compute/function_internal.h +++ b/cpp/src/arrow/compute/function_internal.h @@ -74,7 +74,7 @@ Result ValidateEnumValue(CType raw) { return Status::Invalid("Invalid value for ", EnumTraits::name(), ": ", raw); } -class GenericOptionsType : public FunctionOptionsType { +class ARROW_EXPORT GenericOptionsType : public FunctionOptionsType { public: Result> Serialize(const FunctionOptions&) const override; Result> Deserialize( From 62f8884bedcff98753de946900ad8b9b5acffc4b Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 13 Oct 2022 15:50:52 -0400 Subject: [PATCH 11/19] suggested changes --- cpp/src/arrow/compute/exec.h | 5 +++-- cpp/src/arrow/compute/function.h | 6 +++--- cpp/src/arrow/compute/function_internal.cc | 4 ++-- cpp/src/arrow/compute/function_internal.h | 1 + 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 37e00b43fa3..4aac5863923 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -456,8 +456,9 @@ Result> GetFunctionExecutor( /// \brief One-shot executor provider for all types of functions. /// -/// Does kernel dispatch and argument type-checking while iteration of ChunkedArray inputs -/// and wrapping of outputs are deferred to the executor. +/// This function creates and initializes a `FunctionExecutor` appropriate +/// for the given function name, input types (taken from the Datum arguments) +/// and function options. ARROW_EXPORT Result> GetFunctionExecutor( const std::string& func_name, const std::vector& args, diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index 6e846b65519..cab7ec3280c 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -162,10 +162,10 @@ struct ARROW_EXPORT FunctionDoc { /// \brief An executor of a function with a preconfigured kernel struct ARROW_EXPORT FunctionExecutor { virtual ~FunctionExecutor() = default; - /// \brief Initialize a preconfigured kernel + /// \brief Initialize or re-initialize the preconfigured kernel /// - /// This method may be called zero or more times. By default, the kernel is initialized - /// with default function options and exec context. + /// This method may be called zero or more times. Depending on how + /// the FunctionExecutor was obtained, it may already have been initialized. virtual Status Init(const FunctionOptions* options = NULLPTR, ExecContext* exec_ctx = NULLPTR) = 0; /// \brief Execute the preconfigured kernel with arguments that must fit it diff --git a/cpp/src/arrow/compute/function_internal.cc b/cpp/src/arrow/compute/function_internal.cc index 99daaeb2641..7ec2952f442 100644 --- a/cpp/src/arrow/compute/function_internal.cc +++ b/cpp/src/arrow/compute/function_internal.cc @@ -111,8 +111,8 @@ Result> DeserializeFunctionOptions( Status CheckAllArrayOrScalar(const std::vector& values) { for (const auto& value : values) { if (!value.is_value()) { - return Status::Invalid("Tried executing function with non-value type: ", - value.ToString()); + return Status::TypeError("Tried executing function with non-value type: ", + value.ToString()); } } return Status::OK(); diff --git a/cpp/src/arrow/compute/function_internal.h b/cpp/src/arrow/compute/function_internal.h index e88175f4b48..8ed4493e742 100644 --- a/cpp/src/arrow/compute/function_internal.h +++ b/cpp/src/arrow/compute/function_internal.h @@ -666,6 +666,7 @@ const FunctionOptionsType* GetFunctionOptionsType(const Properties&... propertie Status CheckAllArrayOrScalar(const std::vector& values); +ARROW_EXPORT Result> GetFunctionArgumentTypes(const std::vector& args); } // namespace internal From 115a59cec50e362f6b0c1099e4c62ca2856e8a7d Mon Sep 17 00:00:00 2001 From: rtpsw Date: Sun, 6 Nov 2022 16:47:42 +0200 Subject: [PATCH 12/19] Update cpp/src/arrow/compute/exec.h Fix doc Co-authored-by: Antoine Pitrou --- cpp/src/arrow/compute/exec.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 4aac5863923..7c00b1d72cb 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -447,8 +447,8 @@ Result CallFunction(const std::string& func_name, const ExecBatch& batch, /// \brief One-shot executor provider for all types of functions. /// -/// Does kernel dispatch and type checking while iteration of ChunkedArray inputs -/// and wrapping of outputs are deferred to the executor. +/// This function creates and initializes a `FunctionExecutor` appropriate +/// for the given function name, input types and function options. ARROW_EXPORT Result> GetFunctionExecutor( const std::string& func_name, std::vector in_types, From 88f26507339545326c7643e1fcc9ec0757dd4c61 Mon Sep 17 00:00:00 2001 From: rtpsw Date: Sun, 6 Nov 2022 16:48:00 +0200 Subject: [PATCH 13/19] Update cpp/src/arrow/compute/function.h Fix doc Co-authored-by: Antoine Pitrou --- cpp/src/arrow/compute/function.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index cab7ec3280c..b094a0e5c8f 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -240,6 +240,9 @@ class ARROW_EXPORT Function { virtual Result DispatchBest(std::vector* values) const; /// \brief Get a function executor with a best-matching kernel + /// + /// The returned executor will by default work with the default FunctionOptions + /// and KernelContext. If you want to change that, call `FunctionExecutor::Init`. virtual Result> GetBestExecutor( std::vector inputs) const; From cdb7ded82322a6c9a5f5061b28c8e7c0917423f0 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Tue, 8 Nov 2022 03:22:21 -0500 Subject: [PATCH 14/19] requested fixes --- cpp/src/arrow/compute/exec.h | 6 +-- cpp/src/arrow/compute/function.cc | 16 +++--- cpp/src/arrow/compute/function.h | 15 ++++-- cpp/src/arrow/compute/function_internal.cc | 4 +- cpp/src/arrow/compute/function_test.cc | 61 +++++++++++----------- cpp/src/arrow/compute/type_fwd.h | 2 + docs/source/cpp/api/compute.rst | 7 +++ 7 files changed, 63 insertions(+), 48 deletions(-) diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 7c00b1d72cb..a1f72b3e501 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -30,7 +30,7 @@ #include "arrow/array/data.h" #include "arrow/compute/exec/expression.h" -#include "arrow/compute/registry.h" +#include "arrow/compute/type_fwd.h" #include "arrow/datum.h" #include "arrow/result.h" #include "arrow/type_fwd.h" @@ -47,10 +47,6 @@ class CpuInfo; namespace compute { -struct FunctionExecutor; -class FunctionOptions; -class FunctionRegistry; - // It seems like 64K might be a good default chunksize to use for execution // based on the experience of other query processing systems. The current // default is not to chunk contiguous arrays, though, but this may change in diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 17eff6c87e6..732c92ccc45 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -219,7 +219,7 @@ struct FunctionExecutorImpl : public FunctionExecutor { util::tracing::Span span; auto func_kind = func.kind(); - auto func_name = func.name(); + const auto& func_name = func.name(); START_COMPUTE_SPAN(span, func_name, {{"function.name", func_name}, {"function.options", options ? options->ToString() : ""}, @@ -230,14 +230,14 @@ struct FunctionExecutorImpl : public FunctionExecutor { } ExecContext* ctx = kernel_ctx.exec_context(); // Cast arguments if necessary - std::vector args_with_cast; + std::vector args_with_cast(args.size()); for (size_t i = 0; i != args.size(); ++i) { const auto& in_type = in_types[i]; auto arg = args[i]; if (in_type != args[i].type()) { ARROW_ASSIGN_OR_RAISE(arg, Cast(args[i], CastOptions::Safe(in_type), ctx)); } - args_with_cast.push_back(arg); + args_with_cast[i] = std::move(arg); } detail::DatumAccumulator listener; @@ -255,12 +255,14 @@ struct FunctionExecutorImpl : public FunctionExecutor { if (passed_length != -1 && passed_length != inferred_length) { return Status::Invalid( "Passed batch length for execution did not match actual" - " length of values for scalar function execution"); + " length of values for execution of scalar function '", + func_name, "'"); } } else if (func_kind == Function::VECTOR) { - auto vkernel = static_cast(kernel); - if (!(all_same_length || !vkernel->can_execute_chunkwise)) { - return Status::Invalid("Vector kernel arguments must all be the same length"); + auto vkernel = checked_cast(kernel); + if (!all_same_length && vkernel->can_execute_chunkwise) { + return Status::Invalid("Arguments for execution of vector kernel function '", + func_name, "' must all be the same length"); } } } diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index b094a0e5c8f..8a1b0da424a 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -160,7 +160,8 @@ struct ARROW_EXPORT FunctionDoc { }; /// \brief An executor of a function with a preconfigured kernel -struct ARROW_EXPORT FunctionExecutor { +class ARROW_EXPORT FunctionExecutor { + public: virtual ~FunctionExecutor() = default; /// \brief Initialize or re-initialize the preconfigured kernel /// @@ -169,8 +170,16 @@ struct ARROW_EXPORT FunctionExecutor { virtual Status Init(const FunctionOptions* options = NULLPTR, ExecContext* exec_ctx = NULLPTR) = 0; /// \brief Execute the preconfigured kernel with arguments that must fit it - virtual Result Execute(const std::vector& args, - int64_t passed_length = -1) = 0; + /// + /// The method requires the arguments be castable to the preconfigured types. + /// + /// \param[in] args Arguments to execute the function on + /// \param[in] length Length of arguments batch or -1 to default it. If the + /// function has no parameters, this determines the batch length, defaulting + /// to 0. Otherwise, if the function is scalar, this must equal the argument + /// batch's inferred length or be -1 to default to it. This is ignored for + /// vector functions. + virtual Result Execute(const std::vector& args, int64_t length = -1) = 0; }; /// \brief Base class for compute functions. Function implementations contain a diff --git a/cpp/src/arrow/compute/function_internal.cc b/cpp/src/arrow/compute/function_internal.cc index 7ec2952f442..cd73462e953 100644 --- a/cpp/src/arrow/compute/function_internal.cc +++ b/cpp/src/arrow/compute/function_internal.cc @@ -111,8 +111,8 @@ Result> DeserializeFunctionOptions( Status CheckAllArrayOrScalar(const std::vector& values) { for (const auto& value : values) { if (!value.is_value()) { - return Status::TypeError("Tried executing function with non-value type: ", - value.ToString()); + return Status::TypeError( + "Tried executing function with non-array, non-scalar type: ", value.ToString()); } } return Status::OK(); diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index 621594bffe6..4e56c4fcb1b 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -310,7 +310,7 @@ TEST(ScalarAggregateFunction, Basics) { } Result> NoopInit(KernelContext*, const KernelInitArgs&) { - return NULLPTR; + return nullptr; } Status NoopConsume(KernelContext*, const ExecSpan&) { return Status::OK(); } @@ -387,25 +387,27 @@ TEST(FunctionExecutor, Basics) { if (&options != init_args.options) { return Status::Invalid("expected options not found in kernel init args"); } - return NULLPTR; + return nullptr; }; - auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) { - DCHECK_EQ(2, args.values.size()); - const int32_t* vals[2]; - for (size_t i = 0; i < 2; i++) { - DCHECK(args.values[i].is_array()); - const ArraySpan& array = args.values[i].array; - DCHECK_EQ(*int32(), *array.type); - vals[i] = array.GetValues(1); - } - DCHECK(out->is_array_data()); - auto out_data = out->array_data(); - Int32Builder builder; - for (int64_t i = 0; i < args.length; i++) { - ARROW_RETURN_NOT_OK(builder.Append(vals[0][i] + vals[1][i])); - } - ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish()); - *out_data.get() = *array->data(); + auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) -> Status { + [&]() { // gtest ASSERT macros require a void function + ASSERT_EQ(2, args.values.size()); + const int32_t* vals[2]; + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(args.values[i].is_array()); + const ArraySpan& array = args.values[i].array; + ASSERT_EQ(*int32(), *array.type); + vals[i] = array.GetValues(1); + } + ASSERT_TRUE(out->is_array_data()); + auto out_data = out->array_data(); + Int32Builder builder; + for (int64_t i = 0; i < args.length; i++) { + ASSERT_OK(builder.Append(vals[0][i] + vals[1][i])); + } + ASSERT_OK_AND_ASSIGN(auto array, builder.Finish()); + *out_data.get() = *array->data(); + }(); return Status::OK(); }; std::vector in_types = {int32(), int32()}; @@ -418,21 +420,18 @@ TEST(FunctionExecutor, Basics) { ASSERT_FALSE(init_called); ASSERT_OK(func_exec->Init(&options, &exec_ctx)); ASSERT_TRUE(init_called); - auto build_array = [](int32_t i) -> Result { - Int32Builder builder; - ARROW_RETURN_NOT_OK(builder.Append(i)); - ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish()); - return Datum(array->data()); - }; + std::vector> arrays = { + ArrayFromJSON(int32(), "[1]"), ArrayFromJSON(int32(), "[2]"), + ArrayFromJSON(int32(), "[3]"), ArrayFromJSON(int32(), "[4]")}; + std::vector> expected = {ArrayFromJSON(int32(), "[3]"), + ArrayFromJSON(int32(), "[5]"), + ArrayFromJSON(int32(), "[7]")}; for (int32_t i = 1; i <= 3; i++) { - ASSERT_OK_AND_ASSIGN(auto value0, build_array(i)); - ASSERT_OK_AND_ASSIGN(auto value1, build_array(i + 1)); - std::vector values = {value0, value1}; + std::vector values = {arrays[i - 1], arrays[i]}; ASSERT_OK_AND_ASSIGN(auto result, func_exec->Execute(values, 1)); ASSERT_TRUE(result.is_array()); - auto int32_data = dynamic_cast(result.array().get()); - ASSERT_NE(NULLPTR, int32_data); - EXPECT_EQ(2 * i + 1, int32_data->GetValues(1)[0]); + auto actual = result.make_array(); + AssertArraysEqual(*expected[i - 1], *actual); } } diff --git a/cpp/src/arrow/compute/type_fwd.h b/cpp/src/arrow/compute/type_fwd.h index 11c45fde091..5f70e98931d 100644 --- a/cpp/src/arrow/compute/type_fwd.h +++ b/cpp/src/arrow/compute/type_fwd.h @@ -28,6 +28,8 @@ namespace compute { class Function; class FunctionOptions; +struct FunctionExecutor; +class FunctionRegistry; class CastOptions; diff --git a/docs/source/cpp/api/compute.rst b/docs/source/cpp/api/compute.rst index 288e280d0cf..5e490fc4089 100644 --- a/docs/source/cpp/api/compute.rst +++ b/docs/source/cpp/api/compute.rst @@ -31,6 +31,13 @@ Abstract Function classes :content-only: :members: +Function execution +------------------ + +.. doxygengroup:: compute-functions-executor + :content-only: + :members: + Function registry ----------------- From d02b58bc7eba1211c6b11b0da75ed15856db826c Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Tue, 8 Nov 2022 03:35:16 -0500 Subject: [PATCH 15/19] fix cast --- cpp/src/arrow/compute/function.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 732c92ccc45..f7ba5224175 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -259,7 +259,7 @@ struct FunctionExecutorImpl : public FunctionExecutor { func_name, "'"); } } else if (func_kind == Function::VECTOR) { - auto vkernel = checked_cast(kernel); + auto vkernel = static_cast(kernel); if (!all_same_length && vkernel->can_execute_chunkwise) { return Status::Invalid("Arguments for execution of vector kernel function '", func_name, "' must all be the same length"); From 43c529aa7be1fdcdb02c8e32cc351b05cc761679 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Tue, 8 Nov 2022 03:50:09 -0500 Subject: [PATCH 16/19] fix forward declaration --- cpp/src/arrow/compute/type_fwd.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/type_fwd.h b/cpp/src/arrow/compute/type_fwd.h index 5f70e98931d..3849525c02c 100644 --- a/cpp/src/arrow/compute/type_fwd.h +++ b/cpp/src/arrow/compute/type_fwd.h @@ -28,7 +28,7 @@ namespace compute { class Function; class FunctionOptions; -struct FunctionExecutor; +class FunctionExecutor; class FunctionRegistry; class CastOptions; From 798b17e77814e66a5dd79f3ee1e963f9cdadfd5e Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 10 Nov 2022 03:41:09 -0500 Subject: [PATCH 17/19] add tests --- cpp/src/arrow/compute/exec.cc | 2 + cpp/src/arrow/compute/exec_test.cc | 207 ++++++++++++++++++++++--- cpp/src/arrow/compute/function.cc | 7 + cpp/src/arrow/compute/function_test.cc | 46 ++++-- 4 files changed, 224 insertions(+), 38 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 96ecf627864..e7ba61faa09 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -862,6 +862,7 @@ class ScalarExecutor : public KernelExecutorImpl { } } if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + data_preallocated_.clear(); ComputeDataPreallocate(*output_type_.type, &data_preallocated_); } @@ -945,6 +946,7 @@ class VectorExecutor : public KernelExecutorImpl { (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE && kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL); if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + data_preallocated_.clear(); ComputeDataPreallocate(*output_type_.type, &data_preallocated_); } diff --git a/cpp/src/arrow/compute/exec_test.cc b/cpp/src/arrow/compute/exec_test.cc index eac18f194d2..602f5e4ca03 100644 --- a/cpp/src/arrow/compute/exec_test.cc +++ b/cpp/src/arrow/compute/exec_test.cc @@ -896,7 +896,7 @@ struct ExampleState : public KernelState { Result> InitStateful(KernelContext*, const KernelInitArgs& args) { auto func_options = static_cast(args.options); - return std::make_unique(func_options->value); + return std::make_unique(func_options ? func_options->value : nullptr); } Status ExecStateful(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { @@ -1010,36 +1010,134 @@ class TestCallScalarFunction : public TestComputeInternals { bool TestCallScalarFunction::initialized_ = false; -TEST_F(TestCallScalarFunction, ArgumentValidation) { +class FunctionCaller { + public: + virtual ~FunctionCaller() = default; + + virtual Result Call(const std::vector& args, + const FunctionOptions* options, + ExecContext* ctx = NULLPTR) = 0; + virtual Result Call(const std::vector& args, + ExecContext* ctx = NULLPTR) = 0; +}; + +using FunctionCallerMaker = std::function>( + const std::string& func_name, std::vector in_types)>; + +class SimpleFunctionCaller : public FunctionCaller { + public: + explicit SimpleFunctionCaller(const std::string& func_name) : func_name(func_name) {} + + static Result> Make(const std::string& func_name) { + return std::make_shared(func_name); + } + + static Result> Maker(const std::string& func_name, + std::vector in_types) { + return Make(func_name); + } + + Result Call(const std::vector& args, const FunctionOptions* options, + ExecContext* ctx) override { + return CallFunction(func_name, args, options, ctx); + } + Result Call(const std::vector& args, ExecContext* ctx) override { + return CallFunction(func_name, args, ctx); + } + + std::string func_name; +}; + +class ExecFunctionCaller : public FunctionCaller { + public: + explicit ExecFunctionCaller(std::shared_ptr func_exec) + : func_exec(std::move(func_exec)) {} + + static Result> Make( + const std::string& func_name, const std::vector& args, + const FunctionOptions* options = nullptr, + FunctionRegistry* func_registry = nullptr) { + ARROW_ASSIGN_OR_RAISE(auto func_exec, + GetFunctionExecutor(func_name, args, options, func_registry)); + return std::make_shared(std::move(func_exec)); + } + + static Result> Make( + const std::string& func_name, std::vector in_types, + const FunctionOptions* options = nullptr, + FunctionRegistry* func_registry = nullptr) { + ARROW_ASSIGN_OR_RAISE( + auto func_exec, GetFunctionExecutor(func_name, in_types, options, func_registry)); + return std::make_shared(std::move(func_exec)); + } + + static Result> Maker(const std::string& func_name, + std::vector in_types) { + return Make(func_name, std::move(in_types)); + } + + Result Call(const std::vector& args, const FunctionOptions* options, + ExecContext* ctx) override { + ARROW_RETURN_NOT_OK(func_exec->Init(options, ctx)); + return func_exec->Execute(args); + } + Result Call(const std::vector& args, ExecContext* ctx) override { + return Call(args, nullptr, ctx); + } + + std::shared_ptr func_exec; +}; + +class TestCallScalarFunctionArgumentValidation : public TestCallScalarFunction { + protected: + void DoTest(FunctionCallerMaker caller_maker); +}; + +void TestCallScalarFunctionArgumentValidation::DoTest(FunctionCallerMaker caller_maker) { + ASSERT_OK_AND_ASSIGN(auto test_copy, caller_maker("test_copy", {int32()})); + // Copy accepts only a single array argument Datum d1(GetInt32Array(10)); // Too many args std::vector args = {d1, d1}; - ASSERT_RAISES(Invalid, CallFunction("test_copy", args)); + ASSERT_RAISES(Invalid, test_copy->Call(args)); // Too few args = {}; - ASSERT_RAISES(Invalid, CallFunction("test_copy", args)); + ASSERT_RAISES(Invalid, test_copy->Call(args)); // Cannot do scalar Datum d1_scalar(std::make_shared(5)); - ASSERT_OK_AND_ASSIGN(auto result, CallFunction("test_copy", {d1})); - ASSERT_OK_AND_ASSIGN(result, CallFunction("test_copy", {d1_scalar})); + ASSERT_OK_AND_ASSIGN(auto result, test_copy->Call({d1})); + ASSERT_OK_AND_ASSIGN(result, test_copy->Call({d1_scalar})); +} + +TEST_F(TestCallScalarFunctionArgumentValidation, SimpleCall) { + TestCallScalarFunctionArgumentValidation::DoTest(SimpleFunctionCaller::Maker); +} + +TEST_F(TestCallScalarFunctionArgumentValidation, ExecCall) { + TestCallScalarFunctionArgumentValidation::DoTest(ExecFunctionCaller::Maker); } -TEST_F(TestCallScalarFunction, PreallocationCases) { +class TestCallScalarFunctionPreallocationCases : public TestCallScalarFunction { + protected: + void DoTest(FunctionCallerMaker caller_maker); +}; + +void TestCallScalarFunctionPreallocationCases::DoTest(FunctionCallerMaker caller_maker) { double null_prob = 0.2; auto arr = GetUInt8Array(100, null_prob); - auto CheckFunction = [&](std::string func_name) { + auto CheckFunction = [&](std::shared_ptr test_copy) { ResetContexts(); // The default should be a single array output { std::vector args = {Datum(arr)}; - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args)); + ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args)); ASSERT_EQ(Datum::ARRAY, result.kind()); AssertArraysEqual(*arr, *result.make_array()); } @@ -1049,7 +1147,7 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { { std::vector args = {Datum(arr)}; exec_ctx_->set_exec_chunksize(80); - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); + ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args, exec_ctx_.get())); AssertArraysEqual(*arr, *result.make_array()); } @@ -1057,7 +1155,7 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { // Chunksize not multiple of 8 std::vector args = {Datum(arr)}; exec_ctx_->set_exec_chunksize(11); - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); + ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args, exec_ctx_.get())); AssertArraysEqual(*arr, *result.make_array()); } @@ -1066,7 +1164,7 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { auto carr = std::make_shared(ArrayVector{arr->Slice(0, 10), arr->Slice(10)}); std::vector args = {Datum(carr)}; - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); + ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args, exec_ctx_.get())); std::shared_ptr actual = result.chunked_array(); ASSERT_EQ(1, actual->num_chunks()); AssertChunkedEquivalent(*carr, *actual); @@ -1077,7 +1175,7 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { std::vector args = {Datum(arr)}; exec_ctx_->set_preallocate_contiguous(false); exec_ctx_->set_exec_chunksize(40); - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); + ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args, exec_ctx_.get())); ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); const ChunkedArray& carr = *result.chunked_array(); ASSERT_EQ(3, carr.num_chunks()); @@ -1087,11 +1185,28 @@ TEST_F(TestCallScalarFunction, PreallocationCases) { } }; - CheckFunction("test_copy"); - CheckFunction("test_copy_computed_bitmap"); + ASSERT_OK_AND_ASSIGN(auto test_copy, caller_maker("test_copy", {uint8()})); + CheckFunction(test_copy); + ASSERT_OK_AND_ASSIGN(auto test_copy_computed_bitmap, + caller_maker("test_copy_computed_bitmap", {uint8()})); + CheckFunction(test_copy_computed_bitmap); +} + +TEST_F(TestCallScalarFunctionPreallocationCases, SimpleCaller) { + TestCallScalarFunctionPreallocationCases::DoTest(SimpleFunctionCaller::Maker); } -TEST_F(TestCallScalarFunction, BasicNonStandardCases) { +TEST_F(TestCallScalarFunctionPreallocationCases, ExecCaller) { + TestCallScalarFunctionPreallocationCases::DoTest(ExecFunctionCaller::Maker); +} + +class TestCallScalarFunctionBasicNonStandardCases : public TestCallScalarFunction { + protected: + void DoTest(FunctionCallerMaker caller_maker); +}; + +void TestCallScalarFunctionBasicNonStandardCases::DoTest( + FunctionCallerMaker caller_maker) { // Test a handful of cases // // * Validity bitmap computed by kernel rather than using PropagateNulls @@ -1103,19 +1218,19 @@ TEST_F(TestCallScalarFunction, BasicNonStandardCases) { auto arr = GetUInt8Array(1000, null_prob); std::vector args = {Datum(arr)}; - auto CheckFunction = [&](std::string func_name) { + auto CheckFunction = [&](std::shared_ptr test_nopre) { ResetContexts(); // The default should be a single array output { - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args)); + ASSERT_OK_AND_ASSIGN(Datum result, test_nopre->Call(args)); AssertArraysEqual(*arr, *result.make_array(), true); } // Split execution into 3 chunks { exec_ctx_->set_exec_chunksize(400); - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get())); + ASSERT_OK_AND_ASSIGN(Datum result, test_nopre->Call(args, exec_ctx_.get())); ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); const ChunkedArray& carr = *result.chunked_array(); ASSERT_EQ(3, carr.num_chunks()); @@ -1125,31 +1240,73 @@ TEST_F(TestCallScalarFunction, BasicNonStandardCases) { } }; - CheckFunction("test_nopre_data"); - CheckFunction("test_nopre_validity_or_data"); + ASSERT_OK_AND_ASSIGN(auto test_nopre_data, caller_maker("test_nopre_data", {uint8()})); + CheckFunction(test_nopre_data); + ASSERT_OK_AND_ASSIGN(auto test_nopre_validity_or_data, + caller_maker("test_nopre_validity_or_data", {uint8()})); + CheckFunction(test_nopre_validity_or_data); +} + +TEST_F(TestCallScalarFunctionBasicNonStandardCases, SimpleCall) { + TestCallScalarFunctionBasicNonStandardCases::DoTest(SimpleFunctionCaller::Maker); +} + +TEST_F(TestCallScalarFunctionBasicNonStandardCases, ExecCall) { + TestCallScalarFunctionBasicNonStandardCases::DoTest(ExecFunctionCaller::Maker); } -TEST_F(TestCallScalarFunction, StatefulKernel) { +class TestCallScalarFunctionStatefulKernel : public TestCallScalarFunction { + protected: + void DoTest(FunctionCallerMaker caller_maker); +}; + +void TestCallScalarFunctionStatefulKernel::DoTest(FunctionCallerMaker caller_maker) { + ASSERT_OK_AND_ASSIGN(auto test_stateful, caller_maker("test_stateful", {int32()})); + auto input = ArrayFromJSON(int32(), "[1, 2, 3, null, 5]"); auto multiplier = std::make_shared(2); auto expected = ArrayFromJSON(int32(), "[2, 4, 6, null, 10]"); ExampleOptions options(multiplier); std::vector args = {Datum(input)}; - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction("test_stateful", args, &options)); + ASSERT_OK_AND_ASSIGN(Datum result, test_stateful->Call(args, &options)); AssertArraysEqual(*expected, *result.make_array()); } -TEST_F(TestCallScalarFunction, ScalarFunction) { +TEST_F(TestCallScalarFunctionStatefulKernel, Simplecall) { + TestCallScalarFunctionStatefulKernel::DoTest(SimpleFunctionCaller::Maker); +} + +TEST_F(TestCallScalarFunctionStatefulKernel, ExecCall) { + TestCallScalarFunctionStatefulKernel::DoTest(ExecFunctionCaller::Maker); +} + +class TestCallScalarFunctionScalarFunction : public TestCallScalarFunction { + protected: + void DoTest(FunctionCallerMaker caller_maker); +}; + +void TestCallScalarFunctionScalarFunction::DoTest(FunctionCallerMaker caller_maker) { + ASSERT_OK_AND_ASSIGN(auto test_scalar_add_int32, + caller_maker("test_scalar_add_int32", {int32(), int32()})); + std::vector args = {Datum(std::make_shared(5)), Datum(std::make_shared(7))}; - ASSERT_OK_AND_ASSIGN(Datum result, CallFunction("test_scalar_add_int32", args)); + ASSERT_OK_AND_ASSIGN(Datum result, test_scalar_add_int32->Call(args)); ASSERT_EQ(Datum::SCALAR, result.kind()); auto expected = std::make_shared(12); ASSERT_TRUE(expected->Equals(*result.scalar())); } +TEST_F(TestCallScalarFunctionScalarFunction, SimpleCall) { + TestCallScalarFunctionScalarFunction::DoTest(SimpleFunctionCaller::Maker); +} + +TEST_F(TestCallScalarFunctionScalarFunction, ExecCall) { + TestCallScalarFunctionScalarFunction::DoTest(ExecFunctionCaller::Maker); +} + } // namespace detail } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index f7ba5224175..90e754f6150 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -211,6 +211,9 @@ struct FunctionExecutorImpl : public FunctionExecutor { } Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override { + if (exec_ctx == NULLPTR) { + exec_ctx = default_exec_context(); + } kernel_ctx = KernelContext{exec_ctx, kernel}; return KernelInit(options); } @@ -225,6 +228,10 @@ struct FunctionExecutorImpl : public FunctionExecutor { {"function.options", options ? options->ToString() : ""}, {"function.kind", func_kind}}); + if (in_types.size() != args.size()) { + return Status::Invalid("Execution of '", func_name, "' expected ", in_types.size(), + " arguments but got ", args.size()); + } if (!inited) { ARROW_RETURN_NOT_OK(Init(NULLPTR, default_exec_context())); } diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index 4e56c4fcb1b..470325db1b9 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -33,6 +33,7 @@ #include "arrow/datum.h" #include "arrow/status.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" #include "arrow/type.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" @@ -360,6 +361,8 @@ struct TestFunctionOptions : public FunctionOptions { TestFunctionOptions(); static const char* kTypeName; + + int value; }; static auto kTestFunctionOptionsType = @@ -373,20 +376,27 @@ const char* TestFunctionOptions::kTypeName = "test_options"; TEST(FunctionExecutor, Basics) { VectorFunction func("vector_test", Arity::Binary(), /*doc=*/FunctionDoc::Empty()); - bool init_called = false; + int init_calls = 0; + int expected_optval = 0; ExecContext exec_ctx; TestFunctionOptions options; + options.value = 1; auto init = - [&init_called, &exec_ctx, &options]( - KernelContext* kernel_ctx, + [&](KernelContext* kernel_ctx, const KernelInitArgs& init_args) -> Result> { - init_called = true; if (&exec_ctx != kernel_ctx->exec_context()) { return Status::Invalid("expected exec context not found in kernel context"); } + if (init_args.options != nullptr) { + const auto* test_opts = checked_cast(init_args.options); + if (test_opts->value != expected_optval) { + return Status::Invalid("bad options value"); + } + } if (&options != init_args.options) { return Status::Invalid("expected options not found in kernel init args"); } + ++init_calls; return nullptr; }; auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) -> Status { @@ -417,21 +427,31 @@ TEST(FunctionExecutor, Basics) { ASSERT_EQ(exec, static_cast(dispatched)->exec); std::vector inputs = {int32(), int32()}; ASSERT_OK_AND_ASSIGN(auto func_exec, func.GetBestExecutor(inputs)); - ASSERT_FALSE(init_called); - ASSERT_OK(func_exec->Init(&options, &exec_ctx)); - ASSERT_TRUE(init_called); + ASSERT_EQ(0, init_calls); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("options not found"), + func_exec->Init(nullptr, &exec_ctx)); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("bad options value"), + func_exec->Init(&options, &exec_ctx)); + ExecContext other_exec_ctx; + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("exec context not found"), + func_exec->Init(&options, &other_exec_ctx)); std::vector> arrays = { ArrayFromJSON(int32(), "[1]"), ArrayFromJSON(int32(), "[2]"), ArrayFromJSON(int32(), "[3]"), ArrayFromJSON(int32(), "[4]")}; std::vector> expected = {ArrayFromJSON(int32(), "[3]"), ArrayFromJSON(int32(), "[5]"), ArrayFromJSON(int32(), "[7]")}; - for (int32_t i = 1; i <= 3; i++) { - std::vector values = {arrays[i - 1], arrays[i]}; - ASSERT_OK_AND_ASSIGN(auto result, func_exec->Execute(values, 1)); - ASSERT_TRUE(result.is_array()); - auto actual = result.make_array(); - AssertArraysEqual(*expected[i - 1], *actual); + for (int n = 1; n <= 3; n++) { + expected_optval = options.value = n; + ASSERT_OK(func_exec->Init(&options, &exec_ctx)); + ASSERT_EQ(n, init_calls); + for (int32_t i = 1; i <= 3; i++) { + std::vector values = {arrays[i - 1], arrays[i]}; + ASSERT_OK_AND_ASSIGN(auto result, func_exec->Execute(values, 1)); + ASSERT_TRUE(result.is_array()); + auto actual = result.make_array(); + AssertArraysEqual(*expected[i - 1], *actual); + } } } From 0530446c788abb2df5baf8e6c36c9550104d7c39 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 10 Nov 2022 06:37:52 -0500 Subject: [PATCH 18/19] fix test_compute.py --- python/pyarrow/tests/test_compute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index c41c4fa9b3b..fb0c209e9a7 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -2894,5 +2894,5 @@ def test_expression_call_function(): def test_cast_table_raises(): table = pa.table({'a': [1, 2]}) - with pytest.raises(pa.lib.ArrowInvalid): + with pytest.raises(pa.lib.ArrowTypeError): pc.cast(table, pa.int64()) From 8e29636d170afafd1301e3dca8c7ac028be8ca73 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 10 Nov 2022 14:38:04 +0100 Subject: [PATCH 19/19] Nits --- cpp/src/arrow/compute/function_test.cc | 15 ++++++++------- cpp/src/arrow/compute/type_fwd.h | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index 470325db1b9..b71e5a12b50 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -406,7 +406,7 @@ TEST(FunctionExecutor, Basics) { for (size_t i = 0; i < 2; i++) { ASSERT_TRUE(args.values[i].is_array()); const ArraySpan& array = args.values[i].array; - ASSERT_EQ(*int32(), *array.type); + ASSERT_EQ(array.type->id(), Type::INT32); vals[i] = array.GetValues(1); } ASSERT_TRUE(out->is_array_data()); @@ -423,9 +423,11 @@ TEST(FunctionExecutor, Basics) { std::vector in_types = {int32(), int32()}; OutputType out_type = int32(); ASSERT_OK(func.AddKernel(in_types, out_type, exec, init)); + ASSERT_OK_AND_ASSIGN(const Kernel* dispatched, func.DispatchExact({int32(), int32()})); ASSERT_EQ(exec, static_cast(dispatched)->exec); std::vector inputs = {int32(), int32()}; + ASSERT_OK_AND_ASSIGN(auto func_exec, func.GetBestExecutor(inputs)); ASSERT_EQ(0, init_calls); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("options not found"), @@ -435,12 +437,11 @@ TEST(FunctionExecutor, Basics) { ExecContext other_exec_ctx; EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("exec context not found"), func_exec->Init(&options, &other_exec_ctx)); - std::vector> arrays = { - ArrayFromJSON(int32(), "[1]"), ArrayFromJSON(int32(), "[2]"), - ArrayFromJSON(int32(), "[3]"), ArrayFromJSON(int32(), "[4]")}; - std::vector> expected = {ArrayFromJSON(int32(), "[3]"), - ArrayFromJSON(int32(), "[5]"), - ArrayFromJSON(int32(), "[7]")}; + + ArrayVector arrays = {ArrayFromJSON(int32(), "[1]"), ArrayFromJSON(int32(), "[2]"), + ArrayFromJSON(int32(), "[3]"), ArrayFromJSON(int32(), "[4]")}; + ArrayVector expected = {ArrayFromJSON(int32(), "[3]"), ArrayFromJSON(int32(), "[5]"), + ArrayFromJSON(int32(), "[7]")}; for (int n = 1; n <= 3; n++) { expected_optval = options.value = n; ASSERT_OK(func_exec->Init(&options, &exec_ctx)); diff --git a/cpp/src/arrow/compute/type_fwd.h b/cpp/src/arrow/compute/type_fwd.h index 3849525c02c..67dc5a278b4 100644 --- a/cpp/src/arrow/compute/type_fwd.h +++ b/cpp/src/arrow/compute/type_fwd.h @@ -27,8 +27,8 @@ struct TypeHolder; namespace compute { class Function; -class FunctionOptions; class FunctionExecutor; +class FunctionOptions; class FunctionRegistry; class CastOptions;