diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 502f0d06693..92654ffb252 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -287,6 +287,107 @@ bool ExecNode::ErrorIfNotOk(Status status) { return true; } +MapNode::MapNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, bool async_mode) + : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, + std::move(output_schema), + /*num_outputs=*/1) { + if (async_mode) { + executor_ = plan_->exec_context()->executor(); + } else { + executor_ = nullptr; + } +} + +void MapNode::ErrorReceived(ExecNode* input, Status error) { + DCHECK_EQ(input, inputs_[0]); + outputs_[0]->ErrorReceived(this, std::move(error)); +} + +void MapNode::InputFinished(ExecNode* input, int total_batches) { + DCHECK_EQ(input, inputs_[0]); + outputs_[0]->InputFinished(this, total_batches); + if (input_counter_.SetTotal(total_batches)) { + this->Finish(); + } +} + +Status MapNode::StartProducing() { return Status::OK(); } + +void MapNode::PauseProducing(ExecNode* output) {} + +void MapNode::ResumeProducing(ExecNode* output) {} + +void MapNode::StopProducing(ExecNode* output) { + DCHECK_EQ(output, outputs_[0]); + StopProducing(); +} + +void MapNode::StopProducing() { + if (executor_) { + this->stop_source_.RequestStop(); + } + if (input_counter_.Cancel()) { + this->Finish(); + } + inputs_[0]->StopProducing(this); +} + +Future<> MapNode::finished() { return finished_; } + +void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, + ExecBatch batch) { + Status status; + if (finished_.is_finished()) { + return; + } + auto task = [this, map_fn, batch]() { + auto guarantee = batch.guarantee; + auto output_batch = map_fn(std::move(batch)); + if (ErrorIfNotOk(output_batch.status())) { + return output_batch.status(); + } + output_batch->guarantee = guarantee; + outputs_[0]->InputReceived(this, output_batch.MoveValueUnsafe()); + return Status::OK(); + }; + + if (executor_) { + status = task_group_.AddTask([this, task]() -> Result> { + return this->executor_->Submit(this->stop_source_.token(), [this, task]() { + auto status = task(); + if (this->input_counter_.Increment()) { + this->Finish(status); + } + return status; + }); + }); + } else { + status = task(); + if (input_counter_.Increment()) { + this->Finish(status); + } + } + if (!status.ok()) { + if (input_counter_.Cancel()) { + this->Finish(status); + } + inputs_[0]->StopProducing(this); + return; + } +} + +void MapNode::Finish(Status finish_st /*= Status::OK()*/) { + if (executor_) { + task_group_.End().AddCallback([this, finish_st](const Status& st) { + Status final_status = finish_st & st; + this->finished_.MarkFinished(final_status); + }); + } else { + this->finished_.MarkFinished(finish_st); + } +} + std::shared_ptr MakeGeneratorReader( std::shared_ptr schema, std::function>()> gen, MemoryPool* pool) { diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 73611bc992c..cad087ef679 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -23,13 +23,17 @@ #include #include "arrow/compute/exec.h" +#include "arrow/compute/exec/util.h" #include "arrow/compute/type_fwd.h" #include "arrow/type_fwd.h" +#include "arrow/util/async_util.h" +#include "arrow/util/cancel.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" #include "arrow/util/visibility.h" namespace arrow { + namespace compute { class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { @@ -243,6 +247,56 @@ class ARROW_EXPORT ExecNode { NodeVector outputs_; }; +/// \brief MapNode is an ExecNode type class which process a task like filter/project +/// (See SubmitTask method) to each given ExecBatch object, which have one input, one +/// output, and are pure functions on the input +/// +/// A simple parallel runner is created with a "map_fn" which is just a function that +/// takes a batch in and returns a batch. This simple parallel runner also needs an +/// executor (use simple synchronous runner if there is no executor) + +class MapNode : public ExecNode { + public: + MapNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, bool async_mode); + + void ErrorReceived(ExecNode* input, Status error) override; + + void InputFinished(ExecNode* input, int total_batches) override; + + Status StartProducing() override; + + void PauseProducing(ExecNode* output) override; + + void ResumeProducing(ExecNode* output) override; + + void StopProducing(ExecNode* output) override; + + void StopProducing() override; + + Future<> finished() override; + + protected: + void SubmitTask(std::function(ExecBatch)> map_fn, ExecBatch batch); + + void Finish(Status finish_st = Status::OK()); + + protected: + // Counter for the number of batches received + AtomicCounter input_counter_; + + // Future to sync finished + Future<> finished_ = Future<>::Make(); + + // The task group for the corresponding batches + util::AsyncTaskGroup task_group_; + + ::arrow::internal::Executor* executor_; + + // Variable used to cancel remaining tasks in the executor + StopSource stop_source_; +}; + /// \brief An extensible registry for factories of ExecNodes class ARROW_EXPORT ExecFactoryRegistry { public: diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 81fa9b17645..2e6d974dc13 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -21,13 +21,11 @@ #include "arrow/compute/exec.h" #include "arrow/compute/exec/expression.h" #include "arrow/compute/exec/options.h" -#include "arrow/compute/exec/util.h" #include "arrow/datum.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" - namespace arrow { using internal::checked_cast; @@ -35,13 +33,11 @@ using internal::checked_cast; namespace compute { namespace { -class FilterNode : public ExecNode { +class FilterNode : public MapNode { public: FilterNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, Expression filter) - : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, - std::move(output_schema), - /*num_outputs=*/1), + std::shared_ptr output_schema, Expression filter, bool async_mode) + : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), filter_(std::move(filter)) {} static Result Make(ExecPlan* plan, std::vector inputs, @@ -61,9 +57,9 @@ class FilterNode : public ExecNode { filter_expression.ToString(), " evaluates to ", filter_expression.type()->ToString()); } - return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(filter_expression)); + std::move(filter_expression), + filter_options.async_mode); } const char* kind_name() const override { return "FilterNode"; } @@ -98,50 +94,19 @@ class FilterNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - - auto maybe_filtered = DoFilter(std::move(batch)); - if (ErrorIfNotOk(maybe_filtered.status())) return; - - maybe_filtered->guarantee = batch.guarantee; - outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); - } - - void ErrorReceived(ExecNode* input, Status error) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->ErrorReceived(this, std::move(error)); - } - - void InputFinished(ExecNode* input, int total_batches) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->InputFinished(this, total_batches); + auto func = [this](ExecBatch batch) { return DoFilter(std::move(batch)); }; + this->SubmitTask(std::move(func), std::move(batch)); } - Status StartProducing() override { return Status::OK(); } - - void PauseProducing(ExecNode* output) override {} - - void ResumeProducing(ExecNode* output) override {} - - void StopProducing(ExecNode* output) override { - DCHECK_EQ(output, outputs_[0]); - StopProducing(); - } - - void StopProducing() override { inputs_[0]->StopProducing(this); } - - Future<> finished() override { return inputs_[0]->finished(); } - protected: std::string ToStringExtra() const override { return "filter=" + filter_.ToString(); } private: Expression filter_; }; - } // namespace namespace internal { - void RegisterFilterNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("filter", FilterNode::Make)); } diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index cde118acd8a..1fc6db642e0 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -58,10 +58,11 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { /// excluded in the batch emitted by this node. class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { public: - explicit FilterNodeOptions(Expression filter_expression) - : filter_expression(std::move(filter_expression)) {} + explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true) + : filter_expression(std::move(filter_expression)), async_mode(async_mode) {} Expression filter_expression; + bool async_mode; }; /// \brief Make a node which executes expressions on input batches, producing new batches. @@ -73,11 +74,14 @@ class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { public: explicit ProjectNodeOptions(std::vector expressions, - std::vector names = {}) - : expressions(std::move(expressions)), names(std::move(names)) {} + std::vector names = {}, bool async_mode = true) + : expressions(std::move(expressions)), + names(std::move(names)), + async_mode(async_mode) {} std::vector expressions; std::vector names; + bool async_mode; }; /// \brief Make a node which aggregates input batches, optionally grouped by keys. diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 70908dc4924..c675acb3d98 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -37,13 +37,12 @@ using internal::checked_cast; namespace compute { namespace { -class ProjectNode : public ExecNode { +class ProjectNode : public MapNode { public: ProjectNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, std::vector exprs) - : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, - std::move(output_schema), - /*num_outputs=*/1), + std::shared_ptr output_schema, std::vector exprs, + bool async_mode) + : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), exprs_(std::move(exprs)) {} static Result Make(ExecPlan* plan, std::vector inputs, @@ -70,9 +69,9 @@ class ProjectNode : public ExecNode { fields[i] = field(std::move(names[i]), expr.type()); ++i; } - return plan->EmplaceNode(plan, std::move(inputs), - schema(std::move(fields)), std::move(exprs)); + schema(std::move(fields)), std::move(exprs), + project_options.async_mode); } const char* kind_name() const override { return "ProjectNode"; } @@ -91,39 +90,10 @@ class ProjectNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - - auto maybe_projected = DoProject(std::move(batch)); - if (ErrorIfNotOk(maybe_projected.status())) return; - - maybe_projected->guarantee = batch.guarantee; - outputs_[0]->InputReceived(this, maybe_projected.MoveValueUnsafe()); - } - - void ErrorReceived(ExecNode* input, Status error) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->ErrorReceived(this, std::move(error)); - } - - void InputFinished(ExecNode* input, int total_batches) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->InputFinished(this, total_batches); - } - - Status StartProducing() override { return Status::OK(); } - - void PauseProducing(ExecNode* output) override {} - - void ResumeProducing(ExecNode* output) override {} - - void StopProducing(ExecNode* output) override { - DCHECK_EQ(output, outputs_[0]); - StopProducing(); + auto func = [this](ExecBatch batch) { return DoProject(std::move(batch)); }; + this->SubmitTask(std::move(func), std::move(batch)); } - void StopProducing() override { inputs_[0]->StopProducing(this); } - - Future<> finished() override { return inputs_[0]->finished(); } - protected: std::string ToStringExtra() const override { std::stringstream ss; diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index d84f3c44115..e1c10edd8a9 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -274,9 +274,9 @@ struct OrderBySinkNode final : public SinkNode { plan()->exec_context()->memory_pool()); if (ErrorIfNotOk(maybe_batch.status())) { StopProducing(); - bool cancelled = input_counter_.Cancel(); - DCHECK(cancelled); - finished_.MarkFinished(maybe_batch.status()); + if (input_counter_.Cancel()) { + finished_.MarkFinished(maybe_batch.status()); + } return; } auto record_batch = maybe_batch.MoveValueUnsafe(); diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index c601e9fb1e2..984116a0047 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -133,10 +133,13 @@ endif() if(ARROW_BUILD_BENCHMARKS) add_arrow_benchmark(file_benchmark PREFIX "arrow-dataset") + add_arrow_benchmark(scanner_benchmark PREFIX "arrow-dataset") if(ARROW_BUILD_STATIC) target_link_libraries(arrow-dataset-file-benchmark PUBLIC arrow_dataset_static) + target_link_libraries(arrow-dataset-scanner-benchmark PUBLIC arrow_dataset_static) else() target_link_libraries(arrow-dataset-file-benchmark PUBLIC arrow_dataset_shared) + target_link_libraries(arrow-dataset-scanner-benchmark PUBLIC arrow_dataset_static) endif() endif() diff --git a/cpp/src/arrow/dataset/scanner_benchmark.cc b/cpp/src/arrow/dataset/scanner_benchmark.cc new file mode 100644 index 00000000000..e3021794c5d --- /dev/null +++ b/cpp/src/arrow/dataset/scanner_benchmark.cc @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/api.h" +#include "arrow/compute/api.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/plan.h" +#include "arrow/dataset/scanner.h" +#include "arrow/dataset/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/testing/random.h" + +namespace arrow { +namespace compute { + +constexpr auto kSeed = 0x0ff1ce; + +void GenerateBatchesFromSchema(const std::shared_ptr& schema, size_t num_batches, + BatchesWithSchema* out_batches, int multiplicity = 1, + int64_t batch_size = 4) { + ::arrow::random::RandomArrayGenerator rng_(kSeed); + if (num_batches == 0) { + auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0)); + out_batches->batches.push_back(empty_record_batch); + } else { + for (size_t j = 0; j < num_batches; j++) { + out_batches->batches.push_back( + ExecBatch(*rng_.BatchOf(schema->fields(), batch_size))); + } + } + + size_t batch_count = out_batches->batches.size(); + for (int repeat = 1; repeat < multiplicity; ++repeat) { + for (size_t i = 0; i < batch_count; ++i) { + out_batches->batches.push_back(out_batches->batches[i]); + } + } + out_batches->schema = schema; +} + +RecordBatchVector GenerateBatches(const std::shared_ptr& schema, + size_t num_batches, size_t batch_size) { + BatchesWithSchema input_batches; + + RecordBatchVector batches; + GenerateBatchesFromSchema(schema, num_batches, &input_batches, 1, batch_size); + + for (const auto& batch : input_batches.batches) { + batches.push_back(batch.ToRecordBatch(schema).MoveValueUnsafe()); + } + return batches; +} + +} // namespace compute + +namespace dataset { + +static std::map, RecordBatchVector> datasets; + +void StoreBatches(size_t num_batches, size_t batch_size, + const RecordBatchVector& batches) { + datasets[std::make_pair(num_batches, batch_size)] = batches; +} + +RecordBatchVector GetBatches(size_t num_batches, size_t batch_size) { + auto iter = datasets.find(std::make_pair(num_batches, batch_size)); + if (iter == datasets.end()) { + return RecordBatchVector{}; + } + return iter->second; +} + +std::shared_ptr GetSchema() { + static std::shared_ptr s = schema({field("a", int32()), field("b", boolean())}); + return s; +} + +size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); } + +void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) { + // NB: This test is here for didactic purposes + + // Specify a MemoryPool and ThreadPool for the ExecPlan + compute::ExecContext exec_context(default_memory_pool(), + ::arrow::internal::GetCpuThreadPool()); + + // ensure arrow::dataset node factories are in the registry + ::arrow::dataset::internal::Initialize(); + + // A ScanNode is constructed from an ExecPlan (into which it is inserted), + // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for + // predicate pushdown, a projection to skip materialization of unnecessary columns, + // ...) + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + compute::ExecPlan::Make(&exec_context)); + + RecordBatchVector batches = GetBatches(num_batches, batch_size); + + std::shared_ptr dataset = + std::make_shared(GetSchema(), batches); + + auto options = std::make_shared(); + // sync scanning is not supported by ScanNode + options->use_async = true; + // specify the filter + compute::Expression b_is_true = field_ref("b"); + options->filter = b_is_true; + // for now, specify the projection as the full project expression (eventually this can + // just be a list of materialized field names) + compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); + options->projection = + call("make_struct", {a_times_2}, compute::MakeStructOptions{{"a * 2"}}); + + // construct the scan node + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * scan, + compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); + + // pipe the scan node into a filter node + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * filter, + compute::MakeExecNode("filter", plan.get(), {scan}, + compute::FilterNodeOptions{b_is_true, async_mode})); + + // pipe the filter node into a project node + // NB: we're using the project node factory which preserves fragment/batch index + // tagging, so we *can* reorder later if we choose. The tags will not appear in + // our output. + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * project, + compute::MakeExecNode("augmented_project", plan.get(), {filter}, + compute::ProjectNodeOptions{{a_times_2}, {}, async_mode})); + + // finally, pipe the project node into a sink node + AsyncGenerator> sink_gen; + ASSERT_OK_AND_ASSIGN(compute::ExecNode * sink, + compute::MakeExecNode("sink", plan.get(), {project}, + compute::SinkNodeOptions{&sink_gen})); + + ASSERT_NE(sink, nullptr); + + // translate sink_gen (async) to sink_reader (sync) + std::shared_ptr sink_reader = compute::MakeGeneratorReader( + schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); + + // start the ExecPlan + ASSERT_OK(plan->StartProducing()); + + // collect sink_reader into a Table + ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); + + ASSERT_GT(collected->num_rows(), 0); + + // wait 1s for completion + ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; +} + +static void MinimalEndToEndBench(benchmark::State& state) { + size_t num_batches = state.range(0); + size_t batch_size = state.range(1); + bool async_mode = state.range(2); + + for (auto _ : state) { + MinimalEndToEndScan(num_batches, batch_size, async_mode); + } + state.SetItemsProcessed(state.iterations() * num_batches); + state.SetBytesProcessed(state.iterations() * num_batches * batch_size * + GetBytesForSchema()); +} + +static const std::vector kWorkload = {100, 1000, 10000, 100000}; + +static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark* b) { + for (const int32_t num_batches : kWorkload) { + for (const int batch_size : {10, 100, 1000}) { + for (const bool async_mode : {true, false}) { + b->Args({num_batches, batch_size, async_mode}); + RecordBatchVector batches = + ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size); + StoreBatches(num_batches, batch_size, batches); + } + } + } + b->ArgNames({"num_batches", "batch_size", "async_mode"}); + b->UseRealTime(); +} + +BENCHMARK(MinimalEndToEndBench)->Apply(MinimalEndToEnd_Customize); + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 0c6c3277290..6235cf2fd50 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1402,7 +1402,8 @@ TEST(ScanNode, MinimalEndToEnd) { // NB: This test is here for didactic purposes // Specify a MemoryPool and ThreadPool for the ExecPlan - compute::ExecContext exec_context(default_memory_pool(), GetCpuThreadPool()); + compute::ExecContext exec_context(default_memory_pool(), + ::arrow::internal::GetCpuThreadPool()); // ensure arrow::dataset node factories are in the registry arrow::dataset::internal::Initialize();