From 3ac0674756d9db02253381991de449bcf959d48e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 10 May 2022 13:52:31 -1000 Subject: [PATCH 1/7] ARROW-17287: Initial creation of a "scan node" which doesn't use async generators to do scanning. Formally defined a new scan options and interfaces for schema evolution. --- cpp/src/arrow/compute/exec/exec_plan.cc | 57 ++ cpp/src/arrow/compute/exec/exec_plan.h | 33 + cpp/src/arrow/compute/exec/expression.cc | 10 +- cpp/src/arrow/compute/exec/expression.h | 49 ++ .../arrow/compute/exec/expression_internal.h | 47 -- cpp/src/arrow/compute/exec/filter_node.cc | 7 +- cpp/src/arrow/compute/exec/map_node.cc | 10 +- cpp/src/arrow/compute/exec/map_node.h | 4 +- cpp/src/arrow/compute/exec/options.h | 12 +- cpp/src/arrow/compute/exec/project_node.cc | 8 +- cpp/src/arrow/compute/exec/util.cc | 2 +- cpp/src/arrow/dataset/CMakeLists.txt | 3 +- cpp/src/arrow/dataset/dataset.cc | 179 +++++ cpp/src/arrow/dataset/dataset.h | 175 +++- cpp/src/arrow/dataset/file_base.cc | 8 +- cpp/src/arrow/dataset/plan.cc | 1 + cpp/src/arrow/dataset/scan_node.cc | 372 +++++++++ cpp/src/arrow/dataset/scanner.cc | 9 + cpp/src/arrow/dataset/scanner.h | 113 +++ cpp/src/arrow/dataset/scanner_benchmark.cc | 149 +++- cpp/src/arrow/dataset/scanner_test.cc | 757 +++++++++++++++++- cpp/src/arrow/type.cc | 11 + cpp/src/arrow/type.h | 3 + cpp/src/arrow/util/async_util.cc | 18 +- cpp/src/arrow/util/async_util.h | 68 ++ cpp/src/arrow/util/async_util_test.cc | 88 ++ 26 files changed, 2084 insertions(+), 109 deletions(-) create mode 100644 cpp/src/arrow/dataset/scan_node.cc diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 6b02b76916c..83cd6b1ba80 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -32,11 +32,13 @@ #include "arrow/datum.h" #include "arrow/record_batch.h" #include "arrow/result.h" +#include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/checked_cast.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/tracing_internal.h" +#include "arrow/util/vector.h" namespace arrow { @@ -555,6 +557,61 @@ bool Declaration::IsValid(ExecFactoryRegistry* registry) const { return !this->factory_name.empty() && this->options != nullptr; } +Future> DeclarationToTableAsync(Declaration declaration, + ExecContext* exec_context) { + std::shared_ptr> output_table = + std::make_shared>(); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(exec_context)); + Declaration with_sink = Declaration::Sequence( + {declaration, {"table_sink", TableSinkNodeOptions(output_table.get())}}); + ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get())); + ARROW_RETURN_NOT_OK(exec_plan->StartProducing()); + return exec_plan->finished().Then([exec_plan, output_table] { return *output_table; }); +} + +Result> DeclarationToTable(Declaration declaration, + ExecContext* exec_context) { + return DeclarationToTableAsync(std::move(declaration), exec_context).result(); +} + +Future>> DeclarationToBatchesAsync( + Declaration declaration, ExecContext* exec_context) { + return DeclarationToTableAsync(std::move(declaration), exec_context) + .Then([](const std::shared_ptr& table) { + return TableBatchReader(table).ToRecordBatches(); + }); +} + +Result>> DeclarationToBatches( + Declaration declaration, ExecContext* exec_context) { + return DeclarationToBatchesAsync(std::move(declaration), exec_context).result(); +} + +Future> DeclarationToExecBatchesAsync(Declaration declaration, + ExecContext* exec_context) { + AsyncGenerator> sink_gen; + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, + ExecPlan::Make(exec_context)); + Declaration with_sink = + Declaration::Sequence({declaration, {"sink", SinkNodeOptions(&sink_gen)}}); + ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get())); + ARROW_RETURN_NOT_OK(exec_plan->StartProducing()); + auto collected_fut = CollectAsyncGenerator(sink_gen); + return AllComplete({exec_plan->finished(), Future<>(collected_fut)}) + .Then([collected_fut, exec_plan]() -> Result> { + ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result()); + return ::arrow::internal::MapVector( + [](std::optional batch) { return std::move(*batch); }, + std::move(collected)); + }); +} + +Result> DeclarationToExecBatches(Declaration declaration, + ExecContext* exec_context) { + return DeclarationToExecBatchesAsync(std::move(declaration), exec_context).result(); +} + namespace internal { void RegisterSourceNode(ExecFactoryRegistry*); diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index e9af46be261..c867a1b1ac3 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -483,6 +483,39 @@ struct ARROW_EXPORT Declaration { std::string label; }; +/// \brief Utility method to run a declaration and collect the results into a table +/// +/// This method will add a sink node to the declaration to collect results into a +/// table. It will then create an ExecPlan from the declaration, start the exec plan, +/// block until the plan has finished, and return the created table. +Result> DeclarationToTable( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Asynchronous version of \see DeclarationToTable +Future> DeclarationToTableAsync( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Utility method to run a declaration and collect the results into ExecBatch +/// vector +/// +/// \see DeclarationToTable for details +Result> DeclarationToExecBatches( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Asynchronous version of \see DeclarationToExecBatches +Future> DeclarationToExecBatchesAsync( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Utility method to run a declaration and collect the results into a vector +/// +/// \see DeclarationToTable for details +Result>> DeclarationToBatches( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + +/// \brief Asynchronous version of \see DeclarationToBatches +Future>> DeclarationToBatchesAsync( + Declaration declaration, ExecContext* exec_context = default_exec_context()); + /// \brief Wrap an ExecBatch generator in a RecordBatchReader. /// /// The RecordBatchReader does not impose any ordering on emitted batches. diff --git a/cpp/src/arrow/compute/exec/expression.cc b/cpp/src/arrow/compute/exec/expression.cc index ff59977b671..eecc8024d78 100644 --- a/cpp/src/arrow/compute/exec/expression.cc +++ b/cpp/src/arrow/compute/exec/expression.cc @@ -77,9 +77,15 @@ Expression call(std::string function, std::vector arguments, return Expression(std::move(call)); } -const Datum* Expression::literal() const { return std::get_if(impl_.get()); } +const Datum* Expression::literal() const { + if (impl_ == nullptr) return nullptr; + + return std::get_if(impl_.get()); +} const Expression::Parameter* Expression::parameter() const { + if (impl_ == nullptr) return nullptr; + return std::get_if(impl_.get()); } @@ -91,6 +97,8 @@ const FieldRef* Expression::field_ref() const { } const Expression::Call* Expression::call() const { + if (impl_ == nullptr) return nullptr; + return std::get_if(impl_.get()); } diff --git a/cpp/src/arrow/compute/exec/expression.h b/cpp/src/arrow/compute/exec/expression.h index 51f67cb63f2..9c4eeffa211 100644 --- a/cpp/src/arrow/compute/exec/expression.h +++ b/cpp/src/arrow/compute/exec/expression.h @@ -100,6 +100,8 @@ class ARROW_EXPORT Expression { // XXX someday // Result GetPipelines(); + bool is_valid() const { return impl_ != NULLPTR; } + /// Access a Call or return nullptr if this expression is not a call const Call* call() const; /// Access a Datum or return nullptr if this expression is not a literal @@ -277,6 +279,53 @@ ARROW_EXPORT Expression or_(Expression lhs, Expression rhs); ARROW_EXPORT Expression or_(const std::vector&); ARROW_EXPORT Expression not_(Expression operand); +/// Modify an Expression with pre-order and post-order visitation. +/// `pre` will be invoked on each Expression. `pre` will visit Calls before their +/// arguments, `post_call` will visit Calls (and no other Expressions) after their +/// arguments. Visitors should return the Identical expression to indicate no change; this +/// will prevent unnecessary construction in the common case where a modification is not +/// possible/necessary/... +/// +/// If an argument was modified, `post_call` visits a reconstructed Call with the modified +/// arguments but also receives a pointer to the unmodified Expression as a second +/// argument. If no arguments were modified the unmodified Expression* will be nullptr. +template +Result Modify(Expression expr, const PreVisit& pre, + const PostVisitCall& post_call) { + ARROW_ASSIGN_OR_RAISE(expr, Result(pre(std::move(expr)))); + + auto call = expr.call(); + if (!call) return expr; + + bool at_least_one_modified = false; + std::vector modified_arguments; + + for (size_t i = 0; i < call->arguments.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(auto modified_argument, + Modify(call->arguments[i], pre, post_call)); + + if (Identical(modified_argument, call->arguments[i])) { + continue; + } + + if (!at_least_one_modified) { + modified_arguments = call->arguments; + at_least_one_modified = true; + } + + modified_arguments[i] = std::move(modified_argument); + } + + if (at_least_one_modified) { + // reconstruct the call expression with the modified arguments + auto modified_call = *call; + modified_call.arguments = std::move(modified_arguments); + return post_call(Expression(std::move(modified_call)), &expr); + } + + return post_call(std::move(expr), nullptr); +} + /// @} } // namespace compute diff --git a/cpp/src/arrow/compute/exec/expression_internal.h b/cpp/src/arrow/compute/exec/expression_internal.h index 027c954c6d0..9e29b8e27f9 100644 --- a/cpp/src/arrow/compute/exec/expression_internal.h +++ b/cpp/src/arrow/compute/exec/expression_internal.h @@ -287,52 +287,5 @@ inline Result> GetFunction( return GetCastFunction(*to_type); } -/// Modify an Expression with pre-order and post-order visitation. -/// `pre` will be invoked on each Expression. `pre` will visit Calls before their -/// arguments, `post_call` will visit Calls (and no other Expressions) after their -/// arguments. Visitors should return the Identical expression to indicate no change; this -/// will prevent unnecessary construction in the common case where a modification is not -/// possible/necessary/... -/// -/// If an argument was modified, `post_call` visits a reconstructed Call with the modified -/// arguments but also receives a pointer to the unmodified Expression as a second -/// argument. If no arguments were modified the unmodified Expression* will be nullptr. -template -Result Modify(Expression expr, const PreVisit& pre, - const PostVisitCall& post_call) { - ARROW_ASSIGN_OR_RAISE(expr, Result(pre(std::move(expr)))); - - auto call = expr.call(); - if (!call) return expr; - - bool at_least_one_modified = false; - std::vector modified_arguments; - - for (size_t i = 0; i < call->arguments.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(auto modified_argument, - Modify(call->arguments[i], pre, post_call)); - - if (Identical(modified_argument, call->arguments[i])) { - continue; - } - - if (!at_least_one_modified) { - modified_arguments = call->arguments; - at_least_one_modified = true; - } - - modified_arguments[i] = std::move(modified_argument); - } - - if (at_least_one_modified) { - // reconstruct the call expression with the modified arguments - auto modified_call = *call; - modified_call.arguments = std::move(modified_arguments); - return post_call(Expression(std::move(modified_call)), &expr); - } - - return post_call(std::move(expr), nullptr); -} - } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 280d1e9ae00..19d18ca6081 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -38,8 +38,8 @@ namespace { class FilterNode : public MapNode { public: FilterNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, Expression filter, bool async_mode) - : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), + std::shared_ptr output_schema, Expression filter) + : MapNode(plan, std::move(inputs), std::move(output_schema)), filter_(std::move(filter)) {} static Result Make(ExecPlan* plan, std::vector inputs, @@ -61,8 +61,7 @@ class FilterNode : public MapNode { filter_expression.type()->ToString()); } return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(filter_expression), - filter_options.async_mode); + std::move(filter_expression)); } const char* kind_name() const override { return "FilterNode"; } diff --git a/cpp/src/arrow/compute/exec/map_node.cc b/cpp/src/arrow/compute/exec/map_node.cc index b99d0644905..98ea084192f 100644 --- a/cpp/src/arrow/compute/exec/map_node.cc +++ b/cpp/src/arrow/compute/exec/map_node.cc @@ -34,16 +34,10 @@ namespace arrow { namespace compute { MapNode::MapNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, bool async_mode) + std::shared_ptr output_schema) : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, std::move(output_schema), - /*num_outputs=*/1) { - if (async_mode) { - executor_ = plan_->exec_context()->executor(); - } else { - executor_ = nullptr; - } -} + /*num_outputs=*/1) {} void MapNode::ErrorReceived(ExecNode* input, Status error) { DCHECK_EQ(input, inputs_[0]); diff --git a/cpp/src/arrow/compute/exec/map_node.h b/cpp/src/arrow/compute/exec/map_node.h index 63d9db4a782..ea5264aa87e 100644 --- a/cpp/src/arrow/compute/exec/map_node.h +++ b/cpp/src/arrow/compute/exec/map_node.h @@ -45,7 +45,7 @@ namespace compute { class ARROW_EXPORT MapNode : public ExecNode { public: MapNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, bool async_mode); + std::shared_ptr output_schema); void ErrorReceived(ExecNode* input, Status error) override; @@ -70,8 +70,6 @@ class ARROW_EXPORT MapNode : public ExecNode { // Counter for the number of batches received AtomicCounter input_counter_; - ::arrow::internal::Executor* executor_; - // Variable used to cancel remaining tasks in the executor StopSource stop_source_; }; diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index c5edc0610c5..68178ea9f21 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -84,11 +84,10 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { /// excluded in the batch emitted by this node. class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { public: - explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true) - : filter_expression(std::move(filter_expression)), async_mode(async_mode) {} + explicit FilterNodeOptions(Expression filter_expression) + : filter_expression(std::move(filter_expression)) {} Expression filter_expression; - bool async_mode; }; /// \brief Make a node which executes expressions on input batches, producing new batches. @@ -100,14 +99,11 @@ class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { public: explicit ProjectNodeOptions(std::vector expressions, - std::vector names = {}, bool async_mode = true) - : expressions(std::move(expressions)), - names(std::move(names)), - async_mode(async_mode) {} + std::vector names = {}) + : expressions(std::move(expressions)), names(std::move(names)) {} std::vector expressions; std::vector names; - bool async_mode; }; /// \brief Make a node which aggregates input batches, optionally grouped by keys. diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 678925901c4..5ce5428a15d 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -41,9 +41,8 @@ namespace { class ProjectNode : public MapNode { public: ProjectNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, std::vector exprs, - bool async_mode) - : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), + std::shared_ptr output_schema, std::vector exprs) + : MapNode(plan, std::move(inputs), std::move(output_schema)), exprs_(std::move(exprs)) {} static Result Make(ExecPlan* plan, std::vector inputs, @@ -72,8 +71,7 @@ class ProjectNode : public MapNode { ++i; } return plan->EmplaceNode(plan, std::move(inputs), - schema(std::move(fields)), std::move(exprs), - project_options.async_mode); + schema(std::move(fields)), std::move(exprs)); } const char* kind_name() const override { return "ProjectNode"; } diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc index a34a9c62713..5997f2bd385 100644 --- a/cpp/src/arrow/compute/exec/util.cc +++ b/cpp/src/arrow/compute/exec/util.cc @@ -399,7 +399,7 @@ Status TableSinkNodeConsumer::Consume(ExecBatch batch) { } Future<> TableSinkNodeConsumer::Finish() { - ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(batches_)); + ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(schema_, batches_)); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index 2b164269ce6..261a52eb4f5 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -28,7 +28,8 @@ set(ARROW_DATASET_SRCS partition.cc plan.cc projector.cc - scanner.cc) + scanner.cc + scan_node.cc) set(ARROW_DATASET_STATIC_LINK_LIBS) set(ARROW_DATASET_SHARED_LINK_LIBS) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 57ee6726cd1..9fdc2734a45 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -24,6 +24,7 @@ #include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/bit_util.h" +#include "arrow/util/byte_size.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/thread_pool.h" @@ -34,11 +35,22 @@ using internal::checked_pointer_cast; namespace dataset { +const compute::Expression Fragment::kNoPartitionInformation = compute::literal(true); + Fragment::Fragment(compute::Expression partition_expression, std::shared_ptr physical_schema) : partition_expression_(std::move(partition_expression)), physical_schema_(std::move(physical_schema)) {} +Future> Fragment::InspectFragment() { + return Status::NotImplemented("Inspect fragment"); +} + +Future> Fragment::BeginScan( + const FragmentScanRequest& request, const InspectedFragment& inspected_fragment) { + return Status::NotImplemented("New scan method"); +} + Result> Fragment::ReadPhysicalSchema() { { auto lock = physical_schema_mutex_.Lock(); @@ -141,6 +153,37 @@ Future> InMemoryFragment::CountRows( return Future>::MakeFinished(total); } +Future> InMemoryFragment::InspectFragment() { + return std::make_shared(physical_schema_->field_names()); +} + +class InMemoryFragment::Scanner : public FragmentScanner { + public: + explicit Scanner(InMemoryFragment* fragment) : fragment_(fragment) {} + + Future> ScanBatch(int batch_number) override { + return Future>::MakeFinished( + fragment_->record_batches_[batch_number]); + } + + int64_t EstimatedDataBytes(int batch_number) override { + return arrow::util::TotalBufferSize(*fragment_->record_batches_[batch_number]); + } + + int NumBatches() override { + return static_cast(fragment_->record_batches_.size()); + } + + private: + InMemoryFragment* fragment_; +}; + +Future> InMemoryFragment::BeginScan( + const FragmentScanRequest& request, const InspectedFragment& inspected_fragment) { + return Future>::MakeFinished( + std::make_shared(this)); +} + Dataset::Dataset(std::shared_ptr schema, compute::Expression partition_expression) : schema_(std::move(schema)), partition_expression_(std::move(partition_expression)) {} @@ -265,5 +308,141 @@ Result UnionDataset::GetFragmentsImpl(compute::Expression pred return GetFragmentsFromDatasets(children_, predicate); } +namespace { + +class BasicFragmentEvolution : public FragmentEvolutionStrategy { + public: + BasicFragmentEvolution(std::vector ds_to_frag_map, Schema* dataset_schema) + : ds_to_frag_map(std::move(ds_to_frag_map)), dataset_schema(dataset_schema) {} + + Result GetGuarantee( + const std::vector& dataset_schema_selection) const override { + std::vector missing_fields; + for (const FieldPath& path : dataset_schema_selection) { + int top_level_field_idx = path[0]; + if (ds_to_frag_map[top_level_field_idx] < 0) { + missing_fields.push_back( + compute::is_null(compute::field_ref(top_level_field_idx))); + } + } + if (missing_fields.empty()) { + return compute::literal(true); + } + if (missing_fields.size() == 1) { + return missing_fields[0]; + } + return compute::and_(missing_fields); + } + + Result> DevolveSelection( + const std::vector& dataset_schema_selection) const override { + std::vector desired_columns; + for (std::size_t selection_idx = 0; selection_idx < dataset_schema_selection.size(); + selection_idx++) { + const FieldPath& path = dataset_schema_selection[selection_idx]; + int top_level_field_idx = path[0]; + int dest_top_level_idx = ds_to_frag_map[top_level_field_idx]; + if (dest_top_level_idx >= 0) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr field, path.Get(*dataset_schema)); + std::vector dest_path_indices(path.indices()); + dest_path_indices[0] = dest_top_level_idx; + desired_columns.push_back( + FragmentSelectionColumn{FieldPath(dest_path_indices), field->type().get(), + static_cast(selection_idx)}); + } + } + return std::move(desired_columns); + }; + + Result DevolveFilter( + const compute::Expression& filter) const override { + return compute::Modify( + filter, + [&](compute::Expression expr) -> Result { + const FieldRef* ref = expr.field_ref(); + if (ref) { + ARROW_ASSIGN_OR_RAISE(FieldPath path, ref->FindOne(*dataset_schema)); + int top_level_idx = path[0]; + std::vector modified_indices(path.indices()); + modified_indices[0] = ds_to_frag_map[top_level_idx]; + if (modified_indices[0] < 0) { + return Status::Invalid( + "Filter cannot be applied. It refers to a missing field ", + ref->ToString(), + " in a way that cannot be satisfied even though we know that field is " + "null"); + } + return compute::field_ref(FieldRef(std::move(modified_indices))); + } + return std::move(expr); + }, + [](compute::Expression expr, compute::Expression* old_expr) { return expr; }); + }; + + Result EvolveBatch( + const std::shared_ptr& batch, + const std::vector& dataset_selection, + const std::vector& selection) const override { + std::vector columns(dataset_selection.size()); + DCHECK_EQ(batch->num_columns(), static_cast(selection.size())); + // First go through and populate the columns we retrieved + for (int idx = 0; idx < batch->num_columns(); idx++) { + columns[selection[idx].selection_index] = batch->column(idx); + } + // Next go through and fill in the null columns + for (std::size_t idx = 0; idx < dataset_selection.size(); idx++) { + int top_level_idx = dataset_selection[idx][0]; + if (ds_to_frag_map[top_level_idx] < 0) { + columns[idx] = MakeNullScalar( + dataset_schema->field(static_cast(top_level_idx))->type()); + } + } + return compute::ExecBatch(columns, batch->num_rows()); + }; + + std::string ToString() const override { return "basic-fragment-evolution"; } + + std::vector ds_to_frag_map; + Schema* dataset_schema; + + static std::unique_ptr Make( + const std::shared_ptr& dataset_schema, + const std::vector& fragment_column_names) { + std::vector ds_to_frag_map; + std::unordered_map column_names_map; + for (size_t i = 0; i < fragment_column_names.size(); i++) { + column_names_map[fragment_column_names[i]] = static_cast(i); + } + for (int idx = 0; idx < dataset_schema->num_fields(); idx++) { + const std::string& field_name = dataset_schema->field(idx)->name(); + auto column_idx_itr = column_names_map.find(field_name); + if (column_idx_itr == column_names_map.end()) { + ds_to_frag_map.push_back(-1); + } else { + ds_to_frag_map.push_back(column_idx_itr->second); + } + } + return ::arrow::internal::make_unique( + std::move(ds_to_frag_map), dataset_schema.get()); + } +}; + +class BasicDatasetEvolutionStrategy : public DatasetEvolutionStrategy { + std::unique_ptr GetStrategy( + const Dataset& dataset, const Fragment& fragment, + const InspectedFragment& inspected_fragment) override { + return BasicFragmentEvolution::Make(dataset.schema(), + inspected_fragment.column_names); + } + + std::string ToString() const override { return "basic-dataset-evolution"; } +}; + +} // namespace + +std::unique_ptr MakeBasicDatasetEvolutionStrategy() { + return ::arrow::internal::make_unique(); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 3a5030b6be8..5b20c804418 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -30,6 +30,7 @@ #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/util/async_generator_fwd.h" +#include "arrow/util/future.h" #include "arrow/util/macros.h" #include "arrow/util/mutex.h" @@ -43,17 +44,91 @@ namespace dataset { using RecordBatchGenerator = std::function>()>; +/// \brief Description of a column to scan +struct FragmentSelectionColumn { + /// \brief The path to the column to load + FieldPath path; + /// \brief The type of the column in the dataset schema + /// + /// A format may choose to ignore this field completely. For example, when + /// reading from IPC the reader can just return the column in the data type + /// that is stored on disk. There is no point in doing anything special. + /// + /// However, some formats may be capable of casting on the fly. For example, + /// when reading from CSV, if we know the target type of the column, we can + /// convert from string to the target type as we read. + DataType* requested_type; + /// \brief The index in the output selection of this column + int selection_index; +}; +/// \brief Instructions for scanning a particular fragment +/// +/// The fragment scan request is dervied from ScanV2Options. The main +/// difference is that the scan options are based on the dataset schema +/// while the fragment request is based on the fragment schema. +struct FragmentScanRequest { + /// \brief A row filter + /// + /// The filter expression should be written against the fragment schema. + /// + /// \see ScanV2Options for details on how this filter should be applied + compute::Expression filter = compute::literal(true); + + /// \brief The columns to scan + /// + /// These indices refer to the fragment schema + /// + /// Note: This is NOT a simple list of top-level column indices. + /// For more details \see ScanV2Options + /// + /// If possible a fragment should only read from disk the data needed + /// to satisfy these columns. If a format cannot partially read a nested + /// column (e.g. JSON) then it must apply the column selection (in memory) + /// before returning the scanned batch. + std::vector columns; + /// \brief Options specific to the format being scanned + FragmentScanOptions* format_scan_options; +}; + +class FragmentScanner { + public: + /// This instance will only be destroyed after all ongoing scan futures + /// have been completed. + /// + /// This means any callbacks created as part of the scan can safely + /// capture `this` + virtual ~FragmentScanner() = default; + /// \brief Scan a batch of data from the file + /// \param batch_number The index of the batch to read + virtual Future> ScanBatch(int batch_number) = 0; + /// \brief Calculate an estimate of how many data bytes the given batch will represent + /// + /// "Data bytes" should be the total size of all the buffers once the data has been + /// decoded into the Arrow format. + virtual int64_t EstimatedDataBytes(int batch_number) = 0; + /// \brief The number of batches in the fragment to scan + virtual int NumBatches() = 0; +}; + +struct InspectedFragment { + explicit InspectedFragment(std::vector column_names) + : column_names(std::move(column_names)) {} + std::vector column_names; +}; + /// \brief A granular piece of a Dataset, such as an individual file. /// /// A Fragment can be read/scanned separately from other fragments. It yields a -/// collection of RecordBatches when scanned, encapsulated in one or more -/// ScanTasks. +/// collection of RecordBatches when scanned /// /// Note that Fragments have well defined physical schemas which are reconciled by /// the Datasets which contain them; these physical schemas may differ from a parent /// Dataset's schema and the physical schemas of sibling Fragments. class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { public: + /// \brief An expression that represents no known partition information + static const compute::Expression kNoPartitionInformation; + /// \brief Return the physical schema of the Fragment. /// /// The physical schema is also called the writer schema. @@ -65,6 +140,17 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { virtual Result ScanBatchesAsync( const std::shared_ptr& options) = 0; + /// \brief Inspect a fragment to learn basic information + /// + /// This will be called before a scan and a fragment should attach whatever + /// information will be needed to figure out an evolution strategy. This information + /// will then be passed to the call to BeginScan + virtual Future> InspectFragment(); + + /// \brief Start a scan operation + virtual Future> BeginScan( + const FragmentScanRequest& request, const InspectedFragment& inspected_fragment); + /// \brief Count the number of rows in this fragment matching the filter using metadata /// only. That is, this method may perform I/O, but will not load data. /// @@ -119,6 +205,7 @@ class ARROW_DS_EXPORT FragmentScanOptions { /// RecordBatch. class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: + class Scanner; InMemoryFragment(std::shared_ptr schema, RecordBatchVector record_batches, compute::Expression = compute::literal(true)); explicit InMemoryFragment(RecordBatchVector record_batches, @@ -130,6 +217,11 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { compute::Expression predicate, const std::shared_ptr& options) override; + Future> InspectFragment() override; + Future> BeginScan( + const FragmentScanRequest& request, + const InspectedFragment& inspected_fragment) override; + std::string type_name() const override { return "in-memory"; } protected: @@ -142,6 +234,80 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { using FragmentGenerator = AsyncGenerator>; +/// \brief Rules for converting the dataset schema to and from fragment schemas +class ARROW_DS_EXPORT FragmentEvolutionStrategy { + public: + /// This instance will only be destroyed when all scan operations for the + /// fragment have completed. + virtual ~FragmentEvolutionStrategy() = default; + /// \brief A guarantee that applies to all batches of this fragment + /// + /// For example, if a fragment is missing one of the fields in the dataset + /// schema then a typical evolution strategy is to set that field to null. + /// + /// So if the column at index 3 is missing then the guarantee is + /// FieldRef(3) == null + /// + /// Individual field guarantees should be AND'd together and returned + /// as a single expression. + virtual Result GetGuarantee( + const std::vector& dataset_schema_selection) const = 0; + + /// \brief Return a fragment schema selection given a dataset schema selection + /// + /// For example, if the user wants fields 2 & 4 of the dataset schema and + /// in this fragment the field 2 is missing and the field 4 is at index 1 then + /// this should return {1} + virtual Result> DevolveSelection( + const std::vector& dataset_schema_selection) const = 0; + + /// \brief Return a filter expression bound to the fragment schema given + /// a filter expression bound to the dataset schema + /// + /// The dataset scan filter will first be simplified by the guarantee returned + /// by GetGuarantee. This means an evolution that only handles dropping or casting + /// fields doesn't need to do anything here except return the given filter. + /// + /// On the other hand, an evolution that is doing some kind of aliasing will likely + /// need to convert field references in the filter to the aliased field references + /// where appropriate. + virtual Result DevolveFilter( + const compute::Expression& filter) const = 0; + + /// \brief Convert a batch from the fragment schema to the dataset schema + /// + /// Typically this involves casting columns from the data type stored on disk + /// to the data type of the dataset schema. For example, this fragment might + /// have columns stored as int32 and the dataset schema might have int64 for + /// the column. In this case we should cast the column from int32 to int64. + /// + /// Note: A fragment may perform this cast as the data is read from disk. In + /// that case a cast might not be needed. + virtual Result EvolveBatch( + const std::shared_ptr& batch, + const std::vector& dataset_selection, + const std::vector& selection) const = 0; + + /// \brief Return a string description of this strategy + virtual std::string ToString() const = 0; +}; + +/// \brief Lookup to create a FragmentEvolutionStrategy for a given fragment +class ARROW_DS_EXPORT DatasetEvolutionStrategy { + public: + virtual ~DatasetEvolutionStrategy() = default; + /// \brief Create a strategy for evolving from the given fragment + /// to the schema of the given dataset + virtual std::unique_ptr GetStrategy( + const Dataset& dataset, const Fragment& fragment, + const InspectedFragment& inspected_fragment) = 0; + + /// \brief Return a string description of this strategy + virtual std::string ToString() const = 0; +}; + +std::unique_ptr MakeBasicDatasetEvolutionStrategy(); + /// \brief A container of zero or more Fragments. /// /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a @@ -178,6 +344,9 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { virtual Result> ReplaceSchema( std::shared_ptr schema) const = 0; + /// \brief Rules used by this dataset to handle schema evolution + DatasetEvolutionStrategy* evolution_strategy() { return evolution_strategy_.get(); } + virtual ~Dataset() = default; protected: @@ -201,6 +370,8 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { std::shared_ptr schema_; compute::Expression partition_expression_ = compute::literal(true); + std::unique_ptr evolution_strategy_ = + MakeBasicDatasetEvolutionStrategy(); }; /// \addtogroup dataset-implementations diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 31606d8cbce..a4aaaee99e9 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -476,8 +476,8 @@ class TeeNode : public compute::MapNode { TeeNode(compute::ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, std::unique_ptr dataset_writer, - FileSystemDatasetWriteOptions write_options, bool async_mode) - : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), + FileSystemDatasetWriteOptions write_options) + : MapNode(plan, std::move(inputs), std::move(output_schema)), dataset_writer_(std::move(dataset_writer)), write_options_(std::move(write_options)) { std::unique_ptr serial_throttle = @@ -503,8 +503,8 @@ class TeeNode : public compute::MapNode { internal::DatasetWriter::Make(write_options, plan->async_scheduler())); return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(dataset_writer), std::move(write_options), - /*async_mode=*/true); + std::move(dataset_writer), + std::move(write_options)); } const char* kind_name() const override { return "TeeNode"; } diff --git a/cpp/src/arrow/dataset/plan.cc b/cpp/src/arrow/dataset/plan.cc index 01169413f78..805fb25aa71 100644 --- a/cpp/src/arrow/dataset/plan.cc +++ b/cpp/src/arrow/dataset/plan.cc @@ -33,6 +33,7 @@ void Initialize() { auto registry = compute::default_exec_factory_registry(); if (registry) { InitializeScanner(registry); + InitializeScannerV2(registry); InitializeDatasetWriter(registry); } }); diff --git a/cpp/src/arrow/dataset/scan_node.cc b/cpp/src/arrow/dataset/scan_node.cc new file mode 100644 index 00000000000..6f3d4477bba --- /dev/null +++ b/cpp/src/arrow/dataset/scan_node.cc @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/scanner.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/tracing_internal.h" +#include "arrow/util/unreachable.h" + +namespace cp = arrow::compute; + +namespace arrow { + +using internal::checked_cast; + +namespace dataset { + +namespace { + +Result> OutputSchemaFromOptions(const ScanV2Options& options) { + return FieldPath::GetAll(*options.dataset->schema(), options.columns); +} + +// In the future we should support async scanning of fragments. The +// Dataset class doesn't support this yet but we pretend it does here to +// ease future adoption of the feature. +AsyncGenerator> GetFragments(Dataset* dataset, + cp::Expression predicate) { + // In the future the dataset should be responsible for figuring out + // the I/O context. This will allow different I/O contexts to be used + // when scanning different datasets. For example, if we are scanning a + // union of a remote dataset and a local dataset. + const auto& io_context = io::default_io_context(); + auto io_executor = io_context.executor(); + Future> fragments_it_fut = + DeferNotOk(io_executor->Submit( + [dataset, predicate]() -> Result> { + ARROW_ASSIGN_OR_RAISE(FragmentIterator fragments_iter, + dataset->GetFragments(predicate)); + return std::make_shared(std::move(fragments_iter)); + })); + Future>> fragments_gen_fut = + fragments_it_fut.Then([](const std::shared_ptr& fragments_it) + -> Result>> { + ARROW_ASSIGN_OR_RAISE(std::vector> fragments, + fragments_it->ToVector()); + return MakeVectorGenerator(std::move(fragments)); + }); + return MakeFromFuture(std::move(fragments_gen_fut)); +} + +/// \brief A node that scans a dataset +/// +/// The scan node has three groups of io-tasks and one task. +/// +/// The first io-task (listing) fetches the fragments from the dataset. This may be a +/// simple iteration of paths or, if the dataset is described with wildcards, this may +/// involve I/O for listing and walking directory paths. There is one listing io-task per +/// dataset. +/// +/// Ths next step is to fetch the metadata for the fragment. For some formats (e.g. CSV) +/// this may be quite simple (get the size of the file). For other formats (e.g. parquet) +/// this is more involved and requires reading data. There is one metadata io-task per +/// fragment. The metadata io-task creates an AsyncGenerator from the +/// fragment. +/// +/// Once the metadata io-task is done we can issue read io-tasks. Each read io-task +/// requests a single batch of data from the disk by pulling the next Future from the +/// generator. +/// +/// Finally, when the future is fulfilled, we issue a pipeline task to drive the batch +/// through the pipeline. +/// +/// Most of these tasks are io-tasks. They take very few CPU resources and they run on +/// the I/O thread pool. These io-tasks are invisible to the exec plan and so we need to +/// do some custom scheduling. We limit how many fragments we read from at any one time. +/// This is referred to as "fragment readahead". +/// +/// Within a fragment there is usually also some amount of "row readahead". This row +/// readahead is handled by the fragment (and not the scanner) because the exact details +/// of how it is performed depend on the underlying format. +/// +/// When a scan node is aborted (StopProducing) we send a cancel signal to any active +/// fragments. On destruction we continue consuming the fragments until they complete +/// (which should be fairly quick since we cancelled the fragment). This ensures the +/// I/O work is completely finished before the node is destroyed. +class ScanNode : public cp::ExecNode { + public: + ScanNode(cp::ExecPlan* plan, ScanV2Options options, + std::shared_ptr output_schema) + : cp::ExecNode(plan, {}, {}, std::move(output_schema), + /*num_outputs=*/1), + options_(options), + fragments_throttle_( + util::AsyncTaskScheduler::MakeThrottle(options_.fragment_readahead + 1)), + batches_throttle_( + util::AsyncTaskScheduler::MakeThrottle(options_.target_bytes_readahead + 1)) { + } + + static Result NormalizeAndValidate(const ScanV2Options& options, + compute::ExecContext* ctx) { + ScanV2Options normalized(options); + if (!normalized.dataset) { + return Status::Invalid("Scan options must include a dataset"); + } + + if (options.fragment_readahead < 0) { + return Status::Invalid( + "Fragment readahead may not be less than 0. Set to 0 to disable readahead"); + } + + if (options.target_bytes_readahead < 0) { + return Status::Invalid( + "Batch readahead may not be less than 0. Set to 0 to disable readahead"); + } + + if (!normalized.filter.is_valid()) { + normalized.filter = compute::literal(true); + } + + if (normalized.filter.call() && normalized.filter.IsBound()) { + // There is no easy way to make sure a filter was bound agaisnt the same + // function registry as the one in ctx so we just require it to be unbound + // FIXME - Do we care if it was bound to a different function registry? + return Status::Invalid("Scan filter must be unbound"); + } else if (!normalized.filter.IsBound()) { + ARROW_ASSIGN_OR_RAISE(normalized.filter, + normalized.filter.Bind(*options.dataset->schema(), ctx)); + } // Else we must have some simple filter like literal(true) which might be bound + // but we don't care + + return std::move(normalized); + } + + static Result Make(cp::ExecPlan* plan, std::vector inputs, + const cp::ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "ScanNode")); + const auto& scan_options = checked_cast(options); + ARROW_ASSIGN_OR_RAISE(ScanV2Options normalized_options, + NormalizeAndValidate(scan_options, plan->exec_context())); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr output_schema, + OutputSchemaFromOptions(normalized_options)); + return plan->EmplaceNode(plan, std::move(normalized_options), + std::move(output_schema)); + } + + const char* kind_name() const override { return "ScanNode"; } + + [[noreturn]] static void NoInputs() { + Unreachable("no inputs; this should never be called"); + } + [[noreturn]] void InputReceived(cp::ExecNode*, cp::ExecBatch) override { NoInputs(); } + [[noreturn]] void ErrorReceived(cp::ExecNode*, Status) override { NoInputs(); } + [[noreturn]] void InputFinished(cp::ExecNode*, int) override { NoInputs(); } + + Status Init() override { + // batch_output_ = + // ::arrow::internal::make_unique(this, + // outputs_[0]); + return Status::OK(); + } + + struct ScanState { + std::mutex mutex; + std::shared_ptr fragment_scanner; + std::unique_ptr fragment_evolution; + FragmentScanRequest scan_request; + }; + + struct ScanBatchTask : util::AsyncTaskScheduler::Task { + ScanBatchTask(ScanNode* node, ScanState* scan_state, int batch_index) + : node_(node), scan_(scan_state), batch_index_(batch_index) { + int64_t cost = scan_state->fragment_scanner->EstimatedDataBytes(batch_index_); + // It's possible, though probably a bad idea, for a single batch of a fragment + // to be larger than 2GiB. In that case, it doesn't matter much if we underestimate + // because the largest the throttle can be is 2GiB and thus we will be in "one batch + // at a time" mode anyways which is the best we can do in this case. + cost_ = static_cast( + std::min(cost, static_cast(std::numeric_limits::max()))); + } + + Result> operator()(util::AsyncTaskScheduler* scheduler) override { + // Prevent concurrent calls to ScanBatch which might not be thread safe + std::lock_guard lk(scan_->mutex); + return scan_->fragment_scanner->ScanBatch(batch_index_) + .Then([this](const std::shared_ptr batch) { + return HandleBatch(batch); + }); + } + + Status HandleBatch(const std::shared_ptr& batch) { + ARROW_ASSIGN_OR_RAISE( + compute::ExecBatch evolved_batch, + scan_->fragment_evolution->EvolveBatch(batch, node_->options_.columns, + scan_->scan_request.columns)); + node_->outputs_[0]->InputReceived(node_, std::move(evolved_batch)); + return Status::OK(); + } + + int cost() const override { return cost_; } + + ScanNode* node_; + ScanState* scan_; + int batch_index_; + int cost_; + }; + + struct ListFragmentTask : util::AsyncTaskScheduler::Task { + ListFragmentTask(ScanNode* node, std::shared_ptr fragment) + : node(node), fragment(std::move(fragment)) {} + + Result> operator()(util::AsyncTaskScheduler* scheduler) override { + return fragment->InspectFragment().Then( + [this, + scheduler](const std::shared_ptr& inspected_fragment) { + return BeginScan(inspected_fragment, scheduler); + }); + } + + Future<> BeginScan(const std::shared_ptr& inspected_fragment, + util::AsyncTaskScheduler* scan_scheduler) { + // Now that we have an inspected fragment we need to use the dataset's evolution + // strategy to figure out how to scan it + scan_state->fragment_evolution = + node->options_.dataset->evolution_strategy()->GetStrategy( + *node->options_.dataset, *fragment, *inspected_fragment); + ARROW_RETURN_NOT_OK(InitFragmentScanRequest()); + return fragment->BeginScan(scan_state->scan_request, *inspected_fragment) + .Then([this, scan_scheduler]( + const std::shared_ptr& fragment_scanner) { + return AddScanTasks(fragment_scanner, scan_scheduler); + }); + } + + Future<> AddScanTasks(const std::shared_ptr& fragment_scanner, + util::AsyncTaskScheduler* scan_scheduler) { + scan_state->fragment_scanner = fragment_scanner; + ScanState* state_view = scan_state.get(); + // Finish callback keeps the scan state alive until all scan tasks done + struct StateHolder { + Status operator()() { return Status::OK(); } + std::unique_ptr scan_state; + }; + util::AsyncTaskScheduler* frag_scheduler = scan_scheduler->MakeSubScheduler( + StateHolder{std::move(scan_state)}, node->batches_throttle_.get()); + for (int i = 0; i < fragment_scanner->NumBatches(); i++) { + node->num_batches_.fetch_add(1); + frag_scheduler->AddTask( + arrow::internal::make_unique(node, state_view, i)); + } + Future<> list_and_scan_node = frag_scheduler->OnFinished(); + frag_scheduler->End(); + // The "list fragments" task doesn't actually end until the fragments are + // all scanned. This allows us to enforce fragment readahead. + return list_and_scan_node; + } + + // Take the dataset options, and the fragment evolution, and figure out exactly how + // we should scan the fragment itself. + Status InitFragmentScanRequest() { + ARROW_ASSIGN_OR_RAISE( + scan_state->scan_request.columns, + scan_state->fragment_evolution->DevolveSelection(node->options_.columns)); + ARROW_ASSIGN_OR_RAISE( + compute::Expression devolution_guarantee, + scan_state->fragment_evolution->GetGuarantee(node->options_.columns)); + ARROW_ASSIGN_OR_RAISE( + compute::Expression simplified_filter, + compute::SimplifyWithGuarantee(node->options_.filter, devolution_guarantee)); + ARROW_ASSIGN_OR_RAISE( + scan_state->scan_request.filter, + scan_state->fragment_evolution->DevolveFilter(std::move(simplified_filter))); + scan_state->scan_request.format_scan_options = node->options_.format_options; + return Status::OK(); + } + + ScanNode* node; + std::shared_ptr fragment; + std::unique_ptr scan_state = arrow::internal::make_unique(); + }; + + Status StartProducing() override { + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.kind", kind_name()}, + {"node.label", label()}, + {"node.output_schema", output_schema()->ToString()}, + {"node.detail", ToString()}}); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_); + AsyncGenerator> frag_gen = + GetFragments(options_.dataset.get(), options_.filter); + util::AsyncTaskScheduler* scan_scheduler = plan_->async_scheduler()->MakeSubScheduler( + [this]() { + outputs_[0]->InputFinished(this, num_batches_.load()); + finished_.MarkFinished(); + return Status::OK(); + }, + fragments_throttle_.get()); + plan_->async_scheduler()->AddAsyncGenerator>( + std::move(frag_gen), + [this, scan_scheduler](const std::shared_ptr& fragment) { + scan_scheduler->AddTask( + arrow::internal::make_unique(this, fragment)); + return Status::OK(); + }, + [scan_scheduler]() { + scan_scheduler->End(); + return Status::OK(); + }); + return Status::OK(); + } + + void PauseProducing(ExecNode* output, int32_t counter) override { + // FIXME(TODO) + // Need to ressurect AsyncToggle and then all fragment scanners + // should share the same toggle + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + // FIXME(TODO) + } + + void StopProducing(ExecNode* output) override { + DCHECK_EQ(output, outputs_[0]); + StopProducing(); + } + + void StopProducing() override {} + + private: + ScanV2Options options_; + std::atomic num_batches_{0}; + // std::unique_ptr batch_output_; + std::unique_ptr fragments_throttle_; + std::unique_ptr batches_throttle_; +}; + +} // namespace + +namespace internal { +void InitializeScannerV2(arrow::compute::ExecFactoryRegistry* registry) { + DCHECK_OK(registry->AddFactory("scan2", ScanNode::Make)); +} +} // namespace internal +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index eb09a986c97..e74db5b5c5a 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "arrow/array/array_primitive.h" @@ -67,6 +68,14 @@ std::vector ScanOptions::MaterializedFields() const { return fields; } +std::vector ScanV2Options::AllColumns(const Dataset& dataset) { + std::vector selection(dataset.schema()->num_fields()); + for (std::size_t i = 0; i < selection.size(); i++) { + selection[i] = {static_cast(i)}; + } + return selection; +} + namespace { class ScannerRecordBatchReader : public RecordBatchReader { public: diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 646cc0de72e..0a6fee7b652 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -54,6 +54,7 @@ constexpr int64_t kDefaultBatchSize = 1 << 17; // 128Ki rows // This will yield 64 batches ~ 8Mi rows constexpr int32_t kDefaultBatchReadahead = 16; constexpr int32_t kDefaultFragmentReadahead = 4; +constexpr int32_t kDefaultBytesReadahead = 1 << 25; // 32MiB /// Scan-specific options, which can be changed between scans of the same dataset. struct ARROW_DS_EXPORT ScanOptions { @@ -137,6 +138,117 @@ struct ARROW_DS_EXPORT ScanOptions { compute::BackpressureOptions::DefaultBackpressure(); }; +/// Scan-specific options, which can be changed between scans of the same dataset. +/// +/// A dataset consists of one or more individual fragments. A fragment is anything +/// that is indepedently scannable, often a file. +/// +/// Batches from all fragments will be converted to a single schema. This unified +/// schema is referred to as the "dataset schema" and is the output schema for +/// this node. +/// +/// Individual fragments may have schemas that are different from the dataset +/// schema. This is sometimes referred to as the physical or fragment schema. +/// Conversion from the fragment schema to the dataset schema is a process +/// known as evolution. +struct ARROW_DS_EXPORT ScanV2Options : public compute::ExecNodeOptions { + explicit ScanV2Options(std::shared_ptr dataset) + : dataset(std::move(dataset)) {} + + /// \brief The dataset to scan + std::shared_ptr dataset; + /// \brief A row filter + /// + /// The filter expression should be written against the dataset schema. + /// The filter must be unbound. + /// + /// This is an opportunistic pushdown filter. Filtering capabilities will + /// vary between formats. If a format is not capable of applying the filter + /// then it will ignore it. + /// + /// Each fragment will do its best to filter the data based on the information + /// (partitioning guarantees, statistics) available to it. If it is able to + /// apply some filtering then it will indicate what filtering it was able to + /// apply by attaching a guarantee to the batch. + /// + /// For example, if a filter is x < 50 && y > 40 then a batch may be able to + /// apply a guarantee x < 50. Post-scan filtering would then only need to + /// consider y > 40 (for this specific batch). The next batch may not be able + /// to attach any guarantee and both clauses would need to be applied to that batch. + /// + /// A single guarantee-aware filtering operation should generally be applied to all + /// resulting batches. The scan node is not responsible for this. + compute::Expression filter = compute::literal(true); + + /// \brief The columns to scan + /// + /// This is not a simple list of top-level column indices but instead a set of paths + /// allowing for partial selection of columns + /// + /// These paths refer to the dataset schema + /// + /// For example, consider the following dataset schema: + /// schema({ + /// field("score", int32()), + /// "marker", struct_({ + /// field("color", utf8()), + /// field("location", struct_({ + /// field("x", float64()), + /// field("y", float64()) + /// }) + /// }) + /// }) + /// + /// If `columns` is {{0}, {1,1,0}} then the output schema is: + /// schema({field("score", int32()), field("x", float64())}) + /// + /// If `columns` is {{1,1,1}, {1,1}} then the output schema is: + /// schema({ + /// field("y", float64()), + /// field("location", struct_({ + /// field("x", float64()), + /// field("y", float64()) + /// }) + /// }) + std::vector columns; + + /// \brief Target number of bytes to read ahead in a fragment + /// + /// This limit involves some amount of estimation. Formats typically only know + /// batch boundaries in terms of rows (not decoded bytes) and so an estimation + /// must be done to guess the average row size. Other formats like CSV and JSON + /// must make even more generalized guesses. + /// + /// This is a best-effort guide. Some formats may need to read ahead further, + /// for example, if scanning a parquet file that has batches with 100MiB of data + /// then the actual readahead will be at least 100MiB + /// + /// Set to 0 to disable readhead. When disabled, the scanner will read the + /// dataset one batch at a time + /// + /// This limit applies across all fragments. If the limit is 32MiB and the + /// fragment readahead allows for 20 fragments to be read at once then the + /// total readahead will still be 32MiB and NOT 20 * 32MiB. + int32_t target_bytes_readahead = kDefaultBytesReadahead; + + /// \brief Number of fragments to read ahead + /// + /// Higher readahead will potentially lead to more efficient I/O but will lead + /// to the scan operation using more RAM. The default is fairly conservative + /// and designed for fast local disks (or slow local spinning disks which cannot + /// handle much parallelism anyways). When using a highly parallel remote filesystem + /// you will likely want to increase these values. + /// + /// Set to 0 to disable fragment readahead. When disabled the dataset will be scanned + /// one fragment at a time. + int32_t fragment_readahead = kDefaultFragmentReadahead; + /// \brief Options specific to the file format + FragmentScanOptions* format_options; + + /// \brief Utility method to get a selection representing all columns in a dataset + static std::vector AllColumns(const Dataset& dataset); +}; + /// \brief Describes a projection struct ARROW_DS_EXPORT ProjectionDescr { /// \brief The projection expression itself @@ -442,6 +554,7 @@ class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions { namespace internal { ARROW_DS_EXPORT void InitializeScanner(arrow::compute::ExecFactoryRegistry* registry); +ARROW_DS_EXPORT void InitializeScannerV2(arrow::compute::ExecFactoryRegistry* registry); } // namespace internal } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner_benchmark.cc b/cpp/src/arrow/dataset/scanner_benchmark.cc index b0254089a95..448b40bf158 100644 --- a/cpp/src/arrow/dataset/scanner_benchmark.cc +++ b/cpp/src/arrow/dataset/scanner_benchmark.cc @@ -96,9 +96,7 @@ std::shared_ptr GetSchema() { size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); } -void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) { - // NB: This test is here for didactic purposes - +void MinimalEndToEndScan(size_t num_batches, size_t batch_size) { // Specify a MemoryPool and ThreadPool for the ExecPlan compute::ExecContext exec_context(default_memory_pool(), ::arrow::internal::GetCpuThreadPool()); @@ -120,7 +118,7 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) auto options = std::make_shared(); // specify the filter - compute::Expression b_is_true = field_ref("b"); + compute::Expression b_is_true = equal(field_ref("b"), literal(true)); options->filter = b_is_true; // for now, specify the projection as the full project expression (eventually this can // just be a list of materialized field names) @@ -134,10 +132,9 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); // pipe the scan node into a filter node - ASSERT_OK_AND_ASSIGN( - compute::ExecNode * filter, - compute::MakeExecNode("filter", plan.get(), {scan}, - compute::FilterNodeOptions{b_is_true, async_mode})); + ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, + compute::MakeExecNode("filter", plan.get(), {scan}, + compute::FilterNodeOptions{b_is_true})); // pipe the filter node into a project node // NB: we're using the project node factory which preserves fragment/batch index @@ -146,7 +143,7 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) ASSERT_OK_AND_ASSIGN( compute::ExecNode * project, compute::MakeExecNode("augmented_project", plan.get(), {filter}, - compute::ProjectNodeOptions{{a_times_2}, {}, async_mode})); + compute::ProjectNodeOptions{{a_times_2}, {}})); // finally, pipe the project node into a sink node AsyncGenerator> sink_gen; @@ -172,37 +169,157 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; } +void ScanOnly( + size_t num_batches, size_t batch_size, const std::string& factory_name, + std::function>(size_t, size_t)> + options_factory) { + compute::ExecContext exec_context(default_memory_pool(), + ::arrow::internal::GetCpuThreadPool()); + + // ensure arrow::dataset node factories are in the registry + ::arrow::dataset::internal::Initialize(); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + compute::ExecPlan::Make(&exec_context)); + + RecordBatchVector batches = GetBatches(num_batches, batch_size); + + std::shared_ptr dataset = + std::make_shared(GetSchema(), batches); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr node_options, + options_factory(num_batches, batch_size)); + + // construct the plan + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * scan, + compute::MakeExecNode(factory_name, plan.get(), {}, *node_options)); + AsyncGenerator> sink_gen; + ASSERT_OK_AND_ASSIGN(compute::ExecNode * sink, + compute::MakeExecNode("sink", plan.get(), {scan}, + compute::SinkNodeOptions{&sink_gen})); + + ASSERT_NE(sink, nullptr); + + // translate sink_gen (async) to sink_reader (sync) + std::shared_ptr sink_reader = compute::MakeGeneratorReader( + schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); + + // start the ExecPlan + ASSERT_OK(plan->StartProducing()); + + // collect sink_reader into a Table + ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); + + ASSERT_GT(collected->num_rows(), 0); + + // wait 1s for completion + ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; +} + static void MinimalEndToEndBench(benchmark::State& state) { size_t num_batches = state.range(0); size_t batch_size = state.range(1); - bool async_mode = state.range(2); for (auto _ : state) { - MinimalEndToEndScan(num_batches, batch_size, async_mode); + MinimalEndToEndScan(num_batches, batch_size); } state.SetItemsProcessed(state.iterations() * num_batches); state.SetBytesProcessed(state.iterations() * num_batches * batch_size * GetBytesForSchema()); } -static const std::vector kWorkload = {100, 1000, 10000, 100000}; +const std::function>(size_t, size_t)> + kScanFactory = [](size_t num_batches, size_t batch_size) { + RecordBatchVector batches = GetBatches(num_batches, batch_size); + std::shared_ptr dataset = + std::make_shared(GetSchema(), std::move(batches)); + + std::shared_ptr options = std::make_shared(); + // specify the filter + compute::Expression b_is_true = equal(field_ref("b"), literal(true)); + options->filter = b_is_true; + options->projection = call("make_struct", {field_ref("a"), field_ref("b")}, + compute::MakeStructOptions{{"a", "b"}}); + + return std::make_shared(std::move(dataset), std::move(options)); + }; + +const std::function>(size_t, size_t)> + kScanV2Factory = + [](size_t num_batches, + size_t batch_size) -> Result> { + RecordBatchVector batches = GetBatches(num_batches, batch_size); + std::shared_ptr sch = GetSchema(); + std::shared_ptr dataset = + std::make_shared(sch, std::move(batches)); + + std::shared_ptr options = std::make_shared(dataset); + // specify the filter + compute::Expression b_is_true = equal(field_ref("b"), literal(true)); + options->filter = b_is_true; + options->columns = ScanV2Options::AllColumns(*dataset); + + return options; +}; + +static constexpr int kScanIdx = 0; +static constexpr int kScanV2Idx = 1; + +static void ScanOnlyBench(benchmark::State& state) { + size_t num_batches = state.range(0); + size_t batch_size = state.range(1); + + std::function>(size_t, size_t)> + options_factory; + std::string scan_factory = "scan"; + if (state.range(2) == kScanIdx) { + options_factory = kScanFactory; + } else if (state.range(2) == kScanV2Idx) { + options_factory = kScanV2Factory; + scan_factory = "scan2"; + } + + for (auto _ : state) { + ScanOnly(num_batches, batch_size, scan_factory, options_factory); + } + state.SetItemsProcessed(state.iterations() * num_batches); + state.SetBytesProcessed(state.iterations() * num_batches * batch_size * + GetBytesForSchema()); +} static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark* b) { - for (const int32_t num_batches : kWorkload) { + for (const int32_t num_batches : {1000}) { + for (const int batch_size : {10, 100, 1000}) { + b->Args({num_batches, batch_size}); + RecordBatchVector batches = + ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size); + StoreBatches(num_batches, batch_size, batches); + } + } + b->ArgNames({"num_batches", "batch_size"}); + b->UseRealTime(); +} + +// FIXME - Combine these two customize blocks by moving the end-to-end to support +// options factories +static void ScanOnlyEndToEnd_Customize(benchmark::internal::Benchmark* b) { + for (const int32_t num_batches : {1000}) { for (const int batch_size : {10, 100, 1000}) { - for (const bool async_mode : {true, false}) { - b->Args({num_batches, batch_size, async_mode}); + for (const int scan_idx : {kScanIdx, kScanV2Idx}) { + b->Args({num_batches, batch_size, scan_idx}); RecordBatchVector batches = ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size); StoreBatches(num_batches, batch_size, batches); } } } - b->ArgNames({"num_batches", "batch_size", "async_mode"}); + b->ArgNames({"num_batches", "batch_size", "scan_alg"}); b->UseRealTime(); } BENCHMARK(MinimalEndToEndBench)->Apply(MinimalEndToEnd_Customize); +BENCHMARK(ScanOnlyBench)->Apply(ScanOnlyEndToEnd_Customize)->Iterations(100); } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 6cafe10f78a..65b60f2cfd0 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -28,6 +28,8 @@ #include "arrow/compute/cast.h" #include "arrow/compute/exec/exec_plan.h" #include "arrow/compute/exec/expression_internal.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/plan.h" #include "arrow/dataset/test_util.h" #include "arrow/record_batch.h" @@ -39,6 +41,7 @@ #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" #include "arrow/testing/util.h" +#include "arrow/util/byte_size.h" #include "arrow/util/range.h" #include "arrow/util/thread_pool.h" #include "arrow/util/vector.h" @@ -53,6 +56,751 @@ using internal::Iota; namespace dataset { +// The basic evolution strategy doesn't really need any info from the dataset +// or the fragment other than the schema so we just make a dummy dataset/fragment +// here. +std::unique_ptr MakeDatasetFromSchema(std::shared_ptr sch) { + return ::arrow::internal::make_unique(std::move(sch), + RecordBatchVector{}); +} + +std::unique_ptr MakeSomeFragment(std::shared_ptr sch) { + return ::arrow::internal::make_unique(std::move(sch), + RecordBatchVector{}); +} + +TEST(BasicEvolution, MissingColumn) { + std::unique_ptr strategy = + MakeBasicDatasetEvolutionStrategy(); + + std::shared_ptr dataset_schema = + schema({field("A", int32()), field("B", int16()), field("C", int64())}); + std::unique_ptr dataset = MakeDatasetFromSchema(dataset_schema); + std::unique_ptr fragment = MakeSomeFragment(std::move(dataset_schema)); + + InspectedFragment inspected{{"A", "B"}}; + std::unique_ptr fragment_strategy = + strategy->GetStrategy(*dataset, *fragment, inspected); + + compute::Expression filter = equal(field_ref("C"), literal(INT64_C(7))); + // If, after simplification, a filter somehow still references a missing field + // then it is an error. + ASSERT_RAISES(Invalid, fragment_strategy->DevolveFilter(filter)); + std::vector selection{FieldPath({0}), FieldPath({2})}; + // Basic strategy should provide is_null guarantee for missing fields + compute::Expression expected_guarantee = is_null(field_ref(2)); + ASSERT_OK_AND_ASSIGN(compute::Expression guarantee, + fragment_strategy->GetGuarantee(selection)); + ASSERT_EQ(expected_guarantee, guarantee); + + // Basic strategy should drop missing fields from selection + ASSERT_OK_AND_ASSIGN(std::vector devolved_selection, + fragment_strategy->DevolveSelection(selection)); + ASSERT_EQ(1, devolved_selection.size()); + ASSERT_EQ(FieldPath({0}), devolved_selection[0].path); + ASSERT_EQ(*int32(), *devolved_selection[0].requested_type); + + // Basic strategy should append null column to batches for missing column + std::shared_ptr devolved_batch = + RecordBatchFromJSON(schema({field("A", int32())}), R"([[1], [2], [3]])"); + ASSERT_OK_AND_ASSIGN( + compute::ExecBatch evolved_batch, + fragment_strategy->EvolveBatch(devolved_batch, selection, devolved_selection)); + ASSERT_EQ(2, evolved_batch.values.size()); + AssertArraysEqual(*devolved_batch->column(0), *evolved_batch[0].make_array()); + ASSERT_EQ(*MakeNullScalar(int64()), *evolved_batch.values[1].scalar()); +} + +TEST(BasicEvolution, ReorderedColumns) { + std::unique_ptr strategy = + MakeBasicDatasetEvolutionStrategy(); + + std::shared_ptr dataset_schema = + schema({field("A", int32()), field("B", int16()), field("C", int64())}); + std::unique_ptr dataset = MakeDatasetFromSchema(dataset_schema); + std::unique_ptr fragment = MakeSomeFragment(std::move(dataset_schema)); + + InspectedFragment inspected{{"C", "B", "A"}}; + std::unique_ptr fragment_strategy = + strategy->GetStrategy(*dataset, *fragment, inspected); + + compute::Expression filter = equal(field_ref("C"), literal(INT64_C(7))); + compute::Expression fragment_filter = equal(field_ref(0), literal(INT64_C(7))); + // Devolved filter should have updated indices + ASSERT_OK_AND_ASSIGN(compute::Expression devolved, + fragment_strategy->DevolveFilter(filter)); + ASSERT_EQ(fragment_filter, devolved); + std::vector selection{FieldPath({0}), FieldPath({2})}; + // No guarantees if simply reordering + compute::Expression expected_guarantee = literal(true); + ASSERT_OK_AND_ASSIGN(compute::Expression guarantee, + fragment_strategy->GetGuarantee(selection)); + ASSERT_EQ(expected_guarantee, guarantee); + + // Devolved selection should have correct indices + ASSERT_OK_AND_ASSIGN(std::vector devolved_selection, + fragment_strategy->DevolveSelection(selection)); + ASSERT_EQ(2, devolved_selection.size()); + ASSERT_EQ(FieldPath({2}), devolved_selection[0].path); + ASSERT_EQ(FieldPath({0}), devolved_selection[1].path); + ASSERT_EQ(*int32(), *devolved_selection[0].requested_type); + ASSERT_EQ(*int64(), *devolved_selection[1].requested_type); + + // Basic strategy should append null column to batches for missing column + std::shared_ptr devolved_batch = RecordBatchFromJSON( + schema({field("C", int64()), field("A", int32())}), R"([[1,4], [2,5], [3,6]])"); + ASSERT_OK_AND_ASSIGN( + compute::ExecBatch evolved_batch, + fragment_strategy->EvolveBatch(devolved_batch, selection, devolved_selection)); + ASSERT_EQ(2, evolved_batch.values.size()); + AssertArraysEqual(*devolved_batch->column(0), *evolved_batch[0].make_array()); + AssertArraysEqual(*devolved_batch->column(1), *evolved_batch[1].make_array()); +} + +struct MockScanTask { + explicit MockScanTask(std::shared_ptr batch) : batch(std::move(batch)) {} + + std::shared_ptr batch; + Future> batch_future = + Future>::Make(); +}; + +struct MockFragmentScanner : public FragmentScanner { + explicit MockFragmentScanner(std::vector scan_tasks) + : scan_tasks_(std::move(scan_tasks)), has_started_(scan_tasks_.size(), false) {} + + // ### FragmentScanner API ### + Future> ScanBatch(int batch_number) override { + has_started_[batch_number] = true; + return scan_tasks_[batch_number].batch_future; + } + int64_t EstimatedDataBytes(int batch_number) override { + return util::TotalBufferSize(*scan_tasks_[batch_number].batch); + } + int NumBatches() override { return static_cast(scan_tasks_.size()); } + + // ### Unit Test API ### + void DeliverBatches(bool slow, const std::vector& to_deliver) { + for (MockScanTask task : to_deliver) { + if (slow) { + std::ignore = SleepABitAsync().Then( + [task]() mutable { task.batch_future.MarkFinished(task.batch); }); + } else { + task.batch_future.MarkFinished(task.batch); + } + } + } + + void DeliverBatchesInOrder(bool slow) { DeliverBatches(slow, scan_tasks_); } + + void DeliverBatchesRandomly(bool slow, std::default_random_engine* gen) { + std::vector shuffled_tasks(scan_tasks_); + std::shuffle(shuffled_tasks.begin(), shuffled_tasks.end(), *gen); + DeliverBatches(slow, shuffled_tasks); + } + + bool HasStarted(int batch_number) { return has_started_[batch_number]; } + bool HasDelivered(int batch_number) { + return scan_tasks_[batch_number].batch_future.is_finished(); + } + + std::vector scan_tasks_; + std::vector has_started_; +}; + +struct MockFragment : public Fragment { + // ### Fragment API ### + + MockFragment(std::shared_ptr fragment_schema, + std::vector scan_tasks, + std::shared_ptr inspected, + compute::Expression partition_expression) + : Fragment(std::move(partition_expression), std::move(fragment_schema)), + fragment_scanner_(std::make_shared(std::move(scan_tasks))), + inspected_(std::move(inspected)) {} + + Result ScanBatchesAsync( + const std::shared_ptr& options) override { + return Status::Invalid("Not implemented because not needed by unit tests"); + }; + + Future> InspectFragment() override { + has_inspected_ = true; + return inspected_future_; + } + + Future> BeginScan( + const FragmentScanRequest& request, + const InspectedFragment& inspected_fragment) override { + has_started_ = true; + seen_request_ = request; + return fragment_scanner_future_; + } + + Future> CountRows( + compute::Expression predicate, + const std::shared_ptr& options) override { + return Status::Invalid("Not implemented because not needed by unit tests"); + } + + std::string type_name() const override { return "mock"; } + + Result> ReadPhysicalSchemaImpl() override { + return physical_schema_; + }; + + // ### Unit Test API ### + + void FinishInspection() { inspected_future_.MarkFinished(inspected_); } + void FinishScanBegin() { fragment_scanner_future_.MarkFinished(fragment_scanner_); } + + Future<> DeliverInit(bool slow) { + if (slow) { + return SleepABitAsync().Then([this] { + FinishInspection(); + return SleepABitAsync().Then([this] { FinishScanBegin(); }); + }); + } else { + FinishInspection(); + FinishScanBegin(); + return Future<>::MakeFinished(); + } + } + + void DeliverBatchesInOrder(bool slow) { + std::ignore = DeliverInit(slow).Then( + [this, slow] { fragment_scanner_->DeliverBatchesInOrder(slow); }); + } + + Future<> DeliverBatchesRandomly(bool slow, std::default_random_engine* gen) { + return DeliverInit(slow).Then( + [this, slow, gen] { fragment_scanner_->DeliverBatchesRandomly(slow, gen); }); + } + + bool has_inspected() { return has_inspected_; } + bool has_started() { return has_started_; } + bool HasBatchStarted(int batch_index) { + return fragment_scanner_->HasStarted(batch_index); + } + bool HasBatchDelivered(int batch_index) { + return fragment_scanner_->HasDelivered(batch_index); + } + + std::shared_ptr fragment_scanner_; + Future> fragment_scanner_future_ = + Future>::Make(); + std::shared_ptr inspected_; + Future> inspected_future_ = + Future>::Make(); + bool has_inspected_ = false; + bool has_started_ = false; + FragmentScanRequest seen_request_; +}; + +FragmentVector AsFragmentVector( + const std::vector>& fragments) { + FragmentVector frag_vec; + frag_vec.insert(frag_vec.end(), fragments.begin(), fragments.end()); + return frag_vec; +} + +struct MockDataset : public FragmentDataset { + MockDataset(std::shared_ptr dataset_schema, + std::vector> fragments) + : FragmentDataset(std::move(dataset_schema), AsFragmentVector(fragments)), + fragments_(std::move(fragments)) {} + + // ### Dataset API ### + std::string type_name() const override { return "mock"; } + + Result> ReplaceSchema( + std::shared_ptr schema) const override { + return Status::Invalid("Not needed for unit test"); + } + + Result GetFragmentsImpl(compute::Expression predicate) override { + has_started_ = true; + return FragmentDataset::GetFragmentsImpl(std::move(predicate)); + } + + // ### Unit Test API ### + void DeliverBatchesInOrder(bool slow) { + for (const auto& fragment : fragments_) { + fragment->DeliverBatchesInOrder(slow); + } + } + + void DeliverBatchesRandomly(bool slow) { + const auto seed = ::arrow::internal::GetRandomSeed(); + std::default_random_engine gen( + static_cast(seed)); + + std::vector> fragments_shuffled(fragments_); + std::shuffle(fragments_shuffled.begin(), fragments_shuffled.end(), gen); + std::vector> deliver_futures; + for (const auto& fragment : fragments_shuffled) { + deliver_futures.push_back(fragment->DeliverBatchesRandomly(slow, &gen)); + } + // Need to wait for fragments to finish init so gen stays valid + AllComplete(deliver_futures).Wait(); + } + + bool has_started() { return has_started_; } + bool HasStartedFragment(int fragment_index) { + return fragments_[fragment_index]->has_started(); + } + bool HasStartedBatch(int fragment_index, int batch_index) { + return fragments_[fragment_index]->HasBatchStarted(batch_index); + } + + bool has_started_ = false; + std::vector> fragments_; +}; + +struct MockDatasetBuilder { + explicit MockDatasetBuilder(std::shared_ptr dataset_schema) + : dataset_schema(std::move(dataset_schema)) {} + + void AddFragment( + std::shared_ptr fragment_schema, + std::unique_ptr inspection = nullptr, + compute::Expression partition_expression = Fragment::kNoPartitionInformation) { + if (!inspection) { + inspection = std::make_unique(fragment_schema->field_names()); + } + fragments.push_back(std::make_shared( + std::move(fragment_schema), std::vector(), std::move(inspection), + std::move(partition_expression))); + active_fragment = fragments[fragments.size() - 1]->fragment_scanner_.get(); + } + + void AddBatch(std::shared_ptr batch) { + active_fragment->scan_tasks_.emplace_back(std::move(batch)); + active_fragment->has_started_.push_back(false); + } + + std::unique_ptr Finish() { + return arrow::internal::make_unique(std::move(dataset_schema), + std::move(fragments)); + } + + std::shared_ptr dataset_schema; + std::vector> fragments; + MockFragmentScanner* active_fragment = nullptr; +}; + +template ::value>::type> +std::shared_ptr ArrayFromRange(int start, int end, bool add_nulls) { + using ArrowBuilderType = typename arrow::TypeTraits::BuilderType; + ArrowBuilderType builder; + ARROW_EXPECT_OK(builder.Reserve(end - start)); + for (int val = start; val < end; val++) { + if (add_nulls && val % 2 == 0) { + builder.UnsafeAppendNull(); + } else { + builder.UnsafeAppend(val); + } + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr range_arr, builder.Finish()); + return range_arr; +} + +struct ScannerTestParams { + bool slow; + int num_fragments; + int num_batches; + + std::string ToString() const { + std::stringstream ss; + ss << (slow ? "slow" : "fast") << num_fragments << "f" << num_batches << "b"; + return ss.str(); + } + + static std::string ToTestNameString( + const ::testing::TestParamInfo& info) { + return info.param.ToString(); + } + + static std::vector Values() { + std::vector values; + for (bool slow : {false, true}) { + values.push_back({slow, 1, 128}); + values.push_back({slow, 16, 128}); + } + return values; + } +}; + +constexpr int kRowsPerTestBatch = 1024; + +std::shared_ptr ScannerTestSchema() { + return schema({field("row_num", int32()), field("filterable", int16()), + field("nested", struct_({field("x", int32()), field("y", int32())}))}); +} + +std::shared_ptr MakeTestBatch(int idx) { + ArrayVector arrays; + // Row number + arrays.push_back(ArrayFromRange(idx * kRowsPerTestBatch, + (idx + 1) * kRowsPerTestBatch, + /*add_nulls=*/false)); + // Filterable + arrays.push_back(ArrayFromRange(0, kRowsPerTestBatch, + /*add_nulls=*/true)); + // Nested + std::shared_ptr x_vals = + ArrayFromRange(0, kRowsPerTestBatch, /*add_nulls=*/false); + std::shared_ptr y_vals = + ArrayFromRange(0, kRowsPerTestBatch, /*add_nulls=*/true); + EXPECT_OK_AND_ASSIGN(std::shared_ptr nested_arr, + StructArray::Make({std::move(x_vals), std::move(y_vals)}, + {field("x", int32()), field("y", int32())})); + arrays.push_back(std::move(nested_arr)); + return RecordBatch::Make(ScannerTestSchema(), kRowsPerTestBatch, std::move(arrays)); +} + +std::unique_ptr MakeTestDataset(int num_fragments, int batches_per_fragment, + bool empty = false) { + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder dataset_builder(test_schema); + for (int i = 0; i < num_fragments; i++) { + dataset_builder.AddFragment( + test_schema, + ::arrow::internal::make_unique(test_schema->field_names()), + Fragment::kNoPartitionInformation); + for (int j = 0; j < batches_per_fragment; j++) { + if (empty) { + dataset_builder.AddBatch( + RecordBatch::Make(schema({}), kRowsPerTestBatch, ArrayVector{})); + } else { + dataset_builder.AddBatch(MakeTestBatch(i * batches_per_fragment + j)); + } + } + } + return dataset_builder.Finish(); +} + +class TestScannerBase : public ::testing::TestWithParam { + protected: + TestScannerBase() { internal::Initialize(); } + + std::shared_ptr MakeExpectedBatch() { + RecordBatchVector batches; + for (int frag_idx = 0; frag_idx < GetParam().num_fragments; frag_idx++) { + for (int batch_idx = 0; batch_idx < GetParam().num_batches; batch_idx++) { + batches.push_back(MakeTestBatch(batch_idx + (frag_idx * GetParam().num_batches))); + } + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr
table, + Table::FromRecordBatches(std::move(batches))); + EXPECT_OK_AND_ASSIGN(std::shared_ptr as_one_batch, + table->CombineChunksToBatch()); + return as_one_batch; + } + + compute::Declaration MakeScanNode(std::shared_ptr dataset) { + ScanV2Options options(dataset); + options.columns = ScanV2Options::AllColumns(*dataset); + return compute::Declaration("scan2", options); + } + + RecordBatchVector RunNode(compute::Declaration scan_decl, bool ordered, + MockDataset* mock_dataset) { + Future batches_fut = + compute::DeclarationToBatchesAsync(std::move(scan_decl)); + if (ordered) { + mock_dataset->DeliverBatchesInOrder(GetParam().slow); + } else { + mock_dataset->DeliverBatchesRandomly(GetParam().slow); + } + EXPECT_FINISHES_OK_AND_ASSIGN(RecordBatchVector record_batches, batches_fut); + return record_batches; + } + + void CheckScannedBatches(RecordBatchVector batches) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr
batches_as_table, + Table::FromRecordBatches(std::move(batches))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr combined_data, + batches_as_table->CombineChunksToBatch()); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr sort_indices, + compute::SortIndices(combined_data->column(0), compute::SortOptions{})); + ASSERT_OK_AND_ASSIGN(Datum sorted_data, compute::Take(combined_data, sort_indices)); + + std::shared_ptr expected_data = MakeExpectedBatch(); + AssertBatchesEqual(*expected_data, *sorted_data.record_batch()); + } + + void CheckScanner(bool ordered) { + std::shared_ptr mock_dataset = + MakeTestDataset(GetParam().num_fragments, GetParam().num_batches); + compute::Declaration scan_decl = MakeScanNode(mock_dataset); + RecordBatchVector scanned_batches = RunNode(scan_decl, ordered, mock_dataset.get()); + CheckScannedBatches(std::move(scanned_batches)); + } +}; + +TEST_P(TestScannerBase, ScanOrdered) { CheckScanner(true); } +TEST_P(TestScannerBase, ScanUnordered) { CheckScanner(false); } + +// FIXME: Add test for scanning no columns + +INSTANTIATE_TEST_SUITE_P(BasicNewScannerTests, TestScannerBase, + ::testing::ValuesIn(ScannerTestParams::Values()), + [](const ::testing::TestParamInfo& info) { + return std::to_string(info.index) + info.param.ToString(); + }); + +void CheckScannerBackpressure(std::shared_ptr dataset, ScanV2Options options, + int maxConcurrentFragments, int maxConcurrentBatches, + ::arrow::internal::ThreadPool* thread_pool) { + // Start scanning + compute::Declaration scan_decl = compute::Declaration("scan2", std::move(options)); + Future batches_fut = + compute::DeclarationToBatchesAsync(std::move(scan_decl)); + + auto get_num_inspected = [&] { + int num_inspected = 0; + for (const auto& frag : dataset->fragments_) { + if (frag->has_inspected()) { + num_inspected++; + } + } + return num_inspected; + }; + BusyWait(10, [&] { + return get_num_inspected() == static_cast(maxConcurrentFragments); + }); + SleepABit(); + ASSERT_EQ(get_num_inspected(), static_cast(maxConcurrentFragments)); + + int total_batches = 0; + for (const auto& frag : dataset->fragments_) { + total_batches += frag->fragment_scanner_->NumBatches(); + frag->FinishInspection(); + frag->FinishScanBegin(); + } + + int batches_scanned = 0; + while (batches_scanned < total_batches) { + MockScanTask* next_task_to_deliver = nullptr; + thread_pool->WaitForIdle(); + int batches_started = 0; + for (const auto& frag : dataset->fragments_) { + for (int i = 0; i < frag->fragment_scanner_->NumBatches(); i++) { + if (frag->HasBatchStarted(i)) { + batches_started++; + if (next_task_to_deliver == nullptr && !frag->HasBatchDelivered(i)) { + next_task_to_deliver = &frag->fragment_scanner_->scan_tasks_[i]; + } + } + } + } + ASSERT_LE(batches_started - batches_scanned, maxConcurrentBatches) + << " too many scan tasks were allowed to run"; + ASSERT_NE(next_task_to_deliver, nullptr); + next_task_to_deliver->batch_future.MarkFinished(next_task_to_deliver->batch); + batches_scanned++; + } +} + +TEST(TestNewScanner, Backpressure) { + constexpr int kNumFragments = 4; + constexpr int kNumBatchesPerFragment = 4; + internal::Initialize(); + std::shared_ptr test_dataset = + MakeTestDataset(kNumFragments, kNumBatchesPerFragment); + + ScanV2Options options(test_dataset); + + // No readahead + options.dataset = test_dataset; + options.columns = ScanV2Options::AllColumns(*test_dataset); + options.fragment_readahead = 0; + options.target_bytes_readahead = 0; + CheckScannerBackpressure(test_dataset, options, 1, 1, + ::arrow::internal::GetCpuThreadPool()); + + // Some readahead + test_dataset = MakeTestDataset(kNumFragments, kNumBatchesPerFragment); + options = ScanV2Options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + options.fragment_readahead = 4; + // each batch should be 14Ki so 50Ki readahead should yield 3-at-a-time + options.target_bytes_readahead = 50 * kRowsPerTestBatch; + CheckScannerBackpressure(test_dataset, options, 4, 3, + ::arrow::internal::GetCpuThreadPool()); +} + +TEST(TestNewScanner, NestedRead) { + // This tests the case where the file format does not support + // handling nested reads (e.g. JSON) and so the scanner must + // drop the extra data + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + builder.AddFragment(test_schema); + std::shared_ptr batch = MakeTestBatch(0); + ASSERT_OK_AND_ASSIGN(std::shared_ptr nested_col, FieldPath({2, 0}).Get(*batch)); + std::shared_ptr one_column = RecordBatch::Make( + schema({field("x", int32())}), batch->num_rows(), ArrayVector{nested_col}); + builder.AddBatch(std::move(one_column)); + std::shared_ptr test_dataset = builder.Finish(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + // nested.x + options.columns = {FieldPath({2, 0})}; + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(1, batches.size()); + for (const auto& batch : batches) { + ASSERT_EQ("x", batch->schema()->field(0)->name()); + ASSERT_EQ(*int32(), *batch->schema()->field(0)->type()); + ASSERT_EQ(*int32(), *batch->column(0)->type()); + } + const FragmentScanRequest& seen_request = test_dataset->fragments_[0]->seen_request_; + ASSERT_EQ(1, seen_request.columns.size()); + ASSERT_EQ(FieldPath({2, 0}), seen_request.columns[0].path); + ASSERT_EQ(*int32(), *seen_request.columns[0].requested_type); + ASSERT_EQ(0, seen_request.columns[0].selection_index); +} + +std::shared_ptr MakePartitionSkipDataset() { + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + builder.AddFragment(test_schema, /*inspection=*/nullptr, + greater(field_ref("filterable"), literal(50))); + builder.AddBatch(MakeTestBatch(0)); + builder.AddFragment(test_schema, /*inspection=*/nullptr, + less_equal(field_ref("filterable"), literal(50))); + builder.AddBatch(MakeTestBatch(1)); + return builder.Finish(); +} + +TEST(TestNewScanner, PartitionSkip) { + internal::Initialize(); + std::shared_ptr test_dataset = MakePartitionSkipDataset(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + options.filter = greater(field_ref("filterable"), literal(75)); + + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(1, batches.size()); + AssertBatchesEqual(*MakeTestBatch(0), *batches[0]); + + test_dataset = MakePartitionSkipDataset(); + test_dataset->DeliverBatchesInOrder(false); + options = ScanV2Options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + options.filter = less(field_ref("filterable"), literal(25)); + + ASSERT_OK_AND_ASSIGN(batches, compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(1, batches.size()); + AssertBatchesEqual(*MakeTestBatch(1), *batches[0]); +} + +TEST(TestNewScanner, NoFragments) { + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + std::shared_ptr test_dataset = builder.Finish(); + + ScanV2Options options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(0, batches.size()); +} + +TEST(TestNewScanner, EmptyFragment) { + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + builder.AddFragment(test_schema); + std::shared_ptr test_dataset = builder.Finish(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(0, batches.size()); +} + +TEST(TestNewScanner, EmptyBatch) { + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + builder.AddFragment(test_schema); + ASSERT_OK_AND_ASSIGN(std::shared_ptr empty_batch, + RecordBatch::MakeEmpty(test_schema)); + builder.AddBatch(std::move(empty_batch)); + std::shared_ptr test_dataset = builder.Finish(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + options.columns = ScanV2Options::AllColumns(*test_dataset); + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + ASSERT_EQ(0, batches.size()); +} + +TEST(TestNewScanner, NoColumns) { + constexpr int kNumFragments = 4; + constexpr int kNumBatchesPerFragment = 4; + internal::Initialize(); + std::shared_ptr test_dataset = + MakeTestDataset(kNumFragments, kNumBatchesPerFragment, /*empty=*/true); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + ASSERT_OK_AND_ASSIGN(std::vector batches, + compute::DeclarationToExecBatches({"scan2", options})); + ASSERT_EQ(16, batches.size()); + for (const auto& batch : batches) { + ASSERT_EQ(0, batch.values.size()); + ASSERT_EQ(kRowsPerTestBatch, batch.length); + } +} + +TEST(TestNewScanner, MissingColumn) { + internal::Initialize(); + std::shared_ptr test_schema = ScannerTestSchema(); + MockDatasetBuilder builder(test_schema); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr missing_schema, + test_schema->RemoveField(2)); + builder.AddFragment(missing_schema); + std::shared_ptr batch = MakeTestBatch(0); + // Remove column 2 because we are pretending it doesn't exist + // in the fragment + ASSERT_OK_AND_ASSIGN(batch, batch->RemoveColumn(2)); + // Remove column 1 because we aren't going to ask for it + ASSERT_OK_AND_ASSIGN(batch, batch->RemoveColumn(1)); + builder.AddBatch(batch); + + std::shared_ptr test_dataset = builder.Finish(); + test_dataset->DeliverBatchesInOrder(false); + + ScanV2Options options(test_dataset); + options.columns = {FieldPath({0}), FieldPath({2})}; + + ASSERT_OK_AND_ASSIGN(std::vector> batches, + compute::DeclarationToBatches({"scan2", options})); + + ASSERT_EQ(1, batches.size()); + AssertArraysEqual(*batch->column(0), *batches[0]->column(0)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr expected_nulls, + MakeArrayOfNull(test_schema->field(2)->type(), kRowsPerTestBatch)); + AssertArraysEqual(*expected_nulls, *batches[0]->column(1)); +} + struct TestScannerParams { bool use_threads; int num_child_datasets; @@ -98,8 +846,7 @@ class TestScanner : public DatasetFixtureMixinWithParam { } std::shared_ptr MakeScanner(std::shared_ptr batch) { - std::vector> batches{ - static_cast(GetParam().num_batches), batch}; + RecordBatchVector batches{static_cast(GetParam().num_batches), batch}; DatasetVector children{static_cast(GetParam().num_child_datasets), std::make_shared(batch->schema(), batches)}; @@ -333,7 +1080,7 @@ TEST_P(TestScanner, MaterializeMissingColumn) { TEST_P(TestScanner, ToTable) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_); - std::vector> batches{ + RecordBatchVector batches{ static_cast(GetParam().num_batches * GetParam().num_child_datasets), batch}; @@ -452,7 +1199,7 @@ TEST_P(TestScanner, EmptyFragment) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_); auto empty_batch = ConstantArrayGenerator::Zeroes(0, schema_); - std::vector> batches{ + RecordBatchVector batches{ static_cast(GetParam().num_batches * GetParam().num_child_datasets), batch}; @@ -581,7 +1328,7 @@ TEST_P(TestScanner, CountRowsWithMetadata) { TEST_P(TestScanner, ToRecordBatchReader) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_); - std::vector> batches{ + RecordBatchVector batches{ static_cast(GetParam().num_batches * GetParam().num_child_datasets), batch}; diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index f12eca607de..52a1ea2fa7f 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -1092,6 +1092,17 @@ Result> FieldPath::Get(const FieldVector& fields) const { return FieldPathGetImpl::Get(this, fields); } +Result> FieldPath::GetAll(const Schema& schm, + const std::vector& paths) { + std::vector> fields; + fields.reserve(paths.size()); + for (const auto& path : paths) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr field, path.Get(schm)); + fields.push_back(std::move(field)); + } + return schema(std::move(fields)); +} + Result> FieldPath::Get(const RecordBatch& batch) const { ARROW_ASSIGN_OR_RAISE(auto data, FieldPathGetImpl::Get(this, batch.column_data())); return MakeArray(std::move(data)); diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 2bdfdec84c4..beb023c8a81 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -1626,6 +1626,9 @@ class ARROW_EXPORT FieldPath { Result> Get(const DataType& type) const; Result> Get(const FieldVector& fields) const; + static Result> GetAll(const Schema& schema, + const std::vector& paths); + /// \brief Retrieve the referenced column from a RecordBatch or Table Result> Get(const RecordBatch& batch) const; diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 68b7e30e3e5..a1341417665 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -32,7 +32,8 @@ namespace util { class ThrottleImpl : public AsyncTaskScheduler::Throttle { public: - explicit ThrottleImpl(int max_concurrent_cost) : available_cost_(max_concurrent_cost) {} + explicit ThrottleImpl(int max_concurrent_cost) + : max_concurrent_cost_(max_concurrent_cost), available_cost_(max_concurrent_cost) {} std::optional> TryAcquire(int amt) override { std::lock_guard lk(mutex_); @@ -61,8 +62,11 @@ class ThrottleImpl : public AsyncTaskScheduler::Throttle { } } + int Capacity() override { return max_concurrent_cost_; } + private: std::mutex mutex_; + int max_concurrent_cost_; int available_cost_; Future<> backoff_; }; @@ -151,7 +155,8 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { queue_->Push(std::move(task)); return true; } - std::optional> maybe_backoff = throttle_->TryAcquire(task->cost()); + int latched_cost = std::min(task->cost(), throttle_->Capacity()); + std::optional> maybe_backoff = throttle_->TryAcquire(latched_cost); if (maybe_backoff) { queue_->Push(std::move(task)); lk.unlock(); @@ -237,7 +242,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { private: void ContinueTasksUnlocked(std::unique_lock&& lk) { while (!queue_->Empty()) { - int next_cost = queue_->Peek().cost(); + int next_cost = std::min(queue_->Peek().cost(), throttle_->Capacity()); std::optional> maybe_backoff = throttle_->TryAcquire(next_cost); if (maybe_backoff) { lk.unlock(); @@ -266,6 +271,9 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { void DoSubmitTask(std::unique_ptr task) { int cost = task->cost(); + if (throttle_) { + cost = std::min(cost, throttle_->Capacity()); + } Result> submit_result = (*task)(this); if (!submit_result.ok()) { global_abort_->store(true); @@ -274,7 +282,9 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { AbortUnlocked(submit_result.status(), std::move(lk)); return; } - submit_result->AddCallback([this, cost](const Status& st) { + // FIXME(C++17, move into lambda?) + std::shared_ptr task_holder = std::move(task); + submit_result->AddCallback([this, cost, task_holder](const Status& st) { std::unique_lock lk(mutex_); if (!st.ok()) { running_tasks_--; diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index 9e47bf77b78..e636eb805de 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -17,10 +17,13 @@ #pragma once +#include + #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/functional.h" #include "arrow/util/future.h" +#include "arrow/util/iterator.h" #include "arrow/util/mutex.h" #include @@ -147,6 +150,13 @@ class ARROW_EXPORT AsyncTaskScheduler { /// This will possibly complete waiting futures and should probably not be /// called while holding locks. virtual void Release(int amt) = 0; + + /// The size of the largest task that can run + /// + /// Incoming tasks will have their cost latched to this value to ensure + /// they can still run (although they will generally be the only thing allowed to + /// run at that time). + virtual int Capacity() = 0; }; /// Create a throttle /// @@ -176,6 +186,64 @@ class ARROW_EXPORT AsyncTaskScheduler { /// \return true if the task was submitted or queued, false if the task was ignored virtual bool AddTask(std::unique_ptr task) = 0; + /// Adds an async generator to the scheduler + /// + /// The async generator will be visited, one item at a time. Submitting a task + /// will consist of polling the generator for the next future. The generator's future + /// will then represent the task itself. + /// + /// This visits the task serially without readahead. If readahead or parallelism + /// is desired then it should be added in the generator itself. + /// + /// The tasks will be submitted to a subscheduler which will be ended when the generator + /// is exhausted. + /// + /// The generator itself will be kept alive until all tasks have been completed. + /// However, if the scheduler is aborted, the generator will be destroyed as soon as the + /// next item would be requested. + template + bool AddAsyncGenerator(std::function()> generator, + std::function visitor, + FnOnce finish_callback) { + AsyncTaskScheduler* generator_scheduler = + MakeSubScheduler(std::move(finish_callback)); + struct State { + State(std::function()> generator, std::function visitor) + : generator(std::move(generator)), visitor(std::move(visitor)) {} + std::function()> generator; + std::function visitor; + }; + std::unique_ptr state_holder = + std::make_unique(std::move(generator), std::move(visitor)); + struct SubmitTask : public Task { + explicit SubmitTask(std::unique_ptr state_holder) + : state_holder(std::move(state_holder)) {} + struct SubmitTaskCallback { + SubmitTaskCallback(AsyncTaskScheduler* scheduler, + std::unique_ptr state_holder) + : scheduler(scheduler), state_holder(std::move(state_holder)) {} + Status operator()(const T& item) { + if (IsIterationEnd(item)) { + scheduler->End(); + return Status::OK(); + } + ARROW_RETURN_NOT_OK(state_holder->visitor(item)); + scheduler->AddTask(std::make_unique(std::move(state_holder))); + return Status::OK(); + } + AsyncTaskScheduler* scheduler; + std::unique_ptr state_holder; + }; + Result> operator()(AsyncTaskScheduler* scheduler) { + Future next = state_holder->generator(); + return next.Then(SubmitTaskCallback(scheduler, std::move(state_holder))); + } + std::unique_ptr state_holder; + }; + return generator_scheduler->AddTask( + std::make_unique(std::move(state_holder))); + } + template struct SimpleTask : public Task { explicit SimpleTask(Callable callable) : callable(std::move(callable)) {} diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index be1b7c4a636..ea03b5431df 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -29,9 +29,12 @@ #include #include "arrow/result.h" +#include "arrow/testing/async_test_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/async_generator.h" #include "arrow/util/future.h" +#include "arrow/util/test_common.h" namespace arrow { namespace util { @@ -123,6 +126,27 @@ TEST(AsyncTaskScheduler, Abandoned) { ASSERT_FALSE(pending_task_submitted); } +TEST(AsyncTaskScheduler, TaskStaysAliveUntilFinished) { + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + Future<> task = Future<>::Make(); + bool my_task_destroyed = false; + struct MyTask : public AsyncTaskScheduler::Task { + MyTask(bool* my_task_destroyed_ptr, Future<> task_fut) + : my_task_destroyed_ptr(my_task_destroyed_ptr), task_fut(std::move(task_fut)) {} + ~MyTask() { *my_task_destroyed_ptr = true; } + Result> operator()(AsyncTaskScheduler*) { return task_fut; } + bool* my_task_destroyed_ptr; + Future<> task_fut; + }; + scheduler->AddTask(std::make_unique(&my_task_destroyed, task)); + SleepABit(); + ASSERT_FALSE(my_task_destroyed); + task.MarkFinished(); + ASSERT_TRUE(my_task_destroyed); + scheduler->End(); + ASSERT_FINISHES_OK(scheduler->OnFinished()); +} + TEST(AsyncTaskScheduler, TaskFailsAfterEnd) { std::unique_ptr scheduler = AsyncTaskScheduler::Make(); Future<> task = Future<>::Make(); @@ -198,6 +222,28 @@ TEST(AsyncTaskScheduler, SubSchedulerNoTasks) { ASSERT_FINISHES_OK(parent->OnFinished()); } +TEST(AsyncTaskScheduler, AsyncGenerator) { + for (bool slow : {false, true}) { + ARROW_SCOPED_TRACE("Slow: ", slow); + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + std::vector values{1, 2, 3}; + std::vector seen_values{}; + AsyncGenerator generator = MakeVectorGenerator(values); + if (slow) { + generator = util::SlowdownABit(generator); + } + std::function visitor = [&](const TestInt& val) { + seen_values.push_back(val); + return Status::OK(); + }; + scheduler->AddAsyncGenerator(std::move(generator), std::move(visitor), + EmptyFinishCallback()); + scheduler->End(); + ASSERT_FINISHES_OK(scheduler->OnFinished()); + ASSERT_EQ(seen_values, values); + } +} // namespace util + class CustomThrottle : public AsyncTaskScheduler::Throttle { public: virtual std::optional> TryAcquire(int amt) { @@ -209,6 +255,7 @@ class CustomThrottle : public AsyncTaskScheduler::Throttle { } virtual void Release(int amt) {} void Unlock() { gate_.MarkFinished(); } + int Capacity() { return std::numeric_limits::max(); } private: Future<> gate_ = Future<>::Make(); @@ -248,6 +295,47 @@ TEST(AsyncTaskScheduler, EndWaitsForAddedButNotSubmittedTasks) { ASSERT_TRUE(was_run); } +TEST(AsyncTaskScheduler, TaskWithCostBiggerThanThrottle) { + // It can be difficult to know the maximum cost a task may have. In + // scanning this is the maximum size of a batch stored on disk which we + // cannot know ahead of time. So a task may have a cost greater than the + // size of the throttle. In that case we simply drop the cost to the + // capacity of the throttle. + constexpr int kThrottleCapacity = 5; + std::unique_ptr throttle = + AsyncTaskScheduler::MakeThrottle(kThrottleCapacity); + std::unique_ptr task_group = + AsyncTaskScheduler::Make(throttle.get()); + bool task_submitted = false; + Future<> task = Future<>::Make(); + + struct ExpensiveTask : AsyncTaskScheduler::Task { + ExpensiveTask(bool* task_submitted, Future<> task) + : task_submitted(task_submitted), task(std::move(task)) {} + Result> operator()(AsyncTaskScheduler*) override { + *task_submitted = true; + return task; + } + int cost() const override { return kThrottleCapacity * 50; } + bool* task_submitted; + Future<> task; + }; + + Future<> blocking_task = Future<>::Make(); + task_group->AddSimpleTask([&] { return blocking_task; }); + task_group->AddTask(std::make_unique(&task_submitted, task)); + task_group->End(); + + // Task should not be submitted initially because blocking_task (even though + // it has a cost of 1) is preventing it. + ASSERT_FALSE(task_submitted); + blocking_task.MarkFinished(); + // One blocking_task is out of the way the task is free to run + ASSERT_TRUE(task_submitted); + task.MarkFinished(); + ASSERT_FINISHES_OK(task_group->OnFinished()); +} + TEST(AsyncTaskScheduler, TaskFinishesAfterError) { /// If a task fails it shouldn't impact previously submitted tasks std::unique_ptr task_group = AsyncTaskScheduler::Make(); From f626342be9d250e4f35753c3dc5d7af4eda8dfc7 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 26 Sep 2022 14:41:22 -0700 Subject: [PATCH 2/7] ARROW-17287: Update after rebase --- cpp/src/arrow/compute/exec/map_node.cc | 3 --- cpp/src/arrow/compute/exec/map_node.h | 3 --- cpp/src/arrow/dataset/dataset.cc | 6 +++--- cpp/src/arrow/dataset/scan_node.cc | 13 ++++++------- cpp/src/arrow/dataset/scanner_test.cc | 12 ++++-------- 5 files changed, 13 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/compute/exec/map_node.cc b/cpp/src/arrow/compute/exec/map_node.cc index 98ea084192f..16201ea1290 100644 --- a/cpp/src/arrow/compute/exec/map_node.cc +++ b/cpp/src/arrow/compute/exec/map_node.cc @@ -76,9 +76,6 @@ void MapNode::StopProducing(ExecNode* output) { void MapNode::StopProducing() { EVENT(span_, "StopProducing"); - if (executor_) { - this->stop_source_.RequestStop(); - } if (input_counter_.Cancel()) { this->Finish(); } diff --git a/cpp/src/arrow/compute/exec/map_node.h b/cpp/src/arrow/compute/exec/map_node.h index ea5264aa87e..88241ece592 100644 --- a/cpp/src/arrow/compute/exec/map_node.h +++ b/cpp/src/arrow/compute/exec/map_node.h @@ -69,9 +69,6 @@ class ARROW_EXPORT MapNode : public ExecNode { protected: // Counter for the number of batches received AtomicCounter input_counter_; - - // Variable used to cancel remaining tasks in the executor - StopSource stop_source_; }; } // namespace compute diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 9fdc2734a45..6c5797e5c9d 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -422,8 +422,8 @@ class BasicFragmentEvolution : public FragmentEvolutionStrategy { ds_to_frag_map.push_back(column_idx_itr->second); } } - return ::arrow::internal::make_unique( - std::move(ds_to_frag_map), dataset_schema.get()); + return std::make_unique(std::move(ds_to_frag_map), + dataset_schema.get()); } }; @@ -441,7 +441,7 @@ class BasicDatasetEvolutionStrategy : public DatasetEvolutionStrategy { } // namespace std::unique_ptr MakeBasicDatasetEvolutionStrategy() { - return ::arrow::internal::make_unique(); + return std::make_unique(); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/scan_node.cc b/cpp/src/arrow/dataset/scan_node.cc index 6f3d4477bba..2fa4f1d79df 100644 --- a/cpp/src/arrow/dataset/scan_node.cc +++ b/cpp/src/arrow/dataset/scan_node.cc @@ -24,13 +24,14 @@ #include "arrow/compute/exec/exec_plan.h" #include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/util.h" #include "arrow/dataset/scanner.h" #include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/status.h" #include "arrow/type.h" #include "arrow/util/checked_cast.h" -#include "arrow/util/make_unique.h" +#include "arrow/util/logging.h" #include "arrow/util/tracing_internal.h" #include "arrow/util/unreachable.h" @@ -182,7 +183,7 @@ class ScanNode : public cp::ExecNode { Status Init() override { // batch_output_ = - // ::arrow::internal::make_unique(this, + // ::std::make_unique(this, // outputs_[0]); return Status::OK(); } @@ -272,8 +273,7 @@ class ScanNode : public cp::ExecNode { StateHolder{std::move(scan_state)}, node->batches_throttle_.get()); for (int i = 0; i < fragment_scanner->NumBatches(); i++) { node->num_batches_.fetch_add(1); - frag_scheduler->AddTask( - arrow::internal::make_unique(node, state_view, i)); + frag_scheduler->AddTask(std::make_unique(node, state_view, i)); } Future<> list_and_scan_node = frag_scheduler->OnFinished(); frag_scheduler->End(); @@ -303,7 +303,7 @@ class ScanNode : public cp::ExecNode { ScanNode* node; std::shared_ptr fragment; - std::unique_ptr scan_state = arrow::internal::make_unique(); + std::unique_ptr scan_state = std::make_unique(); }; Status StartProducing() override { @@ -325,8 +325,7 @@ class ScanNode : public cp::ExecNode { plan_->async_scheduler()->AddAsyncGenerator>( std::move(frag_gen), [this, scan_scheduler](const std::shared_ptr& fragment) { - scan_scheduler->AddTask( - arrow::internal::make_unique(this, fragment)); + scan_scheduler->AddTask(std::make_unique(this, fragment)); return Status::OK(); }, [scan_scheduler]() { diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 65b60f2cfd0..19f5abe06a1 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -60,13 +60,11 @@ namespace dataset { // or the fragment other than the schema so we just make a dummy dataset/fragment // here. std::unique_ptr MakeDatasetFromSchema(std::shared_ptr sch) { - return ::arrow::internal::make_unique(std::move(sch), - RecordBatchVector{}); + return std::make_unique(std::move(sch), RecordBatchVector{}); } std::unique_ptr MakeSomeFragment(std::shared_ptr sch) { - return ::arrow::internal::make_unique(std::move(sch), - RecordBatchVector{}); + return std::make_unique(std::move(sch), RecordBatchVector{}); } TEST(BasicEvolution, MissingColumn) { @@ -380,8 +378,7 @@ struct MockDatasetBuilder { } std::unique_ptr Finish() { - return arrow::internal::make_unique(std::move(dataset_schema), - std::move(fragments)); + return std::make_unique(std::move(dataset_schema), std::move(fragments)); } std::shared_ptr dataset_schema; @@ -466,8 +463,7 @@ std::unique_ptr MakeTestDataset(int num_fragments, int batches_per_ MockDatasetBuilder dataset_builder(test_schema); for (int i = 0; i < num_fragments; i++) { dataset_builder.AddFragment( - test_schema, - ::arrow::internal::make_unique(test_schema->field_names()), + test_schema, std::make_unique(test_schema->field_names()), Fragment::kNoPartitionInformation); for (int j = 0; j < batches_per_fragment; j++) { if (empty) { From 0a83cb0da215db67fec14498647009ad6ed7f6b2 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 26 Sep 2022 15:10:36 -0700 Subject: [PATCH 3/7] ARROW-17287: Addressing PR review comments --- cpp/src/arrow/compute/exec/exec_plan.h | 12 ++--- cpp/src/arrow/compute/exec/expression.h | 47 ------------------- .../arrow/compute/exec/expression_internal.h | 47 +++++++++++++++++++ cpp/src/arrow/dataset/dataset.cc | 3 +- cpp/src/arrow/dataset/dataset.h | 19 ++++++-- cpp/src/arrow/dataset/scan_node.cc | 9 +--- cpp/src/arrow/util/async_util.cc | 5 +- 7 files changed, 74 insertions(+), 68 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index c867a1b1ac3..5d929aa3057 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -488,32 +488,32 @@ struct ARROW_EXPORT Declaration { /// This method will add a sink node to the declaration to collect results into a /// table. It will then create an ExecPlan from the declaration, start the exec plan, /// block until the plan has finished, and return the created table. -Result> DeclarationToTable( +ARROW_EXPORT Result> DeclarationToTable( Declaration declaration, ExecContext* exec_context = default_exec_context()); /// \brief Asynchronous version of \see DeclarationToTable -Future> DeclarationToTableAsync( +ARROW_EXPORT Future> DeclarationToTableAsync( Declaration declaration, ExecContext* exec_context = default_exec_context()); /// \brief Utility method to run a declaration and collect the results into ExecBatch /// vector /// /// \see DeclarationToTable for details -Result> DeclarationToExecBatches( +ARROW_EXPORT Result> DeclarationToExecBatches( Declaration declaration, ExecContext* exec_context = default_exec_context()); /// \brief Asynchronous version of \see DeclarationToExecBatches -Future> DeclarationToExecBatchesAsync( +ARROW_EXPORT Future> DeclarationToExecBatchesAsync( Declaration declaration, ExecContext* exec_context = default_exec_context()); /// \brief Utility method to run a declaration and collect the results into a vector /// /// \see DeclarationToTable for details -Result>> DeclarationToBatches( +ARROW_EXPORT Result>> DeclarationToBatches( Declaration declaration, ExecContext* exec_context = default_exec_context()); /// \brief Asynchronous version of \see DeclarationToBatches -Future>> DeclarationToBatchesAsync( +ARROW_EXPORT Future>> DeclarationToBatchesAsync( Declaration declaration, ExecContext* exec_context = default_exec_context()); /// \brief Wrap an ExecBatch generator in a RecordBatchReader. diff --git a/cpp/src/arrow/compute/exec/expression.h b/cpp/src/arrow/compute/exec/expression.h index 9c4eeffa211..7aeb135c994 100644 --- a/cpp/src/arrow/compute/exec/expression.h +++ b/cpp/src/arrow/compute/exec/expression.h @@ -279,53 +279,6 @@ ARROW_EXPORT Expression or_(Expression lhs, Expression rhs); ARROW_EXPORT Expression or_(const std::vector&); ARROW_EXPORT Expression not_(Expression operand); -/// Modify an Expression with pre-order and post-order visitation. -/// `pre` will be invoked on each Expression. `pre` will visit Calls before their -/// arguments, `post_call` will visit Calls (and no other Expressions) after their -/// arguments. Visitors should return the Identical expression to indicate no change; this -/// will prevent unnecessary construction in the common case where a modification is not -/// possible/necessary/... -/// -/// If an argument was modified, `post_call` visits a reconstructed Call with the modified -/// arguments but also receives a pointer to the unmodified Expression as a second -/// argument. If no arguments were modified the unmodified Expression* will be nullptr. -template -Result Modify(Expression expr, const PreVisit& pre, - const PostVisitCall& post_call) { - ARROW_ASSIGN_OR_RAISE(expr, Result(pre(std::move(expr)))); - - auto call = expr.call(); - if (!call) return expr; - - bool at_least_one_modified = false; - std::vector modified_arguments; - - for (size_t i = 0; i < call->arguments.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(auto modified_argument, - Modify(call->arguments[i], pre, post_call)); - - if (Identical(modified_argument, call->arguments[i])) { - continue; - } - - if (!at_least_one_modified) { - modified_arguments = call->arguments; - at_least_one_modified = true; - } - - modified_arguments[i] = std::move(modified_argument); - } - - if (at_least_one_modified) { - // reconstruct the call expression with the modified arguments - auto modified_call = *call; - modified_call.arguments = std::move(modified_arguments); - return post_call(Expression(std::move(modified_call)), &expr); - } - - return post_call(std::move(expr), nullptr); -} - /// @} } // namespace compute diff --git a/cpp/src/arrow/compute/exec/expression_internal.h b/cpp/src/arrow/compute/exec/expression_internal.h index 9e29b8e27f9..027c954c6d0 100644 --- a/cpp/src/arrow/compute/exec/expression_internal.h +++ b/cpp/src/arrow/compute/exec/expression_internal.h @@ -287,5 +287,52 @@ inline Result> GetFunction( return GetCastFunction(*to_type); } +/// Modify an Expression with pre-order and post-order visitation. +/// `pre` will be invoked on each Expression. `pre` will visit Calls before their +/// arguments, `post_call` will visit Calls (and no other Expressions) after their +/// arguments. Visitors should return the Identical expression to indicate no change; this +/// will prevent unnecessary construction in the common case where a modification is not +/// possible/necessary/... +/// +/// If an argument was modified, `post_call` visits a reconstructed Call with the modified +/// arguments but also receives a pointer to the unmodified Expression as a second +/// argument. If no arguments were modified the unmodified Expression* will be nullptr. +template +Result Modify(Expression expr, const PreVisit& pre, + const PostVisitCall& post_call) { + ARROW_ASSIGN_OR_RAISE(expr, Result(pre(std::move(expr)))); + + auto call = expr.call(); + if (!call) return expr; + + bool at_least_one_modified = false; + std::vector modified_arguments; + + for (size_t i = 0; i < call->arguments.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(auto modified_argument, + Modify(call->arguments[i], pre, post_call)); + + if (Identical(modified_argument, call->arguments[i])) { + continue; + } + + if (!at_least_one_modified) { + modified_arguments = call->arguments; + at_least_one_modified = true; + } + + modified_arguments[i] = std::move(modified_argument); + } + + if (at_least_one_modified) { + // reconstruct the call expression with the modified arguments + auto modified_call = *call; + modified_call.arguments = std::move(modified_arguments); + return post_call(Expression(std::move(modified_call)), &expr); + } + + return post_call(std::move(expr), nullptr); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 6c5797e5c9d..bf91ec92507 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -18,6 +18,7 @@ #include #include +#include "arrow/compute/exec/expression_internal.h" #include "arrow/dataset/dataset.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/scanner.h" @@ -331,7 +332,7 @@ class BasicFragmentEvolution : public FragmentEvolutionStrategy { if (missing_fields.size() == 1) { return missing_fields[0]; } - return compute::and_(missing_fields); + return compute::and_(std::move(missing_fields)); } Result> DevolveSelection( diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 5b20c804418..3f635dd7992 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -45,7 +45,7 @@ namespace dataset { using RecordBatchGenerator = std::function>()>; /// \brief Description of a column to scan -struct FragmentSelectionColumn { +struct ARROW_DS_EXPORT FragmentSelectionColumn { /// \brief The path to the column to load FieldPath path; /// \brief The type of the column in the dataset schema @@ -61,12 +61,13 @@ struct FragmentSelectionColumn { /// \brief The index in the output selection of this column int selection_index; }; + /// \brief Instructions for scanning a particular fragment /// /// The fragment scan request is dervied from ScanV2Options. The main /// difference is that the scan options are based on the dataset schema /// while the fragment request is based on the fragment schema. -struct FragmentScanRequest { +struct ARROW_DS_EXPORT FragmentScanRequest { /// \brief A row filter /// /// The filter expression should be written against the fragment schema. @@ -90,7 +91,8 @@ struct FragmentScanRequest { FragmentScanOptions* format_scan_options; }; -class FragmentScanner { +/// \brief An iterator-like object that can yield batches created from a fragment +class ARROW_DS_EXPORT FragmentScanner { public: /// This instance will only be destroyed after all ongoing scan futures /// have been completed. @@ -110,7 +112,16 @@ class FragmentScanner { virtual int NumBatches() = 0; }; -struct InspectedFragment { +/// \brief Information learned about a fragment through inspection +/// +/// This information can be used to figure out which fields need +/// to be read from a file and how the data read in should be evolved +/// to match the dataset schema. +/// +/// For example, from a CSV file we can inspect and learn the column +/// names and use those column names to determine which columns to load +/// from the CSV file. +struct ARROW_DS_EXPORT InspectedFragment { explicit InspectedFragment(std::vector column_names) : column_names(std::move(column_names)) {} std::vector column_names; diff --git a/cpp/src/arrow/dataset/scan_node.cc b/cpp/src/arrow/dataset/scan_node.cc index 2fa4f1d79df..6a5f4842d9b 100644 --- a/cpp/src/arrow/dataset/scan_node.cc +++ b/cpp/src/arrow/dataset/scan_node.cc @@ -181,12 +181,7 @@ class ScanNode : public cp::ExecNode { [[noreturn]] void ErrorReceived(cp::ExecNode*, Status) override { NoInputs(); } [[noreturn]] void InputFinished(cp::ExecNode*, int) override { NoInputs(); } - Status Init() override { - // batch_output_ = - // ::std::make_unique(this, - // outputs_[0]); - return Status::OK(); - } + Status Init() override { return Status::OK(); } struct ScanState { std::mutex mutex; @@ -211,7 +206,7 @@ class ScanNode : public cp::ExecNode { // Prevent concurrent calls to ScanBatch which might not be thread safe std::lock_guard lk(scan_->mutex); return scan_->fragment_scanner->ScanBatch(batch_index_) - .Then([this](const std::shared_ptr batch) { + .Then([this](const std::shared_ptr& batch) { return HandleBatch(batch); }); } diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index a1341417665..a65b538d89f 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -282,9 +282,8 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { AbortUnlocked(submit_result.status(), std::move(lk)); return; } - // FIXME(C++17, move into lambda?) - std::shared_ptr task_holder = std::move(task); - submit_result->AddCallback([this, cost, task_holder](const Status& st) { + // Capture `task` to keep it alive until finished + submit_result->AddCallback([this, cost, task = std::move(task)](const Status& st) { std::unique_lock lk(mutex_); if (!st.ok()) { running_tasks_--; From 7c325b63023e11a2cde53a02e6b0cc2368116df9 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 26 Sep 2022 16:35:31 -0700 Subject: [PATCH 4/7] ARROW-17287: Moved Modify to compute/exec/util and renamed to ModifyExpression --- cpp/src/arrow/compute/exec/expression.cc | 11 +++-- .../arrow/compute/exec/expression_internal.h | 47 ------------------ cpp/src/arrow/compute/exec/util.h | 48 +++++++++++++++++++ cpp/src/arrow/dataset/dataset.cc | 4 +- 4 files changed, 56 insertions(+), 54 deletions(-) diff --git a/cpp/src/arrow/compute/exec/expression.cc b/cpp/src/arrow/compute/exec/expression.cc index eecc8024d78..e99f69fbca7 100644 --- a/cpp/src/arrow/compute/exec/expression.cc +++ b/cpp/src/arrow/compute/exec/expression.cc @@ -24,6 +24,7 @@ #include "arrow/chunked_array.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/exec/expression_internal.h" +#include "arrow/compute/exec/util.h" #include "arrow/compute/exec_internal.h" #include "arrow/compute/function_internal.h" #include "arrow/io/memory.h" @@ -662,7 +663,7 @@ bool ExpressionHasFieldRefs(const Expression& expr) { } Result FoldConstants(Expression expr) { - return Modify( + return ModifyExpression( std::move(expr), [](Expression expr) { return expr; }, [](Expression expr, ...) -> Result { auto call = CallNotNull(expr); @@ -807,7 +808,7 @@ Result ReplaceFieldsWithKnownValues(const KnownFieldValues& known_va "ReplaceFieldsWithKnownValues called on an unbound Expression"); } - return Modify( + return ModifyExpression( std::move(expr), [&known_values](Expression expr) -> Result { if (auto ref = expr.field_ref()) { @@ -878,7 +879,7 @@ Result Canonicalize(Expression expr, compute::ExecContext* exec_cont } } AlreadyCanonicalized; - return Modify( + return ModifyExpression( std::move(expr), [&AlreadyCanonicalized, exec_context](Expression expr) -> Result { auto call = expr.call(); @@ -1120,7 +1121,7 @@ Result SimplifyIsValidGuarantee(Expression expr, const Expression::Call& guarantee) { if (guarantee.function_name != "is_valid") return expr; - return Modify( + return ModifyExpression( std::move(expr), [](Expression expr) { return expr; }, [&](Expression expr, ...) -> Result { auto call = expr.call(); @@ -1162,7 +1163,7 @@ Result SimplifyWithGuarantee(Expression expr, if (auto inequality = Inequality::ExtractOne(guarantee)) { ARROW_ASSIGN_OR_RAISE(auto simplified, - Modify( + ModifyExpression( std::move(expr), [](Expression expr) { return expr; }, [&](Expression expr, ...) -> Result { return inequality->Simplify(std::move(expr)); diff --git a/cpp/src/arrow/compute/exec/expression_internal.h b/cpp/src/arrow/compute/exec/expression_internal.h index 027c954c6d0..9e29b8e27f9 100644 --- a/cpp/src/arrow/compute/exec/expression_internal.h +++ b/cpp/src/arrow/compute/exec/expression_internal.h @@ -287,52 +287,5 @@ inline Result> GetFunction( return GetCastFunction(*to_type); } -/// Modify an Expression with pre-order and post-order visitation. -/// `pre` will be invoked on each Expression. `pre` will visit Calls before their -/// arguments, `post_call` will visit Calls (and no other Expressions) after their -/// arguments. Visitors should return the Identical expression to indicate no change; this -/// will prevent unnecessary construction in the common case where a modification is not -/// possible/necessary/... -/// -/// If an argument was modified, `post_call` visits a reconstructed Call with the modified -/// arguments but also receives a pointer to the unmodified Expression as a second -/// argument. If no arguments were modified the unmodified Expression* will be nullptr. -template -Result Modify(Expression expr, const PreVisit& pre, - const PostVisitCall& post_call) { - ARROW_ASSIGN_OR_RAISE(expr, Result(pre(std::move(expr)))); - - auto call = expr.call(); - if (!call) return expr; - - bool at_least_one_modified = false; - std::vector modified_arguments; - - for (size_t i = 0; i < call->arguments.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(auto modified_argument, - Modify(call->arguments[i], pre, post_call)); - - if (Identical(modified_argument, call->arguments[i])) { - continue; - } - - if (!at_least_one_modified) { - modified_arguments = call->arguments; - at_least_one_modified = true; - } - - modified_arguments[i] = std::move(modified_argument); - } - - if (at_least_one_modified) { - // reconstruct the call expression with the modified arguments - auto modified_call = *call; - modified_call.arguments = std::move(modified_arguments); - return post_call(Expression(std::move(modified_call)), &expr); - } - - return post_call(std::move(expr), nullptr); -} - } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index e1797771fe0..3cab83c608b 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -25,6 +25,7 @@ #include #include "arrow/buffer.h" +#include "arrow/compute/exec/expression.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/type_fwd.h" #include "arrow/memory_pool.h" @@ -361,5 +362,52 @@ struct ARROW_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer { util::Mutex consume_mutex_; }; +/// Modify an Expression with pre-order and post-order visitation. +/// `pre` will be invoked on each Expression. `pre` will visit Calls before their +/// arguments, `post_call` will visit Calls (and no other Expressions) after their +/// arguments. Visitors should return the Identical expression to indicate no change; this +/// will prevent unnecessary construction in the common case where a modification is not +/// possible/necessary/... +/// +/// If an argument was modified, `post_call` visits a reconstructed Call with the modified +/// arguments but also receives a pointer to the unmodified Expression as a second +/// argument. If no arguments were modified the unmodified Expression* will be nullptr. +template +Result ModifyExpression(Expression expr, const PreVisit& pre, + const PostVisitCall& post_call) { + ARROW_ASSIGN_OR_RAISE(expr, Result(pre(std::move(expr)))); + + auto call = expr.call(); + if (!call) return expr; + + bool at_least_one_modified = false; + std::vector modified_arguments; + + for (size_t i = 0; i < call->arguments.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(auto modified_argument, + ModifyExpression(call->arguments[i], pre, post_call)); + + if (Identical(modified_argument, call->arguments[i])) { + continue; + } + + if (!at_least_one_modified) { + modified_arguments = call->arguments; + at_least_one_modified = true; + } + + modified_arguments[i] = std::move(modified_argument); + } + + if (at_least_one_modified) { + // reconstruct the call expression with the modified arguments + auto modified_call = *call; + modified_call.arguments = std::move(modified_arguments); + return post_call(Expression(std::move(modified_call)), &expr); + } + + return post_call(std::move(expr), nullptr); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index bf91ec92507..4e1d7ac20ec 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -18,7 +18,7 @@ #include #include -#include "arrow/compute/exec/expression_internal.h" +#include "arrow/compute/exec/util.h" #include "arrow/dataset/dataset.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/scanner.h" @@ -357,7 +357,7 @@ class BasicFragmentEvolution : public FragmentEvolutionStrategy { Result DevolveFilter( const compute::Expression& filter) const override { - return compute::Modify( + return compute::ModifyExpression( filter, [&](compute::Expression expr) -> Result { const FieldRef* ref = expr.field_ref(); From 641be9157a3a24850229a587232519d1875c70fc Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 26 Sep 2022 17:20:48 -0700 Subject: [PATCH 5/7] ARROW-17287: Lint. Fix python build. --- cpp/src/arrow/compute/exec/util.h | 2 +- python/pyarrow/_exec_plan.pyx | 2 +- python/pyarrow/includes/libarrow.pxd | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index 3cab83c608b..c8d8e8d9149 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -406,7 +406,7 @@ Result ModifyExpression(Expression expr, const PreVisit& pre, return post_call(Expression(std::move(modified_call)), &expr); } - return post_call(std::move(expr), nullptr); + return post_call(std::move(expr), NULLPTR); } } // namespace compute diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 9506caf7d28..526e4cb73ad 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -384,7 +384,7 @@ def _filter_table(table, expression, output_type=Table): c_decl_plan.push_back( CDeclaration(tobytes("filter"), CFilterNodeOptions( - expr.unwrap(), True + expr.unwrap() )) ) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index d9dde7803ab..e44fa2615e2 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2533,7 +2533,7 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil pass cdef cppclass CFilterNodeOptions "arrow::compute::FilterNodeOptions"(CExecNodeOptions): - CFilterNodeOptions(CExpression, c_bool async_mode) + CFilterNodeOptions(CExpression) cdef cppclass CProjectNodeOptions "arrow::compute::ProjectNodeOptions"(CExecNodeOptions): CProjectNodeOptions(vector[CExpression] expressions) From dc6fbf7d2d623a5b1a67be655ef852fbd87e5d39 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 26 Sep 2022 19:02:53 -0700 Subject: [PATCH 6/7] ARROW-17287: Add ARROW_DS_EXPORT to MakeBasicDatasetEvolutionStrategy --- cpp/src/arrow/dataset/dataset.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 3f635dd7992..80c46568a7c 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -317,7 +317,8 @@ class ARROW_DS_EXPORT DatasetEvolutionStrategy { virtual std::string ToString() const = 0; }; -std::unique_ptr MakeBasicDatasetEvolutionStrategy(); +ARROW_DS_EXPORT std::unique_ptr +MakeBasicDatasetEvolutionStrategy(); /// \brief A container of zero or more Fragments. /// From a75fbdcacc97440c13958ae455c1f6804d92f912 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 27 Sep 2022 19:04:11 -0700 Subject: [PATCH 7/7] ARROW-17287: Fixed end-to-end benchmark to also test scanv2 node. Modified scan node to call downstream pipeline on a new thread task. --- cpp/src/arrow/dataset/scan_node.cc | 14 ++-- cpp/src/arrow/dataset/scanner_benchmark.cc | 90 +++++++++++----------- 2 files changed, 50 insertions(+), 54 deletions(-) diff --git a/cpp/src/arrow/dataset/scan_node.cc b/cpp/src/arrow/dataset/scan_node.cc index 6a5f4842d9b..da397312b55 100644 --- a/cpp/src/arrow/dataset/scan_node.cc +++ b/cpp/src/arrow/dataset/scan_node.cc @@ -216,8 +216,11 @@ class ScanNode : public cp::ExecNode { compute::ExecBatch evolved_batch, scan_->fragment_evolution->EvolveBatch(batch, node_->options_.columns, scan_->scan_request.columns)); - node_->outputs_[0]->InputReceived(node_, std::move(evolved_batch)); - return Status::OK(); + return node_->plan_->ScheduleTask( + [node = node_, evolved_batch = std::move(evolved_batch)] { + node->outputs_[0]->InputReceived(node, std::move(evolved_batch)); + return Status::OK(); + }); } int cost() const override { return cost_; } @@ -331,13 +334,11 @@ class ScanNode : public cp::ExecNode { } void PauseProducing(ExecNode* output, int32_t counter) override { - // FIXME(TODO) - // Need to ressurect AsyncToggle and then all fragment scanners - // should share the same toggle + // TODO(ARROW-17755) } void ResumeProducing(ExecNode* output, int32_t counter) override { - // FIXME(TODO) + // TODO(ARROW-17755) } void StopProducing(ExecNode* output) override { @@ -350,7 +351,6 @@ class ScanNode : public cp::ExecNode { private: ScanV2Options options_; std::atomic num_batches_{0}; - // std::unique_ptr batch_output_; std::unique_ptr fragments_throttle_; std::unique_ptr batches_throttle_; }; diff --git a/cpp/src/arrow/dataset/scanner_benchmark.cc b/cpp/src/arrow/dataset/scanner_benchmark.cc index 448b40bf158..0184fcce192 100644 --- a/cpp/src/arrow/dataset/scanner_benchmark.cc +++ b/cpp/src/arrow/dataset/scanner_benchmark.cc @@ -96,7 +96,10 @@ std::shared_ptr GetSchema() { size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); } -void MinimalEndToEndScan(size_t num_batches, size_t batch_size) { +void MinimalEndToEndScan( + size_t num_batches, size_t batch_size, const std::string& factory_name, + std::function>(size_t, size_t)> + options_factory) { // Specify a MemoryPool and ThreadPool for the ExecPlan compute::ExecContext exec_context(default_memory_pool(), ::arrow::internal::GetCpuThreadPool()); @@ -116,22 +119,16 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size) { std::shared_ptr dataset = std::make_shared(GetSchema(), batches); - auto options = std::make_shared(); - // specify the filter - compute::Expression b_is_true = equal(field_ref("b"), literal(true)); - options->filter = b_is_true; - // for now, specify the projection as the full project expression (eventually this can - // just be a list of materialized field names) - compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); - options->projection = - call("make_struct", {a_times_2}, compute::MakeStructOptions{{"a * 2"}}); + ASSERT_OK_AND_ASSIGN(std::shared_ptr node_options, + options_factory(num_batches, batch_size)); // construct the scan node ASSERT_OK_AND_ASSIGN( compute::ExecNode * scan, - compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); + compute::MakeExecNode(factory_name, plan.get(), {}, *node_options)); // pipe the scan node into a filter node + compute::Expression b_is_true = equal(field_ref("b"), literal(true)); ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, compute::MakeExecNode("filter", plan.get(), {scan}, compute::FilterNodeOptions{b_is_true})); @@ -140,10 +137,11 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size) { // NB: we're using the project node factory which preserves fragment/batch index // tagging, so we *can* reorder later if we choose. The tags will not appear in // our output. + compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); ASSERT_OK_AND_ASSIGN( compute::ExecNode * project, - compute::MakeExecNode("augmented_project", plan.get(), {filter}, - compute::ProjectNodeOptions{{a_times_2}, {}})); + compute::MakeExecNode("project", plan.get(), {filter}, + compute::ProjectNodeOptions{{a_times_2}, {"a*2"}})); // finally, pipe the project node into a sink node AsyncGenerator> sink_gen; @@ -155,7 +153,7 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size) { // translate sink_gen (async) to sink_reader (sync) std::shared_ptr sink_reader = compute::MakeGeneratorReader( - schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); + schema({field("a*2", int32())}), std::move(sink_gen), exec_context.memory_pool()); // start the ExecPlan ASSERT_OK(plan->StartProducing()); @@ -202,8 +200,9 @@ void ScanOnly( ASSERT_NE(sink, nullptr); // translate sink_gen (async) to sink_reader (sync) - std::shared_ptr sink_reader = compute::MakeGeneratorReader( - schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); + std::shared_ptr sink_reader = + compute::MakeGeneratorReader(schema({field("a", int32()), field("b", boolean())}), + std::move(sink_gen), exec_context.memory_pool()); // start the ExecPlan ASSERT_OK(plan->StartProducing()); @@ -212,22 +211,14 @@ void ScanOnly( ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); ASSERT_GT(collected->num_rows(), 0); + ASSERT_EQ(collected->num_columns(), 2); // wait 1s for completion ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; } -static void MinimalEndToEndBench(benchmark::State& state) { - size_t num_batches = state.range(0); - size_t batch_size = state.range(1); - - for (auto _ : state) { - MinimalEndToEndScan(num_batches, batch_size); - } - state.SetItemsProcessed(state.iterations() * num_batches); - state.SetBytesProcessed(state.iterations() * num_batches * batch_size * - GetBytesForSchema()); -} +static constexpr int kScanIdx = 0; +static constexpr int kScanV2Idx = 1; const std::function>(size_t, size_t)> kScanFactory = [](size_t num_batches, size_t batch_size) { @@ -263,10 +254,7 @@ const std::function>(size_t, si return options; }; -static constexpr int kScanIdx = 0; -static constexpr int kScanV2Idx = 1; - -static void ScanOnlyBench(benchmark::State& state) { +static void MinimalEndToEndBench(benchmark::State& state) { size_t num_batches = state.range(0); size_t batch_size = state.range(1); @@ -281,29 +269,37 @@ static void ScanOnlyBench(benchmark::State& state) { } for (auto _ : state) { - ScanOnly(num_batches, batch_size, scan_factory, options_factory); + MinimalEndToEndScan(num_batches, batch_size, scan_factory, options_factory); } + state.SetItemsProcessed(state.iterations() * num_batches); state.SetBytesProcessed(state.iterations() * num_batches * batch_size * GetBytesForSchema()); } -static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark* b) { - for (const int32_t num_batches : {1000}) { - for (const int batch_size : {10, 100, 1000}) { - b->Args({num_batches, batch_size}); - RecordBatchVector batches = - ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size); - StoreBatches(num_batches, batch_size, batches); - } +static void ScanOnlyBench(benchmark::State& state) { + size_t num_batches = state.range(0); + size_t batch_size = state.range(1); + + std::function>(size_t, size_t)> + options_factory; + std::string scan_factory = "scan"; + if (state.range(2) == kScanIdx) { + options_factory = kScanFactory; + } else if (state.range(2) == kScanV2Idx) { + options_factory = kScanV2Factory; + scan_factory = "scan2"; } - b->ArgNames({"num_batches", "batch_size"}); - b->UseRealTime(); + + for (auto _ : state) { + ScanOnly(num_batches, batch_size, scan_factory, options_factory); + } + state.SetItemsProcessed(state.iterations() * num_batches); + state.SetBytesProcessed(state.iterations() * num_batches * batch_size * + GetBytesForSchema()); } -// FIXME - Combine these two customize blocks by moving the end-to-end to support -// options factories -static void ScanOnlyEndToEnd_Customize(benchmark::internal::Benchmark* b) { +static void ScanBenchmark_Customize(benchmark::internal::Benchmark* b) { for (const int32_t num_batches : {1000}) { for (const int batch_size : {10, 100, 1000}) { for (const int scan_idx : {kScanIdx, kScanV2Idx}) { @@ -318,8 +314,8 @@ static void ScanOnlyEndToEnd_Customize(benchmark::internal::Benchmark* b) { b->UseRealTime(); } -BENCHMARK(MinimalEndToEndBench)->Apply(MinimalEndToEnd_Customize); -BENCHMARK(ScanOnlyBench)->Apply(ScanOnlyEndToEnd_Customize)->Iterations(100); +BENCHMARK(MinimalEndToEndBench)->Apply(ScanBenchmark_Customize)->Iterations(10); +BENCHMARK(ScanOnlyBench)->Apply(ScanBenchmark_Customize)->Iterations(10); } // namespace dataset } // namespace arrow