diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 9544b9ed4db..d2ad45d37b9 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -55,6 +55,20 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { std::function>()> generator; }; +/// \brief An extended Source node which accepts a table +class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { + public: + TableSourceNodeOptions(std::shared_ptr table, int64_t batch_size) + : table(table), batch_size(batch_size) {} + + // arrow table which acts as the data source + std::shared_ptr
table; + // Size of batches to emit from this node + // If the table is larger the node will emit multiple batches from the + // the table to be processed in parallel. + int64_t batch_size; +}; + /// \brief Make a node which excludes some rows from batches passed through it /// /// filter_expression will be evaluated against each batch which is pushed to diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index b4b24e832ef..e176c701b65 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -238,6 +238,45 @@ TEST(ExecPlanExecution, SourceSink) { } } +TEST(ExecPlanExecution, TableSourceSink) { + for (int batch_size : {1, 4}) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto table, + TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + + ASSERT_OK(Declaration::Sequence( + { + {"table_source", TableSourceNodeOptions{table, batch_size}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); + ASSERT_OK_AND_ASSIGN(auto out_table, TableFromExecBatches(exp_batches.schema, res)); + AssertTablesEqual(table, out_table); + } +} + +TEST(ExecPlanExecution, TableSourceSinkError) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto table, + TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + + auto null_table_options = TableSourceNodeOptions{NULLPTR, 1}; + ASSERT_THAT(MakeExecNode("table_source", plan.get(), {}, null_table_options), + Raises(StatusCode::Invalid, HasSubstr("not null"))); + + auto negative_batch_size_options = TableSourceNodeOptions{table, -1}; + ASSERT_THAT(MakeExecNode("table_source", plan.get(), {}, negative_batch_size_options), + Raises(StatusCode::Invalid, HasSubstr("batch_size > 0"))); +} + TEST(ExecPlanExecution, SinkNodeBackpressure) { constexpr uint32_t kPauseIfAbove = 4; constexpr uint32_t kResumeIfBelow = 2; diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 6d47609d2a2..6b9a2ee9132 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -25,6 +25,7 @@ #include "arrow/compute/exec_internal.h" #include "arrow/datum.h" #include "arrow/result.h" +#include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/async_util.h" #include "arrow/util/checked_cast.h" @@ -34,10 +35,12 @@ #include "arrow/util/thread_pool.h" #include "arrow/util/tracing_internal.h" #include "arrow/util/unreachable.h" +#include "arrow/util/vector.h" namespace arrow { using internal::checked_cast; +using internal::MapVector; namespace compute { namespace { @@ -174,12 +177,80 @@ struct SourceNode : ExecNode { AsyncGenerator> generator_; }; +struct TableSourceNode : public SourceNode { + TableSourceNode(ExecPlan* plan, std::shared_ptr
table, int64_t batch_size) + : SourceNode(plan, table->schema(), TableGenerator(*table, batch_size)) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); + const auto& table_options = checked_cast(options); + const auto& table = table_options.table; + const int64_t batch_size = table_options.batch_size; + + RETURN_NOT_OK(ValidateTableSourceNodeInput(table, batch_size)); + + return plan->EmplaceNode(plan, table, batch_size); + } + + const char* kind_name() const override { return "TableSourceNode"; } + + static arrow::Status ValidateTableSourceNodeInput(const std::shared_ptr
table, + const int64_t batch_size) { + if (table == nullptr) { + return Status::Invalid("TableSourceNode node requires table which is not null"); + } + + if (batch_size <= 0) { + return Status::Invalid( + "TableSourceNode node requires, batch_size > 0 , but got batch size ", + batch_size); + } + + return Status::OK(); + } + + static arrow::AsyncGenerator> TableGenerator( + const Table& table, const int64_t batch_size) { + auto batches = ConvertTableToExecBatches(table, batch_size); + auto opt_batches = + MapVector([](ExecBatch batch) { return util::make_optional(std::move(batch)); }, + std::move(batches)); + AsyncGenerator> gen; + gen = MakeVectorGenerator(std::move(opt_batches)); + return gen; + } + + static std::vector ConvertTableToExecBatches(const Table& table, + const int64_t batch_size) { + std::shared_ptr reader = std::make_shared(table); + + // setting chunksize for the batch reader + reader->set_chunksize(batch_size); + + std::shared_ptr batch; + std::vector exec_batches; + while (true) { + auto batch_res = reader->Next(); + if (batch_res.ok()) { + batch = std::move(batch_res).MoveValueUnsafe(); + } + if (batch == NULLPTR) { + break; + } + exec_batches.emplace_back(*batch); + } + return exec_batches; + } +}; + } // namespace namespace internal { void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); + DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make)); } } // namespace internal