From 8f9e2c677b445093e8fd8649ef5fddc43b6e0355 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Fri, 4 Mar 2022 18:57:55 +0530 Subject: [PATCH 1/9] updating submodule --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index 53b49804710..634739c6644 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 53b498047109d9940fcfab388bd9d6aeb8c57425 +Subproject commit 634739c664433cec366b4b9a81d1e1044a8c5eda From 5fa9c19d3bbb6cc3cbf37b29f811438220b4270d Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sat, 19 Mar 2022 19:14:39 +0530 Subject: [PATCH 2/9] temp commit to remove files in submodule --- .gitmodules | 3 --- testing | 1 - 2 files changed, 4 deletions(-) delete mode 160000 testing diff --git a/.gitmodules b/.gitmodules index 6efc4871542..71722b21777 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ [submodule "cpp/submodules/parquet-testing"] path = cpp/submodules/parquet-testing url = https://github.com/apache/parquet-testing.git -[submodule "testing"] - path = testing - url = https://github.com/apache/arrow-testing diff --git a/testing b/testing deleted file mode 160000 index 634739c6644..00000000000 --- a/testing +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 634739c664433cec366b4b9a81d1e1044a8c5eda From 5538474d7e96ec461ce321817bbc3478d239f258 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sat, 19 Mar 2022 19:15:36 +0530 Subject: [PATCH 3/9] adding submodule --- .gitmodules | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitmodules b/.gitmodules index 71722b21777..6efc4871542 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "cpp/submodules/parquet-testing"] path = cpp/submodules/parquet-testing url = https://github.com/apache/parquet-testing.git +[submodule "testing"] + path = testing + url = https://github.com/apache/arrow-testing From 20619c19bb92100e7dc537048255400db9a34dd0 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sun, 20 Mar 2022 23:09:21 +0530 Subject: [PATCH 4/9] updating testing submodule --- testing | 1 + 1 file changed, 1 insertion(+) create mode 160000 testing diff --git a/testing b/testing new file mode 160000 index 00000000000..d315f798520 --- /dev/null +++ b/testing @@ -0,0 +1 @@ +Subproject commit d315f7985207d2d67fc2c8e41053e9d97d573f4b From 16d5064d28e81f8660432199bcf0476dd4b02b7f Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sun, 20 Mar 2022 23:11:24 +0530 Subject: [PATCH 5/9] revert to uupstream version --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index d315f798520..53b49804710 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit d315f7985207d2d67fc2c8e41053e9d97d573f4b +Subproject commit 53b498047109d9940fcfab388bd9d6aeb8c57425 From 0311621751e3b9b7458302f5633884a6c7ec18fd Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 22 Mar 2022 15:57:06 +0530 Subject: [PATCH 6/9] adding docs and updated example for table-sink --- .../execution_plan_documentation_examples.cc | 49 +++++++++++++++++++ cpp/src/arrow/compute/exec/options.h | 1 - docs/source/cpp/streaming_execution.rst | 21 ++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 0505af223ed..ee0baf77950 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -855,6 +855,50 @@ arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) { // (Doc section: Union Example) +// (Doc section: Table Sink Example) + +/// \brief An example showing a table sink node +/// \param exec_context The execution context to run the plan in +/// +/// TableSink Example +/// This example shows how a table_sink can be used +/// in an execution plan. This includes a source node +/// receiving data as batches and the table sink node +/// which emits the output as a table. +arrow::Status TableSinkExample(cp::ExecContext& exec_context) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr plan, + cp::ExecPlan::Make(&exec_context)); + + ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches()); + + auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}; + + ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source, + cp::MakeExecNode("source", plan.get(), {}, source_node_options)); + + std::shared_ptr output_table; + auto table_sink_options = cp::TableSinkNodeOptions{&output_table, basic_data.schema}; + + ARROW_RETURN_NOT_OK( + cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options)); + // validate the ExecPlan + ARROW_RETURN_NOT_OK(plan->Validate()); + std::cout << "ExecPlan created : " << plan->ToString() << std::endl; + // start the ExecPlan + ARROW_RETURN_NOT_OK(plan->StartProducing()); + + auto finish = source->finished(); + + RETURN_NOT_OK(finish.status()); + + std::cout << "Results : " << output_table->ToString() << std::endl; + + // plan mark finished + auto future = plan->finished(); + return future.status(); +} +// (Doc section: Table Sink Example) + enum ExampleMode { SOURCE_SINK = 0, TABLE_SOURCE_SINK = 1, @@ -869,6 +913,7 @@ enum ExampleMode { KSELECT = 10, WRITE = 11, UNION = 12, + TABLE_SOURCE_TABLE_SINK = 13 }; int main(int argc, char** argv) { @@ -937,6 +982,10 @@ int main(int argc, char** argv) { PrintBlock("Union Example"); status = SourceUnionSinkExample(exec_context); break; + case TABLE_SOURCE_TABLE_SINK: + PrintBlock("TableSink Example"); + status = TableSinkExample(exec_context); + break; default: break; } diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 259e467d97e..3ede26a0182 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -305,7 +305,6 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { /// SelectK options SelectKOptions select_k_options; }; - /// @} /// \brief Adapt an Table as a sink node diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 5bee70eb053..a55ebb198ad 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -346,6 +346,8 @@ This is the list of operations associated with the execution plan: - :class:`arrow::dataset::WriteNodeOptions` * - ``union`` - N/A + * - ``table_sink`` + - :class:`arrow::compute::TableSinkNodeOptions` .. _stream_execution_source_docs: @@ -647,6 +649,25 @@ SelectK example: .. _stream_execution_scan_docs: +``table_sink`` +---------------- + +.. _stream_execution_table_sink_docs: + +Considering the variety of sink nodes provided in the streaming execution engine, the ``table_sink`` node +provides the ability to take the output as a table. It is much easier to use +:class:`arrow::compute::TableSinkNodeOptions`. +The output data can be obtained as a ``std::shared_ptr`` along with the output ``schema``. + +Example of using ``table_sink`` + +.. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc + :language: cpp + :start-after: (Doc section: Table Sink Example) + :end-before: (Doc section: Table Sink Example) + :linenos: + :lineno-match: + ``scan`` --------- From 138b0f6d911a28cd8318c2cc18ab4023e7eccce5 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 31 Mar 2022 09:39:16 +0530 Subject: [PATCH 7/9] addressing reviews --- .../arrow/execution_plan_documentation_examples.cc | 12 ++++-------- docs/source/cpp/streaming_execution.rst | 8 ++++---- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index ee0baf77950..836a648a591 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -887,15 +887,11 @@ arrow::Status TableSinkExample(cp::ExecContext& exec_context) { // start the ExecPlan ARROW_RETURN_NOT_OK(plan->StartProducing()); - auto finish = source->finished(); - - RETURN_NOT_OK(finish.status()); - + // Wait for the plan to finish + auto finished = plan->finished(); + RETURN_NOT_OK(finished.status()); std::cout << "Results : " << output_table->ToString() << std::endl; - - // plan mark finished - auto future = plan->finished(); - return future.status(); + return arrow::Status::OK(); } // (Doc section: Table Sink Example) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index a55ebb198ad..50a4c6fec37 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -654,10 +654,10 @@ SelectK example: .. _stream_execution_table_sink_docs: -Considering the variety of sink nodes provided in the streaming execution engine, the ``table_sink`` node -provides the ability to take the output as a table. It is much easier to use -:class:`arrow::compute::TableSinkNodeOptions`. -The output data can be obtained as a ``std::shared_ptr`` along with the output ``schema``. +The ``table_sink`` node provides the ability to take the output as an in-memory table. +This is much simpler to use than the other sink nodes provided by the streaming execution engine +but it only makes sense when the output fits comfortably in memory. +The node is created using :class:`arrow::compute::TableSinkNodeOptions`. Example of using ``table_sink`` From 75e0567d8f8dfc91ccb42e4421a41738a0f14cde Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 7 Apr 2022 10:58:05 +0530 Subject: [PATCH 8/9] updating example --- cpp/examples/arrow/execution_plan_documentation_examples.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 836a648a591..1ca3d36a349 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -877,7 +877,7 @@ arrow::Status TableSinkExample(cp::ExecContext& exec_context) { cp::MakeExecNode("source", plan.get(), {}, source_node_options)); std::shared_ptr output_table; - auto table_sink_options = cp::TableSinkNodeOptions{&output_table, basic_data.schema}; + auto table_sink_options = cp::TableSinkNodeOptions{&output_table}; ARROW_RETURN_NOT_OK( cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options)); From 28395bf714e51f7d360aa279adb76812041d7fb6 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 7 Apr 2022 11:04:48 +0530 Subject: [PATCH 9/9] addressing review comments --- cpp/src/arrow/compute/exec/options.h | 4 ++-- docs/source/cpp/streaming_execution.rst | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 3ede26a0182..9e99953e872 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -307,9 +307,9 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { }; /// @} -/// \brief Adapt an Table as a sink node +/// \brief Adapt a Table as a sink node /// -/// obtains the output of a execution plan to +/// obtains the output of an execution plan to /// a table pointer. class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 50a4c6fec37..649968ad43f 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -654,8 +654,8 @@ SelectK example: .. _stream_execution_table_sink_docs: -The ``table_sink`` node provides the ability to take the output as an in-memory table. -This is much simpler to use than the other sink nodes provided by the streaming execution engine +The ``table_sink`` node provides the ability to receive the output as an in-memory table. +This is simpler to use than the other sink nodes provided by the streaming execution engine but it only makes sense when the output fits comfortably in memory. The node is created using :class:`arrow::compute::TableSinkNodeOptions`.