From d30cfe77d1702395ef27565790d6accb77a2353f Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 22 Sep 2022 08:53:32 -0400 Subject: [PATCH 1/4] ARROW-17610: [C++] Support additional source types in SourceNode --- cpp/src/arrow/compute/exec/options.h | 29 +++++ cpp/src/arrow/compute/exec/plan_test.cc | 79 +++++++++++++ cpp/src/arrow/compute/exec/source_node.cc | 133 ++++++++++++++++++++++ cpp/src/arrow/compute/exec/test_util.cc | 32 ++++++ cpp/src/arrow/compute/exec/test_util.h | 24 ++++ 5 files changed, 297 insertions(+) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index ffb9f169833..3612946654d 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -27,6 +27,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" #include "arrow/compute/exec/expression.h" +#include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/util/async_generator.h" #include "arrow/util/async_util.h" @@ -77,6 +78,34 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { int64_t max_batch_size; }; +/// \brief An extended Source node which accepts a schema +/// +/// ItMaker is a maker of an iterator of tabular data. +template +class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { + public: + SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker) + : schema(schema), it_maker(std::move(it_maker)) {} + + // the schema of the record batches from the iterator + std::shared_ptr schema; + + // maker of an iterator which acts as the data source + ItMaker it_maker; +}; + +/// \brief An extended Source node which accepts a schema and array-vectors +using ArrayVectorIteratorMaker = std::function>()>; +using ArrayVectorSourceNodeOptions = SchemaSourceNodeOptions; + +using ExecBatchIteratorMaker = std::function>()>; +/// \brief An extended Source node which accepts a schema and exec-batches +using ExecBatchSourceNodeOptions = SchemaSourceNodeOptions; + +using RecordBatchIteratorMaker = std::function>()>; +/// \brief An extended Source node which accepts a schema and record-batches +using RecordBatchSourceNodeOptions = SchemaSourceNodeOptions; + /// \brief Make a node which excludes some rows from batches passed through it /// /// filter_expression will be evaluated against each batch which is pushed to diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index a33337fcfee..f75551b3394 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -295,6 +295,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) { Raises(StatusCode::Invalid, HasSubstr("batch_size > 0"))); } +template +void test_source_sink_error( + std::string source_factory_name, + std::function>(const BatchesWithSchema&)> + to_elements) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + std::shared_ptr no_schema; + + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); + auto element_it_maker = [&elements]() { + return MakeVectorIterator(elements); + }; + + auto null_executor_options = OptionsType{exp_batches.schema, element_it_maker}; + ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, null_executor_options)); + + auto null_schema_options = OptionsType{no_schema, element_it_maker}; + ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_schema_options), + Raises(StatusCode::Invalid, HasSubstr("not null"))); +} + +template +void test_source_sink( + std::string source_factory_name, + std::function>(const BatchesWithSchema&)> + to_elements) { + ASSERT_OK_AND_ASSIGN(auto io_executor, arrow::internal::ThreadPool::Make(1)); + ExecContext exec_context(default_memory_pool(), io_executor.get()); + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context)); + AsyncGenerator> sink_gen; + + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); + auto element_it_maker = [&elements]() { + return MakeVectorIterator(elements); + }; + + ASSERT_OK(Declaration::Sequence({ + {source_factory_name, + OptionsType{exp_batches.schema, element_it_maker}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)))); +} + +TEST(ExecPlanExecution, ArrayVectorSourceSink) { + test_source_sink, ArrayVectorSourceNodeOptions>( + "array_vector_source", ToArrayVectors); +} + +TEST(ExecPlanExecution, ArrayVectorSourceSinkError) { + test_source_sink_error, ArrayVectorSourceNodeOptions>( + "array_vector_source", ToArrayVectors); +} + +TEST(ExecPlanExecution, ExecBatchSourceSink) { + test_source_sink, ExecBatchSourceNodeOptions>( + "exec_batch_source", ToExecBatches); +} + +TEST(ExecPlanExecution, ExecBatchSourceSinkError) { + test_source_sink_error, ExecBatchSourceNodeOptions>( + "exec_batch_source", ToExecBatches); +} + +TEST(ExecPlanExecution, RecordBatchSourceSink) { + test_source_sink, RecordBatchSourceNodeOptions>( + "record_batch_source", ToRecordBatches); +} + +TEST(ExecPlanExecution, RecordBatchSourceSinkError) { + test_source_sink_error, RecordBatchSourceNodeOptions>( + "record_batch_source", ToRecordBatches); +} + TEST(ExecPlanExecution, SinkNodeBackpressure) { std::optional batch = ExecBatchFromJSON({int32(), boolean()}, diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 59a287b0c44..ecc82571076 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -26,6 +26,7 @@ #include "arrow/compute/exec/util.h" #include "arrow/compute/exec_internal.h" #include "arrow/datum.h" +#include "arrow/io/util_internal.h" #include "arrow/result.h" #include "arrow/table.h" #include "arrow/util/async_generator.h" @@ -293,6 +294,135 @@ struct TableSourceNode : public SourceNode { } }; +template +struct SchemaSourceNode : public SourceNode { + SchemaSourceNode(ExecPlan* plan, std::shared_ptr schema, + arrow::AsyncGenerator> generator) + : SourceNode(plan, schema, generator) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName)); + const auto& cast_options = checked_cast(options); + auto& it_maker = cast_options.it_maker; + auto& schema = cast_options.schema; + + auto io_executor = plan->exec_context()->executor(); + auto it = it_maker(); + + if (schema == NULLPTR) { + return Status::Invalid(This::kKindName, " requires schema which is not null"); + } + if (io_executor == NULLPTR) { + io_executor = io::internal::GetIOThreadPool(); + } + + ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor, schema)); + return plan->EmplaceNode(plan, schema, generator); + } +}; + +struct RecordBatchSourceNode + : public SchemaSourceNode { + using RecordBatchSchemaSourceNode = + SchemaSourceNode; + + using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode; + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + return RecordBatchSchemaSourceNode::Make(plan, inputs, options); + } + + const char* kind_name() const override { return kKindName; } + + static Result>> MakeGenerator( + Iterator>& batch_it, + arrow::internal::Executor* io_executor, const std::shared_ptr& schema) { + auto to_exec_batch = + [schema](const std::shared_ptr& batch) -> std::optional { + if (batch == NULLPTR || *batch->schema() != *schema) { + return std::nullopt; + } + return std::optional(ExecBatch(*batch)); + }; + auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it)); + return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor); + } + + static const char kKindName[]; +}; + +const char RecordBatchSourceNode::kKindName[] = "RecordBatchSourceNode"; + +struct ExecBatchSourceNode + : public SchemaSourceNode { + using ExecBatchSchemaSourceNode = + SchemaSourceNode; + + using ExecBatchSchemaSourceNode::ExecBatchSchemaSourceNode; + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + return ExecBatchSchemaSourceNode::Make(plan, inputs, options); + } + + const char* kind_name() const override { return kKindName; } + + static Result>> MakeGenerator( + Iterator>& batch_it, + arrow::internal::Executor* io_executor, const std::shared_ptr& schema) { + auto to_exec_batch = + [](const std::shared_ptr& batch) -> std::optional { + return batch == NULLPTR ? std::nullopt : std::optional(*batch); + }; + auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it)); + return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor); + } + + static const char kKindName[]; +}; + +const char ExecBatchSourceNode::kKindName[] = "ExecBatchSourceNode"; + +struct ArrayVectorSourceNode + : public SchemaSourceNode { + using ArrayVectorSchemaSourceNode = + SchemaSourceNode; + + using ArrayVectorSchemaSourceNode::ArrayVectorSchemaSourceNode; + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + return ArrayVectorSchemaSourceNode::Make(plan, inputs, options); + } + + const char* kind_name() const override { return kKindName; } + + static Result>> MakeGenerator( + Iterator>& arrayvec_it, + arrow::internal::Executor* io_executor, const std::shared_ptr& schema) { + auto to_exec_batch = + [](const std::shared_ptr& arrayvec) -> std::optional { + if (arrayvec == NULLPTR || arrayvec->size() == 0) { + return std::nullopt; + } + std::vector datumvec; + for (const auto& array : *arrayvec) { + datumvec.push_back(Datum(array)); + } + return std::optional( + ExecBatch(std::move(datumvec), (*arrayvec)[0]->length())); + }; + auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(arrayvec_it)); + return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor); + } + + static const char kKindName[]; +}; + +const char ArrayVectorSourceNode::kKindName[] = "ArrayVectorSourceNode"; + } // namespace namespace internal { @@ -300,6 +430,9 @@ namespace internal { void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make)); + DCHECK_OK(registry->AddFactory("record_batch_source", RecordBatchSourceNode::Make)); + DCHECK_OK(registry->AddFactory("exec_batch_source", ExecBatchSourceNode::Make)); + DCHECK_OK(registry->AddFactory("array_vector_source", ArrayVectorSourceNode::Make)); } } // namespace internal diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 13c07d540c4..189310fb93f 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -258,6 +258,38 @@ BatchesWithSchema MakeBatchesFromString(const std::shared_ptr& schema, return out_batches; } +Result>> ToArrayVectors( + const BatchesWithSchema& batches_with_schema) { + std::vector> arrayvecs; + for (auto batch : batches_with_schema.batches) { + ARROW_ASSIGN_OR_RAISE(auto record_batch, + batch.ToRecordBatch(batches_with_schema.schema)); + arrayvecs.push_back(std::make_shared(record_batch->columns())); + } + return arrayvecs; +} + +Result>> ToExecBatches( + const BatchesWithSchema& batches_with_schema) { + std::vector> exec_batches; + for (auto batch : batches_with_schema.batches) { + auto exec_batch = std::make_shared(batch); + exec_batches.push_back(exec_batch); + } + return exec_batches; +} + +Result>> ToRecordBatches( + const BatchesWithSchema& batches_with_schema) { + std::vector> record_batches; + for (auto batch : batches_with_schema.batches) { + ARROW_ASSIGN_OR_RAISE(auto record_batch, + batch.ToRecordBatch(batches_with_schema.schema)); + record_batches.push_back(record_batch); + } + return record_batches; +} + Result> SortTableOnAllFields(const std::shared_ptr& tab) { std::vector sort_keys; for (auto&& f : tab->schema()->fields()) { diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index ae7eac61e95..2984ef6a562 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -113,6 +113,30 @@ BatchesWithSchema MakeBatchesFromString(const std::shared_ptr& schema, const std::vector& json_strings, int multiplicity = 1); +ARROW_TESTING_EXPORT +Result>> ToArrayVectors( + const BatchesWithSchema& batches_with_schema); + +ARROW_TESTING_EXPORT +Result>> ToExecBatches( + const BatchesWithSchema& batches); + +ARROW_TESTING_EXPORT +Result>> ToRecordBatches( + const BatchesWithSchema& batches); + +ARROW_TESTING_EXPORT +Result>> ToArrayVectors( + const BatchesWithSchema& batches_with_schema); + +ARROW_TESTING_EXPORT +Result>> ToExecBatches( + const BatchesWithSchema& batches); + +ARROW_TESTING_EXPORT +Result>> ToRecordBatches( + const BatchesWithSchema& batches); + ARROW_TESTING_EXPORT Result> SortTableOnAllFields(const std::shared_ptr
& tab); From 42ab9e886ab2ac69069156660d6941bf2394bf4e Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 25 Sep 2022 09:26:58 -0400 Subject: [PATCH 2/4] requested fixes --- cpp/src/arrow/compute/exec/options.h | 21 +++++++++++++++++---- cpp/src/arrow/compute/exec/plan_test.cc | 16 ++++++++-------- cpp/src/arrow/compute/exec/source_node.cc | 5 ++++- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 3612946654d..7a28415f51f 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -34,6 +34,13 @@ #include "arrow/util/visibility.h" namespace arrow { + +namespace internal { + +class Executor; + +} // namespace internal + namespace compute { using AsyncExecBatchGenerator = AsyncGenerator>; @@ -84,14 +91,20 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { template class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { public: - SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker) - : schema(schema), it_maker(std::move(it_maker)) {} + SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker, + arrow::internal::Executor* io_executor = NULLPTR) + : schema(schema), it_maker(std::move(it_maker)), io_executor(io_executor) {} - // the schema of the record batches from the iterator + /// \brief The schema of the record batches from the iterator std::shared_ptr schema; - // maker of an iterator which acts as the data source + /// \brief A maker of an iterator which acts as the data source ItMaker it_maker; + + /// \brief The executor to use for scanning the iterator + /// + /// Defaults to the default I/O executor. + arrow::internal::Executor* io_executor; }; /// \brief An extended Source node which accepts a schema and array-vectors diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index f75551b3394..6c8d497a1d6 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -296,7 +296,7 @@ TEST(ExecPlanExecution, TableSourceSinkError) { } template -void test_source_sink_error( +void TestSourceSinkError( std::string source_factory_name, std::function>(const BatchesWithSchema&)> to_elements) { @@ -318,7 +318,7 @@ void test_source_sink_error( } template -void test_source_sink( +void TestSourceSink( std::string source_factory_name, std::function>(const BatchesWithSchema&)> to_elements) { @@ -345,32 +345,32 @@ void test_source_sink( } TEST(ExecPlanExecution, ArrayVectorSourceSink) { - test_source_sink, ArrayVectorSourceNodeOptions>( + TestSourceSink, ArrayVectorSourceNodeOptions>( "array_vector_source", ToArrayVectors); } TEST(ExecPlanExecution, ArrayVectorSourceSinkError) { - test_source_sink_error, ArrayVectorSourceNodeOptions>( + TestSourceSinkError, ArrayVectorSourceNodeOptions>( "array_vector_source", ToArrayVectors); } TEST(ExecPlanExecution, ExecBatchSourceSink) { - test_source_sink, ExecBatchSourceNodeOptions>( + TestSourceSink, ExecBatchSourceNodeOptions>( "exec_batch_source", ToExecBatches); } TEST(ExecPlanExecution, ExecBatchSourceSinkError) { - test_source_sink_error, ExecBatchSourceNodeOptions>( + TestSourceSinkError, ExecBatchSourceNodeOptions>( "exec_batch_source", ToExecBatches); } TEST(ExecPlanExecution, RecordBatchSourceSink) { - test_source_sink, RecordBatchSourceNodeOptions>( + TestSourceSink, RecordBatchSourceNodeOptions>( "record_batch_source", ToRecordBatches); } TEST(ExecPlanExecution, RecordBatchSourceSinkError) { - test_source_sink_error, RecordBatchSourceNodeOptions>( + TestSourceSinkError, RecordBatchSourceNodeOptions>( "record_batch_source", ToRecordBatches); } diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index ecc82571076..3fd4c6fd5be 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -306,8 +306,11 @@ struct SchemaSourceNode : public SourceNode { const auto& cast_options = checked_cast(options); auto& it_maker = cast_options.it_maker; auto& schema = cast_options.schema; + auto io_executor = cast_options.io_executor; - auto io_executor = plan->exec_context()->executor(); + if (io_executor == NULLPTR) { + io_executor = plan->exec_context()->executor(); + } auto it = it_maker(); if (schema == NULLPTR) { From 879285c572a6e41d4107ac6972a53cac6c4b66f7 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 25 Sep 2022 09:57:24 -0400 Subject: [PATCH 3/4] fix for windows --- cpp/src/arrow/compute/exec/options.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 7a28415f51f..7e1e24543d7 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -91,8 +91,8 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { template class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { public: - SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker, - arrow::internal::Executor* io_executor = NULLPTR) + inline SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker, + arrow::internal::Executor* io_executor = NULLPTR) : schema(schema), it_maker(std::move(it_maker)), io_executor(io_executor) {} /// \brief The schema of the record batches from the iterator From d44ea27b9f9367dd9ef17cdc1c79885c6196aca9 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Mon, 26 Sep 2022 07:15:55 -0400 Subject: [PATCH 4/4] fix exporting --- cpp/src/arrow/compute/exec/options.h | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 7e1e24543d7..8600b113489 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -91,8 +91,8 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { template class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { public: - inline SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker, - arrow::internal::Executor* io_executor = NULLPTR) + SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker, + arrow::internal::Executor* io_executor = NULLPTR) : schema(schema), it_maker(std::move(it_maker)), io_executor(io_executor) {} /// \brief The schema of the record batches from the iterator @@ -107,17 +107,26 @@ class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { arrow::internal::Executor* io_executor; }; -/// \brief An extended Source node which accepts a schema and array-vectors using ArrayVectorIteratorMaker = std::function>()>; -using ArrayVectorSourceNodeOptions = SchemaSourceNodeOptions; +/// \brief An extended Source node which accepts a schema and array-vectors +class ARROW_EXPORT ArrayVectorSourceNodeOptions + : public SchemaSourceNodeOptions { + using SchemaSourceNodeOptions::SchemaSourceNodeOptions; +}; using ExecBatchIteratorMaker = std::function>()>; /// \brief An extended Source node which accepts a schema and exec-batches -using ExecBatchSourceNodeOptions = SchemaSourceNodeOptions; +class ARROW_EXPORT ExecBatchSourceNodeOptions + : public SchemaSourceNodeOptions { + using SchemaSourceNodeOptions::SchemaSourceNodeOptions; +}; using RecordBatchIteratorMaker = std::function>()>; /// \brief An extended Source node which accepts a schema and record-batches -using RecordBatchSourceNodeOptions = SchemaSourceNodeOptions; +class ARROW_EXPORT RecordBatchSourceNodeOptions + : public SchemaSourceNodeOptions { + using SchemaSourceNodeOptions::SchemaSourceNodeOptions; +}; /// \brief Make a node which excludes some rows from batches passed through it ///