diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 1705e854fb1..cb562d19f76 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -342,6 +342,10 @@ if(ARROW_CUDA set(ARROW_IPC ON) endif() +if(ARROW_ENGINE) + set(ARROW_COMPUTE ON) +endif() + if(ARROW_DATASET) set(ARROW_COMPUTE ON) set(ARROW_FILESYSTEM ON) diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 0e92811da8c..b2423cf3c76 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -211,6 +211,8 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") define_option(ARROW_DATASET "Build the Arrow Dataset Modules" OFF) + define_option(ARROW_ENGINE "Build the Arrow Execution Engine" OFF) + define_option(ARROW_FILESYSTEM "Build the Arrow Filesystem Layer" OFF) define_option(ARROW_FLIGHT diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 04756aaf8e9..5c3afc47ccb 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -409,6 +409,10 @@ if(ARROW_COMPUTE) endif() endif() +if(ARROW_ENGINE) + list(APPEND ARROW_SRCS engine/exec_plan.cc engine/query_plan.cc) +endif() + if(ARROW_FILESYSTEM) if(ARROW_HDFS) add_definitions(-DARROW_HDFS) @@ -671,6 +675,10 @@ if(ARROW_DATASET) add_subdirectory(dataset) endif() +if(ARROW_ENGINE) + add_subdirectory(engine) +endif() + if(ARROW_FILESYSTEM) add_subdirectory(filesystem) endif() diff --git a/cpp/src/arrow/compute/type_fwd.h b/cpp/src/arrow/compute/type_fwd.h index 9888e610aa7..0cd836ab944 100644 --- a/cpp/src/arrow/compute/type_fwd.h +++ b/cpp/src/arrow/compute/type_fwd.h @@ -29,6 +29,7 @@ struct FunctionOptions; struct CastOptions; +class ExecBatch; class ExecContext; class KernelContext; diff --git a/cpp/src/arrow/engine/CMakeLists.txt b/cpp/src/arrow/engine/CMakeLists.txt new file mode 100644 index 00000000000..f4bfede6a83 --- /dev/null +++ b/cpp/src/arrow/engine/CMakeLists.txt @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Headers: top level +arrow_install_all_headers("arrow/engine") diff --git a/cpp/src/arrow/engine/api.h b/cpp/src/arrow/engine/api.h new file mode 100644 index 00000000000..0646b210868 --- /dev/null +++ b/cpp/src/arrow/engine/api.h @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/engine/query_plan.h" // IWYU pragma: export diff --git a/cpp/src/arrow/engine/exec_plan.cc b/cpp/src/arrow/engine/exec_plan.cc new file mode 100644 index 00000000000..b3acab9c32b --- /dev/null +++ b/cpp/src/arrow/engine/exec_plan.cc @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/exec_plan.h" + +#include "arrow/compute/exec.h" +#include "arrow/engine/query_plan.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/logging.h" +#include "arrow/util/thread_pool.h" + +namespace arrow { + +using internal::checked_cast; + +namespace engine { + +ExecNode::InputBatch::InputBatch(std::vector> batches) + : batches(std::move(batches)), ready_fut(Future<>::Make()) {} + +ExecNode::ExecNode(ExecPlan* plan, const QueryNode* query_node) + : plan_(plan), + query_node_(query_node), + num_inputs_(query_node->num_inputs()), + finished_fut_(Future<>::Make()) {} + +ExecNode::~ExecNode() = default; + +void ExecNode::ReserveBatches(int32_t num_batches) { + // Should be called with mutex locked + if (static_cast(num_batches) > input_batches_.size()) { + input_batches_.resize(num_batches); + } +} + +ExecNode::InputBatch* ExecNode::EnsureBatch(int32_t batch_index) { + // Should be called with mutex locked + if (input_batches_[batch_index] == nullptr) { + input_batches_[batch_index].reset( + new InputBatch{std::vector>(num_inputs_)}); + } + return input_batches_[batch_index].get(); +} + +Status ExecNode::InputReceived(int32_t input_index, int32_t batch_index, + compute::ExecBatch batch) { + auto lock = mutex_.Lock(); + + if (input_index >= num_inputs_) { + return Status::Invalid("Invalid input index"); + } + if (finish_at_ >= 0 && batch_index >= finish_at_) { + return Status::Invalid("Input batch index out of bounds"); + } + + ReserveBatches(batch_index + 1); + auto* input_batch = EnsureBatch(batch_index); + + if (input_batch->batches[input_index].has_value()) { + return Status::Invalid("Batch #", batch_index, " for input #", input_index, + " already received"); + } + input_batch->batches[input_index] = std::move(batch); + if (++input_batch->num_ready == num_inputs_) { + input_batch->ready_fut.MarkFinished(); + } + return Status::OK(); +} + +Status ExecNode::InputFinished(int32_t num_batches) { + auto lock = mutex_.Lock(); + + if (finish_at_ >= 0) { + return Status::Invalid("InputFinished already called"); + } + finish_at_ = num_batches; + ReserveBatches(num_batches); + + std::vector> batch_futures; + for (int32_t i = 0; i < num_batches; ++i) { + auto* input_batch = EnsureBatch(i); + batch_futures[i] = input_batch->ready_fut; + } + + // TODO lifetime + AllComplete(std::move(batch_futures)) + .AddCallback([this](const Result& res) { + finished_fut_.MarkFinished(res.status()); + }); + return Status::OK(); +} + +Future<> ExecNode::RunAsync(int32_t batch_index, internal::Executor* executor) { + auto lock = mutex_.Lock(); + + ReserveBatches(batch_index + 1); + auto* input_batch = EnsureBatch(batch_index); + + // TODO lifetime (take strong ref to ExecPlan?) + return executor->Transfer(input_batch->ready_fut) + .Then([this, batch_index](...) -> Status { return RunSyncInternal(batch_index); }); +} + +Future<> ExecNode::FinishAsync(internal::Executor* executor) { + auto lock = mutex_.Lock(); + + // TODO lifetime + return executor->Transfer(finished_fut_).Then([this](...) -> Status { + return FinishSyncInternal(); + }); +} + +} // namespace engine +} // namespace arrow diff --git a/cpp/src/arrow/engine/exec_plan.h b/cpp/src/arrow/engine/exec_plan.h new file mode 100644 index 00000000000..97b9e701d2d --- /dev/null +++ b/cpp/src/arrow/engine/exec_plan.h @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/compute/type_fwd.h" +#include "arrow/type_fwd.h" +#include "arrow/util/future.h" +#include "arrow/util/macros.h" +#include "arrow/util/mutex.h" +#include "arrow/util/optional.h" +#include "arrow/util/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace engine { + +class ExecNode; +class QueryNode; +class QueryPlan; + +class ARROW_EXPORT ExecPlan { + public: + using NodeVector = std::vector; + + virtual ~ExecPlan() = default; + + /// The query plan this ExecPlan is an instance of + const QueryPlan& query_plan() const { return *query_plan_; } + + compute::ExecContext* context() { return context_; } + + protected: + friend class QueryPlan; + + ExecPlan() = default; + + const std::shared_ptr query_plan_; + compute::ExecContext* context_; +}; + +class ARROW_EXPORT ExecNode { + public: + using NodeVector = std::vector; + + virtual ~ExecNode(); + + /// The query node this ExecNode is an instance of + const QueryNode& query_node() const { return *query_node_; } + + /// This node's exec plan + ExecPlan* plan() { return plan_; } + + /// Transfer input batch to ExecNode + /// + /// When all inputs are received for a given batch_index, the batch is ready + /// for execution. + Status InputReceived(int32_t input_index, int32_t batch_index, + compute::ExecBatch batch); + + /// Mark the inputs finished after the given number of batches. + /// + /// This may be called before all inputs are received. This simply fixes + /// the total number of incoming batches so that the ExecNode knows when + /// it has received all input. + Status InputFinished(int32_t num_batches); + + /// Schedule batch execution once all inputs are received for the given batch_index + /// + /// The returned Future is finished once execution of the batch is finished. + /// Note that execution doesn't necessarily mean that any outputs are produced. + /// Depending on the ExecNode type, outputs may be produced on the fly, + /// or only at the end when all inputs have been received. + Future<> RunAsync(int32_t batch_index, internal::Executor* executor); + + /// Schedule finalization once all batches are executed + /// + /// Return a Future that will be marked finished once all inputs are received + /// and all computation has finished. + /// + /// RunAsync and FinishAsync can be though of as exposing a map/reduce pipeline. + Future<> FinishAsync(internal::Executor* executor); + + protected: + struct InputBatch { + std::vector> batches; + int32_t num_ready = 0; + Future<> ready_fut; + + explicit InputBatch(std::vector> batches); + }; + + ExecNode(ExecPlan* plan, const QueryNode* query_node); + + virtual Status RunSyncInternal(int32_t batch_index) = 0; + virtual Status FinishSyncInternal() = 0; + + InputBatch* EnsureBatch(int32_t batch_index); + void ReserveBatches(int32_t num_batches); + + ExecPlan* plan_; + const QueryNode* const query_node_; + const int32_t num_inputs_; + + // XXX also use a per-input batch mutex? + util::Mutex mutex_; + std::vector> input_batches_; + int32_t finish_at_ = -1; + Future<> finished_fut_; +}; + +} // namespace engine +} // namespace arrow diff --git a/cpp/src/arrow/engine/query_plan.cc b/cpp/src/arrow/engine/query_plan.cc new file mode 100644 index 00000000000..25c61f5e419 --- /dev/null +++ b/cpp/src/arrow/engine/query_plan.cc @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/query_plan.h" + +#include "arrow/engine/exec_plan.h" +#include "arrow/result.h" +#include "arrow/util/checked_cast.h" + +namespace arrow { + +using internal::checked_cast; + +namespace engine { + +namespace { + +struct QueryPlanImpl : public QueryPlan { + explicit QueryPlanImpl(QueryContext* ctx) : context_(ctx) {} + + ~QueryPlanImpl() {} + + QueryContext* context_; + std::vector> nodes_; + NodeVector sources_; + NodeVector sinks_; +}; + +QueryPlanImpl* ToDerived(QueryPlan* ptr) { return checked_cast(ptr); } + +const QueryPlanImpl* ToDerived(const QueryPlan* ptr) { + return checked_cast(ptr); +} + +} // namespace + +Result> QueryPlan::Make(QueryContext* ctx) { + return std::make_shared(ctx); +} + +const QueryPlan::NodeVector& QueryPlan::sources() const { + return ToDerived(this)->sources_; +} + +const QueryPlan::NodeVector& QueryPlan::sinks() const { return ToDerived(this)->sinks_; } + +QueryContext* QueryPlan::context() { return ToDerived(this)->context_; } + +const QueryContext* QueryPlan::context() const { return ToDerived(this)->context_; } + +} // namespace engine +} // namespace arrow diff --git a/cpp/src/arrow/engine/query_plan.h b/cpp/src/arrow/engine/query_plan.h new file mode 100644 index 00000000000..0223d339ee1 --- /dev/null +++ b/cpp/src/arrow/engine/query_plan.h @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/compute/type_fwd.h" +#include "arrow/type_fwd.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace engine { + +class ExecPlan; +class QueryContext; +class QueryNode; + +class ARROW_EXPORT QueryPlan { + public: + using NodeVector = std::vector; + + virtual ~QueryPlan() = default; + + /// Make an empty query plan + static Result> Make(QueryContext*); + + /// The initial inputs + const NodeVector& sources() const; + + /// The final outputs + const NodeVector& sinks() const; + + QueryContext* context(); + const QueryContext* context() const; + + /// Make a concrete execution plan from this query plan + Result> MakeExecPlan(compute::ExecContext*); + + protected: + QueryPlan() = default; +}; + +class ARROW_EXPORT QueryNode { + public: + using NodeVector = std::vector; + + virtual ~QueryNode(); + + virtual const char* kind_name() = 0; + + /// This node's predecessors in the query plan + const NodeVector& inputs() const { return inputs_; } + + /// This node's successors in the query plan + const NodeVector& outputs() const { return outputs_; } + + int num_inputs() const { return static_cast(inputs_.size()); } + + int num_outputs() const { return static_cast(outputs_.size()); } + + /// This node's query plan + QueryPlan* plan() { return plan_; } + + /// \brief An optional label, for display and debugging + /// + /// There is no guarantee that this value is non-empty or unique. + const std::string& label() const { return label_; } + + protected: + friend class QueryPlan; + + QueryNode() = default; + + std::string label_; + QueryPlan* plan_; + NodeVector inputs_; + NodeVector outputs_; +}; + +} // namespace engine +} // namespace arrow