Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 53 additions & 10 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,38 @@ static constexpr int64_t kDefaultExecChunksize = UINT16_MAX;
/// function evaluation
class ARROW_EXPORT ExecContext {
public:
// If no function registry passed, the default is used
// If no function registry passed, the default is used.
explicit ExecContext(MemoryPool* pool = default_memory_pool(),
FunctionRegistry* func_registry = NULLPTR);

/// \brief The MemoryPool used for allocations, default is
/// default_memory_pool().
MemoryPool* memory_pool() const { return pool_; }

::arrow::internal::CpuInfo* cpu_info() const;

/// \brief The FunctionRegistry for looking up functions by name and
/// selecting kernels for execution. Defaults to the library-global function
/// registry provided by GetFunctionRegistry.
FunctionRegistry* func_registry() const { return func_registry_; }

// \brief Set maximum length unit of work for kernel execution. Larger inputs
// will be split into smaller chunks, and, if desired, processed in
// parallel. Set to -1 for no limit
// \brief Set maximum length unit of work for kernel execution. Larger
// contiguous array inputs will be split into smaller chunks, and, if
// possible and enabled, processed in parallel. The default chunksize is
// INT64_MAX, so contiguous arrays are not split.
void set_exec_chunksize(int64_t chunksize) { exec_chunksize_ = chunksize; }

// \brief Maximum length unit of work for kernel execution.
// \brief Maximum length for ExecBatch data chunks processed by
// kernels. Contiguous array inputs with longer length will be split into
// smaller chunks.
int64_t exec_chunksize() const { return exec_chunksize_; }

/// \brief Set whether to use multiple threads for function execution
/// \brief Set whether to use multiple threads for function execution. This
/// is not yet used.
void set_use_threads(bool use_threads = true) { use_threads_ = use_threads; }

/// \brief If true, then utilize multiple threads where relevant for function
/// execution
/// execution. This is not yet used.
bool use_threads() const { return use_threads_; }

// Set the preallocation strategy for kernel execution as it relates to
Expand All @@ -94,12 +103,15 @@ class ARROW_EXPORT ExecContext {
// chunk of execution
//
// TODO: At some point we might want the limit the size of contiguous
// preallocations (for example, merging small ChunkedArray chunks until
// reaching some desired size)
// preallocations. For example, even if the exec_chunksize is 64K or less, we
// might limit contiguous allocations to 1M records, say.
void set_preallocate_contiguous(bool preallocate) {
preallocate_contiguous_ = preallocate;
}

/// \brief If contiguous preallocations should be used when doing chunked
/// execution as specified by exec_chunksize(). See
/// set_preallocate_contiguous() for more information.
bool preallocate_contiguous() const { return preallocate_contiguous_; }

private:
Expand Down Expand Up @@ -146,22 +158,53 @@ class ARROW_EXPORT SelectionVector {
/// Array and Scalar values and an optional SelectionVector indicating that
/// there is an unmaterialized filter that either must be materialized, or (if
/// the kernel supports it) pushed down into the kernel implementation.
///
/// ExecBatch is semantically similar to RecordBatch in that in a SQL context
/// it represents a collection of records, but constant "columns" are
/// represented by Scalar values rather than having to be converted into arrays
/// with repeated values.
///
/// TODO: Datum uses arrow/util/variant.h which may be a bit heavier-weight
/// than is desirable for this class. Microbenchmarks would help determine for
/// sure. See ARROW-8928.
struct ExecBatch {
ExecBatch() {}
ExecBatch(std::vector<Datum> values, int64_t length)
: values(std::move(values)), length(length) {}

/// The values representing positional arguments to be passed to a kernel's
/// exec function for processing.
std::vector<Datum> values;

/// A deferred filter represented as an array of indices into the values.
///
/// For example, the filter [true, true, false, true] would be represented as
/// the selection vector [0, 1, 3]. When the selection vector is set,
/// ExecBatch::length is equal to the length of this array.
std::shared_ptr<SelectionVector> selection_vector;

/// The semantic length of the ExecBatch. When the values are all scalars,
/// the length should be set to 1, otherwise the length is taken from the
/// array values, except when there is a selection vector. When there is a
/// selection vector set, the length of the batch is the length of the
/// selection.
///
/// If the array values are of length 0 then the length is 0 regardless of
/// whether any values are Scalar. In general ExecBatch objects are produced
/// by ExecBatchIterator which by design does not yield length-0 batches.
int64_t length;

/// \brief Return the value at the i-th index
template <typename index_type>
inline const Datum& operator[](index_type i) const {
return values[i];
}

/// \brief A convenience for the number of values / arguments.
int num_values() const { return static_cast<int>(values.size()); }

/// \brief A convenience for returning the ValueDescr objects (types and
/// shapes) from the batch.
std::vector<ValueDescr> GetDescriptors() const {
std::vector<ValueDescr> result;
for (const auto& value : this->values) {
Expand All @@ -178,7 +221,7 @@ ARROW_EXPORT
Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
const FunctionOptions* options, ExecContext* ctx = NULLPTR);

/// \brief Variant of CallFunction for functions not requiring options
/// \brief Variant of CallFunction for functions not requiring options.
ARROW_EXPORT
Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
ExecContext* ctx = NULLPTR);
Expand Down
15 changes: 9 additions & 6 deletions cpp/src/arrow/compute/exec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ class TestCallScalarFunction : public TestComputeInternals {

// This function simply copies memory from the input argument into the
// (preallocated) output
auto func = std::make_shared<ScalarFunction>("test_copy", 1);
auto func = std::make_shared<ScalarFunction>("test_copy", Arity::Unary());

// Add a few kernels. Our implementation only accepts arrays
ASSERT_OK(func->AddKernel({InputType::Array(uint8())}, uint8(), ExecCopy));
Expand All @@ -654,7 +654,8 @@ class TestCallScalarFunction : public TestComputeInternals {
ASSERT_OK(registry->AddFunction(func));

// A version which doesn't want the executor to call PropagateNulls
auto func2 = std::make_shared<ScalarFunction>("test_copy_computed_bitmap", 1);
auto func2 =
std::make_shared<ScalarFunction>("test_copy_computed_bitmap", Arity::Unary());
ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecComputedBitmap);
kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE;
ASSERT_OK(func2->AddKernel(kernel));
Expand All @@ -666,8 +667,9 @@ class TestCallScalarFunction : public TestComputeInternals {

// A function that allocates its own output memory. We have cases for both
// non-preallocated data and non-preallocated validity bitmap
auto f1 = std::make_shared<ScalarFunction>("test_nopre_data", 1);
auto f2 = std::make_shared<ScalarFunction>("test_nopre_validity_or_data", 1);
auto f1 = std::make_shared<ScalarFunction>("test_nopre_data", Arity::Unary());
auto f2 =
std::make_shared<ScalarFunction>("test_nopre_validity_or_data", Arity::Unary());

ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecNoPreallocatedData);
kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
Expand All @@ -686,7 +688,7 @@ class TestCallScalarFunction : public TestComputeInternals {

// This function's behavior depends on a static parameter that is made
// available to the kernel's execution function through its Options object
auto func = std::make_shared<ScalarFunction>("test_stateful", 1);
auto func = std::make_shared<ScalarFunction>("test_stateful", Arity::Unary());

ScalarKernel kernel({InputType::Array(int32())}, int32(), ExecStateful, InitStateful);
ASSERT_OK(func->AddKernel(kernel));
Expand All @@ -696,7 +698,8 @@ class TestCallScalarFunction : public TestComputeInternals {
void AddScalarFunction() {
auto registry = GetFunctionRegistry();

auto func = std::make_shared<ScalarFunction>("test_scalar_add_int32", 2);
auto func =
std::make_shared<ScalarFunction>("test_scalar_add_int32", Arity::Binary());
ASSERT_OK(func->AddKernel({InputType::Scalar(int32()), InputType::Scalar(int32())},
int32(), ExecAddInt32));
ASSERT_OK(registry->AddFunction(func));
Expand Down
89 changes: 52 additions & 37 deletions cpp/src/arrow/compute/function.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

// NOTE: API is EXPERIMENTAL and will change without going through a
// deprecation cycle
// deprecation cycle.

#pragma once

Expand All @@ -39,33 +39,48 @@ namespace compute {

class ExecContext;

/// \brief Base class for specifying options configuring a function's behavior,
/// such as error handling.
struct ARROW_EXPORT FunctionOptions {};

/// \brief Contains the number of required arguments for the function
/// \brief Contains the number of required arguments for the function.
///
/// Naming conventions taken from https://en.wikipedia.org/wiki/Arity.
struct ARROW_EXPORT Arity {
/// \brief A function taking no arguments
static Arity Nullary() { return Arity(0, false); }

/// \brief A function taking 1 argument
static Arity Unary() { return Arity(1, false); }

/// \brief A function taking 2 arguments
static Arity Binary() { return Arity(2, false); }

/// \brief A function taking 3 arguments
static Arity Ternary() { return Arity(3, false); }

/// \brief A function taking a variable number of arguments
static Arity VarArgs(int min_args = 1) { return Arity(min_args, true); }

Arity(int num_args, bool is_varargs = false) // NOLINT implicit conversion
explicit Arity(int num_args, bool is_varargs = false)
: num_args(num_args), is_varargs(is_varargs) {}

/// The number of required arguments (or the minimum number for varargs
/// functions)
/// functions).
int num_args;

/// If true, then the num_args is the minimum number of required arguments
/// If true, then the num_args is the minimum number of required arguments.
bool is_varargs = false;
};

/// \brief Base class for function containers that are capable of dispatch to
/// kernel implementations
/// \brief Base class for compute functions. Function implementations contain a
/// collection of "kernels" which are implementations of the function for
/// specific argument types. Selecting a viable kernel for executing a function
/// is referred to as "dispatching".
class ARROW_EXPORT Function {
public:
/// \brief The kind of function, which indicates in what contexts it is
/// valid for use
/// valid for use.
enum Kind {
/// A function that performs scalar data operations on whole arrays of
/// data. Can generally process Array or Scalar values. The size of the
Expand All @@ -84,23 +99,25 @@ class ARROW_EXPORT Function {

virtual ~Function() = default;

/// \brief The name of the kernel. The registry enforces uniqueness of names
/// \brief The name of the kernel. The registry enforces uniqueness of names.
const std::string& name() const { return name_; }

/// \brief The kind of kernel, which indicates in what contexts it is valid
/// for use
/// for use.
Function::Kind kind() const { return kind_; }

/// \brief Contains the number of arguments the function requires
/// \brief Contains the number of arguments the function requires, or if the
/// function accepts variable numbers of arguments.
const Arity& arity() const { return arity_; }

/// \brief Returns the number of registered kernels for this function
/// \brief Returns the number of registered kernels for this function.
virtual int num_kernels() const = 0;

/// \brief Convenience for invoking a function with kernel dispatch and
/// memory allocation details taken care of
/// \brief Execute the function eagerly with the passed input arguments with
/// kernel dispatch, batch iteration, and memory allocation details taken
/// care of.
///
/// This function can be overridden in subclasses
/// This function can be overridden in subclasses.
virtual Result<Datum> Execute(const std::vector<Datum>& args,
const FunctionOptions* options,
ExecContext* ctx = NULLPTR) const;
Expand Down Expand Up @@ -150,51 +167,47 @@ class ARROW_EXPORT ScalarFunction : public detail::FunctionImpl<ScalarKernel> {
ScalarFunction(std::string name, const Arity& arity)
: detail::FunctionImpl<ScalarKernel>(std::move(name), Function::SCALAR, arity) {}

/// \brief Add a simple kernel (function implementation) with given
/// input/output types, no required state initialization, preallocation for
/// fixed-width types, and default null handling (intersect validity bitmaps
/// of inputs)
/// \brief Add a kernel with given input/output types, no required state
/// initialization, preallocation for fixed-width types, and default null
/// handling (intersect validity bitmaps of inputs).
Status AddKernel(std::vector<InputType> in_types, OutputType out_type,
ArrayKernelExec exec, KernelInit init = NULLPTR);

/// \brief Add a kernel (function implementation). Returns error if fails
/// to match the other parameters of the function
/// \brief Add a kernel (function implementation). Returns error if the
/// kernel's signature does not match the function's arity.
Status AddKernel(ScalarKernel kernel);

/// \brief Return the first kernel that can execute the function given the
/// exact argument types (without implicit type casts or scalar->array
/// promotions)
/// \brief Return a kernel that can execute the function given the exact
/// argument types (without implicit type casts or scalar->array promotions).
///
/// This function is overridden in CastFunction
/// NB: This function is overridden in CastFunction.
virtual Result<const ScalarKernel*> DispatchExact(
const std::vector<ValueDescr>& values) const;
};

/// \brief A function that executes general array operations that may yield
/// outputs of different sizes or have results that depend on the whole array
/// contents. These functions roughly correspond to the functions found in
/// non-SQL array languages like APL and its derivatives
/// non-SQL array languages like APL and its derivatives.
class ARROW_EXPORT VectorFunction : public detail::FunctionImpl<VectorKernel> {
public:
using KernelType = VectorKernel;

VectorFunction(std::string name, const Arity& arity)
: detail::FunctionImpl<VectorKernel>(std::move(name), Function::VECTOR, arity) {}

/// \brief Add a simple kernel (function implementation) with given
/// input/output types, no required state initialization, preallocation for
/// fixed-width types, and default null handling (intersect validity bitmaps
/// of inputs)
/// \brief Add a simple kernel with given input/output types, no required
/// state initialization, no data preallocation, and no preallocation of the
/// validity bitmap.
Status AddKernel(std::vector<InputType> in_types, OutputType out_type,
ArrayKernelExec exec, KernelInit init = NULLPTR);

/// \brief Add a kernel (function implementation). Returns error if fails
/// to match the other parameters of the function
/// \brief Add a kernel (function implementation). Returns error if the
/// kernel's signature does not match the function's arity.
Status AddKernel(VectorKernel kernel);

/// \brief Return the first kernel that can execute the function given the
/// exact argument types (without implicit type casts or scalar->array
/// promotions)
/// \brief Return a kernel that can execute the function given the exact
/// argument types (without implicit type casts or scalar->array promotions)
Result<const VectorKernel*> DispatchExact(const std::vector<ValueDescr>& values) const;
};

Expand All @@ -207,10 +220,12 @@ class ARROW_EXPORT ScalarAggregateFunction
: detail::FunctionImpl<ScalarAggregateKernel>(std::move(name),
Function::SCALAR_AGGREGATE, arity) {}

/// \brief Add a kernel (function implementation). Returns error if fails
/// to match the other parameters of the function
/// \brief Add a kernel (function implementation). Returns error if the
/// kernel's signature does not match the function's arity.
Status AddKernel(ScalarAggregateKernel kernel);

/// \brief Return a kernel that can execute the function given the exact
/// argument types (without implicit type casts or scalar->array promotions)
Result<const ScalarAggregateKernel*> DispatchExact(
const std::vector<ValueDescr>& values) const;
};
Expand Down
Loading