diff --git a/ci/appveyor-cpp-build.bat b/ci/appveyor-cpp-build.bat index 6b930939660..534f73c2d50 100644 --- a/ci/appveyor-cpp-build.bat +++ b/ci/appveyor-cpp-build.bat @@ -97,6 +97,7 @@ cmake -G "%GENERATOR%" %CMAKE_ARGS% ^ -DARROW_CXXFLAGS="%ARROW_CXXFLAGS%" ^ -DARROW_DATASET=ON ^ -DARROW_ENABLE_TIMING_TESTS=OFF ^ + -DARROW_ENGINE=ON ^ -DARROW_FLIGHT=%ARROW_BUILD_FLIGHT% ^ -DARROW_GANDIVA=%ARROW_BUILD_GANDIVA% ^ -DARROW_MIMALLOC=ON ^ diff --git a/ci/scripts/cpp_build.sh b/ci/scripts/cpp_build.sh index 8a1e4f32f3a..d47a6696e8f 100755 --- a/ci/scripts/cpp_build.sh +++ b/ci/scripts/cpp_build.sh @@ -59,6 +59,7 @@ cmake -G "${CMAKE_GENERATOR:-Ninja}" \ -DARROW_CUDA=${ARROW_CUDA:-OFF} \ -DARROW_CXXFLAGS=${ARROW_CXXFLAGS:-} \ -DARROW_DATASET=${ARROW_DATASET:-ON} \ + -DARROW_ENGINE=${ARROW_ENGINE:-ON} \ -DARROW_DEPENDENCY_SOURCE=${ARROW_DEPENDENCY_SOURCE:-AUTO} \ -DARROW_EXTRA_ERROR_CONTEXT=${ARROW_EXTRA_ERROR_CONTEXT:-OFF} \ -DARROW_ENABLE_TIMING_TESTS=${ARROW_ENABLE_TIMING_TESTS:-ON} \ diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a6946403deb..a31af74f68e 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -343,6 +343,10 @@ if(ARROW_CUDA set(ARROW_IPC ON) endif() +if(ARROW_ENGINE) + set(ARROW_COMPUTE ON) +endif() + if(ARROW_DATASET) set(ARROW_COMPUTE ON) set(ARROW_FILESYSTEM ON) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 62ea94b8d02..bee14ae4ce3 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -367,6 +367,7 @@ if(ARROW_COMPUTE) compute/api_vector.cc compute/cast.cc compute/exec.cc + compute/exec/exec_plan.cc compute/exec/expression.cc compute/function.cc compute/kernel.cc @@ -405,6 +406,7 @@ if(ARROW_COMPUTE) set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc PROPERTIES COMPILE_FLAGS ${ARROW_AVX2_FLAG}) endif() + if(ARROW_HAVE_RUNTIME_AVX512) list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx512.cc) set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES @@ -412,6 +414,8 @@ if(ARROW_COMPUTE) set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES COMPILE_FLAGS ${ARROW_AVX512_FLAG}) endif() + + list(APPEND ARROW_TESTING_SRCS compute/exec/test_util.cc) endif() if(ARROW_FILESYSTEM) diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index a10c1dad469..ac6ddc51dff 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -19,4 +19,6 @@ arrow_install_all_headers("arrow/compute/exec") add_arrow_compute_test(expression_test PREFIX "arrow-compute") +add_arrow_compute_test(plan_test PREFIX "arrow-compute") + add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc new file mode 100644 index 00000000000..f765ceccf0c --- /dev/null +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/exec_plan.h" + +#include + +#include "arrow/datum.h" +#include "arrow/result.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" +#include "arrow/util/optional.h" + +namespace arrow { + +using internal::checked_cast; + +namespace compute { + +namespace { + +struct ExecPlanImpl : public ExecPlan { + ExecPlanImpl() = default; + + ~ExecPlanImpl() override = default; + + ExecNode* AddNode(std::unique_ptr node) { + if (node->num_inputs() == 0) { + sources_.push_back(node.get()); + } + if (node->num_outputs() == 0) { + sinks_.push_back(node.get()); + } + nodes_.push_back(std::move(node)); + return nodes_.back().get(); + } + + Status Validate() const { + if (nodes_.empty()) { + return Status::Invalid("ExecPlan has no node"); + } + for (const auto& node : nodes_) { + RETURN_NOT_OK(node->Validate()); + } + return Status::OK(); + } + + Status StartProducing() { + ARROW_ASSIGN_OR_RAISE(auto sorted_nodes, ReverseTopoSort()); + Status st; + auto it = sorted_nodes.begin(); + while (it != sorted_nodes.end() && st.ok()) { + st &= (*it++)->StartProducing(); + } + if (!st.ok()) { + // Stop nodes that successfully started, in reverse order + // (`it` now points after the node that failed starting, so need to rewind) + --it; + while (it != sorted_nodes.begin()) { + (*--it)->StopProducing(); + } + } + return st; + } + + Result ReverseTopoSort() { + struct TopoSort { + const std::vector>& nodes; + std::unordered_set visited; + std::unordered_set visiting; + NodeVector sorted; + + explicit TopoSort(const std::vector>& nodes) + : nodes(nodes) { + visited.reserve(nodes.size()); + sorted.reserve(nodes.size()); + } + + Status Sort() { + for (const auto& node : nodes) { + RETURN_NOT_OK(Visit(node.get())); + } + DCHECK_EQ(sorted.size(), nodes.size()); + DCHECK_EQ(visited.size(), nodes.size()); + DCHECK_EQ(visiting.size(), 0); + return Status::OK(); + } + + Status Visit(ExecNode* node) { + if (visited.count(node) != 0) { + return Status::OK(); + } + + auto it_success = visiting.insert(node); + if (!it_success.second) { + // Insertion failed => node is already being visited + return Status::Invalid("Cycle detected in execution plan"); + } + + for (auto input : node->inputs()) { + // Ensure that producers are inserted before this consumer + RETURN_NOT_OK(Visit(input)); + } + + visiting.erase(it_success.first); + visited.insert(node); + sorted.push_back(node); + return Status::OK(); + } + + NodeVector Reverse() { + std::reverse(sorted.begin(), sorted.end()); + return std::move(sorted); + } + } topo_sort(nodes_); + + RETURN_NOT_OK(topo_sort.Sort()); + return topo_sort.Reverse(); + } + + std::vector> nodes_; + NodeVector sources_, sinks_; +}; + +ExecPlanImpl* ToDerived(ExecPlan* ptr) { return checked_cast(ptr); } + +const ExecPlanImpl* ToDerived(const ExecPlan* ptr) { + return checked_cast(ptr); +} + +util::optional GetNodeIndex(const std::vector& nodes, + const ExecNode* node) { + for (int i = 0; i < static_cast(nodes.size()); ++i) { + if (nodes[i] == node) return i; + } + return util::nullopt; +} + +} // namespace + +Result> ExecPlan::Make() { + return std::make_shared(); +} + +ExecNode* ExecPlan::AddNode(std::unique_ptr node) { + return ToDerived(this)->AddNode(std::move(node)); +} + +const ExecPlan::NodeVector& ExecPlan::sources() const { + return ToDerived(this)->sources_; +} + +const ExecPlan::NodeVector& ExecPlan::sinks() const { return ToDerived(this)->sinks_; } + +Status ExecPlan::Validate() { return ToDerived(this)->Validate(); } + +Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); } + +ExecNode::ExecNode(ExecPlan* plan, std::string label, + std::vector input_descrs, + std::vector input_labels, BatchDescr output_descr, + int num_outputs) + : plan_(plan), + label_(std::move(label)), + input_descrs_(std::move(input_descrs)), + input_labels_(std::move(input_labels)), + output_descr_(std::move(output_descr)), + num_outputs_(num_outputs) {} + +Status ExecNode::Validate() const { + if (inputs_.size() != input_descrs_.size()) { + return Status::Invalid("Invalid number of inputs for '", label(), "' (expected ", + num_inputs(), ", actual ", inputs_.size(), ")"); + } + + if (static_cast(outputs_.size()) != num_outputs_) { + return Status::Invalid("Invalid number of outputs for '", label(), "' (expected ", + num_outputs(), ", actual ", outputs_.size(), ")"); + } + + DCHECK_EQ(input_descrs_.size(), input_labels_.size()); + + for (auto out : outputs_) { + auto input_index = GetNodeIndex(out->inputs(), this); + if (!input_index) { + return Status::Invalid("Node '", label(), "' outputs to node '", out->label(), + "' but is not listed as an input."); + } + + const auto& in_descr = out->input_descrs_[*input_index]; + if (in_descr != output_descr_) { + return Status::Invalid( + "Node '", label(), "' (bound to input ", input_labels_[*input_index], + ") produces batches with type '", ValueDescr::ToString(output_descr_), + "' inconsistent with consumer '", out->label(), "' which accepts '", + ValueDescr::ToString(in_descr), "'"); + } + } + + return Status::OK(); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h new file mode 100644 index 00000000000..0d2faea0ddc --- /dev/null +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/compute/type_fwd.h" +#include "arrow/type_fwd.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +// NOTES: +// - ExecBatches only have arrays or scalars +// - data streams may be ordered, so add input number? +// - node to combine input needs to reorder + +namespace arrow { +namespace compute { + +class ExecNode; + +class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { + public: + using NodeVector = std::vector; + + virtual ~ExecPlan() = default; + + /// Make an empty exec plan + static Result> Make(); + + ExecNode* AddNode(std::unique_ptr node); + + template + ExecNode* EmplaceNode(Args&&... args) { + return AddNode(std::unique_ptr(new Node{std::forward(args)...})); + } + + /// The initial inputs + const NodeVector& sources() const; + + /// The final outputs + const NodeVector& sinks() const; + + // XXX API question: + // There are clearly two phases in the ExecPlan lifecycle: + // - one construction phase where AddNode() and ExecNode::AddInput() is called + // (with optional validation at the end) + // - one execution phase where the nodes are topo-sorted and then started + // + // => Should we separate out those APIs? e.g. have a ExecPlanBuilder + // for the first phase. + + Status Validate(); + + /// Start producing on all nodes + /// + /// Nodes are started in reverse topological order, such that any node + /// is started before all of its inputs. + Status StartProducing(); + + // XXX should we also have `void StopProducing()`? + + protected: + ExecPlan() = default; +}; + +class ARROW_EXPORT ExecNode { + public: + using NodeVector = std::vector; + using BatchDescr = std::vector; + + virtual ~ExecNode() = default; + + virtual const char* kind_name() = 0; + + // The number of inputs/outputs expected by this node + int num_inputs() const { return static_cast(input_descrs_.size()); } + int num_outputs() const { return num_outputs_; } + + /// This node's predecessors in the exec plan + const NodeVector& inputs() const { return inputs_; } + + /// The datatypes accepted by this node for each input + const std::vector& input_descrs() const { return input_descrs_; } + + /// \brief Labels identifying the function of each input. + /// + /// For example, FilterNode accepts "target" and "filter" inputs. + const std::vector& input_labels() const { return input_labels_; } + + /// This node's successors in the exec plan + const NodeVector& outputs() const { return outputs_; } + + /// The datatypes for batches produced by this node + const BatchDescr& output_descr() const { return output_descr_; } + + /// This node's exec plan + ExecPlan* plan() { return plan_; } + + /// \brief An optional label, for display and debugging + /// + /// There is no guarantee that this value is non-empty or unique. + const std::string& label() const { return label_; } + + void AddInput(ExecNode* input) { + inputs_.push_back(input); + input->outputs_.push_back(this); + } + + Status Validate() const; + + /// Upstream API: + /// These functions are called by input nodes that want to inform this node + /// about an updated condition (a new input batch, an error, an impeding + /// end of stream). + /// + /// Implementation rules: + /// - these may be called anytime after StartProducing() has succeeded + /// (and even during or after StopProducing()) + /// - these may be called concurrently + /// - these are allowed to call back into PauseProducing(), ResumeProducing() + /// and StopProducing() + + /// Transfer input batch to ExecNode + virtual void InputReceived(ExecNode* input, int seq_num, compute::ExecBatch batch) = 0; + + /// Signal error to ExecNode + virtual void ErrorReceived(ExecNode* input, Status error) = 0; + + /// Mark the inputs finished after the given number of batches. + /// + /// This may be called before all inputs are received. This simply fixes + /// the total number of incoming batches for an input, so that the ExecNode + /// knows when it has received all input, regardless of order. + virtual void InputFinished(ExecNode* input, int seq_stop) = 0; + + /// Lifecycle API: + /// - start / stop to initiate and terminate production + /// - pause / resume to apply backpressure + /// + /// Implementation rules: + /// - StartProducing() should not recurse into the inputs, as it is + /// handled by ExecPlan::StartProducing() + /// - PauseProducing(), ResumeProducing(), StopProducing() may be called + /// concurrently (but only after StartProducing() has returned successfully) + /// - PauseProducing(), ResumeProducing(), StopProducing() may be called + /// by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished() + /// methods + /// - StopProducing() should recurse into the inputs + /// - StopProducing() must be idempotent + + // XXX What happens if StartProducing() calls an output's InputReceived() + // synchronously, and InputReceived() decides to call back into StopProducing() + // (or PauseProducing()) because it received enough data? + // + // Right now, since synchronous calls happen in both directions (input to + // output and then output to input), a node must be careful to be reentrant + // against synchronous calls from its output, *and* also concurrent calls from + // other threads. The most reliable solution is to update the internal state + // first, and notify outputs only at the end. + // + // Alternate rules: + // - StartProducing(), ResumeProducing() can call synchronously into + // its ouputs' consuming methods (InputReceived() etc.) + // - InputReceived(), ErrorReceived(), InputFinished() can call asynchronously + // into its inputs' PauseProducing(), StopProducing() + // + // Alternate API: + // - InputReceived(), ErrorReceived(), InputFinished() return a ProductionHint + // enum: either None (default), PauseProducing, ResumeProducing, StopProducing + // - A method allows passing a ProductionHint asynchronously from an output node + // (replacing PauseProducing(), ResumeProducing(), StopProducing()) + + /// \brief Start producing + /// + /// This must only be called once. If this fails, then other lifecycle + /// methods must not be called. + /// + /// This is typically called automatically by ExecPlan::StartProducing(). + virtual Status StartProducing() = 0; + + /// \brief Pause producing temporarily + /// + /// This call is a hint that an output node is currently not willing + /// to receive data. + /// + /// This may be called any number of times after StartProducing() succeeds. + /// However, the node is still free to produce data (which may be difficult + /// to prevent anyway if data is produced using multiple threads). + virtual void PauseProducing(ExecNode* output) = 0; + + /// \brief Resume producing after a temporary pause + /// + /// This call is a hint that an output node is willing to receive data again. + /// + /// This may be called any number of times after StartProducing() succeeds. + /// This may also be called concurrently with PauseProducing(), which suggests + /// the implementation may use an atomic counter. + virtual void ResumeProducing(ExecNode* output) = 0; + + /// \brief Stop producing definitively to a single output + /// + /// This call is a hint that an output node has completed and is not willing + /// to not receive any further data. + virtual void StopProducing(ExecNode* output) = 0; + + /// \brief Stop producing definitively + virtual void StopProducing() = 0; + + protected: + ExecNode(ExecPlan* plan, std::string label, std::vector input_descrs, + std::vector input_labels, BatchDescr output_descr, + int num_outputs); + + ExecPlan* plan_; + + std::string label_; + + std::vector input_descrs_; + std::vector input_labels_; + NodeVector inputs_; + + BatchDescr output_descr_; + int num_outputs_; + NodeVector outputs_; +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc new file mode 100644 index 00000000000..d809409b28d --- /dev/null +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -0,0 +1,402 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include + +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/record_batch.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/util/logging.h" +#include "arrow/util/thread_pool.h" + +namespace arrow { + +using internal::Executor; + +namespace compute { + +void AssertBatchesEqual(const RecordBatchVector& expected, + const RecordBatchVector& actual) { + ASSERT_EQ(expected.size(), actual.size()); + for (size_t i = 0; i < expected.size(); ++i) { + AssertBatchesEqual(*expected[i], *actual[i]); + } +} + +TEST(ExecPlanConstruction, Empty) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + + ASSERT_RAISES(Invalid, plan->Validate()); +} + +TEST(ExecPlanConstruction, SingleNode) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + auto node = MakeDummyNode(plan.get(), "dummy", /*num_inputs=*/0, /*num_outputs=*/0); + ASSERT_OK(plan->Validate()); + ASSERT_THAT(plan->sources(), ::testing::ElementsAre(node)); + ASSERT_THAT(plan->sinks(), ::testing::ElementsAre(node)); + + ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make()); + node = MakeDummyNode(plan.get(), "dummy", /*num_inputs=*/1, /*num_outputs=*/0); + // Input not bound + ASSERT_RAISES(Invalid, plan->Validate()); + + ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make()); + node = MakeDummyNode(plan.get(), "dummy", /*num_inputs=*/0, /*num_outputs=*/1); + // Output not bound + ASSERT_RAISES(Invalid, plan->Validate()); +} + +TEST(ExecPlanConstruction, SourceSink) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + auto source = MakeDummyNode(plan.get(), "source", /*num_inputs=*/0, /*num_outputs=*/1); + auto sink = MakeDummyNode(plan.get(), "sink", /*num_inputs=*/1, /*num_outputs=*/0); + // Input / output not bound + ASSERT_RAISES(Invalid, plan->Validate()); + + sink->AddInput(source); + ASSERT_OK(plan->Validate()); + EXPECT_THAT(plan->sources(), ::testing::ElementsAre(source)); + EXPECT_THAT(plan->sinks(), ::testing::ElementsAre(sink)); +} + +TEST(ExecPlanConstruction, MultipleNode) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + + auto source1 = + MakeDummyNode(plan.get(), "source1", /*num_inputs=*/0, /*num_outputs=*/2); + + auto source2 = + MakeDummyNode(plan.get(), "source2", /*num_inputs=*/0, /*num_outputs=*/1); + + auto process1 = + MakeDummyNode(plan.get(), "process1", /*num_inputs=*/1, /*num_outputs=*/2); + + auto process2 = + MakeDummyNode(plan.get(), "process1", /*num_inputs=*/2, /*num_outputs=*/1); + + auto process3 = + MakeDummyNode(plan.get(), "process3", /*num_inputs=*/3, /*num_outputs=*/1); + + auto sink = MakeDummyNode(plan.get(), "sink", /*num_inputs=*/1, /*num_outputs=*/0); + + sink->AddInput(process3); + + process3->AddInput(process1); + process3->AddInput(process2); + process3->AddInput(process1); + + process2->AddInput(source1); + process2->AddInput(source2); + + process1->AddInput(source1); + + ASSERT_OK(plan->Validate()); + ASSERT_THAT(plan->sources(), ::testing::ElementsAre(source1, source2)); + ASSERT_THAT(plan->sinks(), ::testing::ElementsAre(sink)); +} + +struct StartStopTracker { + std::vector started, stopped; + + StartProducingFunc start_producing_func(Status st = Status::OK()) { + return [this, st](ExecNode* node) { + started.push_back(node->label()); + return st; + }; + } + + StopProducingFunc stop_producing_func() { + return [this](ExecNode* node) { stopped.push_back(node->label()); }; + } +}; + +TEST(ExecPlan, DummyStartProducing) { + StartStopTracker t; + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + auto source1 = MakeDummyNode(plan.get(), "source1", /*num_inputs=*/0, /*num_outputs=*/2, + t.start_producing_func(), t.stop_producing_func()); + auto source2 = MakeDummyNode(plan.get(), "source2", /*num_inputs=*/0, /*num_outputs=*/1, + t.start_producing_func(), t.stop_producing_func()); + auto process1 = + MakeDummyNode(plan.get(), "process1", /*num_inputs=*/1, /*num_outputs=*/2, + t.start_producing_func(), t.stop_producing_func()); + auto process2 = + MakeDummyNode(plan.get(), "process2", /*num_inputs=*/2, /*num_outputs=*/1, + t.start_producing_func(), t.stop_producing_func()); + auto process3 = + MakeDummyNode(plan.get(), "process3", /*num_inputs=*/3, /*num_outputs=*/1, + t.start_producing_func(), t.stop_producing_func()); + + auto sink = MakeDummyNode(plan.get(), "sink", /*num_inputs=*/1, /*num_outputs=*/0, + t.start_producing_func(), t.stop_producing_func()); + + process1->AddInput(source1); + process2->AddInput(process1); + process2->AddInput(source2); + process3->AddInput(process1); + process3->AddInput(source1); + process3->AddInput(process2); + sink->AddInput(process3); + + ASSERT_OK(plan->Validate()); + ASSERT_EQ(t.started.size(), 0); + ASSERT_EQ(t.stopped.size(), 0); + + ASSERT_OK(plan->StartProducing()); + // Note that any correct reverse topological order may do + ASSERT_THAT(t.started, ::testing::ElementsAre("sink", "process3", "process2", + "process1", "source2", "source1")); + ASSERT_EQ(t.stopped.size(), 0); +} + +TEST(ExecPlan, DummyStartProducingCycle) { + // A trivial cycle + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + auto node = MakeDummyNode(plan.get(), "dummy", /*num_inputs=*/1, /*num_outputs=*/1); + node->AddInput(node); + ASSERT_OK(plan->Validate()); + ASSERT_RAISES(Invalid, plan->StartProducing()); + + // A less trivial one + ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make()); + auto source = MakeDummyNode(plan.get(), "source", /*num_inputs=*/0, /*num_outputs=*/1); + auto process1 = + MakeDummyNode(plan.get(), "process1", /*num_inputs=*/2, /*num_outputs=*/2); + auto process2 = + MakeDummyNode(plan.get(), "process2", /*num_inputs=*/1, /*num_outputs=*/1); + auto process3 = + MakeDummyNode(plan.get(), "process3", /*num_inputs=*/2, /*num_outputs=*/2); + auto sink = MakeDummyNode(plan.get(), "sink", /*num_inputs=*/1, /*num_outputs=*/0); + + process1->AddInput(source); + process2->AddInput(process1); + process3->AddInput(process2); + process3->AddInput(process1); + process1->AddInput(process3); + sink->AddInput(process3); + + ASSERT_OK(plan->Validate()); + ASSERT_RAISES(Invalid, plan->StartProducing()); +} + +TEST(ExecPlan, DummyStartProducingError) { + StartStopTracker t; + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + auto source1 = MakeDummyNode(plan.get(), "source1", /*num_inputs=*/0, /*num_outputs=*/2, + t.start_producing_func(Status::NotImplemented("zzz")), + t.stop_producing_func()); + auto source2 = MakeDummyNode(plan.get(), "source2", /*num_inputs=*/0, /*num_outputs=*/1, + t.start_producing_func(), t.stop_producing_func()); + auto process1 = MakeDummyNode( + plan.get(), "process1", /*num_inputs=*/1, /*num_outputs=*/2, + t.start_producing_func(Status::IOError("xxx")), t.stop_producing_func()); + auto process2 = + MakeDummyNode(plan.get(), "process2", /*num_inputs=*/2, /*num_outputs=*/1, + t.start_producing_func(), t.stop_producing_func()); + process1->AddInput(source1); + process2->AddInput(process1); + process2->AddInput(source2); + auto process3 = + MakeDummyNode(plan.get(), "process3", /*num_inputs=*/3, /*num_outputs=*/1, + t.start_producing_func(), t.stop_producing_func()); + process3->AddInput(process1); + process3->AddInput(source1); + process3->AddInput(process2); + auto sink = MakeDummyNode(plan.get(), "sink", /*num_inputs=*/1, /*num_outputs=*/0, + t.start_producing_func(), t.stop_producing_func()); + sink->AddInput(process3); + + ASSERT_OK(plan->Validate()); + ASSERT_EQ(t.started.size(), 0); + ASSERT_EQ(t.stopped.size(), 0); + + // `process1` raises IOError + ASSERT_RAISES(IOError, plan->StartProducing()); + ASSERT_THAT(t.started, + ::testing::ElementsAre("sink", "process3", "process2", "process1")); + // Nodes that started successfully were stopped in reverse order + ASSERT_THAT(t.stopped, ::testing::ElementsAre("process2", "process3", "sink")); +} + +// TODO move this to gtest_util.h? + +class SlowRecordBatchReader : public RecordBatchReader { + public: + explicit SlowRecordBatchReader(std::shared_ptr reader) + : reader_(std::move(reader)) {} + + std::shared_ptr schema() const override { return reader_->schema(); } + + Status ReadNext(std::shared_ptr* batch) override { + SleepABit(); + return reader_->ReadNext(batch); + } + + static Result> Make( + RecordBatchVector batches, std::shared_ptr schema = nullptr) { + ARROW_ASSIGN_OR_RAISE(auto reader, + RecordBatchReader::Make(std::move(batches), std::move(schema))); + return std::make_shared(std::move(reader)); + } + + protected: + std::shared_ptr reader_; +}; + +static Result MakeSlowRecordBatchGenerator( + RecordBatchVector batches, std::shared_ptr schema) { + auto gen = MakeVectorGenerator(batches); + // TODO move this into testing/async_generator_util.h? + auto delayed_gen = MakeMappedGenerator>( + std::move(gen), [](const std::shared_ptr& batch) { + auto fut = Future>::Make(); + SleepABitAsync().AddCallback( + [fut, batch](const Result<::arrow::detail::Empty>&) mutable { + fut.MarkFinished(batch); + }); + return fut; + }); + // Adding readahead implicitly adds parallelism by pulling reentrantly from + // the delayed generator + return MakeReadaheadGenerator(std::move(delayed_gen), /*max_readahead=*/64); +} + +class TestExecPlanExecution : public ::testing::Test { + public: + void SetUp() override { + ASSERT_OK_AND_ASSIGN(io_executor_, internal::ThreadPool::Make(8)); + } + + RecordBatchVector MakeRandomBatches(const std::shared_ptr& schema, + int num_batches = 10, int batch_size = 4) { + random::RandomArrayGenerator rng(42); + RecordBatchVector batches; + batches.reserve(num_batches); + for (int i = 0; i < num_batches; ++i) { + batches.push_back(rng.BatchOf(schema->fields(), batch_size)); + } + return batches; + } + + struct CollectorPlan { + std::shared_ptr plan; + RecordBatchCollectNode* sink; + }; + + Result MakeSourceSink(std::shared_ptr reader, + const std::shared_ptr& schema) { + ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make()); + auto source = + MakeRecordBatchReaderNode(plan.get(), "source", reader, io_executor_.get()); + auto sink = MakeRecordBatchCollectNode(plan.get(), "sink", schema); + sink->AddInput(source); + return CollectorPlan{plan, sink}; + } + + Result MakeSourceSink(RecordBatchGenerator generator, + const std::shared_ptr& schema) { + ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make()); + auto source = MakeRecordBatchReaderNode(plan.get(), "source", schema, generator, + io_executor_.get()); + auto sink = MakeRecordBatchCollectNode(plan.get(), "sink", schema); + sink->AddInput(source); + return CollectorPlan{plan, sink}; + } + + Result MakeSourceSink(const RecordBatchVector& batches, + const std::shared_ptr& schema) { + ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchReader::Make(batches, schema)); + return MakeSourceSink(std::move(reader), schema); + } + + Result StartAndCollect(ExecPlan* plan, + RecordBatchCollectNode* sink) { + RETURN_NOT_OK(plan->StartProducing()); + auto fut = CollectAsyncGenerator(sink->generator()); + return fut.result(); + } + + template + void TestSourceSink(RecordBatchReaderFactory reader_factory) { + auto schema = ::arrow::schema({field("a", int32()), field("b", boolean())}); + RecordBatchVector batches{ + RecordBatchFromJSON(schema, R"([{"a": null, "b": true}, + {"a": 4, "b": false}])"), + RecordBatchFromJSON(schema, R"([{"a": 5, "b": null}, + {"a": 6, "b": false}, + {"a": 7, "b": false}])"), + }; + + ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(batches, schema)); + ASSERT_OK_AND_ASSIGN(auto cp, MakeSourceSink(reader, schema)); + ASSERT_OK(cp.plan->Validate()); + + ASSERT_OK_AND_ASSIGN(auto got_batches, StartAndCollect(cp.plan.get(), cp.sink)); + AssertBatchesEqual(batches, got_batches); + } + + template + void TestStressSourceSink(int num_batches, RecordBatchReaderFactory batch_factory) { + auto schema = ::arrow::schema({field("a", int32()), field("b", boolean())}); + auto batches = MakeRandomBatches(schema, num_batches); + + ASSERT_OK_AND_ASSIGN(auto reader, batch_factory(batches, schema)); + ASSERT_OK_AND_ASSIGN(auto cp, MakeSourceSink(reader, schema)); + ASSERT_OK(cp.plan->Validate()); + + ASSERT_OK_AND_ASSIGN(auto got_batches, StartAndCollect(cp.plan.get(), cp.sink)); + AssertBatchesEqual(batches, got_batches); + } + + protected: + std::shared_ptr io_executor_; +}; + +TEST_F(TestExecPlanExecution, SourceSink) { TestSourceSink(RecordBatchReader::Make); } + +TEST_F(TestExecPlanExecution, SlowSourceSink) { + TestSourceSink(SlowRecordBatchReader::Make); +} + +TEST_F(TestExecPlanExecution, SlowSourceSinkParallel) { + TestSourceSink(MakeSlowRecordBatchGenerator); +} + +TEST_F(TestExecPlanExecution, StressSourceSink) { + TestStressSourceSink(/*num_batches=*/200, RecordBatchReader::Make); +} + +TEST_F(TestExecPlanExecution, StressSlowSourceSink) { + // This doesn't create parallelism as the RecordBatchReader is iterated serially. + TestStressSourceSink(/*num_batches=*/30, SlowRecordBatchReader::Make); +} + +TEST_F(TestExecPlanExecution, StressSlowSourceSinkParallel) { + TestStressSourceSink(/*num_batches=*/300, MakeSlowRecordBatchGenerator); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc new file mode 100644 index 00000000000..f2cd7d2a740 --- /dev/null +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -0,0 +1,400 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/test_util.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/datum.h" +#include "arrow/record_batch.h" +#include "arrow/type.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/optional.h" + +namespace arrow { + +using internal::Executor; + +namespace compute { +namespace { + +// TODO expose this as `static ValueDescr::FromSchemaColumns`? +std::vector DescrFromSchemaColumns(const Schema& schema) { + std::vector descr(schema.num_fields()); + std::transform(schema.fields().begin(), schema.fields().end(), descr.begin(), + [](const std::shared_ptr& field) { + return ValueDescr::Array(field->type()); + }); + return descr; +} + +struct DummyNode : ExecNode { + DummyNode(ExecPlan* plan, std::string label, int num_inputs, int num_outputs, + StartProducingFunc start_producing, StopProducingFunc stop_producing) + : ExecNode(plan, std::move(label), std::vector(num_inputs, descr()), {}, + descr(), num_outputs), + start_producing_(std::move(start_producing)), + stop_producing_(std::move(stop_producing)) { + for (int i = 0; i < num_inputs; ++i) { + input_labels_.push_back(std::to_string(i)); + } + } + + const char* kind_name() override { return "Dummy"; } + + void InputReceived(ExecNode* input, int seq_num, compute::ExecBatch batch) override {} + + void ErrorReceived(ExecNode* input, Status error) override {} + + void InputFinished(ExecNode* input, int seq_stop) override {} + + Status StartProducing() override { + if (start_producing_) { + RETURN_NOT_OK(start_producing_(this)); + } + started_ = true; + return Status::OK(); + } + + void PauseProducing(ExecNode* output) override { + ASSERT_GE(num_outputs(), 0) << "Sink nodes should not experience backpressure"; + AssertIsOutput(output); + } + + void ResumeProducing(ExecNode* output) override { + ASSERT_GE(num_outputs(), 0) << "Sink nodes should not experience backpressure"; + AssertIsOutput(output); + } + + void StopProducing(ExecNode* output) override { + ASSERT_GE(num_outputs(), 0) << "Sink nodes should not experience backpressure"; + AssertIsOutput(output); + StopProducing(); + } + + void StopProducing() override { + if (started_) { + started_ = false; + for (const auto& input : inputs_) { + input->StopProducing(this); + } + if (stop_producing_) { + stop_producing_(this); + } + } + } + + private: + void AssertIsOutput(ExecNode* output) { + ASSERT_NE(std::find(outputs_.begin(), outputs_.end(), output), outputs_.end()); + } + + BatchDescr descr() const { return std::vector{ValueDescr(null())}; } + + StartProducingFunc start_producing_; + StopProducingFunc stop_producing_; + bool started_ = false; +}; + +struct RecordBatchReaderNode : ExecNode { + RecordBatchReaderNode(ExecPlan* plan, std::string label, + std::shared_ptr reader, Executor* io_executor) + : ExecNode(plan, std::move(label), {}, {}, + DescrFromSchemaColumns(*reader->schema()), /*num_outputs=*/1), + schema_(reader->schema()), + reader_(std::move(reader)), + io_executor_(io_executor) {} + + RecordBatchReaderNode(ExecPlan* plan, std::string label, std::shared_ptr schema, + RecordBatchGenerator generator, Executor* io_executor) + : ExecNode(plan, std::move(label), {}, {}, DescrFromSchemaColumns(*schema), + /*num_outputs=*/1), + schema_(std::move(schema)), + generator_(std::move(generator)), + io_executor_(io_executor) {} + + const char* kind_name() override { return "RecordBatchReader"; } + + void InputReceived(ExecNode* input, int seq_num, compute::ExecBatch batch) override {} + + void ErrorReceived(ExecNode* input, Status error) override {} + + void InputFinished(ExecNode* input, int seq_stop) override {} + + Status StartProducing() override { + next_batch_index_ = 0; + if (!generator_) { + auto it = MakeIteratorFromReader(reader_); + ARROW_ASSIGN_OR_RAISE(generator_, + MakeBackgroundGenerator(std::move(it), io_executor_)); + } + GenerateOne(std::unique_lock{mutex_}); + return Status::OK(); + } + + void PauseProducing(ExecNode* output) override {} + + void ResumeProducing(ExecNode* output) override {} + + void StopProducing(ExecNode* output) override { + ASSERT_EQ(output, outputs_[0]); + std::unique_lock lock(mutex_); + generator_ = nullptr; // null function + } + + void StopProducing() override { StopProducing(outputs_[0]); } + + private: + void GenerateOne(std::unique_lock&& lock) { + if (!generator_) { + // Stopped + return; + } + auto plan = this->plan()->shared_from_this(); + auto fut = generator_(); + const auto batch_index = next_batch_index_++; + + lock.unlock(); + // TODO we want to transfer always here + io_executor_->Transfer(std::move(fut)) + .AddCallback( + [plan, batch_index, this](const Result>& res) { + std::unique_lock lock(mutex_); + if (!res.ok()) { + for (auto out : outputs_) { + out->ErrorReceived(this, res.status()); + } + return; + } + const auto& batch = *res; + if (IsIterationEnd(batch)) { + lock.unlock(); + for (auto out : outputs_) { + out->InputFinished(this, batch_index); + } + } else { + lock.unlock(); + for (auto out : outputs_) { + out->InputReceived(this, batch_index, compute::ExecBatch(*batch)); + } + lock.lock(); + GenerateOne(std::move(lock)); + } + }); + } + + std::mutex mutex_; + const std::shared_ptr schema_; + const std::shared_ptr reader_; + RecordBatchGenerator generator_; + int next_batch_index_; + + Executor* const io_executor_; +}; + +struct RecordBatchCollectNodeImpl : public RecordBatchCollectNode { + RecordBatchCollectNodeImpl(ExecPlan* plan, std::string label, + std::shared_ptr schema) + : RecordBatchCollectNode(plan, std::move(label), {DescrFromSchemaColumns(*schema)}, + {"batches_to_collect"}, {}, 0), + schema_(std::move(schema)) {} + + RecordBatchGenerator generator() override { return generator_; } + + const char* kind_name() override { return "RecordBatchReader"; } + + Status StartProducing() override { + num_received_ = 0; + num_emitted_ = 0; + emit_stop_ = -1; + stopped_ = false; + producer_.emplace(generator_.producer()); + return Status::OK(); + } + + // sink nodes have no outputs from which to feel backpressure + void ResumeProducing(ExecNode* output) override { + FAIL() << "no outputs; this should never be called"; + } + void PauseProducing(ExecNode* output) override { + FAIL() << "no outputs; this should never be called"; + } + void StopProducing(ExecNode* output) override { + FAIL() << "no outputs; this should never be called"; + } + + void StopProducing() override { + std::unique_lock lock(mutex_); + StopProducingUnlocked(); + } + + void InputReceived(ExecNode* input, int seq_num, + compute::ExecBatch exec_batch) override { + std::unique_lock lock(mutex_); + if (stopped_) { + return; + } + auto maybe_batch = MakeBatch(std::move(exec_batch)); + if (!maybe_batch.ok()) { + lock.unlock(); + producer_->Push(std::move(maybe_batch)); + return; + } + + // TODO would be nice to factor this out in a ReorderQueue + auto batch = *std::move(maybe_batch); + if (seq_num <= static_cast(received_batches_.size())) { + received_batches_.resize(seq_num + 1, nullptr); + } + DCHECK_EQ(received_batches_[seq_num], nullptr); + received_batches_[seq_num] = std::move(batch); + ++num_received_; + + if (seq_num != num_emitted_) { + // Cannot emit yet as there is a hole at `num_emitted_` + DCHECK_GT(seq_num, num_emitted_); + DCHECK_EQ(received_batches_[num_emitted_], nullptr); + return; + } + if (num_received_ == emit_stop_) { + StopProducingUnlocked(); + } + + // Emit batches in order as far as possible + // First collect these batches, then unlock before producing. + const auto seq_start = seq_num; + while (seq_num < static_cast(received_batches_.size()) && + received_batches_[seq_num] != nullptr) { + ++seq_num; + } + DCHECK_GT(seq_num, seq_start); + // By moving the values now, we make sure another thread won't emit the same values + // below + RecordBatchVector to_emit( + std::make_move_iterator(received_batches_.begin() + seq_start), + std::make_move_iterator(received_batches_.begin() + seq_num)); + + lock.unlock(); + for (auto&& batch : to_emit) { + producer_->Push(std::move(batch)); + } + lock.lock(); + + DCHECK_EQ(seq_start, num_emitted_); // num_emitted_ wasn't bumped in the meantime + num_emitted_ = seq_num; + } + + void ErrorReceived(ExecNode* input, Status error) override { + // XXX do we care about properly sequencing the error? + producer_->Push(std::move(error)); + std::unique_lock lock(mutex_); + StopProducingUnlocked(); + } + + void InputFinished(ExecNode* input, int seq_stop) override { + std::unique_lock lock(mutex_); + DCHECK_GE(seq_stop, static_cast(received_batches_.size())); + received_batches_.reserve(seq_stop); + emit_stop_ = seq_stop; + if (emit_stop_ == num_received_) { + DCHECK_EQ(emit_stop_, num_emitted_); + StopProducingUnlocked(); + } + } + + private: + void StopProducingUnlocked() { + if (!stopped_) { + stopped_ = true; + producer_->Close(); + inputs_[0]->StopProducing(this); + } + } + + // TODO factor this out as ExecBatch::ToRecordBatch()? + Result> MakeBatch(compute::ExecBatch&& exec_batch) { + ArrayDataVector columns; + columns.reserve(exec_batch.values.size()); + for (auto&& value : exec_batch.values) { + if (!value.is_array()) { + return Status::TypeError("Expected array input"); + } + columns.push_back(std::move(value).array()); + } + return RecordBatch::Make(schema_, exec_batch.length, std::move(columns)); + } + + const std::shared_ptr schema_; + + std::mutex mutex_; + RecordBatchVector received_batches_; + int num_received_; + int num_emitted_; + int emit_stop_; + bool stopped_; + + PushGenerator> generator_; + util::optional>::Producer> producer_; +}; + +} // namespace + +ExecNode* MakeRecordBatchReaderNode(ExecPlan* plan, std::string label, + std::shared_ptr reader, + Executor* io_executor) { + return plan->EmplaceNode(plan, std::move(label), + std::move(reader), io_executor); +} + +ExecNode* MakeRecordBatchReaderNode(ExecPlan* plan, std::string label, + std::shared_ptr schema, + RecordBatchGenerator generator, + ::arrow::internal::Executor* io_executor) { + return plan->EmplaceNode( + plan, std::move(label), std::move(schema), std::move(generator), io_executor); +} + +ExecNode* MakeDummyNode(ExecPlan* plan, std::string label, int num_inputs, + int num_outputs, StartProducingFunc start_producing, + StopProducingFunc stop_producing) { + return plan->EmplaceNode(plan, std::move(label), num_inputs, num_outputs, + std::move(start_producing), + std::move(stop_producing)); +} + +RecordBatchCollectNode* MakeRecordBatchCollectNode( + ExecPlan* plan, std::string label, const std::shared_ptr& schema) { + return internal::checked_cast( + plan->EmplaceNode(plan, std::move(label), schema)); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h new file mode 100644 index 00000000000..c2dc785a501 --- /dev/null +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/record_batch.h" +#include "arrow/testing/visibility.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/type_fwd.h" + +namespace arrow { +namespace compute { + +using StartProducingFunc = std::function; +using StopProducingFunc = std::function; + +// Make a dummy node that has no execution behaviour +ARROW_TESTING_EXPORT +ExecNode* MakeDummyNode(ExecPlan* plan, std::string label, int num_inputs, + int num_outputs, StartProducingFunc = {}, StopProducingFunc = {}); + +using RecordBatchGenerator = AsyncGenerator>; + +// Make a source node (no inputs) that produces record batches by reading in the +// background from a RecordBatchReader. +ARROW_TESTING_EXPORT +ExecNode* MakeRecordBatchReaderNode(ExecPlan* plan, std::string label, + std::shared_ptr reader, + ::arrow::internal::Executor* io_executor); + +ARROW_TESTING_EXPORT +ExecNode* MakeRecordBatchReaderNode(ExecPlan* plan, std::string label, + std::shared_ptr schema, + RecordBatchGenerator generator, + ::arrow::internal::Executor* io_executor); + +class RecordBatchCollectNode : public ExecNode { + public: + virtual RecordBatchGenerator generator() = 0; + + protected: + using ExecNode::ExecNode; +}; + +ARROW_TESTING_EXPORT +RecordBatchCollectNode* MakeRecordBatchCollectNode(ExecPlan* plan, std::string label, + const std::shared_ptr& schema); + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/type_fwd.h b/cpp/src/arrow/compute/type_fwd.h index 4f4393486ff..5370837f1b9 100644 --- a/cpp/src/arrow/compute/type_fwd.h +++ b/cpp/src/arrow/compute/type_fwd.h @@ -29,6 +29,7 @@ struct FunctionOptions; struct CastOptions; +struct ExecBatch; class ExecContext; class KernelContext; diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index 7eb318c8b41..d541209a314 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -81,6 +81,7 @@ class RecordBatchReader; class Table; struct Datum; +struct ValueDescr; using ChunkedArrayVector = std::vector>; using RecordBatchVector = std::vector>; diff --git a/cpp/src/arrow/util/iterator_test.cc b/cpp/src/arrow/util/iterator_test.cc index 60b57dea1e2..ab62fcb7034 100644 --- a/cpp/src/arrow/util/iterator_test.cc +++ b/cpp/src/arrow/util/iterator_test.cc @@ -31,6 +31,7 @@ #include "arrow/util/iterator.h" #include "arrow/util/test_common.h" #include "arrow/util/vector.h" + namespace arrow { template