diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 0e8a7c68b47..75d02f15713 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -60,29 +60,38 @@ static constexpr int64_t kDefaultExecChunksize = UINT16_MAX; /// function evaluation class ARROW_EXPORT ExecContext { public: - // If no function registry passed, the default is used + // If no function registry passed, the default is used. explicit ExecContext(MemoryPool* pool = default_memory_pool(), FunctionRegistry* func_registry = NULLPTR); + /// \brief The MemoryPool used for allocations, default is + /// default_memory_pool(). MemoryPool* memory_pool() const { return pool_; } ::arrow::internal::CpuInfo* cpu_info() const; + /// \brief The FunctionRegistry for looking up functions by name and + /// selecting kernels for execution. Defaults to the library-global function + /// registry provided by GetFunctionRegistry. FunctionRegistry* func_registry() const { return func_registry_; } - // \brief Set maximum length unit of work for kernel execution. Larger inputs - // will be split into smaller chunks, and, if desired, processed in - // parallel. Set to -1 for no limit + // \brief Set maximum length unit of work for kernel execution. Larger + // contiguous array inputs will be split into smaller chunks, and, if + // possible and enabled, processed in parallel. The default chunksize is + // INT64_MAX, so contiguous arrays are not split. void set_exec_chunksize(int64_t chunksize) { exec_chunksize_ = chunksize; } - // \brief Maximum length unit of work for kernel execution. + // \brief Maximum length for ExecBatch data chunks processed by + // kernels. Contiguous array inputs with longer length will be split into + // smaller chunks. int64_t exec_chunksize() const { return exec_chunksize_; } - /// \brief Set whether to use multiple threads for function execution + /// \brief Set whether to use multiple threads for function execution. This + /// is not yet used. void set_use_threads(bool use_threads = true) { use_threads_ = use_threads; } /// \brief If true, then utilize multiple threads where relevant for function - /// execution + /// execution. This is not yet used. bool use_threads() const { return use_threads_; } // Set the preallocation strategy for kernel execution as it relates to @@ -94,12 +103,15 @@ class ARROW_EXPORT ExecContext { // chunk of execution // // TODO: At some point we might want the limit the size of contiguous - // preallocations (for example, merging small ChunkedArray chunks until - // reaching some desired size) + // preallocations. For example, even if the exec_chunksize is 64K or less, we + // might limit contiguous allocations to 1M records, say. void set_preallocate_contiguous(bool preallocate) { preallocate_contiguous_ = preallocate; } + /// \brief If contiguous preallocations should be used when doing chunked + /// execution as specified by exec_chunksize(). See + /// set_preallocate_contiguous() for more information. bool preallocate_contiguous() const { return preallocate_contiguous_; } private: @@ -146,22 +158,53 @@ class ARROW_EXPORT SelectionVector { /// Array and Scalar values and an optional SelectionVector indicating that /// there is an unmaterialized filter that either must be materialized, or (if /// the kernel supports it) pushed down into the kernel implementation. +/// +/// ExecBatch is semantically similar to RecordBatch in that in a SQL context +/// it represents a collection of records, but constant "columns" are +/// represented by Scalar values rather than having to be converted into arrays +/// with repeated values. +/// +/// TODO: Datum uses arrow/util/variant.h which may be a bit heavier-weight +/// than is desirable for this class. Microbenchmarks would help determine for +/// sure. See ARROW-8928. struct ExecBatch { ExecBatch() {} ExecBatch(std::vector values, int64_t length) : values(std::move(values)), length(length) {} + /// The values representing positional arguments to be passed to a kernel's + /// exec function for processing. std::vector values; + + /// A deferred filter represented as an array of indices into the values. + /// + /// For example, the filter [true, true, false, true] would be represented as + /// the selection vector [0, 1, 3]. When the selection vector is set, + /// ExecBatch::length is equal to the length of this array. std::shared_ptr selection_vector; + + /// The semantic length of the ExecBatch. When the values are all scalars, + /// the length should be set to 1, otherwise the length is taken from the + /// array values, except when there is a selection vector. When there is a + /// selection vector set, the length of the batch is the length of the + /// selection. + /// + /// If the array values are of length 0 then the length is 0 regardless of + /// whether any values are Scalar. In general ExecBatch objects are produced + /// by ExecBatchIterator which by design does not yield length-0 batches. int64_t length; + /// \brief Return the value at the i-th index template inline const Datum& operator[](index_type i) const { return values[i]; } + /// \brief A convenience for the number of values / arguments. int num_values() const { return static_cast(values.size()); } + /// \brief A convenience for returning the ValueDescr objects (types and + /// shapes) from the batch. std::vector GetDescriptors() const { std::vector result; for (const auto& value : this->values) { @@ -178,7 +221,7 @@ ARROW_EXPORT Result CallFunction(const std::string& func_name, const std::vector& args, const FunctionOptions* options, ExecContext* ctx = NULLPTR); -/// \brief Variant of CallFunction for functions not requiring options +/// \brief Variant of CallFunction for functions not requiring options. ARROW_EXPORT Result CallFunction(const std::string& func_name, const std::vector& args, ExecContext* ctx = NULLPTR); diff --git a/cpp/src/arrow/compute/exec_test.cc b/cpp/src/arrow/compute/exec_test.cc index 45c1fe6dbc1..da297e9b084 100644 --- a/cpp/src/arrow/compute/exec_test.cc +++ b/cpp/src/arrow/compute/exec_test.cc @@ -645,7 +645,7 @@ class TestCallScalarFunction : public TestComputeInternals { // This function simply copies memory from the input argument into the // (preallocated) output - auto func = std::make_shared("test_copy", 1); + auto func = std::make_shared("test_copy", Arity::Unary()); // Add a few kernels. Our implementation only accepts arrays ASSERT_OK(func->AddKernel({InputType::Array(uint8())}, uint8(), ExecCopy)); @@ -654,7 +654,8 @@ class TestCallScalarFunction : public TestComputeInternals { ASSERT_OK(registry->AddFunction(func)); // A version which doesn't want the executor to call PropagateNulls - auto func2 = std::make_shared("test_copy_computed_bitmap", 1); + auto func2 = + std::make_shared("test_copy_computed_bitmap", Arity::Unary()); ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecComputedBitmap); kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; ASSERT_OK(func2->AddKernel(kernel)); @@ -666,8 +667,9 @@ class TestCallScalarFunction : public TestComputeInternals { // A function that allocates its own output memory. We have cases for both // non-preallocated data and non-preallocated validity bitmap - auto f1 = std::make_shared("test_nopre_data", 1); - auto f2 = std::make_shared("test_nopre_validity_or_data", 1); + auto f1 = std::make_shared("test_nopre_data", Arity::Unary()); + auto f2 = + std::make_shared("test_nopre_validity_or_data", Arity::Unary()); ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecNoPreallocatedData); kernel.mem_allocation = MemAllocation::NO_PREALLOCATE; @@ -686,7 +688,7 @@ class TestCallScalarFunction : public TestComputeInternals { // This function's behavior depends on a static parameter that is made // available to the kernel's execution function through its Options object - auto func = std::make_shared("test_stateful", 1); + auto func = std::make_shared("test_stateful", Arity::Unary()); ScalarKernel kernel({InputType::Array(int32())}, int32(), ExecStateful, InitStateful); ASSERT_OK(func->AddKernel(kernel)); @@ -696,7 +698,8 @@ class TestCallScalarFunction : public TestComputeInternals { void AddScalarFunction() { auto registry = GetFunctionRegistry(); - auto func = std::make_shared("test_scalar_add_int32", 2); + auto func = + std::make_shared("test_scalar_add_int32", Arity::Binary()); ASSERT_OK(func->AddKernel({InputType::Scalar(int32()), InputType::Scalar(int32())}, int32(), ExecAddInt32)); ASSERT_OK(registry->AddFunction(func)); diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index 28dc975dcc9..ed04b8916c2 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -16,7 +16,7 @@ // under the License. // NOTE: API is EXPERIMENTAL and will change without going through a -// deprecation cycle +// deprecation cycle. #pragma once @@ -39,33 +39,48 @@ namespace compute { class ExecContext; +/// \brief Base class for specifying options configuring a function's behavior, +/// such as error handling. struct ARROW_EXPORT FunctionOptions {}; -/// \brief Contains the number of required arguments for the function +/// \brief Contains the number of required arguments for the function. +/// +/// Naming conventions taken from https://en.wikipedia.org/wiki/Arity. struct ARROW_EXPORT Arity { + /// \brief A function taking no arguments static Arity Nullary() { return Arity(0, false); } + + /// \brief A function taking 1 argument static Arity Unary() { return Arity(1, false); } + + /// \brief A function taking 2 arguments static Arity Binary() { return Arity(2, false); } + + /// \brief A function taking 3 arguments static Arity Ternary() { return Arity(3, false); } + + /// \brief A function taking a variable number of arguments static Arity VarArgs(int min_args = 1) { return Arity(min_args, true); } - Arity(int num_args, bool is_varargs = false) // NOLINT implicit conversion + explicit Arity(int num_args, bool is_varargs = false) : num_args(num_args), is_varargs(is_varargs) {} /// The number of required arguments (or the minimum number for varargs - /// functions) + /// functions). int num_args; - /// If true, then the num_args is the minimum number of required arguments + /// If true, then the num_args is the minimum number of required arguments. bool is_varargs = false; }; -/// \brief Base class for function containers that are capable of dispatch to -/// kernel implementations +/// \brief Base class for compute functions. Function implementations contain a +/// collection of "kernels" which are implementations of the function for +/// specific argument types. Selecting a viable kernel for executing a function +/// is referred to as "dispatching". class ARROW_EXPORT Function { public: /// \brief The kind of function, which indicates in what contexts it is - /// valid for use + /// valid for use. enum Kind { /// A function that performs scalar data operations on whole arrays of /// data. Can generally process Array or Scalar values. The size of the @@ -84,23 +99,25 @@ class ARROW_EXPORT Function { virtual ~Function() = default; - /// \brief The name of the kernel. The registry enforces uniqueness of names + /// \brief The name of the kernel. The registry enforces uniqueness of names. const std::string& name() const { return name_; } /// \brief The kind of kernel, which indicates in what contexts it is valid - /// for use + /// for use. Function::Kind kind() const { return kind_; } - /// \brief Contains the number of arguments the function requires + /// \brief Contains the number of arguments the function requires, or if the + /// function accepts variable numbers of arguments. const Arity& arity() const { return arity_; } - /// \brief Returns the number of registered kernels for this function + /// \brief Returns the number of registered kernels for this function. virtual int num_kernels() const = 0; - /// \brief Convenience for invoking a function with kernel dispatch and - /// memory allocation details taken care of + /// \brief Execute the function eagerly with the passed input arguments with + /// kernel dispatch, batch iteration, and memory allocation details taken + /// care of. /// - /// This function can be overridden in subclasses + /// This function can be overridden in subclasses. virtual Result Execute(const std::vector& args, const FunctionOptions* options, ExecContext* ctx = NULLPTR) const; @@ -150,22 +167,20 @@ class ARROW_EXPORT ScalarFunction : public detail::FunctionImpl { ScalarFunction(std::string name, const Arity& arity) : detail::FunctionImpl(std::move(name), Function::SCALAR, arity) {} - /// \brief Add a simple kernel (function implementation) with given - /// input/output types, no required state initialization, preallocation for - /// fixed-width types, and default null handling (intersect validity bitmaps - /// of inputs) + /// \brief Add a kernel with given input/output types, no required state + /// initialization, preallocation for fixed-width types, and default null + /// handling (intersect validity bitmaps of inputs). Status AddKernel(std::vector in_types, OutputType out_type, ArrayKernelExec exec, KernelInit init = NULLPTR); - /// \brief Add a kernel (function implementation). Returns error if fails - /// to match the other parameters of the function + /// \brief Add a kernel (function implementation). Returns error if the + /// kernel's signature does not match the function's arity. Status AddKernel(ScalarKernel kernel); - /// \brief Return the first kernel that can execute the function given the - /// exact argument types (without implicit type casts or scalar->array - /// promotions) + /// \brief Return a kernel that can execute the function given the exact + /// argument types (without implicit type casts or scalar->array promotions). /// - /// This function is overridden in CastFunction + /// NB: This function is overridden in CastFunction. virtual Result DispatchExact( const std::vector& values) const; }; @@ -173,7 +188,7 @@ class ARROW_EXPORT ScalarFunction : public detail::FunctionImpl { /// \brief A function that executes general array operations that may yield /// outputs of different sizes or have results that depend on the whole array /// contents. These functions roughly correspond to the functions found in -/// non-SQL array languages like APL and its derivatives +/// non-SQL array languages like APL and its derivatives. class ARROW_EXPORT VectorFunction : public detail::FunctionImpl { public: using KernelType = VectorKernel; @@ -181,20 +196,18 @@ class ARROW_EXPORT VectorFunction : public detail::FunctionImpl { VectorFunction(std::string name, const Arity& arity) : detail::FunctionImpl(std::move(name), Function::VECTOR, arity) {} - /// \brief Add a simple kernel (function implementation) with given - /// input/output types, no required state initialization, preallocation for - /// fixed-width types, and default null handling (intersect validity bitmaps - /// of inputs) + /// \brief Add a simple kernel with given input/output types, no required + /// state initialization, no data preallocation, and no preallocation of the + /// validity bitmap. Status AddKernel(std::vector in_types, OutputType out_type, ArrayKernelExec exec, KernelInit init = NULLPTR); - /// \brief Add a kernel (function implementation). Returns error if fails - /// to match the other parameters of the function + /// \brief Add a kernel (function implementation). Returns error if the + /// kernel's signature does not match the function's arity. Status AddKernel(VectorKernel kernel); - /// \brief Return the first kernel that can execute the function given the - /// exact argument types (without implicit type casts or scalar->array - /// promotions) + /// \brief Return a kernel that can execute the function given the exact + /// argument types (without implicit type casts or scalar->array promotions) Result DispatchExact(const std::vector& values) const; }; @@ -207,10 +220,12 @@ class ARROW_EXPORT ScalarAggregateFunction : detail::FunctionImpl(std::move(name), Function::SCALAR_AGGREGATE, arity) {} - /// \brief Add a kernel (function implementation). Returns error if fails - /// to match the other parameters of the function + /// \brief Add a kernel (function implementation). Returns error if the + /// kernel's signature does not match the function's arity. Status AddKernel(ScalarAggregateKernel kernel); + /// \brief Return a kernel that can execute the function given the exact + /// argument types (without implicit type casts or scalar->array promotions) Result DispatchExact( const std::vector& values) const; }; diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index 0c1d6241ef4..5e48e8bbd09 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -57,7 +57,7 @@ TEST(Arity, Basics) { } TEST(ScalarFunction, Basics) { - ScalarFunction func("scalar_test", 2); + ScalarFunction func("scalar_test", Arity::Binary()); ScalarFunction varargs_func("varargs_test", Arity::VarArgs(1)); ASSERT_EQ("scalar_test", func.name()); @@ -72,7 +72,7 @@ TEST(ScalarFunction, Basics) { } TEST(VectorFunction, Basics) { - VectorFunction func("vector_test", 2); + VectorFunction func("vector_test", Arity::Binary()); VectorFunction varargs_func("varargs_test", Arity::VarArgs(1)); ASSERT_EQ("vector_test", func.name()); @@ -139,8 +139,8 @@ void CheckAddDispatch(FunctionType* func) { } TEST(ScalarVectorFunction, DispatchExact) { - ScalarFunction func1("scalar_test", 2); - VectorFunction func2("vector_test", 2); + ScalarFunction func1("scalar_test", Arity::Binary()); + VectorFunction func2("vector_test", Arity::Binary()); CheckAddDispatch(&func1); CheckAddDispatch(&func2); @@ -173,7 +173,7 @@ TEST(ArrayFunction, VarArgs) { } TEST(ScalarAggregateFunction, Basics) { - ScalarAggregateFunction func("agg_test", 1); + ScalarAggregateFunction func("agg_test", Arity::Unary()); ASSERT_EQ("agg_test", func.name()); ASSERT_EQ(1, func.arity().num_args); @@ -190,7 +190,7 @@ void NoopMerge(KernelContext*, const KernelState&, KernelState*) {} void NoopFinalize(KernelContext*, Datum*) {} TEST(ScalarAggregateFunction, DispatchExact) { - ScalarAggregateFunction func("agg_test", 1); + ScalarAggregateFunction func("agg_test", Arity::Unary()); std::vector in_args = {ValueDescr::Array(int8())}; ScalarAggregateKernel kernel(std::move(in_args), int64(), NoopInit, NoopConsume, diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index b76c27a7ab8..9df8d232b6a 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -45,46 +45,54 @@ namespace compute { struct FunctionOptions; /// \brief Base class for opaque kernel-specific state. For example, if there -/// is some kind of initialization required -struct KernelState { +/// is some kind of initialization required. +struct ARROW_EXPORT KernelState { virtual ~KernelState() = default; }; -/// \brief Context/state for the execution of a particular kernel +/// \brief Context/state for the execution of a particular kernel. class ARROW_EXPORT KernelContext { public: explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx) {} - /// \brief Allocate buffer from the context's memory pool + /// \brief Allocate buffer from the context's memory pool. The contents are + /// not uninitialized. Result> Allocate(int64_t nbytes); - /// \brief Allocate buffer for bitmap from the context's memory pool + /// \brief Allocate buffer for bitmap from the context's memory pool. Like + /// Allocate, the contents of the buffer are not initialized but the last + /// byte is preemptively zeroed to help avoid ASAN or valgrind issues. Result> AllocateBitmap(int64_t num_bits); /// \brief Indicate that an error has occurred, to be checked by a exec caller - /// \param[in] status a Status instance + /// \param[in] status a Status instance. /// /// \note Will not overwrite a prior set Status, so we will have the first - /// error that occurred until ExecContext::ResetStatus is called + /// error that occurred until ExecContext::ResetStatus is called. void SetStatus(const Status& status); - /// \brief Clear any error status + /// \brief Clear any error status. void ResetStatus(); - /// \brief Return true if an error has occurred + /// \brief Return true if an error has occurred. bool HasError() const { return !status_.ok(); } - /// \brief Return the current status of the context + /// \brief Return the current status of the context. const Status& status() const { return status_; } - // For passing kernel state to + /// \brief Assign the active KernelState to be utilized for each stage of + /// kernel execution. Ownership and memory lifetime of the KernelState must + /// be minded separately. void SetState(KernelState* state) { state_ = state; } KernelState* state() { return state_; } - /// \brief Common state related to function execution + /// \brief Configuration related to function execution that is to be shared + /// across multiple kernels. ExecContext* exec_context() { return exec_ctx_; } + /// \brief The memory pool to use for allocations. For now, it uses the + /// MemoryPool contained in the ExecContext used to create the KernelContext. MemoryPool* memory_pool() { return exec_ctx_->memory_pool(); } private: @@ -93,6 +101,8 @@ class ARROW_EXPORT KernelContext { KernelState* state_; }; +// A macro to invoke for error control flow after invoking functions (such as +// kernel init or exec functions) that propagate errors via KernelContext. #define ARROW_CTX_RETURN_IF_ERROR(CTX) \ do { \ if (ARROW_PREDICT_FALSE((CTX)->HasError())) { \ @@ -102,19 +112,24 @@ class ARROW_EXPORT KernelContext { } \ } while (0) -/// A standard function taking zero or more Array/Scalar values and returning -/// Array/Scalar output. May be used for SCALAR and VECTOR kernel kinds. Should -/// write into pre-allocated memory except in cases when a builder -/// (e.g. StringBuilder) must be employed +/// \brief The standard kernel execution API that must be implemented for +/// SCALAR and VECTOR kernel types. This includes both stateless and stateful +/// kernels. Kernels depending on some execution state access that state via +/// subclasses of KernelState set on the KernelContext object. May be used for +/// SCALAR and VECTOR kernel kinds. Implementations should endeavor to write +/// into pre-allocated memory if they are able, though for some kernels +/// (e.g. in cases when a builder like StringBuilder) must be employed this may +/// not be possible. using ArrayKernelExec = std::function; -/// \brief An abstract type-checking interface to permit customizable -/// validation rules. This is for scenarios where the acceptance is not an -/// exact type instance along with its unit. -struct TypeMatcher { +/// \brief An type-checking interface to permit customizable validation rules +/// for use with InputType and KernelSignature. This is for scenarios where the +/// acceptance is not an exact type instance, such as a TIMESTAMP type for a +/// specific TimeUnit, but permitting any time zone. +struct ARROW_EXPORT TypeMatcher { virtual ~TypeMatcher() = default; - /// \brief Return true if this matcher accepts the data type + /// \brief Return true if this matcher accepts the data type. virtual bool Matches(const DataType& type) const = 0; /// \brief A human-interpretable string representation of what the type @@ -122,106 +137,129 @@ struct TypeMatcher { /// error messages. virtual std::string ToString() const = 0; + /// \brief Return true if this TypeMatcher contains the same matching rule as + /// the other. Currently depends on RTTI. virtual bool Equals(const TypeMatcher& other) const = 0; }; namespace match { -/// \brief Match any DataType instance having the same DataType::id +/// \brief Match any DataType instance having the same DataType::id. ARROW_EXPORT std::shared_ptr SameTypeId(Type::type type_id); /// \brief Match any TimestampType instance having the same unit, but the time -/// zones can be different +/// zones can be different. ARROW_EXPORT std::shared_ptr TimestampUnit(TimeUnit::type unit); } // namespace match -/// \brief A container to express what kernel argument input types are accepted +/// \brief An object used for type- and shape-checking arguments to be passed +/// to a kernel and stored in a KernelSignature. Distinguishes between ARRAY +/// and SCALAR arguments using ValueDescr::Shape. The type-checking rule can be +/// supplied either with an exact DataType instance or a custom TypeMatcher. class ARROW_EXPORT InputType { public: + /// \brief The kind of type-checking rule that the InputType contains. enum Kind { - /// Accept any value type + /// \brief Accept any value type. ANY_TYPE, - /// A fixed arrow::DataType and will only exact match having this exact - /// type (e.g. same TimestampType unit, same decimal scale and precision, - /// or same nested child types + /// \brief A fixed arrow::DataType and will only exact match having this + /// exact type (e.g. same TimestampType unit, same decimal scale and + /// precision, or same nested child types). EXACT_TYPE, - /// Uses an TypeMatcher implementation to check the type + /// \brief Uses a TypeMatcher implementation to check the type. USE_TYPE_MATCHER }; + /// \brief Accept any value type but with a specific shape (e.g. any Array or + /// any Scalar). InputType(ValueDescr::Shape shape = ValueDescr::ANY) // NOLINT implicit construction : kind_(ANY_TYPE), shape_(shape) {} + /// \brief Accept an exact value type. InputType(std::shared_ptr type, ValueDescr::Shape shape = ValueDescr::ANY) // NOLINT implicit construction : kind_(EXACT_TYPE), shape_(shape), type_(std::move(type)) {} + /// \brief Accept an exact value type and shape provided by a ValueDescr. InputType(const ValueDescr& descr) // NOLINT implicit construction : InputType(descr.type, descr.shape) {} + /// \brief Use the passed TypeMatcher to type check. InputType(std::shared_ptr type_matcher, ValueDescr::Shape shape = ValueDescr::ANY) : kind_(USE_TYPE_MATCHER), shape_(shape), type_matcher_(std::move(type_matcher)) {} + /// \brief Match any type with the given Type::type. Uses a TypeMatcher for + /// its implementation. explicit InputType(Type::type type_id, ValueDescr::Shape shape = ValueDescr::ANY) : InputType(match::SameTypeId(type_id), shape) {} InputType(const InputType& other) { CopyInto(other); } - // Convenience ctors + void operator=(const InputType& other) { CopyInto(other); } + + InputType(InputType&& other) { MoveInto(std::forward(other)); } + + void operator=(InputType&& other) { MoveInto(std::forward(other)); } + + // \brief Match an array with the given exact type. Convenience constructor. static InputType Array(std::shared_ptr type) { return InputType(std::move(type), ValueDescr::ARRAY); } + // \brief Match a scalar with the given exact type. Convenience constructor. static InputType Scalar(std::shared_ptr type) { return InputType(std::move(type), ValueDescr::SCALAR); } + // \brief Match an array with the given Type::type id. Convenience + // constructor. static InputType Array(Type::type id) { return InputType(id, ValueDescr::ARRAY); } + // \brief Match a scalar with the given Type::type id. Convenience + // constructor. static InputType Scalar(Type::type id) { return InputType(id, ValueDescr::SCALAR); } - void operator=(const InputType& other) { CopyInto(other); } - - InputType(InputType&& other) { MoveInto(std::forward(other)); } - - void operator=(InputType&& other) { MoveInto(std::forward(other)); } - - /// \brief Return true if this type exactly matches another + /// \brief Return true if this input type matches the same type cases as the + /// other. bool Equals(const InputType& other) const; bool operator==(const InputType& other) const { return this->Equals(other); } bool operator!=(const InputType& other) const { return !(*this == other); } - /// \brief Return hash code + /// \brief Return hash code. size_t Hash() const; - /// \brief Render a human-readable string representation + /// \brief Render a human-readable string representation. std::string ToString() const; /// \brief Return true if the value matches this argument kind in type - /// and shape + /// and shape. bool Matches(const Datum& value) const; /// \brief Return true if the value descriptor matches this argument kind in - /// type and shape + /// type and shape. bool Matches(const ValueDescr& value) const; - /// \brief The type matching rule that this InputType uses + /// \brief The type matching rule that this InputType uses. Kind kind() const { return kind_; } + /// \brief Indicates whether this InputType matches Array (ValueDescr::ARRAY), + /// Scalar (ValueDescr::SCALAR) values, or both (ValueDescr::ANY). ValueDescr::Shape shape() const { return shape_; } - /// \brief For InputType::EXACT_TYPE, the exact type that this InputType must - /// match. Otherwise this function should not be used + /// \brief For InputType::EXACT_TYPE kind, the exact type that this InputType + /// must match. Otherwise this function should not be used and will assert in + /// debug builds. const std::shared_ptr& type() const; - /// \brief For InputType::, the Type::type that this InputType must - /// match, Otherwise this function should not be used + /// \brief For InputType::USE_TYPE_MATCHER, the TypeMatcher to be used for + /// checking the type of a value. Otherwise this function should not be used + /// and will assert in debug builds. const TypeMatcher& type_matcher() const; private: @@ -250,31 +288,33 @@ class ARROW_EXPORT InputType { std::shared_ptr type_matcher_; }; -/// \brief Container to capture both exact and input-dependent output types +/// \brief Container to capture both exact and input-dependent output types. /// /// The value shape returned by Resolve will be determined by broadcasting the /// shapes of the input arguments, otherwise this is handled by the -/// user-defined resolver function +/// user-defined resolver function: /// /// * Any ARRAY shape -> output shape is ARRAY /// * All SCALAR shapes -> output shape is SCALAR class ARROW_EXPORT OutputType { public: /// \brief An enum indicating whether the value type is an invariant fixed - /// value or one that's computed by a kernel-defined resolver function + /// value or one that's computed by a kernel-defined resolver function. enum ResolveKind { FIXED, COMPUTED }; /// Type resolution function. Given input types and shapes, return output /// type and shape. This function SHOULD _not_ be used to check for arity, - /// that SHOULD be performed one or more layers above. May make use of kernel - /// state to know what type to output + /// that is to be performed one or more layers above. May make use of kernel + /// state to know what type to output in some cases. using Resolver = std::function(KernelContext*, const std::vector&)>; + /// \brief Output an exact type, but with shape determined by promoting the + /// shapes of the inputs (any ARRAY argument yields ARRAY). OutputType(std::shared_ptr type) // NOLINT implicit construction : kind_(FIXED), type_(std::move(type)) {} - /// For outputting a particular type and shape + /// \brief Output the exact type and shape provided by a ValueDescr OutputType(ValueDescr descr); // NOLINT implicit construction explicit OutputType(Resolver resolver) : kind_(COMPUTED), resolver_(resolver) {} @@ -294,28 +334,29 @@ class ARROW_EXPORT OutputType { } /// \brief Return the shape and type of the expected output value of the - /// kernel given the value descriptors (shapes and types). The resolver may - /// make use of state information kept in the KernelContext + /// kernel given the value descriptors (shapes and types) of the input + /// arguments. The resolver may make use of state information kept in the + /// KernelContext. Result Resolve(KernelContext* ctx, const std::vector& args) const; - /// \brief The value type for the FIXED kind rule + /// \brief The exact output value type for the FIXED kind. const std::shared_ptr& type() const; - /// \brief For use with COMPUTED resolution strategy, the output type depends - /// on the input type. It may be more convenient to invoke this with - /// OutputType::Resolve returned from this method + /// \brief For use with COMPUTED resolution strategy. It may be more + /// convenient to invoke this with OutputType::Resolve returned from this + /// method. const Resolver& resolver() const; - /// \brief Render a human-readable string representation + /// \brief Render a human-readable string representation. std::string ToString() const; /// \brief Return the kind of type resolution of this output type, whether - /// fixed/invariant or computed by a "user"-defined resolver + /// fixed/invariant or computed by a resolver. ResolveKind kind() const { return kind_; } /// \brief If the shape is ANY, then Resolve will compute the shape based on - /// the input arguments + /// the input arguments. ValueDescr::Shape shape() const { return shape_; } private: @@ -324,16 +365,18 @@ class ARROW_EXPORT OutputType { // For FIXED resolution std::shared_ptr type_; + /// \brief The shape of the output type to return when using Resolve. If ANY + /// will promote the input shapes. ValueDescr::Shape shape_ = ValueDescr::ANY; // For COMPUTED resolution Resolver resolver_; }; -/// \brief Holds the input types and output type of the kernel +/// \brief Holds the input types and output type of the kernel. /// -/// Varargs functions should pass a single input type to be used to validate -/// the the input types of a function invocation +/// VarArgs functions should pass a single input type to be used to validate +/// the input types of a function invocation. class ARROW_EXPORT KernelSignature { public: KernelSignature(std::vector in_types, OutputType out_type, @@ -345,13 +388,13 @@ class ARROW_EXPORT KernelSignature { bool is_varargs = false); /// \brief Return true if the signature if compatible with the list of input - /// value descriptors + /// value descriptors. bool MatchesInputs(const std::vector& descriptors) const; /// \brief Returns true if the input types of each signature are /// equal. Well-formed functions should have a deterministic output type /// given input types, but currently it is the responsibility of the - /// developer to ensure this + /// developer to ensure this. bool Equals(const KernelSignature& other) const; bool operator==(const KernelSignature& other) const { return this->Equals(other); } @@ -361,8 +404,14 @@ class ARROW_EXPORT KernelSignature { /// \brief Compute a hash code for the signature size_t Hash() const; + /// \brief The input types for the kernel. For VarArgs functions, this should + /// generally contain a single validator to use for validating all of the + /// function arguments. const std::vector& in_types() const { return in_types_; } + /// \brief The output type for the kernel. Use Resolve to return the exact + /// output given input argument ValueDescrs, since many kernels' output types + /// depend on their input types (or their type metadata). const OutputType& out_type() const { return out_type_; } /// \brief Render a human-readable string representation @@ -386,47 +435,78 @@ struct SimdLevel { enum type { NONE, SSE4_2, AVX, AVX2, AVX512, NEON }; }; +/// \brief The strategy to use for propagating or otherwise populating the +/// validity bitmap of a kernel output. struct NullHandling { enum type { /// Compute the output validity bitmap by intersecting the validity bitmaps - /// of the arguments. Kernel does not do anything with the bitmap + /// of the arguments using bitwise-and operations. This means that values + /// in the output are valid/non-null only if the corresponding values in + /// all input arguments were valid/non-null. Kernel generally need not + /// touch the bitmap thereafter, but a kernel's exec function is permitted + /// to alter the bitmap after the null intersection is computed if it needs + /// to. INTERSECTION, - /// Kernel expects a pre-allocated buffer to write the result bitmap into + /// Kernel expects a pre-allocated buffer to write the result bitmap + /// into. The preallocated memory is not zeroed (except for the last byte), + /// so the kernel should ensure to completely populate the bitmap. COMPUTED_PREALLOCATE, - /// Kernel allocates and populates the validity bitmap of the output + /// Kernel allocates and sets the validity bitmap of the output. COMPUTED_NO_PREALLOCATE, - /// Output is never null + /// Kernel output is never null and a validity bitmap does not need to be + /// allocated. OUTPUT_NOT_NULL }; }; +/// \brief The preference for memory preallocation of fixed-width type outputs +/// in kernel execution. struct MemAllocation { enum type { - // For data types that support pre-allocation (fixed-type), the kernel - // expects to be provided pre-allocated memory to write - // into. Non-fixed-width must always allocate their own memory but perhaps - // not their validity bitmaps. The allocation made for the same length as - // the execution batch, so vector kernels yielding differently sized output - // should not use this + // For data types that support pre-allocation (i.e. fixed-width), the + // kernel expects to be provided a pre-allocated data buffer to write + // into. Non-fixed-width types must always allocate their own data + // buffers. The allocation made for the same length as the execution batch, + // so vector kernels yielding differently sized output should not use this. + // + // It is valid for the data to not be preallocated but the validity bitmap + // is (or is computed using the intersection/bitwise-and method). + // + // For variable-size output types like BinaryType or StringType, or for + // nested types, this option has no effect. PREALLOCATE, - // The kernel does its own memory allocation + // The kernel is responsible for allocating its own data buffer for + // fixed-width type outputs. NO_PREALLOCATE }; }; struct Kernel; +/// \brief Arguments to pass to a KernelInit function. A struct is used to help +/// avoid API breakage should the arguments passed need to be expanded. struct KernelInitArgs { + /// \brief A pointer to the kernel being initialized. The init function may + /// depend on the kernel's KernelSignature or other data contained there. const Kernel* kernel; + + /// \brief The types and shapes of the input arguments that the kernel is + /// about to be executed against. + /// + /// TODO: should this be const std::vector*? const-ref is being + /// used to avoid the cost of copying the struct into the args struct. const std::vector& inputs; + + /// \brief Opaque options specific to this kernel. Is nullptr for functions + /// that do not require options. const FunctionOptions* options; }; -// Kernel initializer (context, argument descriptors, options) +/// \brief Common initializer function for all kernel types. using KernelInit = std::function(KernelContext*, const KernelInitArgs&)>; @@ -442,24 +522,33 @@ struct Kernel { Kernel(std::vector in_types, OutputType out_type, KernelInit init) : Kernel(KernelSignature::Make(std::move(in_types), out_type), init) {} + /// \brief The "signature" of the kernel containing the InputType input + /// argument validators and OutputType output type and shape resolver. std::shared_ptr signature; /// \brief Create a new KernelState for invocations of this kernel, e.g. to /// set up any options or state relevant for execution. May be nullptr KernelInit init; - // Does execution benefit from parallelization (splitting large chunks into - // smaller chunks and using multiple threads). Some vector kernels may - // require single-threaded execution. + /// \brief Indicates whether execution can benefit from parallelization + /// (splitting large chunks into smaller chunks and using multiple + /// threads). Some kernels may not support parallel execution at + /// all. Synchronization and concurrency-related issues are currently the + /// responsibility of the Kernel's implementation. bool parallelizable = true; - /// \brief What level of SIMD instruction support in the host CPU is required - /// to use the function + /// \brief Indicates the level of SIMD instruction support in the host CPU is + /// required to use the function. Currently this is not used, but the + /// intention is for functions to be able to contain multiple kernels with + /// the same signature but different levels of SIMD, so that the most + /// optimized kernel supported on a host's processor can be chosen. SimdLevel::type simd_level = SimdLevel::NONE; }; -/// \brief Descriptor to hold signature and execution function implementations -/// for a particular kernel +/// \brief Common kernel base data structure for ScalarKernel and +/// VectorKernel. It is called "ArrayKernel" in that the functions generally +/// output array values (as opposed to scalar values in the case of aggregate +/// functions). struct ArrayKernel : public Kernel { ArrayKernel() {} @@ -473,16 +562,21 @@ struct ArrayKernel : public Kernel { /// \brief Perform a single invocation of this kernel. Depending on the /// implementation, it may only write into preallocated memory, while in some - /// cases it will allocate its own memory. + /// cases it will allocate its own memory. Any required state is managed + /// through the KernelContext. ArrayKernelExec exec; /// \brief Writing execution results into larger contiguous allocations - /// requires that the kernel be able to write into sliced output - /// ArrayData*. Some kernel implementations may not be able to do this, so - /// setting this to false disables this functionality + /// requires that the kernel be able to write into sliced output ArrayData*, + /// including sliced output validity bitmaps. Some kernel implementations may + /// not be able to do this, so setting this to false disables this + /// functionality. bool can_write_into_slices = true; }; +/// \brief Kernel data structure for implementations of ScalarFunction. In +/// addition to the members found in ArrayKernel, contains the null handling +/// and memory pre-allocation preferences. struct ScalarKernel : public ArrayKernel { using ArrayKernel::ArrayKernel; @@ -492,9 +586,17 @@ struct ScalarKernel : public ArrayKernel { MemAllocation::type mem_allocation = MemAllocation::PREALLOCATE; }; -// Convert intermediate results into finalized results. Mutates input argument +// ---------------------------------------------------------------------- +// VectorKernel (for VectorFunction) + +/// \brief See VectorKernel::finalize member for usage using VectorFinalize = std::function*)>; +/// \brief Kernel data structure for implementations of VectorFunction. In +/// addition to the members found in ArrayKernel, contains an optional +/// finalizer function, the null handling and memory pre-allocation preferences +/// (which have different defaults from ScalarKernel), and some other +/// execution-related options. struct VectorKernel : public ArrayKernel { VectorKernel() {} @@ -509,6 +611,11 @@ struct VectorKernel : public ArrayKernel { KernelInit init = NULLPTR, VectorFinalize finalize = NULLPTR) : ArrayKernel(std::move(sig), exec, init), finalize(finalize) {} + /// \brief For VectorKernel, convert intermediate results into finalized + /// results. Mutates input argument. Some kernels may accumulate state + /// (example: hashing-related functions) through processing chunked inputs, and + /// then need to attach some accumulated state to each of the outputs of + /// processing each chunk of data. VectorFinalize finalize; /// Since vector kernels generally are implemented rather differently from @@ -529,11 +636,12 @@ struct VectorKernel : public ArrayKernel { /// /// true -> ChunkedArray /// false -> Array - /// - /// TODO: Where is a better place to deal with this issue? bool output_chunked = true; }; +// ---------------------------------------------------------------------- +// ScalarAggregateKernel (for ScalarAggregateFunction) + using ScalarAggregateConsume = std::function; using ScalarAggregateMerge = @@ -542,6 +650,16 @@ using ScalarAggregateMerge = // Finalize returns Datum to permit multiple return values using ScalarAggregateFinalize = std::function; +/// \brief Kernel data structure for implementations of +/// ScalarAggregateFunction. The four necessary components of an aggregation +/// kernel are the init, consume, merge, and finalize functions. +/// +/// * init: creates a new KernelState for a kernel. +/// * consume: processes an ExecBatch and updates the KernelState found in the +/// KernelContext. +/// * merge: combines one KernelState with another. +/// * finalize: produces the end result of the aggregation using the +/// KernelState in the KernelContext. struct ScalarAggregateKernel : public Kernel { ScalarAggregateKernel() {} diff --git a/cpp/src/arrow/compute/kernels/scalar_boolean.cc b/cpp/src/arrow/compute/kernels/scalar_boolean.cc index 336d104f136..1a05f384ef8 100644 --- a/cpp/src/arrow/compute/kernels/scalar_boolean.cc +++ b/cpp/src/arrow/compute/kernels/scalar_boolean.cc @@ -149,7 +149,7 @@ struct Xor { void MakeFunction(std::string name, int arity, ArrayKernelExec exec, FunctionRegistry* registry, bool can_write_into_slices = true, NullHandling::type null_handling = NullHandling::INTERSECTION) { - auto func = std::make_shared(name, arity); + auto func = std::make_shared(name, Arity(arity)); // Scalar arguments not yet supported std::vector in_types(arity, InputType::Array(boolean())); diff --git a/cpp/src/arrow/compute/registry.h b/cpp/src/arrow/compute/registry.h index c52618fcf2b..bb3ded47b9e 100644 --- a/cpp/src/arrow/compute/registry.h +++ b/cpp/src/arrow/compute/registry.h @@ -33,25 +33,32 @@ namespace compute { class Function; -/// \brief A mutable central function registry for built-in functions -/// and user-defined functions. +/// \brief A mutable central function registry for built-in functions as well +/// as user-defined functions. Functions are implementations of +/// arrow::compute::Function. +/// +/// Generally, each function contains kernels which are implementations of a +/// function for a specific argument signature. After looking up a function in +/// the registry, one can either execute it eagerly with Function::Execute or +/// use one of the function's dispatch methods to pick a suitable kernel for +/// lower-level function execution. class ARROW_EXPORT FunctionRegistry { public: ~FunctionRegistry(); - /// \brief Construct a new kernel registry. Most users only need to use the - /// global registry + /// \brief Construct a new registry. Most users only need to use the global + /// registry static std::unique_ptr Make(); - /// \brief Add a new kernel to the registry. Returns Status::KeyError if a - /// kernel with the same name is already registered + /// \brief Add a new function to the registry. Returns Status::KeyError if a + /// function with the same name is already registered Status AddFunction(std::shared_ptr function, bool allow_overwrite = false); - /// \brief Retrieve a kernel by name from the registry + /// \brief Retrieve a function by name from the registry Result> GetFunction(const std::string& name) const; /// \brief Return vector of all entry names in the registry. Helpful for - /// displaying a manifest of available kernels + /// displaying a manifest of available functions std::vector GetFunctionNames() const; /// \brief The number of currently registered functions @@ -60,12 +67,12 @@ class ARROW_EXPORT FunctionRegistry { private: FunctionRegistry(); - /// Use PIMPL pattern to not have std::unordered_map here + // Use PIMPL pattern to not have std::unordered_map here class FunctionRegistryImpl; std::unique_ptr impl_; }; -// \brief Return the process-global kernel registry +// \brief Return the process-global function registry ARROW_EXPORT FunctionRegistry* GetFunctionRegistry(); } // namespace compute diff --git a/cpp/src/arrow/compute/registry_test.cc b/cpp/src/arrow/compute/registry_test.cc index 155b5255530..4d2b52f79f8 100644 --- a/cpp/src/arrow/compute/registry_test.cc +++ b/cpp/src/arrow/compute/registry_test.cc @@ -53,11 +53,11 @@ TEST_F(TestRegistry, CreateBuiltInRegistry) { TEST_F(TestRegistry, Basics) { ASSERT_EQ(0, registry_->num_functions()); - std::shared_ptr func = std::make_shared("f1", 1); + std::shared_ptr func = std::make_shared("f1", Arity::Unary()); ASSERT_OK(registry_->AddFunction(func)); ASSERT_EQ(1, registry_->num_functions()); - func = std::make_shared("f0", 2); + func = std::make_shared("f0", Arity::Binary()); ASSERT_OK(registry_->AddFunction(func)); ASSERT_EQ(2, registry_->num_functions()); @@ -68,7 +68,7 @@ TEST_F(TestRegistry, Basics) { ASSERT_RAISES(KeyError, registry_->GetFunction("f2")); // Try adding a function with name collision - func = std::make_shared("f1", 1); + func = std::make_shared("f1", Arity::Unary()); ASSERT_RAISES(KeyError, registry_->AddFunction(func)); // Allow overwriting by flag