Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
}
}

kernel_->exec(kernel_ctx_, batch, &out);
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));
if (!preallocate_contiguous_) {
// If we are producing chunked output rather than one big array, then
// emit each chunk as soon as it's available
Expand Down Expand Up @@ -794,8 +793,7 @@ class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
output_descr_.shape == ValueDescr::ARRAY) {
RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array()));
}
kernel_->exec(kernel_ctx_, batch, &out);
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));
if (!kernel_->finalize) {
// If there is no result finalizer (e.g. for hash-based functions, we can
// emit the processed batch right away rather than waiting
Expand All @@ -810,8 +808,7 @@ class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
if (kernel_->finalize) {
// Intermediate results require post-processing after the execution is
// completed (possibly involving some accumulated state)
kernel_->finalize(kernel_ctx_, &results_);
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &results_));
for (const auto& result : results_) {
RETURN_NOT_OK(listener->OnResult(result));
}
Expand Down Expand Up @@ -864,8 +861,7 @@ class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
}

Datum out;
kernel_->finalize(kernel_ctx_, &out);
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &out));
RETURN_NOT_OK(listener->OnResult(std::move(out)));
return Status::OK();
}
Expand All @@ -879,24 +875,19 @@ class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
private:
Status Consume(const ExecBatch& batch) {
// FIXME(ARROW-11840) don't merge *any* aggegates for every batch
auto batch_state = kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_});
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
ARROW_ASSIGN_OR_RAISE(
auto batch_state,
kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_}));

if (batch_state == nullptr) {
kernel_ctx_->SetStatus(
Status::Invalid("ScalarAggregation requires non-null kernel state"));
return kernel_ctx_->status();
return Status::Invalid("ScalarAggregation requires non-null kernel state");
}

KernelContext batch_ctx(exec_context());
batch_ctx.SetState(batch_state.get());

kernel_->consume(&batch_ctx, batch);
ARROW_CTX_RETURN_IF_ERROR(&batch_ctx);

kernel_->merge(kernel_ctx_, std::move(*batch_state), state());
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);

RETURN_NOT_OK(kernel_->consume(&batch_ctx, batch));
RETURN_NOT_OK(kernel_->merge(kernel_ctx_, std::move(*batch_state), state()));
return Status::OK();
}

Expand Down
25 changes: 15 additions & 10 deletions cpp/src/arrow/compute/exec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ TEST_F(TestExecBatchIterator, ZeroLengthInputs) {
// ----------------------------------------------------------------------
// Scalar function execution

void ExecCopy(KernelContext*, const ExecBatch& batch, Datum* out) {
Status ExecCopy(KernelContext*, const ExecBatch& batch, Datum* out) {
DCHECK_EQ(1, batch.num_values());
const auto& type = checked_cast<const FixedWidthType&>(*batch[0].type());
int value_size = type.bit_width() / 8;
Expand All @@ -547,9 +547,10 @@ void ExecCopy(KernelContext*, const ExecBatch& batch, Datum* out) {
uint8_t* dst = out_arr->buffers[1]->mutable_data() + out_arr->offset * value_size;
const uint8_t* src = arg0.buffers[1]->data() + arg0.offset * value_size;
std::memcpy(dst, src, batch.length * value_size);
return Status::OK();
}

void ExecComputedBitmap(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Status ExecComputedBitmap(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// Propagate nulls not used. Check that the out bitmap isn't the same already
// as the input bitmap
const ArrayData& arg0 = *batch[0].array();
Expand All @@ -564,21 +565,22 @@ void ExecComputedBitmap(KernelContext* ctx, const ExecBatch& batch, Datum* out)

internal::CopyBitmap(arg0.buffers[0]->data(), arg0.offset, batch.length,
out_arr->buffers[0]->mutable_data(), out_arr->offset);
ExecCopy(ctx, batch, out);
return ExecCopy(ctx, batch, out);
}

void ExecNoPreallocatedData(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Status ExecNoPreallocatedData(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// Validity preallocated, but not the data
ArrayData* out_arr = out->mutable_array();
DCHECK_EQ(0, out_arr->offset);
const auto& type = checked_cast<const FixedWidthType&>(*batch[0].type());
int value_size = type.bit_width() / 8;
Status s = (ctx->Allocate(out_arr->length * value_size).Value(&out_arr->buffers[1]));
DCHECK_OK(s);
ExecCopy(ctx, batch, out);
return ExecCopy(ctx, batch, out);
}

void ExecNoPreallocatedAnything(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Status ExecNoPreallocatedAnything(KernelContext* ctx, const ExecBatch& batch,
Datum* out) {
// Neither validity nor data preallocated
ArrayData* out_arr = out->mutable_array();
DCHECK_EQ(0, out_arr->offset);
Expand All @@ -589,7 +591,7 @@ void ExecNoPreallocatedAnything(KernelContext* ctx, const ExecBatch& batch, Datu
out_arr->buffers[0]->mutable_data(), /*offset=*/0);

// Reuse the kernel that allocates the data
ExecNoPreallocatedData(ctx, batch, out);
return ExecNoPreallocatedData(ctx, batch, out);
}

struct ExampleOptions : public FunctionOptions {
Expand All @@ -602,12 +604,13 @@ struct ExampleState : public KernelState {
explicit ExampleState(std::shared_ptr<Scalar> value) : value(std::move(value)) {}
};

std::unique_ptr<KernelState> InitStateful(KernelContext*, const KernelInitArgs& args) {
Result<std::unique_ptr<KernelState>> InitStateful(KernelContext*,
const KernelInitArgs& args) {
auto func_options = static_cast<const ExampleOptions*>(args.options);
return std::unique_ptr<KernelState>(new ExampleState{func_options->value});
}

void ExecStateful(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Status ExecStateful(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// We take the value from the state and multiply the data in batch[0] with it
ExampleState* state = static_cast<ExampleState*>(ctx->state());
int32_t multiplier = checked_cast<const Int32Scalar&>(*state->value).value;
Expand All @@ -619,12 +622,14 @@ void ExecStateful(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
for (int64_t i = 0; i < arg0.length; ++i) {
dst[i] = arg0_data[i] * multiplier;
}
return Status::OK();
}

void ExecAddInt32(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Status ExecAddInt32(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Int32Scalar& arg0 = batch[0].scalar_as<Int32Scalar>();
const Int32Scalar& arg1 = batch[1].scalar_as<Int32Scalar>();
out->value = std::make_shared<Int32Scalar>(arg0.value + arg1.value);
return Status::OK();
}

class TestCallScalarFunction : public TestComputeInternals {
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ Result<Datum> Function::Execute(const std::vector<Datum>& args,

KernelContext kernel_ctx{ctx};
if (kernel->init) {
state = kernel->init(&kernel_ctx, {kernel, inputs, options});
RETURN_NOT_OK(kernel_ctx.status());
ARROW_ASSIGN_OR_RAISE(state, kernel->init(&kernel_ctx, {kernel, inputs, options}));
kernel_ctx.SetState(state.get());
}

Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/compute/function_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,17 @@ void BM_CastDispatchBaseline(benchmark::State& state) {

ExecContext exec_context;
KernelContext kernel_context(&exec_context);
auto cast_state =
cast_kernel->init(&kernel_context, {cast_kernel, {double_type}, &cast_options});
ABORT_NOT_OK(kernel_context.status());
auto cast_state = cast_kernel
->init(&kernel_context,
KernelInitArgs{cast_kernel, {double_type}, &cast_options})
.ValueOrDie();
kernel_context.SetState(cast_state.get());

for (auto _ : state) {
Datum timestamp_scalar = MakeNullScalar(double_type);
for (Datum int_scalar : int_scalars) {
exec(&kernel_context, {{std::move(int_scalar)}, 1}, &timestamp_scalar);
ABORT_NOT_OK(kernel_context.status());
ABORT_NOT_OK(
exec(&kernel_context, {{std::move(int_scalar)}, 1}, &timestamp_scalar));
}
benchmark::DoNotOptimize(timestamp_scalar);
}
Expand Down Expand Up @@ -164,8 +165,7 @@ void BM_ExecuteScalarKernelOnScalar(benchmark::State& state) {
int64_t total = 0;
for (const auto& scalar : scalars) {
Datum result{MakeNullScalar(int64())};
exec(&kernel_context, ExecBatch{{scalar}, /*length=*/1}, &result);
ABORT_NOT_OK(kernel_context.status());
ABORT_NOT_OK(exec(&kernel_context, ExecBatch{{scalar}, /*length=*/1}, &result));
total += result.scalar()->is_valid;
}
benchmark::DoNotOptimize(total);
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/arrow/compute/function_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ TEST(VectorFunction, Basics) {
}

auto ExecNYI = [](KernelContext* ctx, const ExecBatch& args, Datum* out) {
ctx->SetStatus(Status::NotImplemented("NYI"));
return;
return Status::NotImplemented("NYI");
};

template <typename FunctionType>
Expand Down Expand Up @@ -181,13 +180,15 @@ TEST(ScalarAggregateFunction, Basics) {
ASSERT_EQ(Function::SCALAR_AGGREGATE, func.kind());
}

std::unique_ptr<KernelState> NoopInit(KernelContext*, const KernelInitArgs&) {
Result<std::unique_ptr<KernelState>> NoopInit(KernelContext*, const KernelInitArgs&) {
return nullptr;
}

void NoopConsume(KernelContext*, const ExecBatch&) {}
void NoopMerge(KernelContext*, const KernelState&, KernelState*) {}
void NoopFinalize(KernelContext*, Datum*) {}
Status NoopConsume(KernelContext*, const ExecBatch&) { return Status::OK(); }
Status NoopMerge(KernelContext*, const KernelState&, KernelState*) {
return Status::OK();
}
Status NoopFinalize(KernelContext*, Datum*) { return Status::OK(); }

TEST(ScalarAggregateFunction, DispatchExact) {
ScalarAggregateFunction func("agg_test", Arity::Unary(), /*doc=*/nullptr);
Expand Down
10 changes: 0 additions & 10 deletions cpp/src/arrow/compute/kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,6 @@ Result<std::shared_ptr<ResizableBuffer>> KernelContext::AllocateBitmap(int64_t n
return result;
}

void KernelContext::SetStatus(const Status& status) {
if (ARROW_PREDICT_TRUE(status.ok())) {
return;
}
status_ = status;
}

/// \brief Clear any error status
void KernelContext::ResetStatus() { status_ = Status::OK(); }

// ----------------------------------------------------------------------
// Some basic TypeMatcher implementations

Expand Down
49 changes: 10 additions & 39 deletions cpp/src/arrow/compute/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,6 @@ class ARROW_EXPORT KernelContext {
/// byte is preemptively zeroed to help avoid ASAN or valgrind issues.
Result<std::shared_ptr<ResizableBuffer>> AllocateBitmap(int64_t num_bits);

/// \brief Indicate that an error has occurred, to be checked by a exec caller
/// \param[in] status a Status instance.
///
/// \note Will not overwrite a prior set Status, so we will have the first
/// error that occurred until ExecContext::ResetStatus is called.
void SetStatus(const Status& status);

/// \brief Clear any error status.
void ResetStatus();

/// \brief Return true if an error has occurred.
bool HasError() const { return !status_.ok(); }

/// \brief Return the current status of the context.
const Status& status() const { return status_; }

/// \brief Assign the active KernelState to be utilized for each stage of
/// kernel execution. Ownership and memory lifetime of the KernelState must
/// be minded separately.
Expand All @@ -96,21 +80,9 @@ class ARROW_EXPORT KernelContext {

private:
ExecContext* exec_ctx_;
Status status_;
KernelState* state_;
};

// A macro to invoke for error control flow after invoking functions (such as
// kernel init or exec functions) that propagate errors via KernelContext.
#define ARROW_CTX_RETURN_IF_ERROR(CTX) \
do { \
if (ARROW_PREDICT_FALSE((CTX)->HasError())) { \
Status s = (CTX)->status(); \
(CTX)->ResetStatus(); \
return s; \
} \
} while (0)

/// \brief The standard kernel execution API that must be implemented for
/// SCALAR and VECTOR kernel types. This includes both stateless and stateful
/// kernels. Kernels depending on some execution state access that state via
Expand All @@ -119,7 +91,7 @@ class ARROW_EXPORT KernelContext {
/// into pre-allocated memory if they are able, though for some kernels
/// (e.g. in cases when a builder like StringBuilder) must be employed this may
/// not be possible.
using ArrayKernelExec = std::function<void(KernelContext*, const ExecBatch&, Datum*)>;
using ArrayKernelExec = std::function<Status(KernelContext*, const ExecBatch&, Datum*)>;

/// \brief An type-checking interface to permit customizable validation rules
/// for use with InputType and KernelSignature. This is for scenarios where the
Expand Down Expand Up @@ -523,9 +495,8 @@ struct KernelInitArgs {
};

/// \brief Common initializer function for all kernel types.
/// If an error occurs it will be stored in the KernelContext; nullptr will be returned.
using KernelInit =
std::function<std::unique_ptr<KernelState>(KernelContext*, const KernelInitArgs&)>;
using KernelInit = std::function<Result<std::unique_ptr<KernelState>>(
KernelContext*, const KernelInitArgs&)>;

/// \brief Base type for kernels. Contains the function signature and
/// optionally the state initialization function, along with some common
Expand Down Expand Up @@ -608,7 +579,7 @@ struct ScalarKernel : public ArrayKernel {
// VectorKernel (for VectorFunction)

/// \brief See VectorKernel::finalize member for usage
using VectorFinalize = std::function<void(KernelContext*, std::vector<Datum>*)>;
using VectorFinalize = std::function<Status(KernelContext*, std::vector<Datum>*)>;

/// \brief Kernel data structure for implementations of VectorFunction. In
/// addition to the members found in ArrayKernel, contains an optional
Expand Down Expand Up @@ -663,13 +634,13 @@ struct VectorKernel : public ArrayKernel {
// ----------------------------------------------------------------------
// ScalarAggregateKernel (for ScalarAggregateFunction)

using ScalarAggregateConsume = std::function<void(KernelContext*, const ExecBatch&)>;
using ScalarAggregateConsume = std::function<Status(KernelContext*, const ExecBatch&)>;

using ScalarAggregateMerge =
std::function<void(KernelContext*, KernelState&&, KernelState*)>;
std::function<Status(KernelContext*, KernelState&&, KernelState*)>;

// Finalize returns Datum to permit multiple return values
using ScalarAggregateFinalize = std::function<void(KernelContext*, Datum*)>;
using ScalarAggregateFinalize = std::function<Status(KernelContext*, Datum*)>;

/// \brief Kernel data structure for implementations of
/// ScalarAggregateFunction. The four necessary components of an aggregation
Expand Down Expand Up @@ -707,13 +678,13 @@ struct ScalarAggregateKernel : public Kernel {
// ----------------------------------------------------------------------
// HashAggregateKernel (for HashAggregateFunction)

using HashAggregateConsume = std::function<void(KernelContext*, const ExecBatch&)>;
using HashAggregateConsume = std::function<Status(KernelContext*, const ExecBatch&)>;

using HashAggregateMerge =
std::function<void(KernelContext*, KernelState&&, KernelState*)>;
std::function<Status(KernelContext*, KernelState&&, KernelState*)>;

// Finalize returns Datum to permit multiple return values
using HashAggregateFinalize = std::function<void(KernelContext*, Datum*)>;
using HashAggregateFinalize = std::function<Status(KernelContext*, Datum*)>;

/// \brief Kernel data structure for implementations of
/// HashAggregateFunction. The four necessary components of an aggregation
Expand Down
Loading