Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions cpp/examples/arrow/execution_plan_documentation_examples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,46 @@ arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {

// (Doc section: Union Example)

// (Doc section: Table Sink Example)

/// \brief An example showing a table sink node
/// \param exec_context The execution context to run the plan in
///
/// TableSink Example
/// This example shows how a table_sink can be used
/// in an execution plan. This includes a source node
/// receiving data as batches and the table sink node
/// which emits the output as a table.
arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));

ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
cp::MakeExecNode("source", plan.get(), {}, source_node_options));

std::shared_ptr<arrow::Table> output_table;
auto table_sink_options = cp::TableSinkNodeOptions{&output_table};

ARROW_RETURN_NOT_OK(
cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
// validate the ExecPlan
ARROW_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
ARROW_RETURN_NOT_OK(plan->StartProducing());

// Wait for the plan to finish
auto finished = plan->finished();
RETURN_NOT_OK(finished.status());
std::cout << "Results : " << output_table->ToString() << std::endl;
return arrow::Status::OK();
}
// (Doc section: Table Sink Example)

enum ExampleMode {
SOURCE_SINK = 0,
TABLE_SOURCE_SINK = 1,
Expand All @@ -869,6 +909,7 @@ enum ExampleMode {
KSELECT = 10,
WRITE = 11,
UNION = 12,
TABLE_SOURCE_TABLE_SINK = 13
};

int main(int argc, char** argv) {
Expand Down Expand Up @@ -937,6 +978,10 @@ int main(int argc, char** argv) {
PrintBlock("Union Example");
status = SourceUnionSinkExample(exec_context);
break;
case TABLE_SOURCE_TABLE_SINK:
PrintBlock("TableSink Example");
status = TableSinkExample(exec_context);
break;
default:
break;
}
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,11 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
/// SelectK options
SelectKOptions select_k_options;
};

/// @}

/// \brief Adapt an Table as a sink node
/// \brief Adapt a Table as a sink node
///
/// obtains the output of a execution plan to
/// obtains the output of an execution plan to
/// a table pointer.
class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions {
public:
Expand Down
21 changes: 21 additions & 0 deletions docs/source/cpp/streaming_execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ This is the list of operations associated with the execution plan:
- :class:`arrow::dataset::WriteNodeOptions`
* - ``union``
- N/A
* - ``table_sink``
- :class:`arrow::compute::TableSinkNodeOptions`

.. _stream_execution_source_docs:

Expand Down Expand Up @@ -647,6 +649,25 @@ SelectK example:

.. _stream_execution_scan_docs:

``table_sink``
----------------

.. _stream_execution_table_sink_docs:

The ``table_sink`` node provides the ability to receive the output as an in-memory table.
This is simpler to use than the other sink nodes provided by the streaming execution engine
but it only makes sense when the output fits comfortably in memory.
The node is created using :class:`arrow::compute::TableSinkNodeOptions`.

Example of using ``table_sink``

.. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
:language: cpp
:start-after: (Doc section: Table Sink Example)
:end-before: (Doc section: Table Sink Example)
:linenos:
:lineno-match:

``scan``
---------

Expand Down