Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1aa49b7
ARROW-11930: [C++][Dataset][Compute] Use an ExecPlan for dataset scans
bkietz May 17, 2021
a367f60
use Loop in GeneratorNode
bkietz May 26, 2021
a6615d9
Make CollectNode public (as SinkNode)
bkietz May 26, 2021
d996e05
add ProjectNode
bkietz May 26, 2021
f2d4626
add sketch of ScanNode
bkietz May 26, 2021
0351ccd
flesh out ScanNode, tag ExecBatches with guarantees
bkietz May 27, 2021
e9468cd
add fast path for FieldRef.Name lookup in Schema
bkietz May 27, 2021
57264ee
remove seq reordering from SinkNode
bkietz Jun 2, 2021
1f8e93b
minor review comments
bkietz Jun 2, 2021
2e2612a
use compute/type_fwd.h
bkietz Jun 7, 2021
d7b4534
Add (very) basic ExecNode doc
bkietz Jun 8, 2021
1019c37
Append to ExecNode doc
bkietz Jun 9, 2021
b63e585
add Result and Status matchers
bkietz Jun 12, 2021
d0c9eac
replace output_descr with output_schema for named fields
bkietz Jun 14, 2021
86cfce5
repair r/src/dataset.cpp
bkietz Jun 14, 2021
55df44d
add accessor to check for thread membership
bkietz Jun 15, 2021
9410b10
add support for Future<> to ResultWith, Raises
bkietz Jun 15, 2021
1de1a14
replaced ScanBatchesUnorderedAsync but it hangs
bkietz Jun 16, 2021
ceac80b
gcc4.8: more explicit construction
bkietz Jun 17, 2021
84c2182
paranoid reversion in async_generator.h
bkietz Jun 17, 2021
aaaa353
move new scan path into a unit test for now
bkietz Jun 17, 2021
1da1775
reduce #includes in expression.h
bkietz Jun 19, 2021
f50a89c
repair python binding with inlined KnownFieldValues def
bkietz Jun 21, 2021
256c29c
review comments
bkietz Jun 28, 2021
02c7eb4
consistent end signaling for Enumerated
bkietz Jun 29, 2021
5133e7e
transfer from background thread
bkietz Jun 30, 2021
e91ef9f
ensure that plans are stopped before they are destroyed
bkietz Jun 30, 2021
046e057
move hash into Expression::Expression(Call) to ensure it's always ini…
bkietz Jul 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/compute/registry.h"
#include "arrow/compute/util_internal.h"
#include "arrow/datum.h"
#include "arrow/pretty_print.h"
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
Expand Down Expand Up @@ -69,6 +70,48 @@ ExecBatch::ExecBatch(const RecordBatch& batch)
std::move(columns.begin(), columns.end(), values.begin());
}

bool ExecBatch::Equals(const ExecBatch& other) const {
return guarantee == other.guarantee && values == other.values;
}

void PrintTo(const ExecBatch& batch, std::ostream* os) {
*os << "ExecBatch\n";

static const std::string indent = " ";

*os << indent << "# Rows: " << batch.length << "\n";
if (batch.guarantee != literal(true)) {
*os << indent << "Guarantee: " << batch.guarantee.ToString() << "\n";
}

int i = 0;
for (const Datum& value : batch.values) {
*os << indent << "" << i++ << ": ";

if (value.is_scalar()) {
*os << "Scalar[" << value.scalar()->ToString() << "]\n";
continue;
}

auto array = value.make_array();
PrettyPrintOptions options;
options.skip_new_lines = true;
*os << "Array";
ARROW_CHECK_OK(PrettyPrint(*array, options, os));
*os << "\n";
}
}

ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) const {
ExecBatch out = *this;
for (auto& value : out.values) {
if (value.is_scalar()) continue;
value = value.array()->Slice(offset, length);
}
out.length = length;
return out;
}

Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
if (values.empty()) {
return Status::Invalid("Cannot infer ExecBatch length without at least one value");
Expand All @@ -77,9 +120,6 @@ Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
int64_t length = -1;
for (const auto& value : values) {
if (value.is_scalar()) {
if (length == -1) {
length = 1;
}
continue;
}

Expand All @@ -94,8 +134,13 @@ Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
}
}

if (length == -1) {
length = 1;
}

return ExecBatch(std::move(values), length);
}

namespace {

Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t length,
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <vector>

#include "arrow/array/data.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/datum.h"
#include "arrow/memory_pool.h"
#include "arrow/result.h"
Expand Down Expand Up @@ -186,6 +187,9 @@ struct ARROW_EXPORT ExecBatch {
/// ExecBatch::length is equal to the length of this array.
std::shared_ptr<SelectionVector> selection_vector;

/// A predicate Expression guaranteed to evaluate to true for all rows in this batch.
Expression guarantee = literal(true);

/// The semantic length of the ExecBatch. When the values are all scalars,
/// the length should be set to 1, otherwise the length is taken from the
/// array values, except when there is a selection vector. When there is a
Expand All @@ -203,9 +207,13 @@ struct ARROW_EXPORT ExecBatch {
return values[i];
}

bool Equals(const ExecBatch& other) const;

/// \brief A convenience for the number of values / arguments.
int num_values() const { return static_cast<int>(values.size()); }

ExecBatch Slice(int64_t offset, int64_t length) const;

/// \brief A convenience for returning the ValueDescr objects (types and
/// shapes) from the batch.
std::vector<ValueDescr> GetDescriptors() const {
Expand All @@ -215,8 +223,13 @@ struct ARROW_EXPORT ExecBatch {
}
return result;
}

ARROW_EXPORT friend void PrintTo(const ExecBatch&, std::ostream*);
};

inline bool operator==(const ExecBatch& l, const ExecBatch& r) { return l.Equals(r); }
inline bool operator!=(const ExecBatch& l, const ExecBatch& r) { return !l.Equals(r); }

/// \defgroup compute-call-function One-shot calls to compute functions
///
/// @{
Expand Down
147 changes: 147 additions & 0 deletions cpp/src/arrow/compute/exec/doc/exec_node.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# ExecNodes and logical operators
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the status of this document. If this is meant to be a persistent document, then can it be part of the Sphinx development docs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll promote this to a Sphinx doc in a follow up. https://issues.apache.org/jira/browse/ARROW-13227


`ExecNode`s are intended to implement individual logical operators
in a streaming execution graph. Each node receives batches from
upstream nodes (inputs), processes them in some way, then pushes
results to downstream nodes (outputs). `ExecNode`s are owned and
(to an extent) coordinated by an `ExecPlan`.

> Terminology: "operator" and "node" are mostly interchangable, like
> "Interface" and "Abstract Base Class" in c++ space. The latter is
> a formal and specific bit of code which implements the abstract
> concept.

## Types of logical operators

Each of these will have at least one corresponding concrete
`ExecNode`. Where possible, compatible implementations of a
logical operator will *not* be exposed as independent subclasses
of `ExecNode`. Instead we prefer that they be
be encapsulated internally by a single subclass of `ExecNode`
to permit switching between them during a query.

- Scan: materializes in-memory batches from storage (e.g. Parquet
files, flight stream, ...)
- Filter: evaluates an `Expression` on each input batch and outputs
a copy with any rows excluded for which the filter did not return
`true`.
- Project: evaluates `Expression`s on each input batch to produce
the columns of an output batch.
- Grouped Aggregate: identify groups based on one or more key columns
in each input batch, then update aggregates corresponding to those
groups. Node that this is a pipeline breaker; it will wait for its
inputs to complete before outputting any batches.
- Union: merge two or more streams of batches into a single stream
of batches.
- Write: write each batch to storage
- ToTable: Collect batches into a `Table` with stable row ordering where
possible.

#### Not in scope for Arrow 5.0:

- Join: perform an inner, left, outer, semi, or anti join given some
join predicates.
- Sort: accumulate all input batches into a single table, reorder its
rows by some sorting condition, then stream the sorted table out as
batches
- Top-K: retrieve a limited subset of rows from a table as though it
were in sorted order.

For example: a dataset scan with only a filter and a
projection will correspond to a fairly trivial graph:

```
ScanNode -> FilterNode -> ProjectNode -> ToTableNode
```

A scan node loads batches from disk and pushes to a filter node.
The filter node excludes some rows based on an `Expression` then
pushes filtered batches to a project node. The project node
materializes new columns based on `Expression`s then pushes those
batches to a table collection node. The table collection node
assembles these batches into a `Table` which is handed off as the
result of the `ExecPlan`.

## Parallelism, pipelines

The execution graph is orthogonal to parallelism; any
node may push to any other node from any thread. A scan node causes
each batch to arrive on a thread after which it will pass through
each node in the example graph above, never leaving that thread
(memory/other resource pressure permitting).

The example graph above happens to be simple enough that processing
of any batch by any node is independent of other nodes and other
batches; it is a pipeline. Note that there is no explicit `Pipeline`
class- pipelined execution is an emergent property of some sub
graphs.

Nodes which do not share this property (pipeline breakers) are
responsible for deciding when they have received sufficient input,
when they can start emitting output, etc. For example a `GroupByNode`
will wait for its input to be exhausted before it begins pushing
batches to its own outputs.

Parallelism is "seeded" by `ScanNode` (or other source nodes)- it
owns a reference to the thread pool on which the graph is executing
and fans out pushing to its outputs across that pool. A subsequent
`ProjectNode` will process the batch immediately after it is handed
off by the `ScanNode`- no explicit scheduling required.
Eventually, individual nodes may internally
parallelize processing of individual batches (for example, if a
`FilterNode`'s filter expression is slow). This decision is also left
up to each `ExecNode` implementation.

# ExecNode interface and usage

`ExecNode`s are constructed using one of the available factory
functions, such as `arrow::compute::MakeFilterNode`
or `arrow::dataset::MakeScanNode`. Any inputs to an `ExecNode`
must be provided when the node is constructed, so the first
nodes to be constructed are source nodes with no inputs
such as `ScanNode`.

The batches yielded by an `ExecNode` always conform precisely
to its output schema. NB: no by-name field lookups or type
checks are performed during execution. The output schema
is usually derived from the output schemas of inputs. For
example a `FilterNode`'s output schema is always identical to
that of its input since batches are only modified by exclusion
of some rows.

An `ExecNode` will begin producing batches when
`node->StartProducing()` is invoked and will proceed until stopped
with `node->StopProducing()`. Started nodes may not be destroyed
until stopped. `ExecNode`s are not currently restartable.
An `ExecNode` pushes batches to its outputs by passing each batch
to `output->InputReceived()`. It signals exhaustion by invoking
`output->InputFinished()`.

Error recovery is permitted within a node. For example, if evaluation
of an `Expression` runs out of memory the governing node may
try that evaluation again after some memory has been freed up.
If a node experiences an error from which it cannot recover (for
example an IO error while parsing a CSV file) then it reports this
with `output->ErrorReceived()`. An error which escapes the scope of
a single node should not be considered recoverable (no `FilterNode`
should `try/catch` the IO error above).

Loading