diff --git a/cpp/examples/arrow/compute_register_example.cc b/cpp/examples/arrow/compute_register_example.cc index d8debd9c3e1..1379fa37d08 100644 --- a/cpp/examples/arrow/compute_register_example.cc +++ b/cpp/examples/arrow/compute_register_example.cc @@ -77,6 +77,7 @@ class ExampleNode : public cp::ExecNode { /*output_schema=*/input->output_schema(), /*num_outputs=*/1) {} const char* kind_name() const override { return "ExampleNode"; } + const std::vector& ordering() override { return ExecNode::kNoOrdering; } arrow::Status StartProducing() override { outputs_[0]->InputFinished(this, 0); diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 88d72b11832..c29f6b26a2e 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -391,6 +391,7 @@ if(ARROW_COMPUTE) compute/exec/bloom_filter.cc compute/exec/exec_plan.cc compute/exec/expression.cc + compute/exec/fetch_node.cc compute/exec/filter_node.cc compute/exec/hash_join.cc compute/exec/hash_join_dict.cc diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index cf91bada6c6..a276bece762 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -71,7 +71,7 @@ ExecBatch::ExecBatch(const RecordBatch& batch) } bool ExecBatch::Equals(const ExecBatch& other) const { - return guarantee == other.guarantee && values == other.values; + return index == other.index && guarantee == other.guarantee && values == other.values; } void PrintTo(const ExecBatch& batch, std::ostream* os) { @@ -83,6 +83,9 @@ void PrintTo(const ExecBatch& batch, std::ostream* os) { if (batch.guarantee != literal(true)) { *os << indent << "Guarantee: " << batch.guarantee.ToString() << "\n"; } + if (batch.index != ExecBatch::kNoOrdering) { + *os << indent << "Index: " << batch.index << "\n"; + } int i = 0; for (const Datum& value : batch.values) { diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 12cce42038d..75d7c388f95 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -216,6 +216,26 @@ struct ARROW_EXPORT ExecBatch { /// whether any values are Scalar. int64_t length = 0; + /// Indicates a batch is not part of an ordered stream + static constexpr int32_t kNoOrdering = -1; + /// The index of the exec batch in an ordered stream of batches + /// + /// Several operations can impose an ordering on their output. Because + /// batches travel through the execution graph at different speeds there + /// is no guarantee those batches will arrive in the same order they are + /// emitted. + /// + /// If there is no ordering then the index should be kNoOrdering. If a node rearranges + /// rows within a batch it will destroy the ordering (e.g. a hash-join node) and should + /// set the index of output batches to kNoOrdering. Other nodes which leave + /// row-in-batch ordering alone should maintain the index on their output batches. + /// Nodes that impose an ordering (e.g. sort) should assign index appropriately. + /// + /// An ordering must be monotonic and have no gaps. This can be somewhat tricky to + /// maintain. For example, when filtering, an implementation may need to emit empty + /// batches to maintain correct ordering. + int32_t index = kNoOrdering; + /// \brief The sum of bytes in each buffer referenced by the batch /// /// Note: Scalars are not counted diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 4ce73359d0f..9b438b92989 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -17,6 +17,7 @@ arrow_install_all_headers("arrow/compute/exec") +add_arrow_compute_test(accumulation_queue PREFIX "arrow-compute" SOURCES accumulation_queue_test.cc) add_arrow_compute_test(expression_test PREFIX "arrow-compute" diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.cc b/cpp/src/arrow/compute/exec/accumulation_queue.cc index 192db529428..61031864c19 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.cc +++ b/cpp/src/arrow/compute/exec/accumulation_queue.cc @@ -17,7 +17,13 @@ #include "arrow/compute/exec/accumulation_queue.h" +#include "arrow/util/future.h" +#include "arrow/util/logging.h" + #include +#include +#include +#include namespace arrow { namespace util { @@ -54,5 +60,86 @@ void AccumulationQueue::Clear() { } ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; } + +struct ExecBatchCmp { + bool operator()(const ExecBatch& left, const ExecBatch& right) { + return left.index > right.index; + } +}; + +class OrderedAccumulationQueueImpl : public OrderedAccumulationQueue { + public: + OrderedAccumulationQueueImpl(TaskFactoryCallback create_task, ScheduleCallback schedule) + : create_task_(std::move(create_task)), schedule_(std::move(schedule)) {} + + ~OrderedAccumulationQueueImpl() override = default; + + Status InsertBatch(ExecBatch batch) override { + DCHECK_GE(batch.index, 0); + std::unique_lock lk(mutex_); + if (!processing_ && batch.index == next_index_) { + std::vector next_batch = PopUnlocked(std::move(batch)); + processing_ = true; + lk.unlock(); + return Deliver(std::move(next_batch)); + } + batches_.push(std::move(batch)); + return Status::OK(); + } + + Status CheckDrained() const override { + if (!batches_.empty()) { + return Status::UnknownError( + "Ordered accumulation queue has data remaining after finish"); + } + return Status::OK(); + } + + private: + std::vector PopUnlocked(std::optional batch) { + std::vector popped; + if (batch.has_value()) { + popped.push_back(std::move(*batch)); + next_index_++; + } + while (!batches_.empty() && batches_.top().index == next_index_) { + popped.push_back(std::move(batches_.top())); + batches_.pop(); + next_index_++; + } + return popped; + } + + Status Deliver(std::vector batches) { + ARROW_ASSIGN_OR_RAISE(Task task, create_task_(std::move(batches))); + Task wrapped_task = [this, task = std::move(task)] { + ARROW_RETURN_NOT_OK(task()); + std::unique_lock lk(mutex_); + if (!batches_.empty() && batches_.top().index == next_index_) { + std::vector next_batches = PopUnlocked(std::nullopt); + lk.unlock(); + ARROW_RETURN_NOT_OK(Deliver(std::move(next_batches))); + } else { + processing_ = false; + } + return Status::OK(); + }; + return schedule_(std::move(wrapped_task)); + } + + TaskFactoryCallback create_task_; + ScheduleCallback schedule_; + std::priority_queue, ExecBatchCmp> batches_; + int next_index_ = 0; + bool processing_ = false; + std::mutex mutex_; +}; + +std::unique_ptr OrderedAccumulationQueue::Make( + TaskFactoryCallback create_task, ScheduleCallback schedule) { + return std::make_unique(std::move(create_task), + std::move(schedule)); +} + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.h b/cpp/src/arrow/compute/exec/accumulation_queue.h index 4b23e5ffcac..c682e8e7916 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.h +++ b/cpp/src/arrow/compute/exec/accumulation_queue.h @@ -53,5 +53,63 @@ class AccumulationQueue { std::vector batches_; }; +/// \brief Sequences data and allows for algorithms relying on ordered execution +/// +/// The ordered execution queue will buffer data. Typically, it is used when +/// there is an ordering in place, and we can assume the stream is roughly in +/// order, even though it may be quite jittery. For example, if we are scanning +/// a dataset that is ordered by some column then the ordered accumulation queue +/// can be used even though a parallel dataset scan wouldn't neccesarily produce +/// a perfectly ordered stream due to jittery I/O. +/// +/// The downstream side of the queue is broken into two parts. The first part, +/// which should be relatively fast, runs serially, and creates tasks. The second +/// part, which can be slower, will run these tasks in parallel. +/// +/// For example, if we are doing an ordered group by operation then the serial part +/// will scan the batches to find group boundaries (places where the key value changes) +/// and will slice the input into groups. The second part will actually run the +/// aggregations on the groups and then call the downstream nodes. +/// +/// This node is currently implemented as a pipeline breaker in the sense that it creates +/// new thread tasks. Each downstream task (the slower part) will be run as a new thread +/// task (submitted via the scheduling callback). A more sophisticated implementation +/// could probably be created that only breaks the pipeline when a batch arrives out of +/// order. +class OrderedAccumulationQueue { + public: + using Task = std::function; + using TaskFactoryCallback = std::function(std::vector)>; + using ScheduleCallback = std::function; + + virtual ~OrderedAccumulationQueue() = default; + + /// \brief Insert a new batch into the queue + /// \param batch The batch to insert + /// + /// If the batch is the next batch in the sequence then a new task will be created from + /// all available batches and submitted to the scheduler. + virtual Status InsertBatch(ExecBatch batch) = 0; + /// \brief Ensure the queue has been fully drained + /// + /// If a caller expects to process all data (e.g. not something like a fetch node) then + /// the caller should call this to ensure that all batches have been processed. This is + /// a sanity check to help detect bugs which produce streams of batches with gaps in the + /// sequencing index and is not strictly needed. + virtual Status CheckDrained() const = 0; + + /// \brief Create a new ordered accumulation queue + /// \param create_task The callback to use when a new task needs to be created + /// + /// This callback will run serially and will never be called reentrantly. It will + /// be given a vector of batches and those batches will be in sequence-order. + /// + /// Ideally this callback should be as quick as possible, doing only the work that + /// needs to be truly serialized. The returned task will then be scheduled. + /// \param schedule The callback to use to schedule a new task + static std::unique_ptr Make(TaskFactoryCallback create_task, + ScheduleCallback schedule); +}; + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/accumulation_queue_test.cc b/cpp/src/arrow/compute/exec/accumulation_queue_test.cc new file mode 100644 index 00000000000..6f77e13e453 --- /dev/null +++ b/cpp/src/arrow/compute/exec/accumulation_queue_test.cc @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/accumulation_queue.h" + +#include + +#include +#include +#include +#include +#include + +#include "arrow/testing/gtest_util.h" + +namespace arrow { +namespace util { + +TEST(AccumulationQueue, Basic) { + constexpr int kNumBatches = 1000; + constexpr int kNumIters = 100; + constexpr int kRandomSeed = 42; + + for (int i = 0; i < kNumIters; i++) { + std::vector collected(kNumBatches); + int num_seen = 0; + int num_active_tasks = 0; + std::mutex task_counter_mutex; + std::condition_variable task_counter_cv; + + OrderedAccumulationQueue::TaskFactoryCallback create_collect_task = + [&](std::vector batches) { + int start = num_seen; + num_seen += static_cast(batches.size()); + return [&, start, batches = std::move(batches)]() { + std::move(batches.begin(), batches.end(), collected.begin() + start); + return Status::OK(); + }; + }; + + std::vector threads; + + OrderedAccumulationQueue::ScheduleCallback schedule = + [&](OrderedAccumulationQueue::Task task) { + std::lock_guard lk(task_counter_mutex); + num_active_tasks++; + threads.emplace_back([&, task = std::move(task)] { + ASSERT_OK(task()); + std::lock_guard lk(task_counter_mutex); + if (--num_active_tasks == 0) { + task_counter_cv.notify_one(); + } + }); + return Status::OK(); + }; + + std::unique_ptr ordered_queue = + OrderedAccumulationQueue::Make(std::move(create_collect_task), + std::move(schedule)); + + std::vector test_batches(kNumBatches); + for (int i = 0; i < kNumBatches; i++) { + test_batches[i].index = i; + } + + std::default_random_engine gen(kRandomSeed); + std::shuffle(test_batches.begin(), test_batches.end(), gen); + + for (auto& batch : test_batches) { + ASSERT_OK(ordered_queue->InsertBatch(std::move(batch))); + } + + std::unique_lock lk(task_counter_mutex); + task_counter_cv.wait(lk, [&] { return num_active_tasks == 0; }); + + for (auto& thread : threads) { + thread.join(); + } + + ASSERT_OK(ordered_queue->CheckDrained()); + ASSERT_EQ(kNumBatches, static_cast(collected.size())); + + for (int i = 0; i < kNumBatches; i++) { + ASSERT_EQ(i, collected[i].index); + } + } +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index cca266ad691..54a0b21aeaa 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -137,6 +137,11 @@ class ScalarAggregateNode : public ExecNode { const char* kind_name() const override { return "ScalarAggregateNode"; } + // There is currently no meaningful ordering to the output of the scalar aggregate + // although in the future we may want to allow sorting here since we will have already + // gathered all the data + const std::vector& ordering() override { return ExecNode::kNoOrdering; } + Status DoConsume(const ExecSpan& batch, size_t thread_index) { util::tracing::Span span; START_COMPUTE_SPAN(span, "Consume", @@ -211,7 +216,7 @@ class ScalarAggregateNode : public ExecNode { void StopProducing(ExecNode* output) override { DCHECK_EQ(output, outputs_[0]); - StopProducing(); + inputs_[0]->StopProducing(this); } void StopProducing() override { @@ -360,6 +365,10 @@ class GroupByNode : public ExecNode { const char* kind_name() const override { return "GroupByNode"; } + // There is currently no ordering assigned to the output although we may want + // to consider a future addition to allow ordering by grouping keys + const std::vector& ordering() override { return ExecNode::kNoOrdering; } + Status Consume(ExecSpan batch) { util::tracing::Span span; START_COMPUTE_SPAN(span, "Consume", diff --git a/cpp/src/arrow/compute/exec/asof_join_node.cc b/cpp/src/arrow/compute/exec/asof_join_node.cc index 35e7b1c6cc6..a0977be4439 100644 --- a/cpp/src/arrow/compute/exec/asof_join_node.cc +++ b/cpp/src/arrow/compute/exec/asof_join_node.cc @@ -872,6 +872,8 @@ class AsofJoinNode : public ExecNode { return indices_of_by_key_; } + const std::vector& ordering() override { return indices_of_on_key_; } + static Status is_valid_on_field(const std::shared_ptr& field) { switch (field->type()->id()) { case Type::INT8: @@ -1114,7 +1116,9 @@ class AsofJoinNode : public ExecNode { void ResumeProducing(ExecNode* output, int32_t counter) override {} void StopProducing(ExecNode* output) override { DCHECK_EQ(output, outputs_[0]); - StopProducing(); + for (auto input : inputs_) { + input->StopProducing(this); + } } void StopProducing() override { process_.Clear(); diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 057e1ace5cd..745dbd6139e 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -31,10 +31,12 @@ #include "arrow/datum.h" #include "arrow/record_batch.h" #include "arrow/result.h" +#include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "arrow/util/tracing_internal.h" +#include "arrow/util/vector.h" namespace arrow { @@ -420,6 +422,11 @@ ExecNode::ExecNode(ExecPlan* plan, NodeVector inputs, } } +const std::vector ExecNode::kImplicitOrdering = { + ExecNode::kImplicitOrderingColumn}; + +const std::vector ExecNode::kNoOrdering = {}; + Status ExecNode::Init() { return Status::OK(); } Status ExecNode::Validate() const { @@ -475,16 +482,10 @@ bool ExecNode::ErrorIfNotOk(Status status) { } MapNode::MapNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, bool async_mode) + std::shared_ptr output_schema) : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, std::move(output_schema), - /*num_outputs=*/1) { - if (async_mode) { - executor_ = plan_->exec_context()->executor(); - } else { - executor_ = nullptr; - } -} + /*num_outputs=*/1) {} void MapNode::ErrorReceived(ExecNode* input, Status error) { DCHECK_EQ(input, inputs_[0]); @@ -518,14 +519,12 @@ void MapNode::ResumeProducing(ExecNode* output, int32_t counter) { void MapNode::StopProducing(ExecNode* output) { DCHECK_EQ(output, outputs_[0]); - StopProducing(); + inputs_[0]->StopProducing(this); } void MapNode::StopProducing() { EVENT(span_, "StopProducing"); - if (executor_) { - this->stop_source_.RequestStop(); - } + this->stop_source_.RequestStop(); if (input_counter_.Cancel()) { this->Finish(); } @@ -542,10 +541,12 @@ void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, } auto task = [this, map_fn, batch]() { auto guarantee = batch.guarantee; + auto index = batch.index; auto output_batch = map_fn(std::move(batch)); if (ErrorIfNotOk(output_batch.status())) { return output_batch.status(); } + output_batch->index = index; output_batch->guarantee = guarantee; outputs_[0]->InputReceived(this, output_batch.MoveValueUnsafe()); return Status::OK(); @@ -568,6 +569,8 @@ void MapNode::Finish(Status finish_st /*= Status::OK()*/) { this->finished_.MarkFinished(finish_st); } +const std::vector& MapNode::ordering() { return inputs_[0]->ordering(); } + std::shared_ptr MakeGeneratorReader( std::shared_ptr schema, std::function>()> gen, MemoryPool* pool) { @@ -647,9 +650,65 @@ bool Declaration::IsValid(ExecFactoryRegistry* registry) const { return !this->factory_name.empty() && this->options != nullptr; } +Future> DeclarationToTableAsync(Declaration declaration, + ExecContext* exec_context) { + std::shared_ptr> output_table = + std::make_shared>(); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(exec_context)); + Declaration with_sink = Declaration::Sequence( + {declaration, {"table_sink", TableSinkNodeOptions(output_table.get())}}); + ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get())); + ARROW_RETURN_NOT_OK(exec_plan->StartProducing()); + return exec_plan->finished().Then([exec_plan, output_table] { return *output_table; }); +} + +Result> DeclarationToTable(Declaration declaration, + ExecContext* exec_context) { + return DeclarationToTableAsync(std::move(declaration), exec_context).result(); +} + +Future>> DeclarationToBatchesAsync( + Declaration declaration, ExecContext* exec_context) { + return DeclarationToTableAsync(std::move(declaration), exec_context) + .Then([](const std::shared_ptr& table) { + return TableBatchReader(table).ToRecordBatches(); + }); +} + +Result>> DeclarationToBatches( + Declaration declaration, ExecContext* exec_context) { + return DeclarationToBatchesAsync(std::move(declaration), exec_context).result(); +} + +Future> DeclarationToExecBatchesAsync(Declaration declaration, + ExecContext* exec_context) { + AsyncGenerator> sink_gen; + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(exec_context)); + Declaration with_sink = + Declaration::Sequence({declaration, {"sink", SinkNodeOptions(&sink_gen)}}); + ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get())); + ARROW_RETURN_NOT_OK(exec_plan->StartProducing()); + auto collected_fut = CollectAsyncGenerator(sink_gen); + return AllComplete({exec_plan->finished(), Future<>(collected_fut)}) + .Then([collected_fut, exec_plan]() -> Result> { + ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result()); + return ::arrow::internal::MapVector( + [](std::optional batch) { return std::move(*batch); }, + std::move(collected)); + }); +} + +Result> DeclarationToExecBatches(Declaration declaration, + ExecContext* exec_context) { + return DeclarationToExecBatchesAsync(std::move(declaration), exec_context).result(); +} + namespace internal { void RegisterSourceNode(ExecFactoryRegistry*); +void RegisterFetchNode(ExecFactoryRegistry*); void RegisterFilterNode(ExecFactoryRegistry*); void RegisterProjectNode(ExecFactoryRegistry*); void RegisterUnionNode(ExecFactoryRegistry*); @@ -665,6 +724,7 @@ ExecFactoryRegistry* default_exec_factory_registry() { public: DefaultRegistry() { internal::RegisterSourceNode(this); + internal::RegisterFetchNode(this); internal::RegisterFilterNode(this); internal::RegisterProjectNode(this); internal::RegisterUnionNode(this); diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 3ff2340856f..cb27e9f9c31 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -337,6 +337,41 @@ class ARROW_EXPORT ExecNode { /// \brief Stop producing definitively to all outputs virtual void StopProducing() = 0; + static constexpr int32_t kImplicitOrderingColumn = -1; + static const std::vector kImplicitOrdering; + static const std::vector kNoOrdering; + + /// \brief The ordering of the node + /// + /// If a node has an ordering then output batches will be labeled with an `index` + /// which should determine the position of the batch in the stream according to + /// the ordering. + /// + /// An empty vector indicates that there is no ordering for the node. + /// + /// The ordering is a list of column indices which the data is sorted by. For + /// example, an ordering of {1, 0} would mean the data is first sorted by column + /// 1 and then by column 0. + /// + /// Nodes which impose an ordering will typically determine their ordering from + /// node options and should return that here. Nodes which pass data through will + /// typically forward the ordering of their input. Nodes which rearrange data and + /// destroy any ordering should return an empty vector. + /// + /// There is a special ordering case, represnted by the vector {kImplicitOrderingColumn} + /// which is used to represent the "implicit order" of the data. For example, if the + /// input to Acero is an in-memory table then the implicit ordering is the order of the + /// data according to the row number of the table (even though row number may not be a + /// column). If the input is a dataset then the "implicit order" is the ordering + /// (fragment_index,batch_index). Effectively, the implicit order can be any ordering + /// which is not represented by columns in the dataset. + /// + /// The implict ordering cannot be used, for example, to implement an ordered streaming + /// group by or an ordered streaming join. However, it can be used for things like a + /// fetch node or to guaranteed the output of the plan is reassambled in the same input + /// order. + virtual const std::vector& ordering() = 0; + /// \brief A future which will be marked finished when this node has stopped producing. virtual Future<> finished() { return finished_; } @@ -380,7 +415,7 @@ class ARROW_EXPORT ExecNode { class ARROW_EXPORT MapNode : public ExecNode { public: MapNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, bool async_mode); + std::shared_ptr output_schema); void ErrorReceived(ExecNode* input, Status error) override; @@ -396,6 +431,8 @@ class ARROW_EXPORT MapNode : public ExecNode { void StopProducing() override; + const std::vector& ordering() override; + protected: void SubmitTask(std::function(ExecBatch)> map_fn, ExecBatch batch); @@ -405,8 +442,6 @@ class ARROW_EXPORT MapNode : public ExecNode { // Counter for the number of batches received AtomicCounter input_counter_; - ::arrow::internal::Executor* executor_; - // Variable used to cancel remaining tasks in the executor StopSource stop_source_; }; @@ -525,6 +560,39 @@ struct ARROW_EXPORT Declaration { std::string label; }; +/// \brief Utility method to run a declaration and collect the results into a table +/// +/// This method will add a sink node to the declaration to collect results into a +/// table. It will then create an ExecPlan from the declaration, start the exec plan, +/// block until the plan has finished, and return the created table. +Result> DeclarationToTable( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Asynchronous version of \see DeclarationToTable +Future> DeclarationToTableAsync( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Utility method to run a declaration and collect the results into ExecBatch +/// vector +/// +/// \see DeclarationToTable for details +Result> DeclarationToExecBatches( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Asynchronous version of \see DeclarationToExecBatches +Future> DeclarationToExecBatchesAsync( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Utility method to run a declaration and collect the results into a vector +/// +/// \see DeclarationToTable for details +Result>> DeclarationToBatches( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Asynchronous version of \see DeclarationToBatches +Future>> DeclarationToBatchesAsync( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + /// \brief Wrap an ExecBatch generator in a RecordBatchReader. /// /// The RecordBatchReader does not impose any ordering on emitted batches. diff --git a/cpp/src/arrow/compute/exec/expression.cc b/cpp/src/arrow/compute/exec/expression.cc index d23838303f7..a0b8a6b4f5a 100644 --- a/cpp/src/arrow/compute/exec/expression.cc +++ b/cpp/src/arrow/compute/exec/expression.cc @@ -76,9 +76,15 @@ Expression call(std::string function, std::vector arguments, return Expression(std::move(call)); } -const Datum* Expression::literal() const { return std::get_if(impl_.get()); } +const Datum* Expression::literal() const { + if (impl_ == nullptr) return nullptr; + + return std::get_if(impl_.get()); +} const Expression::Parameter* Expression::parameter() const { + if (impl_ == nullptr) return nullptr; + return std::get_if(impl_.get()); } @@ -90,6 +96,8 @@ const FieldRef* Expression::field_ref() const { } const Expression::Call* Expression::call() const { + if (impl_ == nullptr) return nullptr; + return std::get_if(impl_.get()); } @@ -456,13 +464,14 @@ Result Expression::Bind(const Schema& in_schema, } Result MakeExecBatch(const Schema& full_schema, const Datum& partial, - Expression guarantee) { + int32_t index, Expression guarantee) { ExecBatch out; if (partial.kind() == Datum::RECORD_BATCH) { const auto& partial_batch = *partial.record_batch(); out.guarantee = std::move(guarantee); out.length = partial_batch.num_rows(); + out.index = index; ARROW_ASSIGN_OR_RAISE(auto known_field_values, ExtractKnownFieldValues(out.guarantee)); @@ -503,14 +512,14 @@ Result MakeExecBatch(const Schema& full_schema, const Datum& partial, ARROW_ASSIGN_OR_RAISE(auto partial_batch, RecordBatch::FromStructArray(partial.make_array())); - return MakeExecBatch(full_schema, partial_batch, std::move(guarantee)); + return MakeExecBatch(full_schema, partial_batch, index, std::move(guarantee)); } if (partial.is_scalar()) { ARROW_ASSIGN_OR_RAISE(auto partial_array, MakeArrayFromScalar(*partial.scalar(), 1)); - ARROW_ASSIGN_OR_RAISE( - auto out, MakeExecBatch(full_schema, partial_array, std::move(guarantee))); + ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema, partial_array, index, + std::move(guarantee))); for (Datum& value : out.values) { if (value.is_scalar()) continue; @@ -526,7 +535,8 @@ Result MakeExecBatch(const Schema& full_schema, const Datum& partial, Result ExecuteScalarExpression(const Expression& expr, const Schema& full_schema, const Datum& partial_input, compute::ExecContext* exec_context) { - ARROW_ASSIGN_OR_RAISE(auto input, MakeExecBatch(full_schema, partial_input)); + ARROW_ASSIGN_OR_RAISE( + auto input, MakeExecBatch(full_schema, partial_input, ExecBatch::kNoOrdering)); return ExecuteScalarExpression(expr, input, exec_context); } diff --git a/cpp/src/arrow/compute/exec/expression.h b/cpp/src/arrow/compute/exec/expression.h index d49fe5c893e..39453a2dfcc 100644 --- a/cpp/src/arrow/compute/exec/expression.h +++ b/cpp/src/arrow/compute/exec/expression.h @@ -100,6 +100,8 @@ class ARROW_EXPORT Expression { // XXX someday // Result GetPipelines(); + bool is_valid() const { return impl_ != NULLPTR; } + /// Access a Call or return nullptr if this expression is not a call const Call* call() const; /// Access a Datum or return nullptr if this expression is not a literal @@ -226,7 +228,7 @@ Result SimplifyWithGuarantee(Expression, /// RecordBatch which may have missing or incorrectly ordered columns. /// Missing fields will be replaced with null scalars. ARROW_EXPORT Result MakeExecBatch(const Schema& full_schema, - const Datum& partial, + const Datum& partial, int32_t index, Expression guarantee = literal(true)); /// Execute a scalar expression against the provided state and input ExecBatch. This @@ -277,6 +279,53 @@ ARROW_EXPORT Expression or_(Expression lhs, Expression rhs); ARROW_EXPORT Expression or_(const std::vector&); ARROW_EXPORT Expression not_(Expression operand); +/// Modify an Expression with pre-order and post-order visitation. +/// `pre` will be invoked on each Expression. `pre` will visit Calls before their +/// arguments, `post_call` will visit Calls (and no other Expressions) after their +/// arguments. Visitors should return the Identical expression to indicate no change; this +/// will prevent unnecessary construction in the common case where a modification is not +/// possible/necessary/... +/// +/// If an argument was modified, `post_call` visits a reconstructed Call with the modified +/// arguments but also receives a pointer to the unmodified Expression as a second +/// argument. If no arguments were modified the unmodified Expression* will be nullptr. +template +Result Modify(Expression expr, const PreVisit& pre, + const PostVisitCall& post_call) { + ARROW_ASSIGN_OR_RAISE(expr, Result(pre(std::move(expr)))); + + auto call = expr.call(); + if (!call) return expr; + + bool at_least_one_modified = false; + std::vector modified_arguments; + + for (size_t i = 0; i < call->arguments.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(auto modified_argument, + Modify(call->arguments[i], pre, post_call)); + + if (Identical(modified_argument, call->arguments[i])) { + continue; + } + + if (!at_least_one_modified) { + modified_arguments = call->arguments; + at_least_one_modified = true; + } + + modified_arguments[i] = std::move(modified_argument); + } + + if (at_least_one_modified) { + // reconstruct the call expression with the modified arguments + auto modified_call = *call; + modified_call.arguments = std::move(modified_arguments); + return post_call(Expression(std::move(modified_call)), &expr); + } + + return post_call(std::move(expr), nullptr); +} + /// @} } // namespace compute diff --git a/cpp/src/arrow/compute/exec/expression_internal.h b/cpp/src/arrow/compute/exec/expression_internal.h index 027c954c6d0..9e29b8e27f9 100644 --- a/cpp/src/arrow/compute/exec/expression_internal.h +++ b/cpp/src/arrow/compute/exec/expression_internal.h @@ -287,52 +287,5 @@ inline Result> GetFunction( return GetCastFunction(*to_type); } -/// Modify an Expression with pre-order and post-order visitation. -/// `pre` will be invoked on each Expression. `pre` will visit Calls before their -/// arguments, `post_call` will visit Calls (and no other Expressions) after their -/// arguments. Visitors should return the Identical expression to indicate no change; this -/// will prevent unnecessary construction in the common case where a modification is not -/// possible/necessary/... -/// -/// If an argument was modified, `post_call` visits a reconstructed Call with the modified -/// arguments but also receives a pointer to the unmodified Expression as a second -/// argument. If no arguments were modified the unmodified Expression* will be nullptr. -template -Result Modify(Expression expr, const PreVisit& pre, - const PostVisitCall& post_call) { - ARROW_ASSIGN_OR_RAISE(expr, Result(pre(std::move(expr)))); - - auto call = expr.call(); - if (!call) return expr; - - bool at_least_one_modified = false; - std::vector modified_arguments; - - for (size_t i = 0; i < call->arguments.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(auto modified_argument, - Modify(call->arguments[i], pre, post_call)); - - if (Identical(modified_argument, call->arguments[i])) { - continue; - } - - if (!at_least_one_modified) { - modified_arguments = call->arguments; - at_least_one_modified = true; - } - - modified_arguments[i] = std::move(modified_argument); - } - - if (at_least_one_modified) { - // reconstruct the call expression with the modified arguments - auto modified_call = *call; - modified_call.arguments = std::move(modified_arguments); - return post_call(Expression(std::move(modified_call)), &expr); - } - - return post_call(std::move(expr), nullptr); -} - } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/expression_test.cc b/cpp/src/arrow/compute/exec/expression_test.cc index 4cb4c272485..a14ba72afb1 100644 --- a/cpp/src/arrow/compute/exec/expression_test.cc +++ b/cpp/src/arrow/compute/exec/expression_test.cc @@ -174,7 +174,8 @@ TEST(ExpressionUtils, StripOrderPreservingCasts) { TEST(ExpressionUtils, MakeExecBatch) { auto Expect = [](std::shared_ptr partial_batch) { SCOPED_TRACE(partial_batch->ToString()); - ASSERT_OK_AND_ASSIGN(auto batch, MakeExecBatch(*kBoringSchema, partial_batch)); + ASSERT_OK_AND_ASSIGN( + auto batch, MakeExecBatch(*kBoringSchema, partial_batch, ExecBatch::kNoOrdering)); ASSERT_EQ(batch.num_values(), kBoringSchema->num_fields()); for (int i = 0; i < kBoringSchema->num_fields(); ++i) { @@ -218,7 +219,8 @@ TEST(ExpressionUtils, MakeExecBatch) { auto duplicated_names = RecordBatch::Make(schema({GetField("i32"), GetField("i32")}), kNumRows, {i32, i32}); - ASSERT_RAISES(Invalid, MakeExecBatch(*kBoringSchema, duplicated_names)); + ASSERT_RAISES(Invalid, + MakeExecBatch(*kBoringSchema, duplicated_names, ExecBatch::kNoOrdering)); } class WidgetifyOptions : public compute::FunctionOptions { diff --git a/cpp/src/arrow/compute/exec/fetch_node.cc b/cpp/src/arrow/compute/exec/fetch_node.cc new file mode 100644 index 00000000000..b51e68a34ec --- /dev/null +++ b/cpp/src/arrow/compute/exec/fetch_node.cc @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/compute/api_vector.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/accumulation_queue.h" +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/util.h" +#include "arrow/datum.h" +#include "arrow/result.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" + +namespace arrow { + +using internal::checked_cast; + +namespace compute { +namespace { + +class FetchNode : public ExecNode { + public: + FetchNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, int limit) + : ExecNode(plan, std::move(inputs), {"source"}, std::move(output_schema), 1), + limit_(limit) { + accumulation_queue_ = util::OrderedAccumulationQueue::Make( + [this](std::vector batch) { + return CreateFetchTask(std::move(batch)); + }, + [this](util::OrderedAccumulationQueue::Task task) { + auto task_wrapper = [task = std::move(task)](int) { return task(); }; + return plan_->ScheduleTask(task_wrapper); + }); + } + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "FetchNode")); + + if (inputs.empty()) { + return Status::Invalid("Fetch node with no input"); + } + if (inputs[0]->ordering() == ExecNode::kNoOrdering) { + return Status::Invalid( + "A fetch node should not follow nodes that destroy ordering."); + } + const std::shared_ptr& schema = inputs[0]->output_schema(); + const auto& fetch_options = checked_cast(options); + + if (fetch_options.limit < 0) { + return Status::Invalid("FetchOptions::limit must be >= 0"); + } + + return plan->EmplaceNode(plan, std::move(inputs), schema, + fetch_options.limit); + } + + const char* kind_name() const override { return "FetchNode"; } + + void ErrorReceived(ExecNode* input, Status error) override { + outputs_[0]->ErrorReceived(this, std::move(error)); + } + void InputFinished(ExecNode* input, int total_batches) override { + if (input_counter_.SetTotal(total_batches)) { + outputs_[0]->InputFinished(this, total_batches); + finished_.MarkFinished(); + } + } + Status StartProducing() override { return Status::OK(); } + void PauseProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->PauseProducing(this, counter); + } + void ResumeProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->ResumeProducing(this, counter); + } + void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); } + void StopProducing() override { + if (input_counter_.Cancel()) { + finished_.MarkFinished(); + } + } + const std::vector& ordering() override { return inputs_[0]->ordering(); } + + void InputReceived(ExecNode* input, ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); + DCHECK_EQ(input, inputs_[0]); + Status st = accumulation_queue_->InsertBatch(std::move(batch)); + if (!st.ok()) { + if (input_counter_.Cancel()) { + finished_.MarkFinished(st); + } + return; + } + if (input_counter_.Increment()) { + finished_.MarkFinished(); + } + } + + util::OrderedAccumulationQueue::Task CreateFetchTask(std::vector batches) { + int remaining = limit_ - seen_; + int total_delivered_rows = 0; + std::vector to_deliver; + for (const auto& batch : batches) { + if (batch.length > remaining) { + to_deliver.push_back(batch.Slice(0, remaining)); + total_delivered_rows += remaining; + remaining = 0; + break; + } else { + total_delivered_rows += batch.length; + remaining -= batch.length; + to_deliver.push_back(std::move(batch)); + } + } + seen_ += total_delivered_rows; + auto task = [this, task_batches = std::move(to_deliver)] { + for (auto& task_batch : task_batches) { + outputs_[0]->InputReceived(this, std::move(task_batch)); + } + return Status::OK(); + }; + if (seen_ == limit_) { + inputs_[0]->StopProducing(this); + outputs_[0]->InputFinished(this, limit_); + if (input_counter_.SetTotal(limit_)) { + finished_.MarkFinished(); + } + } + return task; + } + + protected: + std::string ToStringExtra(int indent = 0) const override { + std::stringstream ss; + ss << "limit=" << limit_; + return ss.str(); + } + + private: + int limit_; + std::unique_ptr accumulation_queue_; + int seen_ = 0; + AtomicCounter input_counter_; +}; + +} // namespace + +namespace internal { + +void RegisterFetchNode(ExecFactoryRegistry* registry) { + DCHECK_OK(registry->AddFactory("fetch", FetchNode::Make)); +} + +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index b424da35f85..189531cee70 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -37,8 +37,8 @@ namespace { class FilterNode : public MapNode { public: FilterNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, Expression filter, bool async_mode) - : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), + std::shared_ptr output_schema, Expression filter) + : MapNode(plan, std::move(inputs), std::move(output_schema)), filter_(std::move(filter)) {} static Result Make(ExecPlan* plan, std::vector inputs, @@ -60,8 +60,7 @@ class FilterNode : public MapNode { filter_expression.type()->ToString()); } return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(filter_expression), - filter_options.async_mode); + std::move(filter_expression)); } const char* kind_name() const override { return "FilterNode"; } diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 237f13d0a25..5ae78ddf381 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -772,6 +772,7 @@ class HashJoinNode : public ExecNode { } const char* kind_name() const override { return "HashJoinNode"; } + const std::vector& ordering() override { return ExecNode::kNoOrdering; } Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) { std::lock_guard guard(build_side_mutex_); @@ -1020,7 +1021,7 @@ class HashJoinNode : public ExecNode { void StopProducing(ExecNode* output) override { DCHECK_EQ(output, outputs_[0]); - StopProducing(); + inputs_[0]->StopProducing(this); } void StopProducing() override { diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index c5edc0610c5..b6cfcb7b77f 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -57,8 +57,20 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { static Result> FromTable(const Table& table, arrow::internal::Executor*); + /// \brief The schema of the data generated by generator std::shared_ptr output_schema; + /// \brief A (potentially asynchronous) source of data std::function>()> generator; + /// \brief The ordering of the data + /// + /// This can be set if `generator` is guaranteed to generate data according to some kind + /// of ordering. The source node will make no attempt to verify this fact but will + /// assign batch indices as if the data is ordered in this way. + /// + /// Data from a source node always has some kind of ordering. The default (an empty + /// vector) will actually assign the implicit ordering to outgoing data. \see + /// ExecNode::ordering for more details + std::vector asserted_ordering; }; /// \brief An extended Source node which accepts a table @@ -84,11 +96,10 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { /// excluded in the batch emitted by this node. class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { public: - explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true) - : filter_expression(std::move(filter_expression)), async_mode(async_mode) {} + explicit FilterNodeOptions(Expression filter_expression) + : filter_expression(std::move(filter_expression)) {} Expression filter_expression; - bool async_mode; }; /// \brief Make a node which executes expressions on input batches, producing new batches. @@ -100,14 +111,28 @@ class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { public: explicit ProjectNodeOptions(std::vector expressions, - std::vector names = {}, bool async_mode = true) - : expressions(std::move(expressions)), - names(std::move(names)), - async_mode(async_mode) {} + std::vector names = {}) + : expressions(std::move(expressions)), names(std::move(names)) {} std::vector expressions; std::vector names; - bool async_mode; +}; + +/// \brief Make a node which only takes the first `limit` results, dicarding the rest +/// +/// This node should not be placed after a node that destroys sequencing information +/// (e.g. a hash-join node) since "the first `limit` results" doesn't make sense in +/// such a situation. +/// +/// This node will buffer some data in order to process data in a serialized fashion +/// to determine "the first `limit` results". In most cases this buffering should be +/// quite small. However, if a source is extremely jittery then the amount of buffering +/// could be greater. +class ARROW_EXPORT FetchNodeOptions : public ExecNodeOptions { + public: + explicit FetchNodeOptions(int limit) : limit(limit) {} + + int limit; }; /// \brief Make a node which aggregates input batches, optionally grouped by keys. diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 1dd071975ee..6b56bba5e68 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -19,6 +19,7 @@ #include #include +#include #include "arrow/compute/exec.h" #include "arrow/compute/exec/exec_plan.h" @@ -1426,6 +1427,34 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) { } } +TEST(ExecPlanExecution, FetchNode) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + std::shared_ptr test_schema = + schema({field("a", int32()), field("b", boolean())}); + auto random_data = + MakeRandomBatches(test_schema, + /*num_batches=*/5, /*batch_size=*/10, /*ordered=*/true); + + ASSERT_OK_AND_ASSIGN( + std::vector filtered_batches, + DeclarationToExecBatches(Declaration::Sequence( + {{"source", + SourceNodeOptions{random_data.schema, random_data.gen(/*parallel=*/true, + /*slow=*/true)}}, + {"fetch", FetchNodeOptions(15)}}))); + + ASSERT_EQ(2, filtered_batches.size()); + + int total_rows = + static_cast(filtered_batches[0].length + filtered_batches[1].length); + ASSERT_EQ(15, total_rows); + + for (const auto& batch : filtered_batches) { + Int32Scalar tag = batch.values[2].scalar_as(); + ASSERT_TRUE(tag.value == 0 || tag.value == 1); + } +} + TEST(ExecPlan, RecordBatchReaderSourceSink) { ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); AsyncGenerator> sink_gen; @@ -1481,5 +1510,42 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) { } } +TEST(BatchOrder, FilterNode) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + std::shared_ptr test_schema = + schema({field("a", int32()), field("b", boolean())}); + auto random_data = + MakeRandomBatches(test_schema, + /*num_batches=*/5, /*batch_size=*/10, /*ordered=*/true); + + // Insert a batch that starts empty + random_data.batches[1] = ExecBatchFromJSON({int32(), boolean()}, "[]"); + random_data.batches[1].values.emplace_back(1); + random_data.batches[1].index = 1; + + // Insert a batch that will be completely filtered + random_data.batches[2] = ExecBatchFromJSON({int32(), boolean()}, "[[-1, false]]"); + random_data.batches[2].values.emplace_back(2); + random_data.batches[2].index = 2; + + ASSERT_OK_AND_ASSIGN( + std::vector filtered_batches, + DeclarationToExecBatches(Declaration::Sequence( + {{"source", + SourceNodeOptions{random_data.schema, random_data.gen(/*parallel=*/true, + /*slow=*/true)}}, + {"filter", FilterNodeOptions(greater(field_ref("a"), literal(0)))}}))); + + std::unordered_set filtered_batch_indices; + for (const auto& batch : filtered_batches) { + filtered_batch_indices.insert(batch.index); + if (batch.index == 2) { + // Sanity check that the filter is actually applied + ASSERT_EQ(0, batch.length); + } + } + ASSERT_EQ(std::unordered_set({0, 1, 2, 3, 4}), filtered_batch_indices); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 76925eb6139..a768f880461 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -40,9 +40,8 @@ namespace { class ProjectNode : public MapNode { public: ProjectNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, std::vector exprs, - bool async_mode) - : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), + std::shared_ptr output_schema, std::vector exprs) + : MapNode(plan, std::move(inputs), std::move(output_schema)), exprs_(std::move(exprs)) {} static Result Make(ExecPlan* plan, std::vector inputs, @@ -71,8 +70,7 @@ class ProjectNode : public MapNode { ++i; } return plan->EmplaceNode(plan, std::move(inputs), - schema(std::move(fields)), std::move(exprs), - project_options.async_mode); + schema(std::move(fields)), std::move(exprs)); } const char* kind_name() const override { return "ProjectNode"; } diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 96a34bff437..0f97293c447 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -152,6 +152,9 @@ class SinkNode : public ExecNode { } [[noreturn]] void StopProducing(ExecNode* output) override { NoOutputs(); } + // There is no output and so there is no ordering + const std::vector& ordering() override { return ExecNode::kNoOrdering; } + void StopProducing() override { EVENT(span_, "StopProducing"); @@ -319,6 +322,9 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { } [[noreturn]] void StopProducing(ExecNode* output) override { NoOutputs(); } + // sink nodes have no output and no ordering + const std::vector& ordering() override { return ExecNode::kNoOrdering; } + void Pause() override { inputs_[0]->PauseProducing(this, ++backpressure_counter_); } void Resume() override { inputs_[0]->ResumeProducing(this, ++backpressure_counter_); } diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 1d51a5c1d28..fcf405c30a2 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include @@ -47,17 +48,25 @@ namespace { struct SourceNode : ExecNode { SourceNode(ExecPlan* plan, std::shared_ptr output_schema, - AsyncGenerator> generator) + AsyncGenerator> generator, + std::vector asserted_ordering) : ExecNode(plan, {}, {}, std::move(output_schema), /*num_outputs=*/1), - generator_(std::move(generator)) {} + generator_(std::move(generator)) { + if (asserted_ordering.size() > 0) { + ordering_ = std::move(asserted_ordering); + } else { + ordering_ = {ExecNode::kImplicitOrderingColumn}; + } + } static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "SourceNode")); const auto& source_options = checked_cast(options); return plan->EmplaceNode(plan, source_options.output_schema, - source_options.generator); + source_options.generator, + source_options.asserted_ordering); } const char* kind_name() const override { return "SourceNode"; } @@ -203,6 +212,7 @@ struct SourceNode : ExecNode { void StopProducing(ExecNode* output) override { DCHECK_EQ(output, outputs_[0]); + std::cout << "Stopping source since downstream no longer needs data" << std::endl; StopProducing(); } @@ -214,6 +224,8 @@ struct SourceNode : ExecNode { } } + const std::vector& ordering() override { return ordering_; } + private: std::mutex mutex_; int32_t backpressure_counter_{0}; @@ -222,11 +234,13 @@ struct SourceNode : ExecNode { bool started_ = false; int batch_count_{0}; AsyncGenerator> generator_; + std::vector ordering_; }; struct TableSourceNode : public SourceNode { TableSourceNode(ExecPlan* plan, std::shared_ptr
table, int64_t batch_size) - : SourceNode(plan, table->schema(), TableGenerator(*table, batch_size)) {} + : SourceNode(plan, table->schema(), TableGenerator(*table, batch_size), + ExecNode::kImplicitOrdering) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 2abe6e9e029..573fa2dac76 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -78,6 +78,8 @@ struct DummyNode : ExecNode { void InputFinished(ExecNode* input, int total_batches) override {} + const std::vector& ordering() override { return ExecNode::kNoOrdering; } + Status StartProducing() override { if (start_producing_) { RETURN_NOT_OK(start_producing_(this)); @@ -219,7 +221,7 @@ BatchesWithSchema MakeNestedBatches() { } BatchesWithSchema MakeRandomBatches(const std::shared_ptr& schema, - int num_batches, int batch_size) { + int num_batches, int batch_size, bool ordered) { BatchesWithSchema out; random::RandomArrayGenerator rng(42); @@ -229,6 +231,11 @@ BatchesWithSchema MakeRandomBatches(const std::shared_ptr& schema, out.batches[i] = ExecBatch(*rng.BatchOf(schema->fields(), batch_size)); // add a tag scalar to ensure the batches are unique out.batches[i].values.emplace_back(i); + if (ordered) { + out.batches[i].index = i; + } else { + out.batches[i].index = ExecBatch::kNoOrdering; + } } out.schema = schema; diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index 5b6e8226b7e..ab954df1e2c 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -106,7 +106,8 @@ BatchesWithSchema MakeNestedBatches(); ARROW_TESTING_EXPORT BatchesWithSchema MakeRandomBatches(const std::shared_ptr& schema, - int num_batches = 10, int batch_size = 4); + int num_batches = 10, int batch_size = 4, + bool ordered = false); ARROW_TESTING_EXPORT BatchesWithSchema MakeBatchesFromString( diff --git a/cpp/src/arrow/compute/exec/tpch_node.cc b/cpp/src/arrow/compute/exec/tpch_node.cc index 40d44dccccf..28fc5767ec0 100644 --- a/cpp/src/arrow/compute/exec/tpch_node.cc +++ b/cpp/src/arrow/compute/exec/tpch_node.cc @@ -3374,6 +3374,8 @@ class TpchNode : public ExecNode { [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } + const std::vector& ordering() override { return ExecNode::kImplicitOrdering; } + Status StartProducing() override { num_running_++; ARROW_RETURN_NOT_OK(generator_->StartProducing( diff --git a/cpp/src/arrow/compute/exec/union_node.cc b/cpp/src/arrow/compute/exec/union_node.cc index e5170c2bc91..9ea571a6969 100644 --- a/cpp/src/arrow/compute/exec/union_node.cc +++ b/cpp/src/arrow/compute/exec/union_node.cc @@ -57,6 +57,8 @@ class UnionNode : public ExecNode { const char* kind_name() const override { return "UnionNode"; } + const std::vector& ordering() override { return ExecNode::kNoOrdering; } + static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast(inputs.size()), diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc index a34a9c62713..5997f2bd385 100644 --- a/cpp/src/arrow/compute/exec/util.cc +++ b/cpp/src/arrow/compute/exec/util.cc @@ -399,7 +399,7 @@ Status TableSinkNodeConsumer::Consume(ExecBatch batch) { } Future<> TableSinkNodeConsumer::Finish() { - ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(batches_)); + ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(schema_, batches_)); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index 2b164269ce6..261a52eb4f5 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -28,7 +28,8 @@ set(ARROW_DATASET_SRCS partition.cc plan.cc projector.cc - scanner.cc) + scanner.cc + scan_node.cc) set(ARROW_DATASET_STATIC_LINK_LIBS) set(ARROW_DATASET_SHARED_LINK_LIBS) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 6faaa953bb3..24912fd3591 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -24,6 +24,7 @@ #include "arrow/dataset/scanner.h" #include "arrow/table.h" #include "arrow/util/bit_util.h" +#include "arrow/util/byte_size.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/make_unique.h" @@ -34,11 +35,22 @@ using internal::checked_pointer_cast; namespace dataset { +const compute::Expression Fragment::kNoPartitionInformation = compute::literal(true); + Fragment::Fragment(compute::Expression partition_expression, std::shared_ptr physical_schema) : partition_expression_(std::move(partition_expression)), physical_schema_(std::move(physical_schema)) {} +Future> Fragment::InspectFragment() { + return Status::NotImplemented("Inspect fragment"); +} + +Future> Fragment::BeginScan( + const FragmentScanRequest& request, const InspectedFragment& inspected_fragment) { + return Status::NotImplemented("New scan method"); +} + Result> Fragment::ReadPhysicalSchema() { { auto lock = physical_schema_mutex_.Lock(); @@ -141,6 +153,37 @@ Future> InMemoryFragment::CountRows( return Future>::MakeFinished(total); } +Future> InMemoryFragment::InspectFragment() { + return std::make_shared(physical_schema_->field_names()); +} + +class InMemoryFragment::Scanner : public FragmentScanner { + public: + explicit Scanner(InMemoryFragment* fragment) : fragment_(fragment) {} + + Future> ScanBatch(int batch_number) override { + return Future>::MakeFinished( + fragment_->record_batches_[batch_number]); + } + + int64_t EstimatedDataBytes(int batch_number) override { + return arrow::util::TotalBufferSize(*fragment_->record_batches_[batch_number]); + } + + int NumBatches() override { + return static_cast(fragment_->record_batches_.size()); + } + + private: + InMemoryFragment* fragment_; +}; + +Future> InMemoryFragment::BeginScan( + const FragmentScanRequest& request, const InspectedFragment& inspected_fragment) { + return Future>::MakeFinished( + std::make_shared(this)); +} + Dataset::Dataset(std::shared_ptr schema, compute::Expression partition_expression) : schema_(std::move(schema)), partition_expression_(std::move(partition_expression)) {} @@ -238,5 +281,141 @@ Result UnionDataset::GetFragmentsImpl(compute::Expression pred return GetFragmentsFromDatasets(children_, predicate); } +namespace { + +class BasicFragmentEvolution : public FragmentEvolutionStrategy { + public: + BasicFragmentEvolution(std::vector ds_to_frag_map, Schema* dataset_schema) + : ds_to_frag_map(std::move(ds_to_frag_map)), dataset_schema(dataset_schema) {} + + Result GetGuarantee( + const std::vector& dataset_schema_selection) const override { + std::vector missing_fields; + for (const FieldPath& path : dataset_schema_selection) { + int top_level_field_idx = path[0]; + if (ds_to_frag_map[top_level_field_idx] < 0) { + missing_fields.push_back( + compute::is_null(compute::field_ref(top_level_field_idx))); + } + } + if (missing_fields.empty()) { + return compute::literal(true); + } + if (missing_fields.size() == 1) { + return missing_fields[0]; + } + return compute::and_(missing_fields); + } + + Result> DevolveSelection( + const std::vector& dataset_schema_selection) const override { + std::vector desired_columns; + for (std::size_t selection_idx = 0; selection_idx < dataset_schema_selection.size(); + selection_idx++) { + const FieldPath& path = dataset_schema_selection[selection_idx]; + int top_level_field_idx = path[0]; + int dest_top_level_idx = ds_to_frag_map[top_level_field_idx]; + if (dest_top_level_idx >= 0) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr field, path.Get(*dataset_schema)); + std::vector dest_path_indices(path.indices()); + dest_path_indices[0] = dest_top_level_idx; + desired_columns.push_back( + FragmentSelectionColumn{FieldPath(dest_path_indices), field->type().get(), + static_cast(selection_idx)}); + } + } + return std::move(desired_columns); + }; + + Result DevolveFilter( + const compute::Expression& filter) const override { + return compute::Modify( + filter, + [&](compute::Expression expr) -> Result { + const FieldRef* ref = expr.field_ref(); + if (ref) { + ARROW_ASSIGN_OR_RAISE(FieldPath path, ref->FindOne(*dataset_schema)); + int top_level_idx = path[0]; + std::vector modified_indices(path.indices()); + modified_indices[0] = ds_to_frag_map[top_level_idx]; + if (modified_indices[0] < 0) { + return Status::Invalid( + "Filter cannot be applied. It refers to a missing field ", + ref->ToString(), + " in a way that cannot be satisfied even though we know that field is " + "null"); + } + return compute::field_ref(FieldRef(std::move(modified_indices))); + } + return std::move(expr); + }, + [](compute::Expression expr, compute::Expression* old_expr) { return expr; }); + }; + + Result EvolveBatch( + const std::shared_ptr& batch, + const std::vector& dataset_selection, + const std::vector& selection) const override { + std::vector columns(dataset_selection.size()); + DCHECK_EQ(batch->num_columns(), static_cast(selection.size())); + // First go through and populate the columns we retrieved + for (int idx = 0; idx < batch->num_columns(); idx++) { + columns[selection[idx].selection_index] = batch->column(idx); + } + // Next go through and fill in the null columns + for (std::size_t idx = 0; idx < dataset_selection.size(); idx++) { + int top_level_idx = dataset_selection[idx][0]; + if (ds_to_frag_map[top_level_idx] < 0) { + columns[idx] = MakeNullScalar( + dataset_schema->field(static_cast(top_level_idx))->type()); + } + } + return compute::ExecBatch(columns, batch->num_rows()); + }; + + std::string ToString() const override { return "basic-fragment-evolution"; } + + std::vector ds_to_frag_map; + Schema* dataset_schema; + + static std::unique_ptr Make( + const std::shared_ptr& dataset_schema, + const std::vector& fragment_column_names) { + std::vector ds_to_frag_map; + std::unordered_map column_names_map; + for (size_t i = 0; i < fragment_column_names.size(); i++) { + column_names_map[fragment_column_names[i]] = static_cast(i); + } + for (int idx = 0; idx < dataset_schema->num_fields(); idx++) { + const std::string& field_name = dataset_schema->field(idx)->name(); + auto column_idx_itr = column_names_map.find(field_name); + if (column_idx_itr == column_names_map.end()) { + ds_to_frag_map.push_back(-1); + } else { + ds_to_frag_map.push_back(column_idx_itr->second); + } + } + return ::arrow::internal::make_unique( + std::move(ds_to_frag_map), dataset_schema.get()); + } +}; + +class BasicDatasetEvolutionStrategy : public DatasetEvolutionStrategy { + std::unique_ptr GetStrategy( + const Dataset& dataset, const Fragment& fragment, + const InspectedFragment& inspected_fragment) override { + return BasicFragmentEvolution::Make(dataset.schema(), + inspected_fragment.column_names); + } + + std::string ToString() const override { return "basic-dataset-evolution"; } +}; + +} // namespace + +std::unique_ptr MakeBasicDatasetEvolutionStrategy() { + return ::arrow::internal::make_unique(); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 62181b60ba4..252e7ac05b3 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -29,6 +29,7 @@ #include "arrow/compute/exec/expression.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/util/future.h" #include "arrow/util/macros.h" #include "arrow/util/mutex.h" @@ -37,17 +38,91 @@ namespace dataset { using RecordBatchGenerator = std::function>()>; +/// \brief Description of a column to scan +struct FragmentSelectionColumn { + /// \brief The path to the column to load + FieldPath path; + /// \brief The type of the column in the dataset schema + /// + /// A format may choose to ignore this field completely. For example, when + /// reading from IPC the reader can just return the column in the data type + /// that is stored on disk. There is no point in doing anything special. + /// + /// However, some formats may be capable of casting on the fly. For example, + /// when reading from CSV, if we know the target type of the column, we can + /// convert from string to the target type as we read. + DataType* requested_type; + /// \brief The index in the output selection of this column + int selection_index; +}; +/// \brief Instructions for scanning a particular fragment +/// +/// The fragment scan request is dervied from ScanV2Options. The main +/// difference is that the scan options are based on the dataset schema +/// while the fragment request is based on the fragment schema. +struct FragmentScanRequest { + /// \brief A row filter + /// + /// The filter expression should be written against the fragment schema. + /// + /// \see ScanV2Options for details on how this filter should be applied + compute::Expression filter = compute::literal(true); + + /// \brief The columns to scan + /// + /// These indices refer to the fragment schema + /// + /// Note: This is NOT a simple list of top-level column indices. + /// For more details \see ScanV2Options + /// + /// If possible a fragment should only read from disk the data needed + /// to satisfy these columns. If a format cannot partially read a nested + /// column (e.g. JSON) then it must apply the column selection (in memory) + /// before returning the scanned batch. + std::vector columns; + /// \brief Options specific to the format being scanned + FragmentScanOptions* format_scan_options; +}; + +class FragmentScanner { + public: + /// This instance will only be destroyed after all ongoing scan futures + /// have been completed. + /// + /// This means any callbacks created as part of the scan can safely + /// capture `this` + virtual ~FragmentScanner() = default; + /// \brief Scan a batch of data from the file + /// \param batch_number The index of the batch to read + virtual Future> ScanBatch(int batch_number) = 0; + /// \brief Calculate an estimate of how many data bytes the given batch will represent + /// + /// "Data bytes" should be the total size of all the buffers once the data has been + /// decoded into the Arrow format. + virtual int64_t EstimatedDataBytes(int batch_number) = 0; + /// \brief The number of batches in the fragment to scan + virtual int NumBatches() = 0; +}; + +struct InspectedFragment { + explicit InspectedFragment(std::vector column_names) + : column_names(std::move(column_names)) {} + std::vector column_names; +}; + /// \brief A granular piece of a Dataset, such as an individual file. /// /// A Fragment can be read/scanned separately from other fragments. It yields a -/// collection of RecordBatches when scanned, encapsulated in one or more -/// ScanTasks. +/// collection of RecordBatches when scanned /// /// Note that Fragments have well defined physical schemas which are reconciled by /// the Datasets which contain them; these physical schemas may differ from a parent /// Dataset's schema and the physical schemas of sibling Fragments. class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { public: + /// \brief An expression that represents no known partition information + static const compute::Expression kNoPartitionInformation; + /// \brief Return the physical schema of the Fragment. /// /// The physical schema is also called the writer schema. @@ -59,6 +134,17 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { virtual Result ScanBatchesAsync( const std::shared_ptr& options) = 0; + /// \brief Inspect a fragment to learn basic information + /// + /// This will be called before a scan and a fragment should attach whatever + /// information will be needed to figure out an evolution strategy. This information + /// will then be passed to the call to BeginScan + virtual Future> InspectFragment(); + + /// \brief Start a scan operation + virtual Future> BeginScan( + const FragmentScanRequest& request, const InspectedFragment& inspected_fragment); + /// \brief Count the number of rows in this fragment matching the filter using metadata /// only. That is, this method may perform I/O, but will not load data. /// @@ -113,6 +199,7 @@ class ARROW_DS_EXPORT FragmentScanOptions { /// RecordBatch. class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: + class Scanner; InMemoryFragment(std::shared_ptr schema, RecordBatchVector record_batches, compute::Expression = compute::literal(true)); explicit InMemoryFragment(RecordBatchVector record_batches, @@ -124,6 +211,11 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { compute::Expression predicate, const std::shared_ptr& options) override; + Future> InspectFragment() override; + Future> BeginScan( + const FragmentScanRequest& request, + const InspectedFragment& inspected_fragment) override; + std::string type_name() const override { return "in-memory"; } protected: @@ -134,6 +226,80 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { /// @} +/// \brief Rules for converting the dataset schema to and from fragment schemas +class ARROW_DS_EXPORT FragmentEvolutionStrategy { + public: + /// This instance will only be destroyed when all scan operations for the + /// fragment have completed. + virtual ~FragmentEvolutionStrategy() = default; + /// \brief A guarantee that applies to all batches of this fragment + /// + /// For example, if a fragment is missing one of the fields in the dataset + /// schema then a typical evolution strategy is to set that field to null. + /// + /// So if the column at index 3 is missing then the guarantee is + /// FieldRef(3) == null + /// + /// Individual field guarantees should be AND'd together and returned + /// as a single expression. + virtual Result GetGuarantee( + const std::vector& dataset_schema_selection) const = 0; + + /// \brief Return a fragment schema selection given a dataset schema selection + /// + /// For example, if the user wants fields 2 & 4 of the dataset schema and + /// in this fragment the field 2 is missing and the field 4 is at index 1 then + /// this should return {1} + virtual Result> DevolveSelection( + const std::vector& dataset_schema_selection) const = 0; + + /// \brief Return a filter expression bound to the fragment schema given + /// a filter expression bound to the dataset schema + /// + /// The dataset scan filter will first be simplified by the guarantee returned + /// by GetGuarantee. This means an evolution that only handles dropping or casting + /// fields doesn't need to do anything here except return the given filter. + /// + /// On the other hand, an evolution that is doing some kind of aliasing will likely + /// need to convert field references in the filter to the aliased field references + /// where appropriate. + virtual Result DevolveFilter( + const compute::Expression& filter) const = 0; + + /// \brief Convert a batch from the fragment schema to the dataset schema + /// + /// Typically this involves casting columns from the data type stored on disk + /// to the data type of the dataset schema. For example, this fragment might + /// have columns stored as int32 and the dataset schema might have int64 for + /// the column. In this case we should cast the column from int32 to int64. + /// + /// Note: A fragment may perform this cast as the data is read from disk. In + /// that case a cast might not be needed. + virtual Result EvolveBatch( + const std::shared_ptr& batch, + const std::vector& dataset_selection, + const std::vector& selection) const = 0; + + /// \brief Return a string description of this strategy + virtual std::string ToString() const = 0; +}; + +/// \brief Lookup to create a FragmentEvolutionStrategy for a given fragment +class ARROW_DS_EXPORT DatasetEvolutionStrategy { + public: + virtual ~DatasetEvolutionStrategy() = default; + /// \brief Create a strategy for evolving from the given fragment + /// to the schema of the given dataset + virtual std::unique_ptr GetStrategy( + const Dataset& dataset, const Fragment& fragment, + const InspectedFragment& inspected_fragment) = 0; + + /// \brief Return a string description of this strategy + virtual std::string ToString() const = 0; +}; + +std::unique_ptr MakeBasicDatasetEvolutionStrategy(); + /// \brief A container of zero or more Fragments. /// /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a @@ -166,6 +332,9 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { virtual Result> ReplaceSchema( std::shared_ptr schema) const = 0; + /// \brief Rules used by this dataset to handle schema evolution + DatasetEvolutionStrategy* evolution_strategy() { return evolution_strategy_.get(); } + virtual ~Dataset() = default; protected: @@ -177,6 +346,8 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { std::shared_ptr schema_; compute::Expression partition_expression_ = compute::literal(true); + std::unique_ptr evolution_strategy_ = + MakeBasicDatasetEvolutionStrategy(); }; /// \addtogroup dataset-implementations diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 23f4d09a9d2..2d50768ce37 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -475,8 +475,8 @@ class TeeNode : public compute::MapNode { TeeNode(compute::ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, std::unique_ptr dataset_writer, - FileSystemDatasetWriteOptions write_options, bool async_mode) - : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), + FileSystemDatasetWriteOptions write_options) + : MapNode(plan, std::move(inputs), std::move(output_schema)), dataset_writer_(std::move(dataset_writer)), write_options_(std::move(write_options)) { std::unique_ptr serial_throttle = @@ -505,8 +505,8 @@ class TeeNode : public compute::MapNode { internal::DatasetWriter::Make(write_options, plan->async_scheduler())); return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(dataset_writer), std::move(write_options), - /*async_mode=*/true); + std::move(dataset_writer), + std::move(write_options)); } const char* kind_name() const override { return "TeeNode"; } diff --git a/cpp/src/arrow/dataset/plan.cc b/cpp/src/arrow/dataset/plan.cc index 01169413f78..805fb25aa71 100644 --- a/cpp/src/arrow/dataset/plan.cc +++ b/cpp/src/arrow/dataset/plan.cc @@ -33,6 +33,7 @@ void Initialize() { auto registry = compute::default_exec_factory_registry(); if (registry) { InitializeScanner(registry); + InitializeScannerV2(registry); InitializeDatasetWriter(registry); } }); diff --git a/cpp/src/arrow/dataset/scan_node.cc b/cpp/src/arrow/dataset/scan_node.cc new file mode 100644 index 00000000000..78a37e0e609 --- /dev/null +++ b/cpp/src/arrow/dataset/scan_node.cc @@ -0,0 +1,421 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/scanner.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/tracing_internal.h" +#include "arrow/util/unreachable.h" + +namespace cp = arrow::compute; + +namespace arrow { + +using internal::checked_cast; + +namespace dataset { + +namespace { + +Result> OutputSchemaFromOptions(const ScanV2Options& options) { + return FieldPath::GetAll(*options.dataset->schema(), options.columns); +} + +// In the future we should support async scanning of fragments. The +// Dataset class doesn't support this yet but we pretend it does here to +// ease future adoption of the feature. +AsyncGenerator> GetFragments(Dataset* dataset, + cp::Expression predicate) { + // In the future the dataset should be responsible for figuring out + // the I/O context. This will allow different I/O contexts to be used + // when scanning different datasets. For example, if we are scanning a + // union of a remote dataset and a local dataset. + const auto& io_context = io::default_io_context(); + auto io_executor = io_context.executor(); + Future> fragments_it_fut = + DeferNotOk(io_executor->Submit( + [dataset, predicate]() -> Result> { + ARROW_ASSIGN_OR_RAISE(FragmentIterator fragments_iter, + dataset->GetFragments(predicate)); + return std::make_shared(std::move(fragments_iter)); + })); + Future>> fragments_gen_fut = + fragments_it_fut.Then([](const std::shared_ptr& fragments_it) + -> Result>> { + ARROW_ASSIGN_OR_RAISE(std::vector> fragments, + fragments_it->ToVector()); + return MakeVectorGenerator(std::move(fragments)); + }); + return MakeFromFuture(std::move(fragments_gen_fut)); +} + +/// \brief A node that scans a dataset +/// +/// The scan node has three groups of io-tasks and one task. +/// +/// The first io-task (listing) fetches the fragments from the dataset. This may be a +/// simple iteration of paths or, if the dataset is described with wildcards, this may +/// involve I/O for listing and walking directory paths. There is one listing io-task per +/// dataset. +/// +/// Ths next step is to fetch the metadata for the fragment. For some formats (e.g. CSV) +/// this may be quite simple (get the size of the file). For other formats (e.g. parquet) +/// this is more involved and requires reading data. There is one metadata io-task per +/// fragment. The metadata io-task creates an AsyncGenerator from the +/// fragment. +/// +/// Once the metadata io-task is done we can issue read io-tasks. Each read io-task +/// requests a single batch of data from the disk by pulling the next Future from the +/// generator. +/// +/// Finally, when the future is fulfilled, we issue a pipeline task to drive the batch +/// through the pipeline. +/// +/// Most of these tasks are io-tasks. They take very few CPU resources and they run on +/// the I/O thread pool. These io-tasks are invisible to the exec plan and so we need to +/// do some custom scheduling. We limit how many fragments we read from at any one time. +/// This is referred to as "fragment readahead". +/// +/// Within a fragment there is usually also some amount of "row readahead". This row +/// readahead is handled by the fragment (and not the scanner) because the exact details +/// of how it is performed depend on the underlying format. +/// +/// When a scan node is aborted (StopProducing) we send a cancel signal to any active +/// fragments. On destruction we continue consuming the fragments until they complete +/// (which should be fairly quick since we cancelled the fragment). This ensures the +/// I/O work is completely finished before the node is destroyed. +class ScanNode : public cp::ExecNode { + public: + ScanNode(cp::ExecPlan* plan, ScanV2Options options, + std::shared_ptr output_schema) + : cp::ExecNode(plan, {}, {}, std::move(output_schema), + /*num_outputs=*/1), + options_(options), + fragments_throttle_( + util::AsyncTaskScheduler::MakeThrottle(options_.fragment_readahead + 1)), + batches_throttle_( + util::AsyncTaskScheduler::MakeThrottle(options_.target_bytes_readahead + 1)) { + if (options.asserted_ordering.empty()) { + ordering_ = ExecNode::kImplicitOrdering; + } else { + ordering_ = options.asserted_ordering; + } + } + + static Result NormalizeAndValidate(const ScanV2Options& options, + compute::ExecContext* ctx) { + ScanV2Options normalized(options); + if (!normalized.dataset) { + return Status::Invalid("Scan options must include a dataset"); + } + + if (options.fragment_readahead < 0) { + return Status::Invalid( + "Fragment readahead may not be less than 0. Set to 0 to disable readahead"); + } + + if (options.target_bytes_readahead < 0) { + return Status::Invalid( + "Batch readahead may not be less than 0. Set to 0 to disable readahead"); + } + + if (!normalized.filter.is_valid()) { + normalized.filter = compute::literal(true); + } + + if (normalized.filter.call() && normalized.filter.IsBound()) { + // There is no easy way to make sure a filter was bound agaisnt the same + // function registry as the one in ctx so we just require it to be unbound + // FIXME - Do we care if it was bound to a different function registry? + return Status::Invalid("Scan filter must be unbound"); + } else if (!normalized.filter.IsBound()) { + ARROW_ASSIGN_OR_RAISE(normalized.filter, + normalized.filter.Bind(*options.dataset->schema(), ctx)); + } // Else we must have some simple filter like literal(true) which might be bound + // but we don't care + + return std::move(normalized); + } + + static Result Make(cp::ExecPlan* plan, std::vector inputs, + const cp::ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "ScanNode")); + const auto& scan_options = checked_cast(options); + ARROW_ASSIGN_OR_RAISE(ScanV2Options normalized_options, + NormalizeAndValidate(scan_options, plan->exec_context())); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr output_schema, + OutputSchemaFromOptions(normalized_options)); + return plan->EmplaceNode(plan, std::move(normalized_options), + std::move(output_schema)); + } + + const char* kind_name() const override { return "ScanNode"; } + + [[noreturn]] static void NoInputs() { + Unreachable("no inputs; this should never be called"); + } + [[noreturn]] void InputReceived(cp::ExecNode*, cp::ExecBatch) override { NoInputs(); } + [[noreturn]] void ErrorReceived(cp::ExecNode*, Status) override { NoInputs(); } + [[noreturn]] void InputFinished(cp::ExecNode*, int) override { NoInputs(); } + + const std::vector& ordering() override { return ordering_; } + + Status Init() override { + // batch_output_ = + // ::arrow::internal::make_unique(this, + // outputs_[0]); + return Status::OK(); + } + + struct ScanState { + std::mutex mutex; + std::shared_ptr fragment_scanner; + std::unique_ptr fragment_evolution; + FragmentScanRequest scan_request; + }; + + struct ScanBatchTask : util::AsyncTaskScheduler::Task { + ScanBatchTask(ScanNode* node, ScanState* scan_state, int batch_index, + int fragment_index) + : node_(node), + scan_(scan_state), + batch_index_(batch_index), + fragment_index_(fragment_index) { + int64_t cost = scan_state->fragment_scanner->EstimatedDataBytes(batch_index_); + // It's possible, though probably a bad idea, for a single batch of a fragment + // to be larger than 2GiB. In that case, it doesn't matter much if we underestimate + // because the largest the throttle can be is 2GiB and thus we will be in "one batch + // at a time" mode anyways which is the best we can do in this case. + cost_ = static_cast( + std::min(cost, static_cast(std::numeric_limits::max()))); + } + + struct IndexedBatch { + int32_t index; + std::shared_ptr batch; + }; + + Result> operator()(util::AsyncTaskScheduler* scheduler) override { + // Prevent concurrent calls to ScanBatch which might not be thread safe + std::lock_guard lk(scan_->mutex); + return scan_->fragment_scanner->ScanBatch(batch_index_) + .Then([this](const std::shared_ptr& batch) { + return IndexBatch(batch); + }) + .Then([this](const IndexedBatch& batch) { return HandleBatch(batch); }); + } + + Future IndexBatch(const std::shared_ptr& batch) { + if (fragment_index_ == 0) { + return IndexedBatch{batch_index_, batch}; + } else { + return node_->frag_idx_to_batch_offset_[fragment_index_ - 1].Then( + [this, batch](int32_t frag_offset) { + return IndexedBatch{batch_index_ + frag_offset, batch}; + }); + } + } + + Status HandleBatch(const IndexedBatch& indexed_batch) { + ARROW_ASSIGN_OR_RAISE( + compute::ExecBatch evolved_batch, + scan_->fragment_evolution->EvolveBatch( + indexed_batch.batch, node_->options_.columns, scan_->scan_request.columns)); + evolved_batch.index = indexed_batch.index; + node_->outputs_[0]->InputReceived(node_, std::move(evolved_batch)); + return Status::OK(); + } + + int cost() const override { return cost_; } + + ScanNode* node_; + ScanState* scan_; + int batch_index_; + int fragment_index_; + int cost_; + }; + + struct ListFragmentTask : util::AsyncTaskScheduler::Task { + ListFragmentTask(ScanNode* node, std::shared_ptr fragment, + int32_t fragment_index) + : node(node), fragment(std::move(fragment)), fragment_index_(fragment_index) {} + + Result> operator()(util::AsyncTaskScheduler* scheduler) override { + return fragment->InspectFragment().Then( + [this, + scheduler](const std::shared_ptr& inspected_fragment) { + return BeginScan(inspected_fragment, scheduler); + }); + } + + Future<> BeginScan(const std::shared_ptr& inspected_fragment, + util::AsyncTaskScheduler* scan_scheduler) { + // Now that we have an inspected fragment we need to use the dataset's evolution + // strategy to figure out how to scan it + scan_state->fragment_evolution = + node->options_.dataset->evolution_strategy()->GetStrategy( + *node->options_.dataset, *fragment, *inspected_fragment); + ARROW_RETURN_NOT_OK(InitFragmentScanRequest()); + return fragment->BeginScan(scan_state->scan_request, *inspected_fragment) + .Then([this, scan_scheduler]( + const std::shared_ptr& fragment_scanner) { + return AddScanTasks(fragment_scanner, scan_scheduler); + }); + } + + Future<> AddScanTasks(const std::shared_ptr& fragment_scanner, + util::AsyncTaskScheduler* scan_scheduler) { + int32_t num_batches = fragment_scanner->NumBatches(); + if (fragment_index_ == 0) { + node->frag_idx_to_batch_offset_[0].MarkFinished(num_batches); + } else { + node->frag_idx_to_batch_offset_[fragment_index_ - 1].AddCallback( + [this, fragment_index = fragment_index_, + num_batches](Result prev_offset) { + DCHECK_OK(prev_offset.status()); + node->frag_idx_to_batch_offset_[fragment_index].MarkFinished(num_batches + + *prev_offset); + }); + } + scan_state->fragment_scanner = fragment_scanner; + ScanState* state_view = scan_state.get(); + // Finish callback keeps the scan state alive until all scan tasks done + struct StateHolder { + Status operator()() { return Status::OK(); } + std::unique_ptr scan_state; + }; + util::AsyncTaskScheduler* frag_scheduler = scan_scheduler->MakeSubScheduler( + StateHolder{std::move(scan_state)}, node->batches_throttle_.get()); + for (int i = 0; i < fragment_scanner->NumBatches(); i++) { + node->num_batches_.fetch_add(1); + frag_scheduler->AddTask( + std::make_unique(node, state_view, i, fragment_index_)); + } + Future<> list_and_scan_node = frag_scheduler->OnFinished(); + frag_scheduler->End(); + // The "list fragments" task doesn't actually end until the fragments are + // all scanned. This allows us to enforce fragment readahead. + return list_and_scan_node; + } + + // Take the dataset options, and the fragment evolution, and figure out exactly how + // we should scan the fragment itself. + Status InitFragmentScanRequest() { + ARROW_ASSIGN_OR_RAISE( + scan_state->scan_request.columns, + scan_state->fragment_evolution->DevolveSelection(node->options_.columns)); + ARROW_ASSIGN_OR_RAISE( + compute::Expression devolution_guarantee, + scan_state->fragment_evolution->GetGuarantee(node->options_.columns)); + ARROW_ASSIGN_OR_RAISE( + compute::Expression simplified_filter, + compute::SimplifyWithGuarantee(node->options_.filter, devolution_guarantee)); + ARROW_ASSIGN_OR_RAISE( + scan_state->scan_request.filter, + scan_state->fragment_evolution->DevolveFilter(std::move(simplified_filter))); + scan_state->scan_request.format_scan_options = node->options_.format_options; + return Status::OK(); + } + + ScanNode* node; + std::shared_ptr fragment; + int32_t fragment_index_; + std::unique_ptr scan_state = arrow::internal::make_unique(); + }; + + Status StartProducing() override { + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.kind", kind_name()}, + {"node.label", label()}, + {"node.output_schema", output_schema()->ToString()}, + {"node.detail", ToString()}}); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_); + AsyncGenerator> frag_gen = + GetFragments(options_.dataset.get(), options_.filter); + util::AsyncTaskScheduler* scan_scheduler = plan_->async_scheduler()->MakeSubScheduler( + [this]() { + outputs_[0]->InputFinished(this, num_batches_.load()); + finished_.MarkFinished(); + return Status::OK(); + }, + fragments_throttle_.get()); + plan_->async_scheduler()->AddAsyncGenerator>( + std::move(frag_gen), + [this, scan_scheduler](const std::shared_ptr& fragment) { + frag_idx_to_batch_offset_.push_back(Future::Make()); + int32_t frag_idx = static_cast(frag_idx_to_batch_offset_.size()) - 1; + scan_scheduler->AddTask( + arrow::internal::make_unique(this, fragment, frag_idx)); + return Status::OK(); + }, + [scan_scheduler]() { + scan_scheduler->End(); + return Status::OK(); + }); + return Status::OK(); + } + + void PauseProducing(ExecNode* output, int32_t counter) override { + // FIXME(TODO) + // Need to ressurect AsyncToggle and then all fragment scanners + // should share the same toggle + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + // FIXME(TODO) + } + + void StopProducing(ExecNode* output) override { + DCHECK_EQ(output, outputs_[0]); + std::cout << "Would stop producing early because requested by upstream" << std::endl; + StopProducing(); + } + + void StopProducing() override {} + + private: + ScanV2Options options_; + std::vector ordering_; + std::atomic num_batches_{0}; + std::vector> frag_idx_to_batch_offset_; + std::unique_ptr fragments_throttle_; + std::unique_ptr batches_throttle_; +}; + +} // namespace + +namespace internal { +void InitializeScannerV2(arrow::compute::ExecFactoryRegistry* registry) { + DCHECK_OK(registry->AddFactory("scan2", ScanNode::Make)); +} +} // namespace internal +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index eb09a986c97..4a2d1d2ce8b 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "arrow/array/array_primitive.h" @@ -67,6 +68,14 @@ std::vector ScanOptions::MaterializedFields() const { return fields; } +std::vector ScanV2Options::AllColumns(const Dataset& dataset) { + std::vector selection(dataset.schema()->num_fields()); + for (std::size_t i = 0; i < selection.size(); i++) { + selection[i] = {static_cast(i)}; + } + return selection; +} + namespace { class ScannerRecordBatchReader : public RecordBatchReader { public: @@ -926,7 +935,8 @@ Result MakeScanNode(compute::ExecPlan* plan, ARROW_ASSIGN_OR_RAISE( std::optional batch, compute::MakeExecBatch(*scan_options->dataset_schema, - partial.record_batch.value, guarantee)); + partial.record_batch.value, + compute::ExecBatch::kNoOrdering, guarantee)); // tag rows with fragment- and batch-of-origin batch->values.emplace_back(partial.fragment.index); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 646cc0de72e..9536972b498 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -54,6 +54,7 @@ constexpr int64_t kDefaultBatchSize = 1 << 17; // 128Ki rows // This will yield 64 batches ~ 8Mi rows constexpr int32_t kDefaultBatchReadahead = 16; constexpr int32_t kDefaultFragmentReadahead = 4; +constexpr int32_t kDefaultBytesReadahead = 1 << 25; // 32MiB /// Scan-specific options, which can be changed between scans of the same dataset. struct ARROW_DS_EXPORT ScanOptions { @@ -137,6 +138,122 @@ struct ARROW_DS_EXPORT ScanOptions { compute::BackpressureOptions::DefaultBackpressure(); }; +/// Scan-specific options, which can be changed between scans of the same dataset. +/// +/// A dataset consists of one or more individual fragments. A fragment is anything +/// that is indepedently scannable, often a file. +/// +/// Batches from all fragments will be converted to a single schema. This unified +/// schema is referred to as the "dataset schema" and is the output schema for +/// this node. +/// +/// Individual fragments may have schemas that are different from the dataset +/// schema. This is sometimes referred to as the physical or fragment schema. +/// Conversion from the fragment schema to the dataset schema is a process +/// known as evolution. +struct ARROW_DS_EXPORT ScanV2Options : public compute::ExecNodeOptions { + explicit ScanV2Options(std::shared_ptr dataset) + : dataset(std::move(dataset)) {} + + /// \brief The dataset to scan + std::shared_ptr dataset; + /// \brief A row filter + /// + /// The filter expression should be written against the dataset schema. + /// The filter must be unbound. + /// + /// This is an opportunistic pushdown filter. Filtering capabilities will + /// vary between formats. If a format is not capable of applying the filter + /// then it will ignore it. + /// + /// Each fragment will do its best to filter the data based on the information + /// (partitioning guarantees, statistics) available to it. If it is able to + /// apply some filtering then it will indicate what filtering it was able to + /// apply by attaching a guarantee to the batch. + /// + /// For example, if a filter is x < 50 && y > 40 then a batch may be able to + /// apply a guarantee x < 50. Post-scan filtering would then only need to + /// consider y > 40 (for this specific batch). The next batch may not be able + /// to attach any guarantee and both clauses would need to be applied to that batch. + /// + /// A single guarantee-aware filtering operation should generally be applied to all + /// resulting batches. The scan node is not responsible for this. + compute::Expression filter = compute::literal(true); + + /// \brief The columns to scan + /// + /// This is not a simple list of top-level column indices but instead a set of paths + /// allowing for partial selection of columns + /// + /// These paths refer to the dataset schema + /// + /// For example, consider the following dataset schema: + /// schema({ + /// field("score", int32()), + /// "marker", struct_({ + /// field("color", utf8()), + /// field("location", struct_({ + /// field("x", float64()), + /// field("y", float64()) + /// }) + /// }) + /// }) + /// + /// If `columns` is {{0}, {1,1,0}} then the output schema is: + /// schema({field("score", int32()), field("x", float64())}) + /// + /// If `columns` is {{1,1,1}, {1,1}} then the output schema is: + /// schema({ + /// field("y", float64()), + /// field("location", struct_({ + /// field("x", float64()), + /// field("y", float64()) + /// }) + /// }) + std::vector columns; + + /// \brief Target number of bytes to read ahead in a fragment + /// + /// This limit involves some amount of estimation. Formats typically only know + /// batch boundaries in terms of rows (not decoded bytes) and so an estimation + /// must be done to guess the average row size. Other formats like CSV and JSON + /// must make even more generalized guesses. + /// + /// This is a best-effort guide. Some formats may need to read ahead further, + /// for example, if scanning a parquet file that has batches with 100MiB of data + /// then the actual readahead will be at least 100MiB + /// + /// Set to 0 to disable readhead. When disabled, the scanner will read the + /// dataset one batch at a time + /// + /// This limit applies across all fragments. If the limit is 32MiB and the + /// fragment readahead allows for 20 fragments to be read at once then the + /// total readahead will still be 32MiB and NOT 20 * 32MiB. + int32_t target_bytes_readahead = kDefaultBytesReadahead; + + /// \brief Number of fragments to read ahead + /// + /// Higher readahead will potentially lead to more efficient I/O but will lead + /// to the scan operation using more RAM. The default is fairly conservative + /// and designed for fast local disks (or slow local spinning disks which cannot + /// handle much parallelism anyways). When using a highly parallel remote filesystem + /// you will likely want to increase these values. + /// + /// Set to 0 to disable fragment readahead. When disabled the dataset will be scanned + /// one fragment at a time. + int32_t fragment_readahead = kDefaultFragmentReadahead; + /// \brief Options specific to the file format + FragmentScanOptions* format_options; + + /// \brief Utility method to get a selection representing all columns in a dataset + static std::vector AllColumns(const Dataset& dataset); + + /// \brief The ordering of the data + /// + /// \see SourceNode for details on how this assertion behaves + std::vector asserted_ordering; +}; + /// \brief Describes a projection struct ARROW_DS_EXPORT ProjectionDescr { /// \brief The projection expression itself @@ -442,6 +559,7 @@ class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions { namespace internal { ARROW_DS_EXPORT void InitializeScanner(arrow::compute::ExecFactoryRegistry* registry); +ARROW_DS_EXPORT void InitializeScannerV2(arrow::compute::ExecFactoryRegistry* registry); } // namespace internal } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner_benchmark.cc b/cpp/src/arrow/dataset/scanner_benchmark.cc index b0254089a95..448b40bf158 100644 --- a/cpp/src/arrow/dataset/scanner_benchmark.cc +++ b/cpp/src/arrow/dataset/scanner_benchmark.cc @@ -96,9 +96,7 @@ std::shared_ptr GetSchema() { size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); } -void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) { - // NB: This test is here for didactic purposes - +void MinimalEndToEndScan(size_t num_batches, size_t batch_size) { // Specify a MemoryPool and ThreadPool for the ExecPlan compute::ExecContext exec_context(default_memory_pool(), ::arrow::internal::GetCpuThreadPool()); @@ -120,7 +118,7 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) auto options = std::make_shared(); // specify the filter - compute::Expression b_is_true = field_ref("b"); + compute::Expression b_is_true = equal(field_ref("b"), literal(true)); options->filter = b_is_true; // for now, specify the projection as the full project expression (eventually this can // just be a list of materialized field names) @@ -134,10 +132,9 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); // pipe the scan node into a filter node - ASSERT_OK_AND_ASSIGN( - compute::ExecNode * filter, - compute::MakeExecNode("filter", plan.get(), {scan}, - compute::FilterNodeOptions{b_is_true, async_mode})); + ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, + compute::MakeExecNode("filter", plan.get(), {scan}, + compute::FilterNodeOptions{b_is_true})); // pipe the filter node into a project node // NB: we're using the project node factory which preserves fragment/batch index @@ -146,7 +143,7 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) ASSERT_OK_AND_ASSIGN( compute::ExecNode * project, compute::MakeExecNode("augmented_project", plan.get(), {filter}, - compute::ProjectNodeOptions{{a_times_2}, {}, async_mode})); + compute::ProjectNodeOptions{{a_times_2}, {}})); // finally, pipe the project node into a sink node AsyncGenerator> sink_gen; @@ -172,37 +169,157 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; } +void ScanOnly( + size_t num_batches, size_t batch_size, const std::string& factory_name, + std::function>(size_t, size_t)> + options_factory) { + compute::ExecContext exec_context(default_memory_pool(), + ::arrow::internal::GetCpuThreadPool()); + + // ensure arrow::dataset node factories are in the registry + ::arrow::dataset::internal::Initialize(); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + compute::ExecPlan::Make(&exec_context)); + + RecordBatchVector batches = GetBatches(num_batches, batch_size); + + std::shared_ptr dataset = + std::make_shared(GetSchema(), batches); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr node_options, + options_factory(num_batches, batch_size)); + + // construct the plan + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * scan, + compute::MakeExecNode(factory_name, plan.get(), {}, *node_options)); + AsyncGenerator> sink_gen; + ASSERT_OK_AND_ASSIGN(compute::ExecNode * sink, + compute::MakeExecNode("sink", plan.get(), {scan}, + compute::SinkNodeOptions{&sink_gen})); + + ASSERT_NE(sink, nullptr); + + // translate sink_gen (async) to sink_reader (sync) + std::shared_ptr sink_reader = compute::MakeGeneratorReader( + schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); + + // start the ExecPlan + ASSERT_OK(plan->StartProducing()); + + // collect sink_reader into a Table + ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); + + ASSERT_GT(collected->num_rows(), 0); + + // wait 1s for completion + ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; +} + static void MinimalEndToEndBench(benchmark::State& state) { size_t num_batches = state.range(0); size_t batch_size = state.range(1); - bool async_mode = state.range(2); for (auto _ : state) { - MinimalEndToEndScan(num_batches, batch_size, async_mode); + MinimalEndToEndScan(num_batches, batch_size); } state.SetItemsProcessed(state.iterations() * num_batches); state.SetBytesProcessed(state.iterations() * num_batches * batch_size * GetBytesForSchema()); } -static const std::vector kWorkload = {100, 1000, 10000, 100000}; +const std::function>(size_t, size_t)> + kScanFactory = [](size_t num_batches, size_t batch_size) { + RecordBatchVector batches = GetBatches(num_batches, batch_size); + std::shared_ptr dataset = + std::make_shared(GetSchema(), std::move(batches)); + + std::shared_ptr options = std::make_shared(); + // specify the filter + compute::Expression b_is_true = equal(field_ref("b"), literal(true)); + options->filter = b_is_true; + options->projection = call("make_struct", {field_ref("a"), field_ref("b")}, + compute::MakeStructOptions{{"a", "b"}}); + + return std::make_shared(std::move(dataset), std::move(options)); + }; + +const std::function>(size_t, size_t)> + kScanV2Factory = + [](size_t num_batches, + size_t batch_size) -> Result> { + RecordBatchVector batches = GetBatches(num_batches, batch_size); + std::shared_ptr sch = GetSchema(); + std::shared_ptr dataset = + std::make_shared(sch, std::move(batches)); + + std::shared_ptr options = std::make_shared(dataset); + // specify the filter + compute::Expression b_is_true = equal(field_ref("b"), literal(true)); + options->filter = b_is_true; + options->columns = ScanV2Options::AllColumns(*dataset); + + return options; +}; + +static constexpr int kScanIdx = 0; +static constexpr int kScanV2Idx = 1; + +static void ScanOnlyBench(benchmark::State& state) { + size_t num_batches = state.range(0); + size_t batch_size = state.range(1); + + std::function>(size_t, size_t)> + options_factory; + std::string scan_factory = "scan"; + if (state.range(2) == kScanIdx) { + options_factory = kScanFactory; + } else if (state.range(2) == kScanV2Idx) { + options_factory = kScanV2Factory; + scan_factory = "scan2"; + } + + for (auto _ : state) { + ScanOnly(num_batches, batch_size, scan_factory, options_factory); + } + state.SetItemsProcessed(state.iterations() * num_batches); + state.SetBytesProcessed(state.iterations() * num_batches * batch_size * + GetBytesForSchema()); +} static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark* b) { - for (const int32_t num_batches : kWorkload) { + for (const int32_t num_batches : {1000}) { + for (const int batch_size : {10, 100, 1000}) { + b->Args({num_batches, batch_size}); + RecordBatchVector batches = + ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size); + StoreBatches(num_batches, batch_size, batches); + } + } + b->ArgNames({"num_batches", "batch_size"}); + b->UseRealTime(); +} + +// FIXME - Combine these two customize blocks by moving the end-to-end to support +// options factories +static void ScanOnlyEndToEnd_Customize(benchmark::internal::Benchmark* b) { + for (const int32_t num_batches : {1000}) { for (const int batch_size : {10, 100, 1000}) { - for (const bool async_mode : {true, false}) { - b->Args({num_batches, batch_size, async_mode}); + for (const int scan_idx : {kScanIdx, kScanV2Idx}) { + b->Args({num_batches, batch_size, scan_idx}); RecordBatchVector batches = ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size); StoreBatches(num_batches, batch_size, batches); } } } - b->ArgNames({"num_batches", "batch_size", "async_mode"}); + b->ArgNames({"num_batches", "batch_size", "scan_alg"}); b->UseRealTime(); } BENCHMARK(MinimalEndToEndBench)->Apply(MinimalEndToEnd_Customize); +BENCHMARK(ScanOnlyBench)->Apply(ScanOnlyEndToEnd_Customize)->Iterations(100); } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 0768014b862..344b982b7d2 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -28,6 +28,8 @@ #include "arrow/compute/cast.h" #include "arrow/compute/exec/exec_plan.h" #include "arrow/compute/exec/expression_internal.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/plan.h" #include "arrow/dataset/test_util.h" #include "arrow/record_batch.h" @@ -39,6 +41,7 @@ #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" #include "arrow/testing/util.h" +#include "arrow/util/byte_size.h" #include "arrow/util/range.h" #include "arrow/util/thread_pool.h" #include "arrow/util/vector.h" @@ -54,6 +57,777 @@ using internal::Iota; namespace dataset { +// The basic evolution strategy doesn't really need any info from the dataset +// or the fragment other than the schema so we just make a dummy dataset/fragment +// here. +std::unique_ptr MakeDatasetFromSchema(std::shared_ptr sch) { + return ::arrow::internal::make_unique(std::move(sch), + RecordBatchVector{}); +} + +std::unique_ptr MakeSomeFragment(std::shared_ptr sch) { + return ::arrow::internal::make_unique(std::move(sch), + RecordBatchVector{}); +} + +TEST(BasicEvolution, MissingColumn) { + std::unique_ptr strategy = + MakeBasicDatasetEvolutionStrategy(); + + std::shared_ptr dataset_schema = + schema({field("A", int32()), field("B", int16()), field("C", int64())}); + std::unique_ptr dataset = MakeDatasetFromSchema(dataset_schema); + std::unique_ptr fragment = MakeSomeFragment(std::move(dataset_schema)); + + InspectedFragment inspected{{"A", "B"}}; + std::unique_ptr fragment_strategy = + strategy->GetStrategy(*dataset, *fragment, inspected); + + compute::Expression filter = equal(field_ref("C"), literal(INT64_C(7))); + // If, after simplification, a filter somehow still references a missing field + // then it is an error. + ASSERT_RAISES(Invalid, fragment_strategy->DevolveFilter(filter)); + std::vector selection{FieldPath({0}), FieldPath({2})}; + // Basic strategy should provide is_null guarantee for missing fields + compute::Expression expected_guarantee = is_null(field_ref(2)); + ASSERT_OK_AND_ASSIGN(compute::Expression guarantee, + fragment_strategy->GetGuarantee(selection)); + ASSERT_EQ(expected_guarantee, guarantee); + + // Basic strategy should drop missing fields from selection + ASSERT_OK_AND_ASSIGN(std::vector devolved_selection, + fragment_strategy->DevolveSelection(selection)); + ASSERT_EQ(1, devolved_selection.size()); + ASSERT_EQ(FieldPath({0}), devolved_selection[0].path); + ASSERT_EQ(*int32(), *devolved_selection[0].requested_type); + + // Basic strategy should append null column to batches for missing column + std::shared_ptr devolved_batch = + RecordBatchFromJSON(schema({field("A", int32())}), R"([[1], [2], [3]])"); + ASSERT_OK_AND_ASSIGN( + compute::ExecBatch evolved_batch, + fragment_strategy->EvolveBatch(devolved_batch, selection, devolved_selection)); + ASSERT_EQ(2, evolved_batch.values.size()); + AssertArraysEqual(*devolved_batch->column(0), *evolved_batch[0].make_array()); + ASSERT_EQ(*MakeNullScalar(int64()), *evolved_batch.values[1].scalar()); +} + +TEST(BasicEvolution, ReorderedColumns) { + std::unique_ptr strategy = + MakeBasicDatasetEvolutionStrategy(); + + std::shared_ptr dataset_schema = + schema({field("A", int32()), field("B", int16()), field("C", int64())}); + std::unique_ptr dataset = MakeDatasetFromSchema(dataset_schema); + std::unique_ptr fragment = MakeSomeFragment(std::move(dataset_schema)); + + InspectedFragment inspected{{"C", "B", "A"}}; + std::unique_ptr fragment_strategy = + strategy->GetStrategy(*dataset, *fragment, inspected); + + compute::Expression filter = equal(field_ref("C"), literal(INT64_C(7))); + compute::Expression fragment_filter = equal(field_ref(0), literal(INT64_C(7))); + // Devolved filter should have updated indices + ASSERT_OK_AND_ASSIGN(compute::Expression devolved, + fragment_strategy->DevolveFilter(filter)); + ASSERT_EQ(fragment_filter, devolved); + std::vector selection{FieldPath({0}), FieldPath({2})}; + // No guarantees if simply reordering + compute::Expression expected_guarantee = literal(true); + ASSERT_OK_AND_ASSIGN(compute::Expression guarantee, + fragment_strategy->GetGuarantee(selection)); + ASSERT_EQ(expected_guarantee, guarantee); + + // Devolved selection should have correct indices + ASSERT_OK_AND_ASSIGN(std::vector devolved_selection, + fragment_strategy->DevolveSelection(selection)); + ASSERT_EQ(2, devolved_selection.size()); + ASSERT_EQ(FieldPath({2}), devolved_selection[0].path); + ASSERT_EQ(FieldPath({0}), devolved_selection[1].path); + ASSERT_EQ(*int32(), *devolved_selection[0].requested_type); + ASSERT_EQ(*int64(), *devolved_selection[1].requested_type); + + // Basic strategy should append null column to batches for missing column + std::shared_ptr devolved_batch = RecordBatchFromJSON( + schema({field("C", int64()), field("A", int32())}), R"([[1,4], [2,5], [3,6]])"); + ASSERT_OK_AND_ASSIGN( + compute::ExecBatch evolved_batch, + fragment_strategy->EvolveBatch(devolved_batch, selection, devolved_selection)); + ASSERT_EQ(2, evolved_batch.values.size()); + AssertArraysEqual(*devolved_batch->column(0), *evolved_batch[0].make_array()); + AssertArraysEqual(*devolved_batch->column(1), *evolved_batch[1].make_array()); +} + +struct MockScanTask { + explicit MockScanTask(std::shared_ptr batch) : batch(std::move(batch)) {} + + std::shared_ptr batch; + Future> batch_future = + Future>::Make(); +}; + +struct MockFragmentScanner : public FragmentScanner { + explicit MockFragmentScanner(std::vector scan_tasks) + : scan_tasks_(std::move(scan_tasks)), has_started_(scan_tasks_.size(), false) {} + + // ### FragmentScanner API ### + Future> ScanBatch(int batch_number) override { + has_started_[batch_number] = true; + return scan_tasks_[batch_number].batch_future; + } + int64_t EstimatedDataBytes(int batch_number) override { + return util::TotalBufferSize(*scan_tasks_[batch_number].batch); + } + int NumBatches() override { return static_cast(scan_tasks_.size()); } + + // ### Unit Test API ### + void DeliverBatches(bool slow, const std::vector& to_deliver) { + for (MockScanTask task : to_deliver) { + if (slow) { + std::ignore = SleepABitAsync().Then( + [task]() mutable { task.batch_future.MarkFinished(task.batch); }); + } else { + task.batch_future.MarkFinished(task.batch); + } + } + } + + void DeliverBatchesInOrder(bool slow) { DeliverBatches(slow, scan_tasks_); } + + void DeliverBatchesRandomly(bool slow, std::default_random_engine* gen) { + std::vector shuffled_tasks(scan_tasks_); + std::shuffle(shuffled_tasks.begin(), shuffled_tasks.end(), *gen); + DeliverBatches(slow, shuffled_tasks); + } + + bool HasStarted(int batch_number) { return has_started_[batch_number]; } + bool HasDelivered(int batch_number) { + return scan_tasks_[batch_number].batch_future.is_finished(); + } + + std::vector scan_tasks_; + std::vector has_started_; +}; + +struct MockFragment : public Fragment { + // ### Fragment API ### + + MockFragment(std::shared_ptr fragment_schema, + std::vector scan_tasks, + std::shared_ptr inspected, + compute::Expression partition_expression) + : Fragment(std::move(partition_expression), std::move(fragment_schema)), + fragment_scanner_(std::make_shared(std::move(scan_tasks))), + inspected_(std::move(inspected)) {} + + Result ScanBatchesAsync( + const std::shared_ptr& options) override { + return Status::Invalid("Not implemented because not needed by unit tests"); + }; + + Future> InspectFragment() override { + has_inspected_ = true; + return inspected_future_; + } + + Future> BeginScan( + const FragmentScanRequest& request, + const InspectedFragment& inspected_fragment) override { + has_started_ = true; + seen_request_ = request; + return fragment_scanner_future_; + } + + Future> CountRows( + compute::Expression predicate, + const std::shared_ptr& options) override { + return Status::Invalid("Not implemented because not needed by unit tests"); + } + + std::string type_name() const override { return "mock"; } + + Result> ReadPhysicalSchemaImpl() override { + return physical_schema_; + }; + + // ### Unit Test API ### + + void FinishInspection() { inspected_future_.MarkFinished(inspected_); } + void FinishScanBegin() { fragment_scanner_future_.MarkFinished(fragment_scanner_); } + + Future<> DeliverInit(bool slow) { + if (slow) { + return SleepABitAsync().Then([this] { + FinishInspection(); + return SleepABitAsync().Then([this] { FinishScanBegin(); }); + }); + } else { + FinishInspection(); + FinishScanBegin(); + return Future<>::MakeFinished(); + } + } + + void DeliverBatchesInOrder(bool slow) { + std::ignore = DeliverInit(slow).Then( + [this, slow] { fragment_scanner_->DeliverBatchesInOrder(slow); }); + } + + Future<> DeliverBatchesRandomly(bool slow, std::default_random_engine* gen) { + return DeliverInit(slow).Then( + [this, slow, gen] { fragment_scanner_->DeliverBatchesRandomly(slow, gen); }); + } + + bool has_inspected() { return has_inspected_; } + bool has_started() { return has_started_; } + bool HasBatchStarted(int batch_index) { + return fragment_scanner_->HasStarted(batch_index); + } + bool HasBatchDelivered(int batch_index) { + return fragment_scanner_->HasDelivered(batch_index); + } + + std::shared_ptr fragment_scanner_; + Future> fragment_scanner_future_ = + Future>::Make(); + std::shared_ptr inspected_; + Future> inspected_future_ = + Future>::Make(); + bool has_inspected_ = false; + bool has_started_ = false; + FragmentScanRequest seen_request_; +}; + +FragmentVector AsFragmentVector( + const std::vector>& fragments) { + FragmentVector frag_vec; + frag_vec.insert(frag_vec.end(), fragments.begin(), fragments.end()); + return frag_vec; +} + +struct MockDataset : public FragmentDataset { + MockDataset(std::shared_ptr dataset_schema, + std::vector> fragments) + : FragmentDataset(std::move(dataset_schema), AsFragmentVector(fragments)), + fragments_(std::move(fragments)) {} + + // ### Dataset API ### + std::string type_name() const override { return "mock"; } + + Result> ReplaceSchema( + std::shared_ptr schema) const override { + return Status::Invalid("Not needed for unit test"); + } + + Result GetFragmentsImpl(compute::Expression predicate) override { + has_started_ = true; + return FragmentDataset::GetFragmentsImpl(std::move(predicate)); + } + + // ### Unit Test API ### + void DeliverBatchesInOrder(bool slow) { + for (const auto& fragment : fragments_) { + fragment->DeliverBatchesInOrder(slow); + } + } + + void DeliverBatchesRandomly(bool slow) { + const auto seed = ::arrow::internal::GetRandomSeed(); + std::default_random_engine gen( + static_cast(seed)); + + std::vector> fragments_shuffled(fragments_); + std::shuffle(fragments_shuffled.begin(), fragments_shuffled.end(), gen); + std::vector> deliver_futures; + for (const auto& fragment : fragments_shuffled) { + deliver_futures.push_back(fragment->DeliverBatchesRandomly(slow, &gen)); + } + // Need to wait for fragments to finish init so gen stays valid + AllComplete(deliver_futures).Wait(); + } + + bool has_started() { return has_started_; } + bool HasStartedFragment(int fragment_index) { + return fragments_[fragment_index]->has_started(); + } + bool HasStartedBatch(int fragment_index, int batch_index) { + return fragments_[fragment_index]->HasBatchStarted(batch_index); + } + + bool has_started_ = false; + std::vector> fragments_; +}; + +struct MockDatasetBuilder { + explicit MockDatasetBuilder(std::shared_ptr dataset_schema) + : dataset_schema(std::move(dataset_schema)) {} + + void AddFragment( + std::shared_ptr fragment_schema, + std::unique_ptr inspection = nullptr, + compute::Expression partition_expression = Fragment::kNoPartitionInformation) { + if (!inspection) { + inspection = std::make_unique(fragment_schema->field_names()); + } + fragments.push_back(std::make_shared( + std::move(fragment_schema), std::vector(), std::move(inspection), + std::move(partition_expression))); + active_fragment = fragments[fragments.size() - 1]->fragment_scanner_.get(); + } + + void AddBatch(std::shared_ptr batch) { + active_fragment->scan_tasks_.emplace_back(std::move(batch)); + active_fragment->has_started_.push_back(false); + } + + std::unique_ptr Finish() { + return arrow::internal::make_unique(std::move(dataset_schema), + std::move(fragments)); + } + + std::shared_ptr dataset_schema; + std::vector> fragments; + MockFragmentScanner* active_fragment = nullptr; +}; + +template ::value>::type> +std::shared_ptr ArrayFromRange(int start, int end, bool add_nulls) { + using ArrowBuilderType = typename arrow::TypeTraits::BuilderType; + ArrowBuilderType builder; + ARROW_EXPECT_OK(builder.Reserve(end - start)); + for (int val = start; val < end; val++) { + if (add_nulls && val % 2 == 0) { + builder.UnsafeAppendNull(); + } else { + builder.UnsafeAppend(val); + } + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr range_arr, builder.Finish()); + return range_arr; +} + +struct ScannerTestParams { + bool slow; + int num_fragments; + int num_batches; + + std::string ToString() const { + std::stringstream ss; + ss << (slow ? "slow" : "fast") << num_fragments << "f" << num_batches << "b"; + return ss.str(); + } + + static std::string ToTestNameString( + const ::testing::TestParamInfo& info) { + return info.param.ToString(); + } + + static std::vector Values() { + std::vector values; + for (bool slow : {false, true}) { + values.push_back({slow, 1, 128}); + values.push_back({slow, 16, 128}); + } + return values; + } +}; + +constexpr int kRowsPerTestBatch = 1024; + +std::shared_ptr ScannerTestSchema() { + return schema({field("row_num", int32()), field("filterable", int16()), + field("nested", struct_({field("x", int32()), field("y", int32())}))}); +} + +std::shared_ptr MakeTestBatch(int idx) { + ArrayVector arrays; + // Row number + arrays.push_back(ArrayFromRange(idx * kRowsPerTestBatch, + (idx + 1) * kRowsPerTestBatch, + /*add_nulls=*/false)); + // Filterable + arrays.push_back(ArrayFromRange(0, kRowsPerTestBatch, + /*add_nulls=*/true)); + // Nested + std::shared_ptr x_vals = + ArrayFromRange(0, kRowsPerTestBatch, /*add_nulls=*/false); + std::shared_ptr y_vals = + ArrayFromRange(0, kRowsPerTestBatch, /*add_nulls=*/true); + EXPECT_OK_AND_ASSIGN(std::shared_ptr nested_arr, + StructArray::Make({std::move(x_vals), std::move(y_vals)}, + {field("x", int32()), field("y", int32())})); + arrays.push_back(std::move(nested_arr)); + return RecordBatch::Make(ScannerTestSchema(), kRowsPerTestBatch, std::move(arrays)); +} + +std::unique_ptr MakeTestDataset(int num_fragments, int batches_per_fragment, + bool empty = false) { + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder dataset_builder(test_schema); + for (int i = 0; i < num_fragments; i++) { + dataset_builder.AddFragment( + test_schema, + ::arrow::internal::make_unique(test_schema->field_names()), + Fragment::kNoPartitionInformation); + for (int j = 0; j < batches_per_fragment; j++) { + if (empty) { + dataset_builder.AddBatch( + RecordBatch::Make(schema({}), kRowsPerTestBatch, ArrayVector{})); + } else { + dataset_builder.AddBatch(MakeTestBatch(i * batches_per_fragment + j)); + } + } + } + return dataset_builder.Finish(); +} + +class TestScannerBase : public ::testing::TestWithParam { + protected: + TestScannerBase() { internal::Initialize(); } + + std::shared_ptr MakeExpectedBatch() { + RecordBatchVector batches; + for (int frag_idx = 0; frag_idx < GetParam().num_fragments; frag_idx++) { + for (int batch_idx = 0; batch_idx < GetParam().num_batches; batch_idx++) { + batches.push_back(MakeTestBatch(batch_idx + (frag_idx * GetParam().num_batches))); + } + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr
table, + Table::FromRecordBatches(std::move(batches))); + EXPECT_OK_AND_ASSIGN(std::shared_ptr as_one_batch, + table->CombineChunksToBatch()); + return as_one_batch; + } + + compute::Declaration MakeScanNode(std::shared_ptr dataset) { + ScanV2Options options(dataset); + options.columns = ScanV2Options::AllColumns(*dataset); + return compute::Declaration("scan2", options); + } + + std::vector RunNode(compute::Declaration scan_decl, bool ordered, + MockDataset* mock_dataset) { + Future> batches_fut = + compute::DeclarationToExecBatchesAsync(std::move(scan_decl)); + if (ordered) { + mock_dataset->DeliverBatchesInOrder(GetParam().slow); + } else { + mock_dataset->DeliverBatchesRandomly(GetParam().slow); + } + EXPECT_FINISHES_OK_AND_ASSIGN(std::vector batches, batches_fut); + return batches; + } + + void CheckScannedBatchOrdering(const std::vector& batches) { + for (const auto& batch : batches) { + ASSERT_GE(batch.index, 0); + const int32_t* row_nums = + reinterpret_cast(batch.values[0].array()->buffers[1]->data()); + ASSERT_EQ(batch.index * 1024, row_nums[0]); + } + } + + RecordBatchVector ExecBatchesToRecordBatches(std::vector batches, + const std::shared_ptr& schema) { + RecordBatchVector record_batches; + std::transform(batches.begin(), batches.end(), std::back_inserter(record_batches), + [&schema](const compute::ExecBatch& batch) { + EXPECT_OK_AND_ASSIGN(std::shared_ptr record_batch, + batch.ToRecordBatch(schema)); + return record_batch; + }); + return record_batches; + } + + void CheckScannedBatches(std::vector batches, + const std::shared_ptr& schema) { + CheckScannedBatchOrdering(batches); + RecordBatchVector record_batches = + ExecBatchesToRecordBatches(std::move(batches), schema); + ASSERT_OK_AND_ASSIGN(std::shared_ptr
batches_as_table, + Table::FromRecordBatches(std::move(record_batches))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr combined_data, + batches_as_table->CombineChunksToBatch()); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr sort_indices, + compute::SortIndices(combined_data->column(0), compute::SortOptions{})); + ASSERT_OK_AND_ASSIGN(Datum sorted_data, compute::Take(combined_data, sort_indices)); + + std::shared_ptr expected_data = MakeExpectedBatch(); + AssertBatchesEqual(*expected_data, *sorted_data.record_batch()); + } + + void CheckScanner(bool ordered) { + std::shared_ptr mock_dataset = + MakeTestDataset(GetParam().num_fragments, GetParam().num_batches); + compute::Declaration scan_decl = MakeScanNode(mock_dataset); + std::vector scanned_batches = + RunNode(scan_decl, ordered, mock_dataset.get()); + CheckScannedBatches(std::move(scanned_batches), mock_dataset->schema()); + } +}; + +TEST_P(TestScannerBase, ScanOrdered) { CheckScanner(true); } +TEST_P(TestScannerBase, ScanUnordered) { CheckScanner(false); } + +// FIXME: Add test for scanning no columns + +INSTANTIATE_TEST_SUITE_P(BasicNewScannerTests, TestScannerBase, + ::testing::ValuesIn(ScannerTestParams::Values()), + [](const ::testing::TestParamInfo& info) { + return std::to_string(info.index) + info.param.ToString(); + }); + +void CheckScannerBackpressure(std::shared_ptr dataset, ScanV2Options options, + int maxConcurrentFragments, int maxConcurrentBatches, + ::arrow::internal::ThreadPool* thread_pool) { + // Start scanning + compute::Declaration scan_decl = compute::Declaration("scan2", std::move(options)); + Future batches_fut = + compute::DeclarationToBatchesAsync(std::move(scan_decl)); + + auto get_num_inspected = [&] { + int num_inspected = 0; + for (const auto& frag : dataset->fragments_) { + if (frag->has_inspected()) { + num_inspected++; + } + } + return num_inspected; + }; + BusyWait(10, [&] { + return get_num_inspected() == static_cast(maxConcurrentFragments); + }); + SleepABit(); + ASSERT_EQ(get_num_inspected(), static_cast(maxConcurrentFragments)); + + int total_batches = 0; + for (const auto& frag : dataset->fragments_) { + total_batches += frag->fragment_scanner_->NumBatches(); + frag->FinishInspection(); + frag->FinishScanBegin(); + } + + int batches_scanned = 0; + while (batches_scanned < total_batches) { + MockScanTask* next_task_to_deliver = nullptr; + thread_pool->WaitForIdle(); + int batches_started = 0; + for (const auto& frag : dataset->fragments_) { + for (int i = 0; i < frag->fragment_scanner_->NumBatches(); i++) { + if (frag->HasBatchStarted(i)) { + batches_started++; + if (next_task_to_deliver == nullptr && !frag->HasBatchDelivered(i)) { + next_task_to_deliver = &frag->fragment_scanner_->scan_tasks_[i]; + } + } + } + } + ASSERT_LE(batches_started - batches_scanned, maxConcurrentBatches) + << " too many scan tasks were allowed to run"; + ASSERT_NE(next_task_to_deliver, nullptr); + next_task_to_deliver->batch_future.MarkFinished(next_task_to_deliver->batch); + batches_scanned++; + } +} + +TEST(TestNewScanner, Backpressure) { + constexpr int kNumFragments = 4; + constexpr int kNumBatchesPerFragment = 4; + internal::Initialize(); + std::shared_ptr test_dataset = + MakeTestDataset(kNumFragments, kNumBatchesPerFragment); + + ScanV2Options options(test_dataset); + + // No readahead + options.dataset = test_dataset; + options.columns = ScanV2Options::AllColumns(*test_dataset); + options.fragment_readahead = 0; + options.target_bytes_readahead = 0; + CheckScannerBackpressure(test_dataset, options, 1, 1, + ::arrow::internal::GetCpuThreadPool()); + + // Some readahead + test_dataset = MakeTestDataset(kNumFragments, kNumBatchesPerFragment); + options = ScanV2Options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + options.fragment_readahead = 4; + // each batch should be 14Ki so 50Ki readahead should yield 3-at-a-time + options.target_bytes_readahead = 50 * kRowsPerTestBatch; + CheckScannerBackpressure(test_dataset, options, 4, 3, + ::arrow::internal::GetCpuThreadPool()); +} + +TEST(TestNewScanner, NestedRead) { + // This tests the case where the file format does not support + // handling nested reads (e.g. JSON) and so the scanner must + // drop the extra data + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + builder.AddFragment(test_schema); + std::shared_ptr batch = MakeTestBatch(0); + ASSERT_OK_AND_ASSIGN(std::shared_ptr nested_col, FieldPath({2, 0}).Get(*batch)); + std::shared_ptr one_column = RecordBatch::Make( + schema({field("x", int32())}), batch->num_rows(), ArrayVector{nested_col}); + builder.AddBatch(std::move(one_column)); + std::shared_ptr test_dataset = builder.Finish(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + // nested.x + options.columns = {FieldPath({2, 0})}; + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(1, batches.size()); + for (const auto& batch : batches) { + ASSERT_EQ("x", batch->schema()->field(0)->name()); + ASSERT_EQ(*int32(), *batch->schema()->field(0)->type()); + ASSERT_EQ(*int32(), *batch->column(0)->type()); + } + const FragmentScanRequest& seen_request = test_dataset->fragments_[0]->seen_request_; + ASSERT_EQ(1, seen_request.columns.size()); + ASSERT_EQ(FieldPath({2, 0}), seen_request.columns[0].path); + ASSERT_EQ(*int32(), *seen_request.columns[0].requested_type); + ASSERT_EQ(0, seen_request.columns[0].selection_index); +} + +std::shared_ptr MakePartitionSkipDataset() { + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + builder.AddFragment(test_schema, /*inspection=*/nullptr, + greater(field_ref("filterable"), literal(50))); + builder.AddBatch(MakeTestBatch(0)); + builder.AddFragment(test_schema, /*inspection=*/nullptr, + less_equal(field_ref("filterable"), literal(50))); + builder.AddBatch(MakeTestBatch(1)); + return builder.Finish(); +} + +TEST(TestNewScanner, PartitionSkip) { + internal::Initialize(); + std::shared_ptr test_dataset = MakePartitionSkipDataset(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + options.filter = greater(field_ref("filterable"), literal(75)); + + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(1, batches.size()); + AssertBatchesEqual(*MakeTestBatch(0), *batches[0]); + + test_dataset = MakePartitionSkipDataset(); + test_dataset->DeliverBatchesInOrder(false); + options = ScanV2Options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + options.filter = less(field_ref("filterable"), literal(25)); + + ASSERT_OK_AND_ASSIGN(batches, compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(1, batches.size()); + AssertBatchesEqual(*MakeTestBatch(1), *batches[0]); +} + +TEST(TestNewScanner, NoFragments) { + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + std::shared_ptr test_dataset = builder.Finish(); + + ScanV2Options options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(0, batches.size()); +} + +TEST(TestNewScanner, EmptyFragment) { + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + builder.AddFragment(test_schema); + std::shared_ptr test_dataset = builder.Finish(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(0, batches.size()); +} + +TEST(TestNewScanner, EmptyBatch) { + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + builder.AddFragment(test_schema); + ASSERT_OK_AND_ASSIGN(std::shared_ptr empty_batch, + RecordBatch::MakeEmpty(test_schema)); + builder.AddBatch(std::move(empty_batch)); + std::shared_ptr test_dataset = builder.Finish(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(0, batches.size()); +} + +TEST(TestNewScanner, NoColumns) { + constexpr int kNumFragments = 4; + constexpr int kNumBatchesPerFragment = 4; + internal::Initialize(); + std::shared_ptr test_dataset = + MakeTestDataset(kNumFragments, kNumBatchesPerFragment, /*empty=*/true); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + ASSERT_OK_AND_ASSIGN(std::vector batches, + compute::DeclarationToExecBatches({"scan2", options})); + ASSERT_EQ(16, batches.size()); + for (const auto& batch : batches) { + ASSERT_EQ(0, batch.values.size()); + ASSERT_EQ(kRowsPerTestBatch, batch.length); + } +} + +TEST(TestNewScanner, MissingColumn) { + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr missing_schema, + test_schema->RemoveField(2)); + builder.AddFragment(missing_schema); + std::shared_ptr batch = MakeTestBatch(0); + // Remove column 2 because we are pretending it doesn't exist + // in the fragment + ASSERT_OK_AND_ASSIGN(batch, batch->RemoveColumn(2)); + // Remove column 1 because we aren't going to ask for it + ASSERT_OK_AND_ASSIGN(batch, batch->RemoveColumn(1)); + builder.AddBatch(batch); + + std::shared_ptr test_dataset = builder.Finish(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + options.columns = {FieldPath({0}), FieldPath({2})}; + + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + + ASSERT_EQ(1, batches.size()); + AssertArraysEqual(*batch->column(0), *batches[0]->column(0)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr expected_nulls, + MakeArrayOfNull(test_schema->field(2)->type(), kRowsPerTestBatch)); + AssertArraysEqual(*expected_nulls, *batches[0]->column(1)); +} + struct TestScannerParams { bool use_threads; int num_child_datasets; @@ -99,8 +873,7 @@ class TestScanner : public DatasetFixtureMixinWithParam { } std::shared_ptr MakeScanner(std::shared_ptr batch) { - std::vector> batches{ - static_cast(GetParam().num_batches), batch}; + RecordBatchVector batches{static_cast(GetParam().num_batches), batch}; DatasetVector children{static_cast(GetParam().num_child_datasets), std::make_shared(batch->schema(), batches)}; @@ -334,7 +1107,7 @@ TEST_P(TestScanner, MaterializeMissingColumn) { TEST_P(TestScanner, ToTable) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_); - std::vector> batches{ + RecordBatchVector batches{ static_cast(GetParam().num_batches * GetParam().num_child_datasets), batch}; @@ -453,7 +1226,7 @@ TEST_P(TestScanner, EmptyFragment) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_); auto empty_batch = ConstantArrayGenerator::Zeroes(0, schema_); - std::vector> batches{ + RecordBatchVector batches{ static_cast(GetParam().num_batches * GetParam().num_child_datasets), batch}; @@ -582,7 +1355,7 @@ TEST_P(TestScanner, CountRowsWithMetadata) { TEST_P(TestScanner, ToRecordBatchReader) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_); - std::vector> batches{ + RecordBatchVector batches{ static_cast(GetParam().num_batches * GetParam().num_child_datasets), batch}; diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index c91fa234e0a..e1d41666207 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -1092,6 +1092,17 @@ Result> FieldPath::Get(const FieldVector& fields) const { return FieldPathGetImpl::Get(this, fields); } +Result> FieldPath::GetAll(const Schema& schm, + const std::vector& paths) { + std::vector> fields; + fields.reserve(paths.size()); + for (const auto& path : paths) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr field, path.Get(schm)); + fields.push_back(std::move(field)); + } + return schema(std::move(fields)); +} + Result> FieldPath::Get(const RecordBatch& batch) const { ARROW_ASSIGN_OR_RAISE(auto data, FieldPathGetImpl::Get(this, batch.column_data())); return MakeArray(std::move(data)); diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 663c4765127..cfba607427e 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -1626,6 +1626,9 @@ class ARROW_EXPORT FieldPath { Result> Get(const DataType& type) const; Result> Get(const FieldVector& fields) const; + static Result> GetAll(const Schema& schema, + const std::vector& paths); + /// \brief Retrieve the referenced column from a RecordBatch or Table Result> Get(const RecordBatch& batch) const; diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 3ac4519a6f0..10d2c5dd807 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -32,7 +32,8 @@ namespace util { class ThrottleImpl : public AsyncTaskScheduler::Throttle { public: - explicit ThrottleImpl(int max_concurrent_cost) : available_cost_(max_concurrent_cost) {} + explicit ThrottleImpl(int max_concurrent_cost) + : max_concurrent_cost_(max_concurrent_cost), available_cost_(max_concurrent_cost) {} std::optional> TryAcquire(int amt) override { std::lock_guard lk(mutex_); @@ -61,8 +62,11 @@ class ThrottleImpl : public AsyncTaskScheduler::Throttle { } } + int Capacity() override { return max_concurrent_cost_; } + private: std::mutex mutex_; + int max_concurrent_cost_; int available_cost_; Future<> backoff_; }; @@ -151,7 +155,8 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { queue_->Push(std::move(task)); return true; } - std::optional> maybe_backoff = throttle_->TryAcquire(task->cost()); + int latched_cost = std::min(task->cost(), throttle_->Capacity()); + std::optional> maybe_backoff = throttle_->TryAcquire(latched_cost); if (maybe_backoff) { queue_->Push(std::move(task)); lk.unlock(); @@ -237,7 +242,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { private: void ContinueTasksUnlocked(std::unique_lock&& lk) { while (!queue_->Empty()) { - int next_cost = queue_->Peek().cost(); + int next_cost = std::min(queue_->Peek().cost(), throttle_->Capacity()); std::optional> maybe_backoff = throttle_->TryAcquire(next_cost); if (maybe_backoff) { lk.unlock(); @@ -266,6 +271,9 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { void DoSubmitTask(std::unique_ptr task) { int cost = task->cost(); + if (throttle_) { + cost = std::min(cost, throttle_->Capacity()); + } Result> submit_result = (*task)(this); if (!submit_result.ok()) { global_abort_->store(true); @@ -274,7 +282,9 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { AbortUnlocked(submit_result.status(), std::move(lk)); return; } - submit_result->AddCallback([this, cost](const Status& st) { + // FIXME(C++17, move into lambda?) + std::shared_ptr task_holder = std::move(task); + submit_result->AddCallback([this, cost, task_holder](const Status& st) { std::unique_lock lk(mutex_); if (!st.ok()) { running_tasks_--; diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index 707f70d471f..46f3c1242b8 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -17,10 +17,13 @@ #pragma once +#include + #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/functional.h" #include "arrow/util/future.h" +#include "arrow/util/iterator.h" #include "arrow/util/make_unique.h" #include "arrow/util/mutex.h" @@ -146,6 +149,13 @@ class ARROW_EXPORT AsyncTaskScheduler { /// This will possibly complete waiting futures and should probably not be /// called while holding locks. virtual void Release(int amt) = 0; + + /// The size of the largest task that can run + /// + /// Incoming tasks will have their cost latched to this value to ensure + /// they can still run (although they will generally be the only thing allowed to + /// run at that time). + virtual int Capacity() = 0; }; /// Create a throttle /// @@ -175,6 +185,65 @@ class ARROW_EXPORT AsyncTaskScheduler { /// \return true if the task was submitted or queued, false if the task was ignored virtual bool AddTask(std::unique_ptr task) = 0; + /// Adds an async generator to the scheduler + /// + /// The async generator will be visited, one item at a time. Submitting a task + /// will consist of polling the generator for the next future. The generator's future + /// will then represent the task itself. + /// + /// This visits the task serially without readahead. If readahead or parallelism + /// is desired then it should be added in the generator itself. + /// + /// The tasks will be submitted to a subscheduler which will be ended when the generator + /// is exhausted. + /// + /// The generator itself will be kept alive until all tasks have been completed. + /// However, if the scheduler is aborted, the generator will be destroyed as soon as the + /// next item would be requested. + template + bool AddAsyncGenerator(std::function()> generator, + std::function visitor, + FnOnce finish_callback) { + AsyncTaskScheduler* generator_scheduler = + MakeSubScheduler(std::move(finish_callback)); + struct State { + State(std::function()> generator, std::function visitor) + : generator(std::move(generator)), visitor(std::move(visitor)) {} + std::function()> generator; + std::function visitor; + }; + std::unique_ptr state_holder = + arrow::internal::make_unique(std::move(generator), std::move(visitor)); + struct SubmitTask : public Task { + explicit SubmitTask(std::unique_ptr state_holder) + : state_holder(std::move(state_holder)) {} + struct SubmitTaskCallback { + SubmitTaskCallback(AsyncTaskScheduler* scheduler, + std::unique_ptr state_holder) + : scheduler(scheduler), state_holder(std::move(state_holder)) {} + Status operator()(const T& item) { + if (IsIterationEnd(item)) { + scheduler->End(); + return Status::OK(); + } + ARROW_RETURN_NOT_OK(state_holder->visitor(item)); + scheduler->AddTask( + arrow::internal::make_unique(std::move(state_holder))); + return Status::OK(); + } + AsyncTaskScheduler* scheduler; + std::unique_ptr state_holder; + }; + Result> operator()(AsyncTaskScheduler* scheduler) { + Future next = state_holder->generator(); + return next.Then(SubmitTaskCallback(scheduler, std::move(state_holder))); + } + std::unique_ptr state_holder; + }; + return generator_scheduler->AddTask( + arrow::internal::make_unique(std::move(state_holder))); + } + template struct SimpleTask : public Task { explicit SimpleTask(Callable callable) : callable(std::move(callable)) {} diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index dfb688f70d1..873f940e4aa 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -28,10 +28,13 @@ #include #include "arrow/result.h" +#include "arrow/testing/async_test_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/async_generator.h" #include "arrow/util/future.h" #include "arrow/util/make_unique.h" +#include "arrow/util/test_common.h" namespace arrow { namespace util { @@ -123,6 +126,27 @@ TEST(AsyncTaskScheduler, Abandoned) { ASSERT_FALSE(pending_task_submitted); } +TEST(AsyncTaskScheduler, TaskStaysAliveUntilFinished) { + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + Future<> task = Future<>::Make(); + bool my_task_destroyed = false; + struct MyTask : public AsyncTaskScheduler::Task { + MyTask(bool* my_task_destroyed_ptr, Future<> task_fut) + : my_task_destroyed_ptr(my_task_destroyed_ptr), task_fut(std::move(task_fut)) {} + ~MyTask() { *my_task_destroyed_ptr = true; } + Result> operator()(AsyncTaskScheduler*) { return task_fut; } + bool* my_task_destroyed_ptr; + Future<> task_fut; + }; + scheduler->AddTask(::arrow::internal::make_unique(&my_task_destroyed, task)); + SleepABit(); + ASSERT_FALSE(my_task_destroyed); + task.MarkFinished(); + ASSERT_TRUE(my_task_destroyed); + scheduler->End(); + ASSERT_FINISHES_OK(scheduler->OnFinished()); +} + TEST(AsyncTaskScheduler, TaskFailsAfterEnd) { std::unique_ptr scheduler = AsyncTaskScheduler::Make(); Future<> task = Future<>::Make(); @@ -198,6 +222,28 @@ TEST(AsyncTaskScheduler, SubSchedulerNoTasks) { ASSERT_FINISHES_OK(parent->OnFinished()); } +TEST(AsyncTaskScheduler, AsyncGenerator) { + for (bool slow : {false, true}) { + ARROW_SCOPED_TRACE("Slow: ", slow); + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + std::vector values{1, 2, 3}; + std::vector seen_values{}; + AsyncGenerator generator = MakeVectorGenerator(values); + if (slow) { + generator = util::SlowdownABit(generator); + } + std::function visitor = [&](const TestInt& val) { + seen_values.push_back(val); + return Status::OK(); + }; + scheduler->AddAsyncGenerator(std::move(generator), std::move(visitor), + EmptyFinishCallback()); + scheduler->End(); + ASSERT_FINISHES_OK(scheduler->OnFinished()); + ASSERT_EQ(seen_values, values); + } +} // namespace util + class CustomThrottle : public AsyncTaskScheduler::Throttle { public: virtual std::optional> TryAcquire(int amt) { @@ -209,6 +255,7 @@ class CustomThrottle : public AsyncTaskScheduler::Throttle { } virtual void Release(int amt) {} void Unlock() { gate_.MarkFinished(); } + int Capacity() { return std::numeric_limits::max(); } private: Future<> gate_ = Future<>::Make(); @@ -248,6 +295,48 @@ TEST(AsyncTaskScheduler, EndWaitsForAddedButNotSubmittedTasks) { ASSERT_TRUE(was_run); } +TEST(AsyncTaskScheduler, TaskWithCostBiggerThanThrottle) { + // It can be difficult to know the maximum cost a task may have. In + // scanning this is the maximum size of a batch stored on disk which we + // cannot know ahead of time. So a task may have a cost greater than the + // size of the throttle. In that case we simply drop the cost to the + // capacity of the throttle. + constexpr int kThrottleCapacity = 5; + std::unique_ptr throttle = + AsyncTaskScheduler::MakeThrottle(kThrottleCapacity); + std::unique_ptr task_group = + AsyncTaskScheduler::Make(throttle.get()); + bool task_submitted = false; + Future<> task = Future<>::Make(); + + struct ExpensiveTask : AsyncTaskScheduler::Task { + ExpensiveTask(bool* task_submitted, Future<> task) + : task_submitted(task_submitted), task(std::move(task)) {} + Result> operator()(AsyncTaskScheduler*) override { + *task_submitted = true; + return task; + } + int cost() const override { return kThrottleCapacity * 50; } + bool* task_submitted; + Future<> task; + }; + + Future<> blocking_task = Future<>::Make(); + task_group->AddSimpleTask([&] { return blocking_task; }); + task_group->AddTask( + ::arrow::internal::make_unique(&task_submitted, task)); + task_group->End(); + + // Task should not be submitted initially because blocking_task (even though + // it has a cost of 1) is preventing it. + ASSERT_FALSE(task_submitted); + blocking_task.MarkFinished(); + // One blocking_task is out of the way the task is free to run + ASSERT_TRUE(task_submitted); + task.MarkFinished(); + ASSERT_FINISHES_OK(task_group->OnFinished()); +} + TEST(AsyncTaskScheduler, TaskFinishesAfterError) { /// If a task fails it shouldn't impact previously submitted tasks std::unique_ptr task_group = AsyncTaskScheduler::Make();