From 7f887dada4ce5fff32f42fed134c2dad94c626d7 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 4 Oct 2021 12:32:48 -0400 Subject: [PATCH 01/13] ARROW-13227: [Documentation][Compute] Document ExecNode --- cpp/src/arrow/compute/exec/doc/exec_node.md | 147 ---------- docs/source/cpp/compute.rst | 2 + docs/source/cpp/getting_started.rst | 1 + docs/source/cpp/simple_graph.svg | 139 ++++++++++ docs/source/cpp/streaming_execution.rst | 291 ++++++++++++++++++++ 5 files changed, 433 insertions(+), 147 deletions(-) delete mode 100644 cpp/src/arrow/compute/exec/doc/exec_node.md create mode 100644 docs/source/cpp/simple_graph.svg create mode 100644 docs/source/cpp/streaming_execution.rst diff --git a/cpp/src/arrow/compute/exec/doc/exec_node.md b/cpp/src/arrow/compute/exec/doc/exec_node.md deleted file mode 100644 index 797cc87d90a..00000000000 --- a/cpp/src/arrow/compute/exec/doc/exec_node.md +++ /dev/null @@ -1,147 +0,0 @@ - - -# ExecNodes and logical operators - -`ExecNode`s are intended to implement individual logical operators -in a streaming execution graph. Each node receives batches from -upstream nodes (inputs), processes them in some way, then pushes -results to downstream nodes (outputs). `ExecNode`s are owned and -(to an extent) coordinated by an `ExecPlan`. - -> Terminology: "operator" and "node" are mostly interchangable, like -> "Interface" and "Abstract Base Class" in c++ space. The latter is -> a formal and specific bit of code which implements the abstract -> concept. - -## Types of logical operators - -Each of these will have at least one corresponding concrete -`ExecNode`. Where possible, compatible implementations of a -logical operator will *not* be exposed as independent subclasses -of `ExecNode`. Instead we prefer that they be -be encapsulated internally by a single subclass of `ExecNode` -to permit switching between them during a query. - -- Scan: materializes in-memory batches from storage (e.g. Parquet - files, flight stream, ...) -- Filter: evaluates an `Expression` on each input batch and outputs - a copy with any rows excluded for which the filter did not return - `true`. -- Project: evaluates `Expression`s on each input batch to produce - the columns of an output batch. -- Grouped Aggregate: identify groups based on one or more key columns - in each input batch, then update aggregates corresponding to those - groups. Node that this is a pipeline breaker; it will wait for its - inputs to complete before outputting any batches. -- Union: merge two or more streams of batches into a single stream - of batches. -- Write: write each batch to storage -- ToTable: Collect batches into a `Table` with stable row ordering where - possible. - -#### Not in scope for Arrow 5.0: - -- Join: perform an inner, left, outer, semi, or anti join given some - join predicates. -- Sort: accumulate all input batches into a single table, reorder its - rows by some sorting condition, then stream the sorted table out as - batches -- Top-K: retrieve a limited subset of rows from a table as though it - were in sorted order. - -For example: a dataset scan with only a filter and a -projection will correspond to a fairly trivial graph: - -``` -ScanNode -> FilterNode -> ProjectNode -> ToTableNode -``` - -A scan node loads batches from disk and pushes to a filter node. -The filter node excludes some rows based on an `Expression` then -pushes filtered batches to a project node. The project node -materializes new columns based on `Expression`s then pushes those -batches to a table collection node. The table collection node -assembles these batches into a `Table` which is handed off as the -result of the `ExecPlan`. - -## Parallelism, pipelines - -The execution graph is orthogonal to parallelism; any -node may push to any other node from any thread. A scan node causes -each batch to arrive on a thread after which it will pass through -each node in the example graph above, never leaving that thread -(memory/other resource pressure permitting). - -The example graph above happens to be simple enough that processing -of any batch by any node is independent of other nodes and other -batches; it is a pipeline. Note that there is no explicit `Pipeline` -class- pipelined execution is an emergent property of some sub -graphs. - -Nodes which do not share this property (pipeline breakers) are -responsible for deciding when they have received sufficient input, -when they can start emitting output, etc. For example a `GroupByNode` -will wait for its input to be exhausted before it begins pushing -batches to its own outputs. - -Parallelism is "seeded" by `ScanNode` (or other source nodes)- it -owns a reference to the thread pool on which the graph is executing -and fans out pushing to its outputs across that pool. A subsequent -`ProjectNode` will process the batch immediately after it is handed -off by the `ScanNode`- no explicit scheduling required. -Eventually, individual nodes may internally -parallelize processing of individual batches (for example, if a -`FilterNode`'s filter expression is slow). This decision is also left -up to each `ExecNode` implementation. - -# ExecNode interface and usage - -`ExecNode`s are constructed using one of the available factory -functions, such as `arrow::compute::MakeFilterNode` -or `arrow::dataset::MakeScanNode`. Any inputs to an `ExecNode` -must be provided when the node is constructed, so the first -nodes to be constructed are source nodes with no inputs -such as `ScanNode`. - -The batches yielded by an `ExecNode` always conform precisely -to its output schema. NB: no by-name field lookups or type -checks are performed during execution. The output schema -is usually derived from the output schemas of inputs. For -example a `FilterNode`'s output schema is always identical to -that of its input since batches are only modified by exclusion -of some rows. - -An `ExecNode` will begin producing batches when -`node->StartProducing()` is invoked and will proceed until stopped -with `node->StopProducing()`. Started nodes may not be destroyed -until stopped. `ExecNode`s are not currently restartable. -An `ExecNode` pushes batches to its outputs by passing each batch -to `output->InputReceived()`. It signals exhaustion by invoking -`output->InputFinished()`. - -Error recovery is permitted within a node. For example, if evaluation -of an `Expression` runs out of memory the governing node may -try that evaluation again after some memory has been freed up. -If a node experiences an error from which it cannot recover (for -example an IO error while parsing a CSV file) then it reports this -with `output->ErrorReceived()`. An error which escapes the scope of -a single node should not be considered recoverable (no `FilterNode` -should `try/catch` the IO error above). - diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 84203da7030..b86fe936bbd 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -50,6 +50,8 @@ both array (chunked or not) and scalar inputs, however some will mandate either. For example, while ``sort_indices`` requires its first and only input to be an array. +.. _invoking compute functions: + Invoking functions ------------------ diff --git a/docs/source/cpp/getting_started.rst b/docs/source/cpp/getting_started.rst index 3c7b7f94f01..36ea4803f73 100644 --- a/docs/source/cpp/getting_started.rst +++ b/docs/source/cpp/getting_started.rst @@ -31,6 +31,7 @@ User Guide datatypes tables compute + streaming_execution io ipc parquet diff --git a/docs/source/cpp/simple_graph.svg b/docs/source/cpp/simple_graph.svg new file mode 100644 index 00000000000..4a3db8eda16 --- /dev/null +++ b/docs/source/cpp/simple_graph.svg @@ -0,0 +1,139 @@ + + + +G + + + +scan lineitem + +scan lineitem + + + +filter + +filter + + + +scan lineitem->filter + + + + + +join + +join + + + +filter->join + + + + + +join again + +join again + + + +join->join again + + + + + +filter again + +filter again + + + +join again->filter again + + + + + +scan orders + +scan orders + + + +project + +project + + + +scan orders->project + + + + + +project->join + + + + + +scan customers + +scan customers + + + +aggregate + +aggregate + + + +scan customers->aggregate + + + + + +aggregate->join again + + + + + +write to disk + +write to disk + + + +filter again->write to disk + + + + + diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst new file mode 100644 index 00000000000..ecfea8b5d92 --- /dev/null +++ b/docs/source/cpp/streaming_execution.rst @@ -0,0 +1,291 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +.. default-domain:: cpp +.. highlight:: cpp +.. cpp:namespace:: arrow::compute + +========================== +Streaming execution engine +========================== + +.. warning:: + + The streaming execution engine is experimental, and a stable API + is not yet guaranteed. + +Motivation +---------- + +For many complex computations, successive direct `invocation of +compute functions ` is not feasible +in either memory or computation time. Doing so causes all intermediate +data to be fully materialized. To facilitate arbitrarily large inputs +and more efficient resource usage, arrow also provides a streaming query +engine with which computations can be formulated and executed. + +.. image:: simple_graph.svg + +:class:`ExecNode` is provided to reify the graph of operations in a query. +Batches of data (struct:`ExecBatch`) flow along edges of the graph from +node to node. Structuring the API around a stream of batches allows the +working set for each node to be tuned for optimal performance independent +of any other nodes in the graph. Each :class:`ExecNode` processes batches +as they are pushed to it along an edge of the graph by upstream nodes +(its inputs), and pushes batches along an edge of the graph to downstream +nodes (its outputs) as they are finalized. + +.. [shaikhha et al] SHAIKHHA, A., DASHTI, M., & KOCH, C. (2018). Push versus pull-based loop fusion in query engines. Journal of Functional Programming, 28. https://doi.org/10.1017/s0956796818000102 + +Overview +-------- + +:class:`ExecNode` + Each node in the graph is an implementation of the :class:`ExecNode` interface. + +:class:`ExecPlan` + A set of :class:`ExecNode` is contained and (to an extent) coordinated by an + :class:`ExecPlan`. + +:class:`ExecFactoryRegistry` + Instances of :class:`ExecNode` are constructed by factory functions held + in a :class:`ExecFactoryRegistry`. + +:class:`ExecNodeOptions` + Heterogenous parameters for factories of :class:`ExecNode` are bundled in an + :class:`ExecNodeOptions`. + +:struct:`Declaration` + ``dplyr``-inspired helper for efficient construction of an :class:`ExecPlan`. + +:struct:`ExecBatch` + A lightweight container for a single chunk of arrow-formatted data. In contrast + to :class:`RecordBatch`, :struct:`ExecBatch` is intended for use exclusively + in a streaming execution context (for example, it will never have a corresponding + Python binding). Furthermore columns which happen to have a constant value may + be represented by a :class:`Scalar` instead of an :class:`Array`. In addition, + :struct:`ExecBatch` may carry execution-relevant properties including a + guaranteed-true-filter for :class:`Expression` simplification. + + +An example :class:`ExecNode` implementation which simlpy passes all input batches +through unchanged:: + + class PassthruNode : public ExecNode { + public: + // InputReceived is the main entry point for ExecNodes. It is invoked + // by an input of this node to push a batch here for processing. + void InputReceived(ExecNode* input, ExecBatch batch) override { + // Since this is a passthru node we simply push the batch to our + // only output here. + outputs_[0]->InputReceived(this, batch); + } + + // ErrorRecieved is called by an input of this node to report an error. + void ErrorReceived(ExecNode* input, Status error) override { + outputs_[0]->ErrorReceived(this, error); + } + + // InputFinished is used to signal how many batches will ultimately arrive. + // It may be called with any ordering relative to InputReceived/ErrorReceived. + void InputFinished(ExecNode* input, int total_batches) override { + outputs_[0]->InputFinished(this, total_batches); + } + + // ExecNodes may request that their inputs throttle production of batches + // until they are ready for more, or stop production if no further batches + // are required. + void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); } + void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); } + void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); } + + // An ExecNode has a single output schema to which all its batches conform. + using ExecNode::output_schema; + + // ExecNodes carry basic introspection for debugging purposes + const char* kind_name() const override { return "PassthruNode"; } + using ExecNode::label; + using ExecNode::SetLabel; + using ExecNode::ToString; + + // An ExecNode holds references to its inputs and outputs, so it is possible + // to walk the graph of execution if necessary. + using ExecNode::inputs; + using ExecNode::outputs; + + // StartProducing() and StopProducing() are invoked by an ExecPlan to + // coordinate the graph-wide execution state. + Status StartProducing() override { return Status::OK(); } + void StopProducing() override {} + Future<> finished() override { return inputs_[0]->finished(); } + }; + +Note that each method which is associated with an edge of the graph must be invoked +with an ``ExecNode*`` to identify the node which invoked it. For example, in an +:class:`ExecNode` which implements ``JOIN`` this tagging might be used to differentiate +between batches from the left or right inputs. +``InputReceived, ErrorReceived, InputFinished`` may only be invoked by the inputs of a +node, while ``ResumeProducing, PauseProducing, StopProducing`` may only be invoked by +outputs of a node. + +:class:`ExecPlan` contains the associated instances of :class:`ExecNode` alive +and provides convenience methods for starting and stopping execution of all nodes +and for querying/awaiting their completion:: + + // construct an ExecPlan first to hold your nodes + ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context())); + + // ... add nodes to your ExecPlan + + // start all nodes in the graph + ARROW_RETURN_NOT_OK(plan->StartProducing()); + + if (need_stop) { + // stop all nodes in the graph + plan->StopProducing(); + } + + // Complete will be marked finished when all nodes have run to completion + // or acknowledged a StopProducing() signal. The ExecPlan should be kept + // alive until this future is marked finished. + Future<> complete = plan->finished(); + + +Constructing ``ExecPlan``s +-------------------------- + +.. warning:: + + The following will be superceded by construction from Compute IR, see ARROW-14074. + +None of the concrete implementations of :class:`ExecNode` are exposed +in headers, so they can't be constructed directly outside the +Translation Unit where they are defined. Instead, factories to +create them are provided in an extensible registry. This structure +provides a number of benefits: + +- This enforces consistent construction. +- It decouples implementations from consumers of the interface + (for example: we have two classes for scalar and grouped aggregate, + we can choose which to construct within the single factory by + checking whether grouping keys are provided) +- This expedites integration with out-of-library extensions. For example + "scan" nodes are implemented in the separate ``libarrow_dataset.so`` library. +- Since the class is not referencable outside the Translation Unit in which it + is defined, compilers can optimize more aggressively. + +Factories of :class:`ExecNode` can be retrieved by name from the registry. +The default registry is available through +:func:`arrow::compute::default_exec_factory_registry()` +and can be queried for the built-in factories:: + + // get the factory for "filter" nodes: + ARROW_ASSIGN_OR_RAISE(auto make_filter, + default_exec_factory_registry()->GetFactory("filter")); + + // factories take three arguments: + ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter( + // the ExecPlan which should own this node + plan.get(), + + // nodes which will send batches to this node (inputs) + {scan_node}, + + // parameters unique to "filter" nodes + FilterNodeOptions{filter_expression})); + + // alternative shorthand: + ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter", + plan.get(), {scan_node}, FilterNodeOptions{filter_expression}); + +Factories can also be added to the default registry as long as they are +convertible to ``std::function( +ExecPlan*, std::vector, const ExecNodeOptions&)>``. + +To build an :class:`ExecPlan` representing a simple pipeline which +reads from a :class:`RecordBatchReader` then filters, projects, and +writes to disk:: + + std::shared_ptr reader = GetStreamOfBatches(); + ExecNode* source_node = *MakeExecNode("source", plan.get(), {}, + SourceNodeOptions::FromReader( + reader, + GetCpuThreadPool())); + + ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node}, + FilterNodeOptions{ + greater(field_ref("score"), literal(3)) + }); + + ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node}, + ProjectNodeOptions{ + {add(field_ref("score"), literal(1))}, + {"score + 1"} + }); + + MakeExecNode("write", plan.get(), {project_node}, + WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}); + +:struct:`Declaration` is a dplyr-inspired helper which further decreases the +boilerplate associated with populating an :class:`ExecPlan` from C++:: + + std::shared_ptr reader = GetStreamOfBatches(); + ASSERT_OK(Declaration::Sequence( + { + {"source", SourceNodeOptions::FromReader( + reader, + GetCpuThreadPool())}, + {"filter", FilterNodeOptions{ + greater(field_ref("score"), literal(3))}}, + {"project", ProjectNodeOptions{ + {add(field_ref("score"), literal(1))}, + {"score + 1"}}}, + {"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}}, + }) + .AddToPlan(plan.get())); + +Note that a source node can wrap anything which resembles a stream of batches. +For example, https://github.com/apache/arrow/pull/11032 adds support for use +of a DuckDB query as a source node. Similarly, a sink node can wrap anything +which absorbs a stream of batches. In the example above we're writing completed +batches to disk. However we can also collect these in memory into a :class:`Table` +or forward them to a :class:`RecordBatchReader` as an out-of-graph stream. +This flexibility allows an :class:`ExecPlan` to be used as streaming middleware +between any endpoints which support arrow formatted batches. + +An :class:`arrow::dataset::Dataset` can also be wrapped as a source node which +pushes all the dataset's batches into an :class:`ExecPlan`. This factory is added +to the default registry with the name ``"scan"`` by calling +``arrow::dataset::internal::Initialize()``:: + + arrow::dataset::internal::Initialize(); + + std::shared_ptr dataset = GetDataset(); + + ASSERT_OK(Declaration::Sequence( + { + {"scan", ScanNodeOptions{dataset, + /* push down predicate, projection, ... */}}, + {"filter", FilterNodeOptions{/* ... */}}, + // ... + }) + .AddToPlan(plan.get())); + +Datasets may be scanned multiple times; just make multiple scan +nodes from that dataset. (Useful for a self-join, for example.) +Note that producing two scan nodes like this will perform all +reads and decodes twice. From 60fae2419582f8384e3f669b87afeffea9daa8f4 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 12 Oct 2021 15:42:37 -0400 Subject: [PATCH 02/13] SVG typo --- docs/source/cpp/simple_graph.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/cpp/simple_graph.svg b/docs/source/cpp/simple_graph.svg index 4a3db8eda16..d87507224cd 100644 --- a/docs/source/cpp/simple_graph.svg +++ b/docs/source/cpp/simple_graph.svg @@ -60,7 +60,7 @@ join->join again - + From 3c9030ba89af91646d5402c19b9c09789f29f928 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 14 Oct 2021 13:34:26 -0400 Subject: [PATCH 03/13] address review comments --- docs/source/cpp/compute.rst | 2 +- docs/source/cpp/streaming_execution.rst | 43 +++++++++++++------------ 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index b86fe936bbd..ad973a0cd9e 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -50,7 +50,7 @@ both array (chunked or not) and scalar inputs, however some will mandate either. For example, while ``sort_indices`` requires its first and only input to be an array. -.. _invoking compute functions: +.. _invoking-compute-functions: Invoking functions ------------------ diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index ecfea8b5d92..b08450af9bd 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -32,7 +32,7 @@ Motivation ---------- For many complex computations, successive direct `invocation of -compute functions ` is not feasible +compute functions ` is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, arrow also provides a streaming query @@ -41,7 +41,7 @@ engine with which computations can be formulated and executed. .. image:: simple_graph.svg :class:`ExecNode` is provided to reify the graph of operations in a query. -Batches of data (struct:`ExecBatch`) flow along edges of the graph from +Batches of data (:struct:`ExecBatch`) flow along edges of the graph from node to node. Structuring the API around a stream of batches allows the working set for each node to be tuned for optimal performance independent of any other nodes in the graph. Each :class:`ExecNode` processes batches @@ -73,16 +73,17 @@ Overview ``dplyr``-inspired helper for efficient construction of an :class:`ExecPlan`. :struct:`ExecBatch` - A lightweight container for a single chunk of arrow-formatted data. In contrast - to :class:`RecordBatch`, :struct:`ExecBatch` is intended for use exclusively - in a streaming execution context (for example, it will never have a corresponding - Python binding). Furthermore columns which happen to have a constant value may - be represented by a :class:`Scalar` instead of an :class:`Array`. In addition, - :struct:`ExecBatch` may carry execution-relevant properties including a - guaranteed-true-filter for :class:`Expression` simplification. + A lightweight container for a single chunk of data in the Arrow format. In + contrast to :class:`RecordBatch`, :struct:`ExecBatch` is intended for use + exclusively in a streaming execution context (for example, it doesn't have a + corresponding Python binding). Furthermore columns which happen to have a + constant value may be represented by a :class:`Scalar` instead of an + :class:`Array`. In addition, :struct:`ExecBatch` may carry + execution-relevant properties including a guaranteed-true-filter + for :class:`Expression` simplification. -An example :class:`ExecNode` implementation which simlpy passes all input batches +An example :class:`ExecNode` implementation which simply passes all input batches through unchanged:: class PassthruNode : public ExecNode { @@ -95,7 +96,7 @@ through unchanged:: outputs_[0]->InputReceived(this, batch); } - // ErrorRecieved is called by an input of this node to report an error. + // ErrorReceived is called by an input of this node to report an error. void ErrorReceived(ExecNode* input, Status error) override { outputs_[0]->ErrorReceived(this, error); } @@ -138,11 +139,11 @@ Note that each method which is associated with an edge of the graph must be invo with an ``ExecNode*`` to identify the node which invoked it. For example, in an :class:`ExecNode` which implements ``JOIN`` this tagging might be used to differentiate between batches from the left or right inputs. -``InputReceived, ErrorReceived, InputFinished`` may only be invoked by the inputs of a -node, while ``ResumeProducing, PauseProducing, StopProducing`` may only be invoked by -outputs of a node. +``InputReceived``, ``ErrorReceived``, ``InputFinished`` may only be invoked by +the inputs of a node, while ``ResumeProducing``, ``PauseProducing``, ``StopProducing`` +may only be invoked by outputs of a node. -:class:`ExecPlan` contains the associated instances of :class:`ExecNode` alive +:class:`ExecPlan` contains the associated instances of :class:`ExecNode` and provides convenience methods for starting and stopping execution of all nodes and for querying/awaiting their completion:: @@ -240,8 +241,9 @@ writes to disk:: MakeExecNode("write", plan.get(), {project_node}, WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}); -:struct:`Declaration` is a dplyr-inspired helper which further decreases the -boilerplate associated with populating an :class:`ExecPlan` from C++:: +:struct:`Declaration` is a `dplyr `-inspired +helper which further decreases the boilerplate associated with populating +an :class:`ExecPlan` from C++:: std::shared_ptr reader = GetStreamOfBatches(); ASSERT_OK(Declaration::Sequence( @@ -259,9 +261,10 @@ boilerplate associated with populating an :class:`ExecPlan` from C++:: .AddToPlan(plan.get())); Note that a source node can wrap anything which resembles a stream of batches. -For example, https://github.com/apache/arrow/pull/11032 adds support for use -of a DuckDB query as a source node. Similarly, a sink node can wrap anything -which absorbs a stream of batches. In the example above we're writing completed +For example, `PR#11032 ` adds +support for use of a `DuckDB ` query as a source node. +Similarly, a sink node can wrap anything which absorbs a stream of batches. +In the example above we're writing completed batches to disk. However we can also collect these in memory into a :class:`Table` or forward them to a :class:`RecordBatchReader` as an out-of-graph stream. This flexibility allows an :class:`ExecPlan` to be used as streaming middleware From 2f31d1369c74a57b3e1eb30adc9e88b1fc2e1d05 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 14 Oct 2021 16:38:45 -0400 Subject: [PATCH 04/13] arrow -> Arrow --- docs/source/cpp/streaming_execution.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index b08450af9bd..405f80b532f 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -35,14 +35,14 @@ For many complex computations, successive direct `invocation of compute functions ` is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs -and more efficient resource usage, arrow also provides a streaming query +and more efficient resource usage, Arrow also provides a streaming query engine with which computations can be formulated and executed. .. image:: simple_graph.svg :class:`ExecNode` is provided to reify the graph of operations in a query. Batches of data (:struct:`ExecBatch`) flow along edges of the graph from -node to node. Structuring the API around a stream of batches allows the +node to node. Structuring the API around streams of batches allows the working set for each node to be tuned for optimal performance independent of any other nodes in the graph. Each :class:`ExecNode` processes batches as they are pushed to it along an edge of the graph by upstream nodes @@ -268,7 +268,7 @@ In the example above we're writing completed batches to disk. However we can also collect these in memory into a :class:`Table` or forward them to a :class:`RecordBatchReader` as an out-of-graph stream. This flexibility allows an :class:`ExecPlan` to be used as streaming middleware -between any endpoints which support arrow formatted batches. +between any endpoints which support Arrow formatted batches. An :class:`arrow::dataset::Dataset` can also be wrapped as a source node which pushes all the dataset's batches into an :class:`ExecPlan`. This factory is added From e1fb731630680ba534c4a6196d88edc203c5142e Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 15 Oct 2021 09:57:58 -0400 Subject: [PATCH 05/13] add seealso for push vs pull model paper --- docs/source/cpp/streaming_execution.rst | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 405f80b532f..cc81af72b7c 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -31,7 +31,7 @@ Streaming execution engine Motivation ---------- -For many complex computations, successive direct `invocation of +For many complex computations, successive direct :ref:`invocation of compute functions ` is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs @@ -49,7 +49,12 @@ as they are pushed to it along an edge of the graph by upstream nodes (its inputs), and pushes batches along an edge of the graph to downstream nodes (its outputs) as they are finalized. -.. [shaikhha et al] SHAIKHHA, A., DASHTI, M., & KOCH, C. (2018). Push versus pull-based loop fusion in query engines. Journal of Functional Programming, 28. https://doi.org/10.1017/s0956796818000102 +..seealso:: + + `SHAIKHHA, A., DASHTI, M., & KOCH, C. + (2018). Push versus pull-based loop fusion in query engines. + Journal of Functional Programming, 28. + `_ Overview -------- @@ -241,7 +246,7 @@ writes to disk:: MakeExecNode("write", plan.get(), {project_node}, WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}); -:struct:`Declaration` is a `dplyr `-inspired +:struct:`Declaration` is a `dplyr `_-inspired helper which further decreases the boilerplate associated with populating an :class:`ExecPlan` from C++:: @@ -261,8 +266,8 @@ an :class:`ExecPlan` from C++:: .AddToPlan(plan.get())); Note that a source node can wrap anything which resembles a stream of batches. -For example, `PR#11032 ` adds -support for use of a `DuckDB ` query as a source node. +For example, `PR#11032 `_ adds +support for use of a `DuckDB `_ query as a source node. Similarly, a sink node can wrap anything which absorbs a stream of batches. In the example above we're writing completed batches to disk. However we can also collect these in memory into a :class:`Table` From 5471a8002ff4f40b7d059bf4b819be98bcc7c4fb Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 15 Oct 2021 10:00:42 -0400 Subject: [PATCH 06/13] Update docs/source/cpp/streaming_execution.rst Co-authored-by: Weston Pace --- docs/source/cpp/streaming_execution.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index cc81af72b7c..064211839c9 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -180,7 +180,7 @@ Constructing ``ExecPlan``s None of the concrete implementations of :class:`ExecNode` are exposed in headers, so they can't be constructed directly outside the -Translation Unit where they are defined. Instead, factories to +translation unit where they are defined. Instead, factories to create them are provided in an extensible registry. This structure provides a number of benefits: From 53576567ae29515fe46b4c19d73b2471b5dd5a23 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 15 Oct 2021 10:01:10 -0400 Subject: [PATCH 07/13] Update docs/source/cpp/streaming_execution.rst Co-authored-by: Weston Pace --- docs/source/cpp/streaming_execution.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 064211839c9..3477b70f5ee 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -191,7 +191,7 @@ provides a number of benefits: checking whether grouping keys are provided) - This expedites integration with out-of-library extensions. For example "scan" nodes are implemented in the separate ``libarrow_dataset.so`` library. -- Since the class is not referencable outside the Translation Unit in which it +- Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively. Factories of :class:`ExecNode` can be retrieved by name from the registry. From 6a23c685190dd4aa12a08e28abde42e10f9c236c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 15 Oct 2021 10:04:10 -0400 Subject: [PATCH 08/13] Update docs/source/cpp/streaming_execution.rst Co-authored-by: Weston Pace --- docs/source/cpp/streaming_execution.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 3477b70f5ee..39a2de7e014 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -40,6 +40,8 @@ engine with which computations can be formulated and executed. .. image:: simple_graph.svg + An example graph of a streaming execution workflow. + :class:`ExecNode` is provided to reify the graph of operations in a query. Batches of data (:struct:`ExecBatch`) flow along edges of the graph from node to node. Structuring the API around streams of batches allows the From b0c881319f33771e8cad7dd4fc1e38bfcf2225fc Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 15 Oct 2021 10:11:17 -0400 Subject: [PATCH 09/13] add more calls to dataset::Initialize(), clarify usage of StopProducing --- docs/source/cpp/streaming_execution.rst | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 39a2de7e014..920702323c7 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -162,10 +162,10 @@ and for querying/awaiting their completion:: // start all nodes in the graph ARROW_RETURN_NOT_OK(plan->StartProducing()); - if (need_stop) { + SetUserCancellationCallback([plan] { // stop all nodes in the graph plan->StopProducing(); - } + }); // Complete will be marked finished when all nodes have run to completion // or acknowledged a StopProducing() signal. The ExecPlan should be kept @@ -245,6 +245,7 @@ writes to disk:: {"score + 1"} }); + arrow::dataset::internal::Initialize(); MakeExecNode("write", plan.get(), {project_node}, WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}); @@ -252,6 +253,8 @@ writes to disk:: helper which further decreases the boilerplate associated with populating an :class:`ExecPlan` from C++:: + arrow::dataset::internal::Initialize(); + std::shared_ptr reader = GetStreamOfBatches(); ASSERT_OK(Declaration::Sequence( { From eddb8d9c262f1f96997bfe4b088f5e2517f3025d Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 15 Oct 2021 10:12:16 -0400 Subject: [PATCH 10/13] Update docs/source/cpp/streaming_execution.rst Co-authored-by: Weston Pace --- docs/source/cpp/streaming_execution.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 920702323c7..1aef81945ff 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -151,8 +151,8 @@ the inputs of a node, while ``ResumeProducing``, ``PauseProducing``, ``StopProdu may only be invoked by outputs of a node. :class:`ExecPlan` contains the associated instances of :class:`ExecNode` -and provides convenience methods for starting and stopping execution of all nodes -and for querying/awaiting their completion:: +and is used to start and stop execution of all nodes and for querying/awaiting +their completion:: // construct an ExecPlan first to hold your nodes ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context())); From b0656bc65496b7ee17668b2a0c90e2c5ac1bb55c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 15 Oct 2021 10:14:10 -0400 Subject: [PATCH 11/13] Update docs/source/cpp/streaming_execution.rst Co-authored-by: Weston Pace --- docs/source/cpp/streaming_execution.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 1aef81945ff..59dfbc1ce23 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -116,7 +116,8 @@ through unchanged:: // ExecNodes may request that their inputs throttle production of batches // until they are ready for more, or stop production if no further batches - // are required. + // are required. These signals should typically be forwarded to the inputs + // of the ExecNode. void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); } void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); } void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); } From 39c6c535466fcbb04ff59c110daed6fde506cca2 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 15 Oct 2021 10:14:15 -0400 Subject: [PATCH 12/13] Update docs/source/cpp/streaming_execution.rst Co-authored-by: Weston Pace --- docs/source/cpp/streaming_execution.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 59dfbc1ce23..6b9b4dd7856 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -104,6 +104,8 @@ through unchanged:: } // ErrorReceived is called by an input of this node to report an error. + // ExecNodes should always forward errors to their outputs unless they + // are able to fully handle the error (this is rare). void ErrorReceived(ExecNode* input, Status error) override { outputs_[0]->ErrorReceived(this, error); } From bdf8183440ca88252edf975253583036e4d08d62 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 15 Oct 2021 10:14:22 -0400 Subject: [PATCH 13/13] Update docs/source/cpp/streaming_execution.rst Co-authored-by: Weston Pace --- docs/source/cpp/streaming_execution.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst index 6b9b4dd7856..5e5b29ad2e6 100644 --- a/docs/source/cpp/streaming_execution.rst +++ b/docs/source/cpp/streaming_execution.rst @@ -139,7 +139,8 @@ through unchanged:: using ExecNode::outputs; // StartProducing() and StopProducing() are invoked by an ExecPlan to - // coordinate the graph-wide execution state. + // coordinate the graph-wide execution state. These do not need to be + // forwarded to inputs or outputs. Status StartProducing() override { return Status::OK(); } void StopProducing() override {} Future<> finished() override { return inputs_[0]->finished(); }