Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/chunked_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class ARROW_EXPORT ChunkedArray {
/// there are zero chunks
Result<std::shared_ptr<ChunkedArray>> View(const std::shared_ptr<DataType>& type) const;

std::shared_ptr<DataType> type() const { return type_; }
const std::shared_ptr<DataType>& type() const { return type_; }

/// \brief Determine if two chunked arrays are equal.
///
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,16 @@ class KernelExecutorImpl : public KernelExecutor {
return out;
}

Status CheckResultType(const Datum& out, const char* function_name) override {
const auto& type = out.type();
if (type != nullptr && !type->Equals(output_descr_.type)) {
return Status::TypeError(
"kernel type result mismatch for function '", function_name, "': declared as ",
output_descr_.type->ToString(), ", actual is ", type->ToString());
}
return Status::OK();
}

ExecContext* exec_context() { return kernel_ctx_->exec_context(); }
KernelState* state() { return kernel_ctx_->state(); }

Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/compute/exec/expression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,13 @@ Result<Datum> ExecuteScalarExpression(const Expression& expr, const ExecBatch& i
auto options = call->options.get();
RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, descrs, options}));

auto listener = std::make_shared<compute::detail::DatumAccumulator>();
RETURN_NOT_OK(executor->Execute(arguments, listener.get()));
return executor->WrapResults(arguments, listener->values());
compute::detail::DatumAccumulator listener;
RETURN_NOT_OK(executor->Execute(arguments, &listener));
const auto out = executor->WrapResults(arguments, listener.values());
#ifndef NDEBUG
DCHECK_OK(executor->CheckResultType(out, call->function_name.c_str()));
#endif
return out;
}

namespace {
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class ARROW_EXPORT KernelExecutor {
virtual Datum WrapResults(const std::vector<Datum>& args,
const std::vector<Datum>& outputs) = 0;

/// \brief Check the actual result type against the resolved output type
virtual Status CheckResultType(const Datum& out, const char* function_name) = 0;

static std::unique_ptr<KernelExecutor> MakeScalar();
static std::unique_ptr<KernelExecutor> MakeVector();
static std::unique_ptr<KernelExecutor> MakeScalarAggregate();
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/compute/registry.h"
#include "arrow/datum.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/logging.h"

namespace arrow {

Expand Down Expand Up @@ -230,9 +231,13 @@ Result<Datum> Function::Execute(const std::vector<Datum>& args,
}
RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, inputs, options}));

auto listener = std::make_shared<detail::DatumAccumulator>();
RETURN_NOT_OK(executor->Execute(implicitly_cast_args, listener.get()));
return executor->WrapResults(implicitly_cast_args, listener->values());
detail::DatumAccumulator listener;
RETURN_NOT_OK(executor->Execute(implicitly_cast_args, &listener));
const auto out = executor->WrapResults(implicitly_cast_args, listener.values());
#ifndef NDEBUG
DCHECK_OK(executor->CheckResultType(out, name_.c_str()));
#endif
return out;
}

Status Function::Validate() const {
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/arrow/datum.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ std::shared_ptr<Array> Datum::make_array() const {
return MakeArray(util::get<std::shared_ptr<ArrayData>>(this->value));
}

std::shared_ptr<DataType> Datum::type() const {
const std::shared_ptr<DataType>& Datum::type() const {
if (this->kind() == Datum::ARRAY) {
return util::get<std::shared_ptr<ArrayData>>(this->value)->type;
}
Expand All @@ -96,17 +96,19 @@ std::shared_ptr<DataType> Datum::type() const {
if (this->kind() == Datum::SCALAR) {
return util::get<std::shared_ptr<Scalar>>(this->value)->type;
}
return nullptr;
static std::shared_ptr<DataType> no_type;
return no_type;
}

std::shared_ptr<Schema> Datum::schema() const {
const std::shared_ptr<Schema>& Datum::schema() const {
if (this->kind() == Datum::RECORD_BATCH) {
return util::get<std::shared_ptr<RecordBatch>>(this->value)->schema();
}
if (this->kind() == Datum::TABLE) {
return util::get<std::shared_ptr<Table>>(this->value)->schema();
}
return nullptr;
static std::shared_ptr<Schema> no_schema;
return no_schema;
}

int64_t Datum::length() const {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/datum.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ struct ARROW_EXPORT Datum {
/// \brief The value type of the variant, if any
///
/// \return nullptr if no type
std::shared_ptr<DataType> type() const;
const std::shared_ptr<DataType>& type() const;

/// \brief The schema of the variant, if any
///
/// \return nullptr if no schema
std::shared_ptr<Schema> schema() const;
const std::shared_ptr<Schema>& schema() const;

/// \brief The value length of the variant, if any
///
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ARROW_EXPORT Table {
const std::shared_ptr<ChunkedArray>& array);

/// \brief Return the table schema
std::shared_ptr<Schema> schema() const { return schema_; }
const std::shared_ptr<Schema>& schema() const { return schema_; }

/// \brief Return a column by index
virtual std::shared_ptr<ChunkedArray> column(int i) const = 0;
Expand Down