Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
#include "arrow/datum.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/vector.h"

namespace arrow {

Expand Down Expand Up @@ -555,6 +557,61 @@ bool Declaration::IsValid(ExecFactoryRegistry* registry) const {
return !this->factory_name.empty() && this->options != nullptr;
}

Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration,
ExecContext* exec_context) {
std::shared_ptr<std::shared_ptr<Table>> output_table =
std::make_shared<std::shared_ptr<Table>>();
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(exec_context));
Declaration with_sink = Declaration::Sequence(
{declaration, {"table_sink", TableSinkNodeOptions(output_table.get())}});
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get()));
ARROW_RETURN_NOT_OK(exec_plan->StartProducing());
return exec_plan->finished().Then([exec_plan, output_table] { return *output_table; });
}

Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration,
ExecContext* exec_context) {
return DeclarationToTableAsync(std::move(declaration), exec_context).result();
}

Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
Declaration declaration, ExecContext* exec_context) {
return DeclarationToTableAsync(std::move(declaration), exec_context)
.Then([](const std::shared_ptr<Table>& table) {
return TableBatchReader(table).ToRecordBatches();
});
}

Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
Declaration declaration, ExecContext* exec_context) {
return DeclarationToBatchesAsync(std::move(declaration), exec_context).result();
}

Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(Declaration declaration,
ExecContext* exec_context) {
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(exec_context));
Declaration with_sink =
Declaration::Sequence({declaration, {"sink", SinkNodeOptions(&sink_gen)}});
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get()));
ARROW_RETURN_NOT_OK(exec_plan->StartProducing());
auto collected_fut = CollectAsyncGenerator(sink_gen);
return AllComplete({exec_plan->finished(), Future<>(collected_fut)})
.Then([collected_fut, exec_plan]() -> Result<std::vector<ExecBatch>> {
ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result());
return ::arrow::internal::MapVector(
[](std::optional<ExecBatch> batch) { return std::move(*batch); },
std::move(collected));
});
}

Result<std::vector<ExecBatch>> DeclarationToExecBatches(Declaration declaration,
ExecContext* exec_context) {
return DeclarationToExecBatchesAsync(std::move(declaration), exec_context).result();
}

namespace internal {

void RegisterSourceNode(ExecFactoryRegistry*);
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,39 @@ struct ARROW_EXPORT Declaration {
std::string label;
};

/// \brief Utility method to run a declaration and collect the results into a table
///
/// This method will add a sink node to the declaration to collect results into a
/// table. It will then create an ExecPlan from the declaration, start the exec plan,
/// block until the plan has finished, and return the created table.
ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(
Declaration declaration, ExecContext* exec_context = default_exec_context());

/// \brief Asynchronous version of \see DeclarationToTable
ARROW_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync(
Declaration declaration, ExecContext* exec_context = default_exec_context());

/// \brief Utility method to run a declaration and collect the results into ExecBatch
/// vector
///
/// \see DeclarationToTable for details
ARROW_EXPORT Result<std::vector<ExecBatch>> DeclarationToExecBatches(
Declaration declaration, ExecContext* exec_context = default_exec_context());

/// \brief Asynchronous version of \see DeclarationToExecBatches
ARROW_EXPORT Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(
Declaration declaration, ExecContext* exec_context = default_exec_context());

/// \brief Utility method to run a declaration and collect the results into a vector
///
/// \see DeclarationToTable for details
ARROW_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
Declaration declaration, ExecContext* exec_context = default_exec_context());

/// \brief Asynchronous version of \see DeclarationToBatches
ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
Declaration declaration, ExecContext* exec_context = default_exec_context());

/// \brief Wrap an ExecBatch generator in a RecordBatchReader.
///
/// The RecordBatchReader does not impose any ordering on emitted batches.
Expand Down
21 changes: 15 additions & 6 deletions cpp/src/arrow/compute/exec/expression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/chunked_array.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec/expression_internal.h"
#include "arrow/compute/exec/util.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/compute/function_internal.h"
#include "arrow/io/memory.h"
Expand Down Expand Up @@ -77,9 +78,15 @@ Expression call(std::string function, std::vector<Expression> arguments,
return Expression(std::move(call));
}

const Datum* Expression::literal() const { return std::get_if<Datum>(impl_.get()); }
const Datum* Expression::literal() const {
if (impl_ == nullptr) return nullptr;

return std::get_if<Datum>(impl_.get());
}

const Expression::Parameter* Expression::parameter() const {
if (impl_ == nullptr) return nullptr;

return std::get_if<Parameter>(impl_.get());
}

Expand All @@ -91,6 +98,8 @@ const FieldRef* Expression::field_ref() const {
}

const Expression::Call* Expression::call() const {
if (impl_ == nullptr) return nullptr;

return std::get_if<Call>(impl_.get());
}

Expand Down Expand Up @@ -654,7 +663,7 @@ bool ExpressionHasFieldRefs(const Expression& expr) {
}

Result<Expression> FoldConstants(Expression expr) {
return Modify(
return ModifyExpression(
std::move(expr), [](Expression expr) { return expr; },
[](Expression expr, ...) -> Result<Expression> {
auto call = CallNotNull(expr);
Expand Down Expand Up @@ -799,7 +808,7 @@ Result<Expression> ReplaceFieldsWithKnownValues(const KnownFieldValues& known_va
"ReplaceFieldsWithKnownValues called on an unbound Expression");
}

return Modify(
return ModifyExpression(
std::move(expr),
[&known_values](Expression expr) -> Result<Expression> {
if (auto ref = expr.field_ref()) {
Expand Down Expand Up @@ -870,7 +879,7 @@ Result<Expression> Canonicalize(Expression expr, compute::ExecContext* exec_cont
}
} AlreadyCanonicalized;

return Modify(
return ModifyExpression(
std::move(expr),
[&AlreadyCanonicalized, exec_context](Expression expr) -> Result<Expression> {
auto call = expr.call();
Expand Down Expand Up @@ -1112,7 +1121,7 @@ Result<Expression> SimplifyIsValidGuarantee(Expression expr,
const Expression::Call& guarantee) {
if (guarantee.function_name != "is_valid") return expr;

return Modify(
return ModifyExpression(
std::move(expr), [](Expression expr) { return expr; },
[&](Expression expr, ...) -> Result<Expression> {
auto call = expr.call();
Expand Down Expand Up @@ -1154,7 +1163,7 @@ Result<Expression> SimplifyWithGuarantee(Expression expr,

if (auto inequality = Inequality::ExtractOne(guarantee)) {
ARROW_ASSIGN_OR_RAISE(auto simplified,
Modify(
ModifyExpression(
std::move(expr), [](Expression expr) { return expr; },
[&](Expression expr, ...) -> Result<Expression> {
return inequality->Simplify(std::move(expr));
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/exec/expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class ARROW_EXPORT Expression {
// XXX someday
// Result<PipelineGraph> GetPipelines();

bool is_valid() const { return impl_ != NULLPTR; }

/// Access a Call or return nullptr if this expression is not a call
const Call* call() const;
/// Access a Datum or return nullptr if this expression is not a literal
Expand Down
47 changes: 0 additions & 47 deletions cpp/src/arrow/compute/exec/expression_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,52 +287,5 @@ inline Result<std::shared_ptr<compute::Function>> GetFunction(
return GetCastFunction(*to_type);
}

/// Modify an Expression with pre-order and post-order visitation.
/// `pre` will be invoked on each Expression. `pre` will visit Calls before their
/// arguments, `post_call` will visit Calls (and no other Expressions) after their
/// arguments. Visitors should return the Identical expression to indicate no change; this
/// will prevent unnecessary construction in the common case where a modification is not
/// possible/necessary/...
///
/// If an argument was modified, `post_call` visits a reconstructed Call with the modified
/// arguments but also receives a pointer to the unmodified Expression as a second
/// argument. If no arguments were modified the unmodified Expression* will be nullptr.
template <typename PreVisit, typename PostVisitCall>
Result<Expression> Modify(Expression expr, const PreVisit& pre,
const PostVisitCall& post_call) {
ARROW_ASSIGN_OR_RAISE(expr, Result<Expression>(pre(std::move(expr))));

auto call = expr.call();
if (!call) return expr;

bool at_least_one_modified = false;
std::vector<Expression> modified_arguments;

for (size_t i = 0; i < call->arguments.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto modified_argument,
Modify(call->arguments[i], pre, post_call));

if (Identical(modified_argument, call->arguments[i])) {
continue;
}

if (!at_least_one_modified) {
modified_arguments = call->arguments;
at_least_one_modified = true;
}

modified_arguments[i] = std::move(modified_argument);
}

if (at_least_one_modified) {
// reconstruct the call expression with the modified arguments
auto modified_call = *call;
modified_call.arguments = std::move(modified_arguments);
return post_call(Expression(std::move(modified_call)), &expr);
}

return post_call(std::move(expr), nullptr);
}

} // namespace compute
} // namespace arrow
7 changes: 3 additions & 4 deletions cpp/src/arrow/compute/exec/filter_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ namespace {
class FilterNode : public MapNode {
public:
FilterNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema, Expression filter, bool async_mode)
: MapNode(plan, std::move(inputs), std::move(output_schema), async_mode),
std::shared_ptr<Schema> output_schema, Expression filter)
: MapNode(plan, std::move(inputs), std::move(output_schema)),
filter_(std::move(filter)) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
Expand All @@ -61,8 +61,7 @@ class FilterNode : public MapNode {
filter_expression.type()->ToString());
}
return plan->EmplaceNode<FilterNode>(plan, std::move(inputs), std::move(schema),
std::move(filter_expression),
filter_options.async_mode);
std::move(filter_expression));
}

const char* kind_name() const override { return "FilterNode"; }
Expand Down
13 changes: 2 additions & 11 deletions cpp/src/arrow/compute/exec/map_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,10 @@ namespace arrow {
namespace compute {

MapNode::MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema, bool async_mode)
std::shared_ptr<Schema> output_schema)
: ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"},
std::move(output_schema),
/*num_outputs=*/1) {
if (async_mode) {
executor_ = plan_->exec_context()->executor();
} else {
executor_ = nullptr;
}
}
/*num_outputs=*/1) {}

void MapNode::ErrorReceived(ExecNode* input, Status error) {
DCHECK_EQ(input, inputs_[0]);
Expand Down Expand Up @@ -82,9 +76,6 @@ void MapNode::StopProducing(ExecNode* output) {

void MapNode::StopProducing() {
EVENT(span_, "StopProducing");
if (executor_) {
this->stop_source_.RequestStop();
}
if (input_counter_.Cancel()) {
this->Finish();
}
Expand Down
7 changes: 1 addition & 6 deletions cpp/src/arrow/compute/exec/map_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace compute {
class ARROW_EXPORT MapNode : public ExecNode {
public:
MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema, bool async_mode);
std::shared_ptr<Schema> output_schema);

void ErrorReceived(ExecNode* input, Status error) override;

Expand All @@ -69,11 +69,6 @@ class ARROW_EXPORT MapNode : public ExecNode {
protected:
// Counter for the number of batches received
AtomicCounter input_counter_;

::arrow::internal::Executor* executor_;

// Variable used to cancel remaining tasks in the executor
StopSource stop_source_;
};

} // namespace compute
Expand Down
12 changes: 4 additions & 8 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
/// excluded in the batch emitted by this node.
class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions {
public:
explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true)
: filter_expression(std::move(filter_expression)), async_mode(async_mode) {}
explicit FilterNodeOptions(Expression filter_expression)
: filter_expression(std::move(filter_expression)) {}

Expression filter_expression;
bool async_mode;
};

/// \brief Make a node which executes expressions on input batches, producing new batches.
Expand All @@ -100,14 +99,11 @@ class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions {
class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions {
public:
explicit ProjectNodeOptions(std::vector<Expression> expressions,
std::vector<std::string> names = {}, bool async_mode = true)
: expressions(std::move(expressions)),
names(std::move(names)),
async_mode(async_mode) {}
std::vector<std::string> names = {})
: expressions(std::move(expressions)), names(std::move(names)) {}

std::vector<Expression> expressions;
std::vector<std::string> names;
bool async_mode;
};

/// \brief Make a node which aggregates input batches, optionally grouped by keys.
Expand Down
8 changes: 3 additions & 5 deletions cpp/src/arrow/compute/exec/project_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ namespace {
class ProjectNode : public MapNode {
public:
ProjectNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema, std::vector<Expression> exprs,
bool async_mode)
: MapNode(plan, std::move(inputs), std::move(output_schema), async_mode),
std::shared_ptr<Schema> output_schema, std::vector<Expression> exprs)
: MapNode(plan, std::move(inputs), std::move(output_schema)),
exprs_(std::move(exprs)) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
Expand Down Expand Up @@ -72,8 +71,7 @@ class ProjectNode : public MapNode {
++i;
}
return plan->EmplaceNode<ProjectNode>(plan, std::move(inputs),
schema(std::move(fields)), std::move(exprs),
project_options.async_mode);
schema(std::move(fields)), std::move(exprs));
}

const char* kind_name() const override { return "ProjectNode"; }
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ Status TableSinkNodeConsumer::Consume(ExecBatch batch) {
}

Future<> TableSinkNodeConsumer::Finish() {
ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(batches_));
ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(schema_, batches_));
return Status::OK();
}

Expand Down
Loading