Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions cpp/examples/arrow/compute_register_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/compute/exec/exec_plan.h>
#include <arrow/compute/exec/expression.h>
#include <arrow/compute/exec/options.h>
#include <arrow/util/async_generator.h>
#include <arrow/util/future.h>

#include <cstdlib>
#include <iostream>
Expand Down Expand Up @@ -65,6 +69,45 @@ arrow::Status ExampleFunctionImpl(cp::KernelContext* ctx, const cp::ExecBatch& b
return arrow::Status::OK();
}

class ExampleNodeOptions : public cp::ExecNodeOptions {};

// a basic ExecNode which ignores all input batches
class ExampleNode : public cp::ExecNode {
public:
ExampleNode(ExecNode* input, const ExampleNodeOptions&)
: ExecNode(/*plan=*/input->plan(), /*inputs=*/{input},
/*input_labels=*/{"ignored"},
/*output_schema=*/input->output_schema(), /*num_outputs=*/1) {}

const char* kind_name() override { return "ExampleNode"; }

arrow::Status StartProducing() override {
outputs_[0]->InputFinished(this, 0);
return arrow::Status::OK();
}

void ResumeProducing(ExecNode* output) override {}
void PauseProducing(ExecNode* output) override {}

void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }
void StopProducing() override { inputs_[0]->StopProducing(); }

void InputReceived(ExecNode* input, int seq_num, cp::ExecBatch batch) override {}
void ErrorReceived(ExecNode* input, arrow::Status error) override {}
void InputFinished(ExecNode* input, int seq_stop) override {}

arrow::Future<> finished() override { return inputs_[0]->finished(); }
};

arrow::Result<cp::ExecNode*> ExampleExecNodeFactory(cp::ExecPlan* plan,
std::vector<cp::ExecNode*> inputs,
const cp::ExecNodeOptions& options) {
const auto& example_options =
arrow::internal::checked_cast<const ExampleNodeOptions&>(options);

return plan->EmplaceNode<ExampleNode>(inputs[0], example_options);
}

const cp::FunctionDoc func_doc{
"Example function to demonstrate registering an out-of-tree function",
"",
Expand Down Expand Up @@ -96,5 +139,24 @@ int main(int argc, char** argv) {
auto maybe_serialized = cp::Serialize(expr);
std::cerr << maybe_serialized.status().ToString() << std::endl;

auto exec_registry = cp::default_exec_factory_registry();
ABORT_ON_FAILURE(
exec_registry->AddFactory("compute_register_example", ExampleExecNodeFactory));

auto maybe_plan = cp::ExecPlan::Make();
ABORT_ON_FAILURE(maybe_plan.status());
auto plan = maybe_plan.ValueOrDie();

arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> source_gen, sink_gen;
ABORT_ON_FAILURE(
cp::Declaration::Sequence(
{
{"source", cp::SourceNodeOptions{arrow::schema({}), source_gen}},
{"compute_register_example", ExampleNodeOptions{}},
{"sink", cp::SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get())
.status());

return EXIT_SUCCESS;
}
5 changes: 5 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,13 @@ if(ARROW_COMPUTE)
compute/api_vector.cc
compute/cast.cc
compute/exec.cc
compute/exec/aggregate_node.cc
compute/exec/exec_plan.cc
compute/exec/expression.cc
compute/exec/filter_node.cc
compute/exec/project_node.cc
compute/exec/source_node.cc
compute/exec/sink_node.cc
compute/function.cc
compute/function_internal.cc
compute/kernel.cc
Expand Down
Loading