diff --git a/cpp/src/arrow/compute/exec/doc/exec_node.md b/cpp/src/arrow/compute/exec/doc/exec_node.md deleted file mode 100644 index 797cc87d90a..00000000000 --- a/cpp/src/arrow/compute/exec/doc/exec_node.md +++ /dev/null @@ -1,147 +0,0 @@ - - -# ExecNodes and logical operators - -`ExecNode`s are intended to implement individual logical operators -in a streaming execution graph. Each node receives batches from -upstream nodes (inputs), processes them in some way, then pushes -results to downstream nodes (outputs). `ExecNode`s are owned and -(to an extent) coordinated by an `ExecPlan`. - -> Terminology: "operator" and "node" are mostly interchangable, like -> "Interface" and "Abstract Base Class" in c++ space. The latter is -> a formal and specific bit of code which implements the abstract -> concept. - -## Types of logical operators - -Each of these will have at least one corresponding concrete -`ExecNode`. Where possible, compatible implementations of a -logical operator will *not* be exposed as independent subclasses -of `ExecNode`. Instead we prefer that they be -be encapsulated internally by a single subclass of `ExecNode` -to permit switching between them during a query. - -- Scan: materializes in-memory batches from storage (e.g. Parquet - files, flight stream, ...) -- Filter: evaluates an `Expression` on each input batch and outputs - a copy with any rows excluded for which the filter did not return - `true`. -- Project: evaluates `Expression`s on each input batch to produce - the columns of an output batch. -- Grouped Aggregate: identify groups based on one or more key columns - in each input batch, then update aggregates corresponding to those - groups. Node that this is a pipeline breaker; it will wait for its - inputs to complete before outputting any batches. -- Union: merge two or more streams of batches into a single stream - of batches. -- Write: write each batch to storage -- ToTable: Collect batches into a `Table` with stable row ordering where - possible. - -#### Not in scope for Arrow 5.0: - -- Join: perform an inner, left, outer, semi, or anti join given some - join predicates. -- Sort: accumulate all input batches into a single table, reorder its - rows by some sorting condition, then stream the sorted table out as - batches -- Top-K: retrieve a limited subset of rows from a table as though it - were in sorted order. - -For example: a dataset scan with only a filter and a -projection will correspond to a fairly trivial graph: - -``` -ScanNode -> FilterNode -> ProjectNode -> ToTableNode -``` - -A scan node loads batches from disk and pushes to a filter node. -The filter node excludes some rows based on an `Expression` then -pushes filtered batches to a project node. The project node -materializes new columns based on `Expression`s then pushes those -batches to a table collection node. The table collection node -assembles these batches into a `Table` which is handed off as the -result of the `ExecPlan`. - -## Parallelism, pipelines - -The execution graph is orthogonal to parallelism; any -node may push to any other node from any thread. A scan node causes -each batch to arrive on a thread after which it will pass through -each node in the example graph above, never leaving that thread -(memory/other resource pressure permitting). - -The example graph above happens to be simple enough that processing -of any batch by any node is independent of other nodes and other -batches; it is a pipeline. Note that there is no explicit `Pipeline` -class- pipelined execution is an emergent property of some sub -graphs. - -Nodes which do not share this property (pipeline breakers) are -responsible for deciding when they have received sufficient input, -when they can start emitting output, etc. For example a `GroupByNode` -will wait for its input to be exhausted before it begins pushing -batches to its own outputs. - -Parallelism is "seeded" by `ScanNode` (or other source nodes)- it -owns a reference to the thread pool on which the graph is executing -and fans out pushing to its outputs across that pool. A subsequent -`ProjectNode` will process the batch immediately after it is handed -off by the `ScanNode`- no explicit scheduling required. -Eventually, individual nodes may internally -parallelize processing of individual batches (for example, if a -`FilterNode`'s filter expression is slow). This decision is also left -up to each `ExecNode` implementation. - -# ExecNode interface and usage - -`ExecNode`s are constructed using one of the available factory -functions, such as `arrow::compute::MakeFilterNode` -or `arrow::dataset::MakeScanNode`. Any inputs to an `ExecNode` -must be provided when the node is constructed, so the first -nodes to be constructed are source nodes with no inputs -such as `ScanNode`. - -The batches yielded by an `ExecNode` always conform precisely -to its output schema. NB: no by-name field lookups or type -checks are performed during execution. The output schema -is usually derived from the output schemas of inputs. For -example a `FilterNode`'s output schema is always identical to -that of its input since batches are only modified by exclusion -of some rows. - -An `ExecNode` will begin producing batches when -`node->StartProducing()` is invoked and will proceed until stopped -with `node->StopProducing()`. Started nodes may not be destroyed -until stopped. `ExecNode`s are not currently restartable. -An `ExecNode` pushes batches to its outputs by passing each batch -to `output->InputReceived()`. It signals exhaustion by invoking -`output->InputFinished()`. - -Error recovery is permitted within a node. For example, if evaluation -of an `Expression` runs out of memory the governing node may -try that evaluation again after some memory has been freed up. -If a node experiences an error from which it cannot recover (for -example an IO error while parsing a CSV file) then it reports this -with `output->ErrorReceived()`. An error which escapes the scope of -a single node should not be considered recoverable (no `FilterNode` -should `try/catch` the IO error above). - diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 84203da7030..ad973a0cd9e 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -50,6 +50,8 @@ both array (chunked or not) and scalar inputs, however some will mandate either. For example, while ``sort_indices`` requires its first and only input to be an array. +.. _invoking-compute-functions: + Invoking functions ------------------ diff --git a/docs/source/cpp/getting_started.rst b/docs/source/cpp/getting_started.rst index 3c7b7f94f01..36ea4803f73 100644 --- a/docs/source/cpp/getting_started.rst +++ b/docs/source/cpp/getting_started.rst @@ -31,6 +31,7 @@ User Guide datatypes tables compute + streaming_execution io ipc parquet diff --git a/docs/source/cpp/simple_graph.svg b/docs/source/cpp/simple_graph.svg new file mode 100644 index 00000000000..d87507224cd --- /dev/null +++ b/docs/source/cpp/simple_graph.svg @@ -0,0 +1,139 @@ + + + +G + + + +scan lineitem + +scan lineitem + + + +filter + +filter + + + +scan lineitem->filter + + + + + +join + +join + + + +filter->join + + + + + +join again + +join again + + + +join->join again + + + + + +filter again + +filter again + + + +join again->filter again + + + + + +scan orders + +scan orders + + + +project + +project + + + +scan orders->project + + + + + +project->join + + + + + +scan customers + +scan customers + + + +aggregate + +aggregate + + + +scan customers->aggregate + + + + + +aggregate->join again + + + + + +write to disk + +write to disk + + + +filter again->write to disk + + + + + diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst new file mode 100644 index 00000000000..5e5b29ad2e6 --- /dev/null +++ b/docs/source/cpp/streaming_execution.rst @@ -0,0 +1,308 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +.. default-domain:: cpp +.. highlight:: cpp +.. cpp:namespace:: arrow::compute + +========================== +Streaming execution engine +========================== + +.. warning:: + + The streaming execution engine is experimental, and a stable API + is not yet guaranteed. + +Motivation +---------- + +For many complex computations, successive direct :ref:`invocation of +compute functions ` is not feasible +in either memory or computation time. Doing so causes all intermediate +data to be fully materialized. To facilitate arbitrarily large inputs +and more efficient resource usage, Arrow also provides a streaming query +engine with which computations can be formulated and executed. + +.. image:: simple_graph.svg + + An example graph of a streaming execution workflow. + +:class:`ExecNode` is provided to reify the graph of operations in a query. +Batches of data (:struct:`ExecBatch`) flow along edges of the graph from +node to node. Structuring the API around streams of batches allows the +working set for each node to be tuned for optimal performance independent +of any other nodes in the graph. Each :class:`ExecNode` processes batches +as they are pushed to it along an edge of the graph by upstream nodes +(its inputs), and pushes batches along an edge of the graph to downstream +nodes (its outputs) as they are finalized. + +..seealso:: + + `SHAIKHHA, A., DASHTI, M., & KOCH, C. + (2018). Push versus pull-based loop fusion in query engines. + Journal of Functional Programming, 28. + `_ + +Overview +-------- + +:class:`ExecNode` + Each node in the graph is an implementation of the :class:`ExecNode` interface. + +:class:`ExecPlan` + A set of :class:`ExecNode` is contained and (to an extent) coordinated by an + :class:`ExecPlan`. + +:class:`ExecFactoryRegistry` + Instances of :class:`ExecNode` are constructed by factory functions held + in a :class:`ExecFactoryRegistry`. + +:class:`ExecNodeOptions` + Heterogenous parameters for factories of :class:`ExecNode` are bundled in an + :class:`ExecNodeOptions`. + +:struct:`Declaration` + ``dplyr``-inspired helper for efficient construction of an :class:`ExecPlan`. + +:struct:`ExecBatch` + A lightweight container for a single chunk of data in the Arrow format. In + contrast to :class:`RecordBatch`, :struct:`ExecBatch` is intended for use + exclusively in a streaming execution context (for example, it doesn't have a + corresponding Python binding). Furthermore columns which happen to have a + constant value may be represented by a :class:`Scalar` instead of an + :class:`Array`. In addition, :struct:`ExecBatch` may carry + execution-relevant properties including a guaranteed-true-filter + for :class:`Expression` simplification. + + +An example :class:`ExecNode` implementation which simply passes all input batches +through unchanged:: + + class PassthruNode : public ExecNode { + public: + // InputReceived is the main entry point for ExecNodes. It is invoked + // by an input of this node to push a batch here for processing. + void InputReceived(ExecNode* input, ExecBatch batch) override { + // Since this is a passthru node we simply push the batch to our + // only output here. + outputs_[0]->InputReceived(this, batch); + } + + // ErrorReceived is called by an input of this node to report an error. + // ExecNodes should always forward errors to their outputs unless they + // are able to fully handle the error (this is rare). + void ErrorReceived(ExecNode* input, Status error) override { + outputs_[0]->ErrorReceived(this, error); + } + + // InputFinished is used to signal how many batches will ultimately arrive. + // It may be called with any ordering relative to InputReceived/ErrorReceived. + void InputFinished(ExecNode* input, int total_batches) override { + outputs_[0]->InputFinished(this, total_batches); + } + + // ExecNodes may request that their inputs throttle production of batches + // until they are ready for more, or stop production if no further batches + // are required. These signals should typically be forwarded to the inputs + // of the ExecNode. + void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); } + void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); } + void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); } + + // An ExecNode has a single output schema to which all its batches conform. + using ExecNode::output_schema; + + // ExecNodes carry basic introspection for debugging purposes + const char* kind_name() const override { return "PassthruNode"; } + using ExecNode::label; + using ExecNode::SetLabel; + using ExecNode::ToString; + + // An ExecNode holds references to its inputs and outputs, so it is possible + // to walk the graph of execution if necessary. + using ExecNode::inputs; + using ExecNode::outputs; + + // StartProducing() and StopProducing() are invoked by an ExecPlan to + // coordinate the graph-wide execution state. These do not need to be + // forwarded to inputs or outputs. + Status StartProducing() override { return Status::OK(); } + void StopProducing() override {} + Future<> finished() override { return inputs_[0]->finished(); } + }; + +Note that each method which is associated with an edge of the graph must be invoked +with an ``ExecNode*`` to identify the node which invoked it. For example, in an +:class:`ExecNode` which implements ``JOIN`` this tagging might be used to differentiate +between batches from the left or right inputs. +``InputReceived``, ``ErrorReceived``, ``InputFinished`` may only be invoked by +the inputs of a node, while ``ResumeProducing``, ``PauseProducing``, ``StopProducing`` +may only be invoked by outputs of a node. + +:class:`ExecPlan` contains the associated instances of :class:`ExecNode` +and is used to start and stop execution of all nodes and for querying/awaiting +their completion:: + + // construct an ExecPlan first to hold your nodes + ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context())); + + // ... add nodes to your ExecPlan + + // start all nodes in the graph + ARROW_RETURN_NOT_OK(plan->StartProducing()); + + SetUserCancellationCallback([plan] { + // stop all nodes in the graph + plan->StopProducing(); + }); + + // Complete will be marked finished when all nodes have run to completion + // or acknowledged a StopProducing() signal. The ExecPlan should be kept + // alive until this future is marked finished. + Future<> complete = plan->finished(); + + +Constructing ``ExecPlan``s +-------------------------- + +.. warning:: + + The following will be superceded by construction from Compute IR, see ARROW-14074. + +None of the concrete implementations of :class:`ExecNode` are exposed +in headers, so they can't be constructed directly outside the +translation unit where they are defined. Instead, factories to +create them are provided in an extensible registry. This structure +provides a number of benefits: + +- This enforces consistent construction. +- It decouples implementations from consumers of the interface + (for example: we have two classes for scalar and grouped aggregate, + we can choose which to construct within the single factory by + checking whether grouping keys are provided) +- This expedites integration with out-of-library extensions. For example + "scan" nodes are implemented in the separate ``libarrow_dataset.so`` library. +- Since the class is not referencable outside the translation unit in which it + is defined, compilers can optimize more aggressively. + +Factories of :class:`ExecNode` can be retrieved by name from the registry. +The default registry is available through +:func:`arrow::compute::default_exec_factory_registry()` +and can be queried for the built-in factories:: + + // get the factory for "filter" nodes: + ARROW_ASSIGN_OR_RAISE(auto make_filter, + default_exec_factory_registry()->GetFactory("filter")); + + // factories take three arguments: + ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter( + // the ExecPlan which should own this node + plan.get(), + + // nodes which will send batches to this node (inputs) + {scan_node}, + + // parameters unique to "filter" nodes + FilterNodeOptions{filter_expression})); + + // alternative shorthand: + ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter", + plan.get(), {scan_node}, FilterNodeOptions{filter_expression}); + +Factories can also be added to the default registry as long as they are +convertible to ``std::function( +ExecPlan*, std::vector, const ExecNodeOptions&)>``. + +To build an :class:`ExecPlan` representing a simple pipeline which +reads from a :class:`RecordBatchReader` then filters, projects, and +writes to disk:: + + std::shared_ptr reader = GetStreamOfBatches(); + ExecNode* source_node = *MakeExecNode("source", plan.get(), {}, + SourceNodeOptions::FromReader( + reader, + GetCpuThreadPool())); + + ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node}, + FilterNodeOptions{ + greater(field_ref("score"), literal(3)) + }); + + ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node}, + ProjectNodeOptions{ + {add(field_ref("score"), literal(1))}, + {"score + 1"} + }); + + arrow::dataset::internal::Initialize(); + MakeExecNode("write", plan.get(), {project_node}, + WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}); + +:struct:`Declaration` is a `dplyr `_-inspired +helper which further decreases the boilerplate associated with populating +an :class:`ExecPlan` from C++:: + + arrow::dataset::internal::Initialize(); + + std::shared_ptr reader = GetStreamOfBatches(); + ASSERT_OK(Declaration::Sequence( + { + {"source", SourceNodeOptions::FromReader( + reader, + GetCpuThreadPool())}, + {"filter", FilterNodeOptions{ + greater(field_ref("score"), literal(3))}}, + {"project", ProjectNodeOptions{ + {add(field_ref("score"), literal(1))}, + {"score + 1"}}}, + {"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}}, + }) + .AddToPlan(plan.get())); + +Note that a source node can wrap anything which resembles a stream of batches. +For example, `PR#11032 `_ adds +support for use of a `DuckDB `_ query as a source node. +Similarly, a sink node can wrap anything which absorbs a stream of batches. +In the example above we're writing completed +batches to disk. However we can also collect these in memory into a :class:`Table` +or forward them to a :class:`RecordBatchReader` as an out-of-graph stream. +This flexibility allows an :class:`ExecPlan` to be used as streaming middleware +between any endpoints which support Arrow formatted batches. + +An :class:`arrow::dataset::Dataset` can also be wrapped as a source node which +pushes all the dataset's batches into an :class:`ExecPlan`. This factory is added +to the default registry with the name ``"scan"`` by calling +``arrow::dataset::internal::Initialize()``:: + + arrow::dataset::internal::Initialize(); + + std::shared_ptr dataset = GetDataset(); + + ASSERT_OK(Declaration::Sequence( + { + {"scan", ScanNodeOptions{dataset, + /* push down predicate, projection, ... */}}, + {"filter", FilterNodeOptions{/* ... */}}, + // ... + }) + .AddToPlan(plan.get())); + +Datasets may be scanned multiple times; just make multiple scan +nodes from that dataset. (Useful for a self-join, for example.) +Note that producing two scan nodes like this will perform all +reads and decodes twice.