diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 0505af223ed..1ca3d36a349 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -855,6 +855,46 @@ arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) { // (Doc section: Union Example) +// (Doc section: Table Sink Example) + +/// \brief An example showing a table sink node +/// \param exec_context The execution context to run the plan in +/// +/// TableSink Example +/// This example shows how a table_sink can be used +/// in an execution plan. This includes a source node +/// receiving data as batches and the table sink node +/// which emits the output as a table. +arrow::Status TableSinkExample(cp::ExecContext& exec_context) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr plan, + cp::ExecPlan::Make(&exec_context)); + + ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches()); + + auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}; + + ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source, + cp::MakeExecNode("source", plan.get(), {}, source_node_options)); + + std::shared_ptr output_table; + auto table_sink_options = cp::TableSinkNodeOptions{&output_table}; + + ARROW_RETURN_NOT_OK( + cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options)); + // validate the ExecPlan + ARROW_RETURN_NOT_OK(plan->Validate()); + std::cout << "ExecPlan created : " << plan->ToString() << std::endl; + // start the ExecPlan + ARROW_RETURN_NOT_OK(plan->StartProducing()); + + // Wait for the plan to finish + auto finished = plan->finished(); + RETURN_NOT_OK(finished.status()); + std::cout << "Results : " << output_table->ToString() << std::endl; + return arrow::Status::OK(); +} +// (Doc section: Table Sink Example) + enum ExampleMode { SOURCE_SINK = 0, TABLE_SOURCE_SINK = 1, @@ -869,6 +909,7 @@ enum ExampleMode { KSELECT = 10, WRITE = 11, UNION = 12, + TABLE_SOURCE_TABLE_SINK = 13 }; int main(int argc, char** argv) { @@ -937,6 +978,10 @@ int main(int argc, char** argv) { PrintBlock("Union Example"); status = SourceUnionSinkExample(exec_context); break; + case TABLE_SOURCE_TABLE_SINK: + PrintBlock("TableSink Example"); + status = TableSinkExample(exec_context); + break; default: break; } diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 259e467d97e..9e99953e872 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -305,12 +305,11 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { /// SelectK options SelectKOptions select_k_options; }; - /// @} -/// \brief Adapt an Table as a sink node +/// \brief Adapt a Table as a sink node /// -/// obtains the output of a execution plan to +/// obtains the output of an execution plan to /// a table pointer. class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 5bee70eb053..649968ad43f 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -346,6 +346,8 @@ This is the list of operations associated with the execution plan: - :class:`arrow::dataset::WriteNodeOptions` * - ``union`` - N/A + * - ``table_sink`` + - :class:`arrow::compute::TableSinkNodeOptions` .. _stream_execution_source_docs: @@ -647,6 +649,25 @@ SelectK example: .. _stream_execution_scan_docs: +``table_sink`` +---------------- + +.. _stream_execution_table_sink_docs: + +The ``table_sink`` node provides the ability to receive the output as an in-memory table. +This is simpler to use than the other sink nodes provided by the streaming execution engine +but it only makes sense when the output fits comfortably in memory. +The node is created using :class:`arrow::compute::TableSinkNodeOptions`. + +Example of using ``table_sink`` + +.. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc + :language: cpp + :start-after: (Doc section: Table Sink Example) + :end-before: (Doc section: Table Sink Example) + :linenos: + :lineno-match: + ``scan`` ---------