From b837dc906e4257d9b7209822dc447e6d74dd275b Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 26 Jan 2022 18:14:57 +0530 Subject: [PATCH 1/8] resolve merge conflicts --- cpp/src/arrow/compute/exec/options.h | 13 +++++++++ cpp/src/arrow/compute/exec/order_by_impl.cc | 32 +++++++++++++++++++++ cpp/src/arrow/compute/exec/order_by_impl.h | 3 ++ 3 files changed, 48 insertions(+) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index ec3e6fba230..25bdb8d9fad 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -280,5 +280,18 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { /// @} +/// \brief Adapt an Table as a sink node +/// +/// plan->exec_context()->executor() will be used to parallelize pushing to +/// outputs, if provided. +class ARROW_EXPORT TableSinkNodeOptions : public SinkNodeOptions { + public: + TableSinkNodeOptions(std::shared_ptr *output_table, + std::function>()>* generator) + : SinkNodeOptions(generator), output_table(output_table) {} + + std::shared_ptr
*output_table; +}; + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/order_by_impl.cc b/cpp/src/arrow/compute/exec/order_by_impl.cc index 4afcf884f53..90576162361 100644 --- a/cpp/src/arrow/compute/exec/order_by_impl.cc +++ b/cpp/src/arrow/compute/exec/order_by_impl.cc @@ -86,6 +86,32 @@ class SelectKBasicImpl : public SortBasicImpl { const SelectKOptions options_; }; +class DefaultOrderImpl : public OrderByImpl { + public: + DefaultOrderImpl(ExecContext* ctx, const std::shared_ptr& output_schema) + : ctx_(ctx), output_schema_(output_schema) {} + + void InputReceived(const std::shared_ptr& batch) override { + std::unique_lock lock(mutex_); + batches_.push_back(batch); + } + + Result DoFinish() override { + std::unique_lock lock(mutex_); + ARROW_ASSIGN_OR_RAISE(auto table, + Table::FromRecordBatches(output_schema_, std::move(batches_))); + return table; + } + + std::string ToString() const override { return "default table generator"; } + + protected: + ExecContext* ctx_; + std::shared_ptr output_schema_; + std::mutex mutex_; + std::vector> batches_; +}; // namespace compute + Result> OrderByImpl::MakeSort( ExecContext* ctx, const std::shared_ptr& output_schema, const SortOptions& options) { @@ -100,5 +126,11 @@ Result> OrderByImpl::MakeSelectK( return std::move(impl); } +Result> OrderByImpl::MakeDefault( + ExecContext* ctx, const std::shared_ptr& output_schema) { + std::unique_ptr impl{new DefaultOrderImpl(ctx, output_schema)}; + return std::move(impl); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/order_by_impl.h b/cpp/src/arrow/compute/exec/order_by_impl.h index afc92aeddb2..a0f02659ed4 100644 --- a/cpp/src/arrow/compute/exec/order_by_impl.h +++ b/cpp/src/arrow/compute/exec/order_by_impl.h @@ -47,6 +47,9 @@ class OrderByImpl { static Result> MakeSelectK( ExecContext* ctx, const std::shared_ptr& output_schema, const SelectKOptions& options); + + static Result> MakeDefault( + ExecContext* ctx, const std::shared_ptr& output_schema); }; } // namespace compute From 25c88694af8cb93c458302ace20f7b42ab41382b Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 26 Jan 2022 18:55:01 +0530 Subject: [PATCH 2/8] adding update on table-sink --- cpp/src/arrow/compute/exec/sink_node.cc | 62 +++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index f218cef9b5e..a3eb070bab4 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -324,6 +324,67 @@ struct OrderBySinkNode final : public SinkNode { std::unique_ptr impl_; }; +struct TableSinkNode final : public SinkNode { + TableSinkNode(ExecPlan* plan, std::vector inputs, + std::unique_ptr impl, + AsyncGenerator>* generator, + util::BackpressureOptions backpressure, std::shared_ptr
*output) + : SinkNode(plan, std::move(inputs), generator, std::move(backpressure)), output_table(output), + impl_{std::move(impl)} {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TableSinkNode")); + const auto& sink_options = checked_cast(options); + ARROW_ASSIGN_OR_RAISE( + std::unique_ptr impl, + OrderByImpl::MakeDefault(plan->exec_context(), inputs[0]->output_schema())); + return plan->EmplaceNode(plan, std::move(inputs), std::move(impl), sink_options.generator, + sink_options.backpressure, sink_options.output_table); + } + + const char* kind_name() const override { return "TableSinkNode"; } + + void InputReceived(ExecNode* input, ExecBatch batch) override { + DCHECK_EQ(input, inputs_[0]); + + auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(), + plan()->exec_context()->memory_pool()); + if (ErrorIfNotOk(maybe_batch.status())) { + StopProducing(); + if (input_counter_.Cancel()) { + finished_.MarkFinished(maybe_batch.status()); + } + return; + } + auto record_batch = maybe_batch.MoveValueUnsafe(); + + impl_->InputReceived(std::move(record_batch)); + if (input_counter_.Increment()) { + Finish(); + } + } + + protected: + Status DoFinish() { + ARROW_ASSIGN_OR_RAISE(Datum gathered, impl_->DoFinish()); + *output_table = gathered.table(); + return Status::OK(); + } + + void Finish() override { + Status st = DoFinish(); + if (ErrorIfNotOk(st)) { + producer_.Push(std::move(st)); + } + SinkNode::Finish(); + } + + private: + std::shared_ptr
*output_table; + std::unique_ptr impl_; +}; + } // namespace namespace internal { @@ -333,6 +394,7 @@ void RegisterSinkNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("order_by_sink", OrderBySinkNode::MakeSort)); DCHECK_OK(registry->AddFactory("consuming_sink", ConsumingSinkNode::Make)); DCHECK_OK(registry->AddFactory("sink", SinkNode::Make)); + DCHECK_OK(registry->AddFactory("table_sink", TableSinkNode::Make)); } } // namespace internal From 906a48db04408fdf34bfdcfef74aaad8b51a71d9 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 27 Jan 2022 15:59:19 +0530 Subject: [PATCH 3/8] initial table_sink node function added --- cpp/src/arrow/compute/exec/options.h | 17 +-- cpp/src/arrow/compute/exec/plan_test.cc | 37 +++++++ cpp/src/arrow/compute/exec/sink_node.cc | 134 +++++++++++++----------- 3 files changed, 120 insertions(+), 68 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 25bdb8d9fad..20c598a1d65 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -284,13 +284,18 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { /// /// plan->exec_context()->executor() will be used to parallelize pushing to /// outputs, if provided. -class ARROW_EXPORT TableSinkNodeOptions : public SinkNodeOptions { +class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: - TableSinkNodeOptions(std::shared_ptr
*output_table, - std::function>()>* generator) - : SinkNodeOptions(generator), output_table(output_table) {} - - std::shared_ptr
*output_table; + TableSinkNodeOptions(std::shared_ptr
* output_table, + std::shared_ptr output_schema, + Future<> finish = Future<>::Make()) + : output_table(output_table), + output_schema(std::move(output_schema)), + finish(std::move(finish)) {} + + std::shared_ptr
* output_table; + std::shared_ptr output_schema; + Future<> finish; }; } // namespace compute diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 258238dbb81..0ab006df7c7 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -488,6 +488,43 @@ TEST(ExecPlanExecution, SourceConsumingSink) { } } +TEST(ExecPlanExecution, SourceTableConsumingSink) { + for (bool slow : {false, true}) { + SCOPED_TRACE(slow ? "slowed" : "unslowed"); + + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel" : "single threaded"); + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + + Future<> finish = Future<>::Make(); + std::shared_ptr
out; + + auto basic_data = MakeBasicBatches(); + + TableSinkNodeOptions options{&out, basic_data.schema, finish}; + + ASSERT_OK_AND_ASSIGN( + auto source, MakeExecNode("source", plan.get(), {}, + SourceNodeOptions(basic_data.schema, + basic_data.gen(parallel, slow)))); + ASSERT_OK(MakeExecNode("table_sink", plan.get(), {source}, options)); + ASSERT_OK(plan->StartProducing()); + // Source should finish fairly quickly + ASSERT_FINISHES_OK(source->finished()); + SleepABit(); + ASSERT_OK_AND_ASSIGN(auto actual, + TableFromExecBatches(basic_data.schema, basic_data.batches)); + ASSERT_EQ(5, out->num_rows()); + AssertTablesEqual(*actual, *out); + // Consumer isn't finished and so plan shouldn't have finished + AssertNotFinished(plan->finished()); + // Mark consumption complete, plan should finish + finish.MarkFinished(); + ASSERT_FINISHES_OK(plan->finished()); + } + } +} + TEST(ExecPlanExecution, ConsumingSinkError) { struct ConsumeErrorConsumer : public SinkNodeConsumer { Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); } diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index a3eb070bab4..c723be55af1 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -232,6 +232,77 @@ class ConsumingSinkNode : public ExecNode { std::shared_ptr consumer_; }; +/** + * @brief This node is an extension on ConsumingSinkNode + * to facilitate to get the output from an execution plan + * as a table. We define a custom SinkNodeConsumer to + * enable this functionality. + */ + +struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { + public: + TableSinkNodeConsumer(std::shared_ptr
* out, + std::shared_ptr output_schema, MemoryPool* pool, + Future<> finish) + : out_(out), + output_schema_(output_schema), + pool_(pool), + finish_(std::move(finish)) {} + + Status Consume(ExecBatch batch) override { + ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_)); + if (rb) { + batch_vector.push_back(rb); + } else { + return Status::Invalid("Invalid ExecBatch consumed"); + } + return Status::OK(); + } + + Future<> Finish() override { + ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch_vector)); + *out_ = table; + return finish_; + } + + private: + std::shared_ptr
* out_; + std::shared_ptr output_schema_; + MemoryPool* pool_; + Future<> finish_; + std::vector> batch_vector; +}; + +static std::shared_ptr MakeTableSinkConsumer( + std::shared_ptr
* out, std::shared_ptr output_schema, MemoryPool* pool, + Future<> finish) { + auto tb_consumer = + std::make_shared(out, output_schema, pool, finish); + return std::move(tb_consumer); +} + +class TableConsumingSinkNode : public ConsumingSinkNode { + public: + TableConsumingSinkNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, + std::shared_ptr
* out, MemoryPool* pool, Future<> finish) + : ConsumingSinkNode(plan, inputs, + MakeTableSinkConsumer(out, output_schema, pool, finish)) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TableConsumingSinkNode")); + + const auto& sink_options = checked_cast(options); + MemoryPool* pool = plan->exec_context()->memory_pool(); + return plan->EmplaceNode( + plan, std::move(inputs), sink_options.output_schema, sink_options.output_table, + pool, sink_options.finish); + } + + const char* kind_name() const override { return "TableConsumingSinkNode"; } +}; + // A sink node that accumulates inputs, then sorts them before emitting them. struct OrderBySinkNode final : public SinkNode { OrderBySinkNode(ExecPlan* plan, std::vector inputs, @@ -324,67 +395,6 @@ struct OrderBySinkNode final : public SinkNode { std::unique_ptr impl_; }; -struct TableSinkNode final : public SinkNode { - TableSinkNode(ExecPlan* plan, std::vector inputs, - std::unique_ptr impl, - AsyncGenerator>* generator, - util::BackpressureOptions backpressure, std::shared_ptr
*output) - : SinkNode(plan, std::move(inputs), generator, std::move(backpressure)), output_table(output), - impl_{std::move(impl)} {} - - static Result Make(ExecPlan* plan, std::vector inputs, - const ExecNodeOptions& options) { - RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TableSinkNode")); - const auto& sink_options = checked_cast(options); - ARROW_ASSIGN_OR_RAISE( - std::unique_ptr impl, - OrderByImpl::MakeDefault(plan->exec_context(), inputs[0]->output_schema())); - return plan->EmplaceNode(plan, std::move(inputs), std::move(impl), sink_options.generator, - sink_options.backpressure, sink_options.output_table); - } - - const char* kind_name() const override { return "TableSinkNode"; } - - void InputReceived(ExecNode* input, ExecBatch batch) override { - DCHECK_EQ(input, inputs_[0]); - - auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(), - plan()->exec_context()->memory_pool()); - if (ErrorIfNotOk(maybe_batch.status())) { - StopProducing(); - if (input_counter_.Cancel()) { - finished_.MarkFinished(maybe_batch.status()); - } - return; - } - auto record_batch = maybe_batch.MoveValueUnsafe(); - - impl_->InputReceived(std::move(record_batch)); - if (input_counter_.Increment()) { - Finish(); - } - } - - protected: - Status DoFinish() { - ARROW_ASSIGN_OR_RAISE(Datum gathered, impl_->DoFinish()); - *output_table = gathered.table(); - return Status::OK(); - } - - void Finish() override { - Status st = DoFinish(); - if (ErrorIfNotOk(st)) { - producer_.Push(std::move(st)); - } - SinkNode::Finish(); - } - - private: - std::shared_ptr
*output_table; - std::unique_ptr impl_; -}; - } // namespace namespace internal { @@ -394,7 +404,7 @@ void RegisterSinkNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("order_by_sink", OrderBySinkNode::MakeSort)); DCHECK_OK(registry->AddFactory("consuming_sink", ConsumingSinkNode::Make)); DCHECK_OK(registry->AddFactory("sink", SinkNode::Make)); - DCHECK_OK(registry->AddFactory("table_sink", TableSinkNode::Make)); + DCHECK_OK(registry->AddFactory("table_sink", TableConsumingSinkNode::Make)); } } // namespace internal From 158fc8fd092d49cc1cd037fa4b9ce2634f91ec0a Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 27 Jan 2022 16:25:13 +0530 Subject: [PATCH 4/8] code clean up --- cpp/src/arrow/compute/exec/order_by_impl.cc | 32 --------------------- cpp/src/arrow/compute/exec/order_by_impl.h | 3 -- 2 files changed, 35 deletions(-) diff --git a/cpp/src/arrow/compute/exec/order_by_impl.cc b/cpp/src/arrow/compute/exec/order_by_impl.cc index 90576162361..4afcf884f53 100644 --- a/cpp/src/arrow/compute/exec/order_by_impl.cc +++ b/cpp/src/arrow/compute/exec/order_by_impl.cc @@ -86,32 +86,6 @@ class SelectKBasicImpl : public SortBasicImpl { const SelectKOptions options_; }; -class DefaultOrderImpl : public OrderByImpl { - public: - DefaultOrderImpl(ExecContext* ctx, const std::shared_ptr& output_schema) - : ctx_(ctx), output_schema_(output_schema) {} - - void InputReceived(const std::shared_ptr& batch) override { - std::unique_lock lock(mutex_); - batches_.push_back(batch); - } - - Result DoFinish() override { - std::unique_lock lock(mutex_); - ARROW_ASSIGN_OR_RAISE(auto table, - Table::FromRecordBatches(output_schema_, std::move(batches_))); - return table; - } - - std::string ToString() const override { return "default table generator"; } - - protected: - ExecContext* ctx_; - std::shared_ptr output_schema_; - std::mutex mutex_; - std::vector> batches_; -}; // namespace compute - Result> OrderByImpl::MakeSort( ExecContext* ctx, const std::shared_ptr& output_schema, const SortOptions& options) { @@ -126,11 +100,5 @@ Result> OrderByImpl::MakeSelectK( return std::move(impl); } -Result> OrderByImpl::MakeDefault( - ExecContext* ctx, const std::shared_ptr& output_schema) { - std::unique_ptr impl{new DefaultOrderImpl(ctx, output_schema)}; - return std::move(impl); -} - } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/order_by_impl.h b/cpp/src/arrow/compute/exec/order_by_impl.h index a0f02659ed4..afc92aeddb2 100644 --- a/cpp/src/arrow/compute/exec/order_by_impl.h +++ b/cpp/src/arrow/compute/exec/order_by_impl.h @@ -47,9 +47,6 @@ class OrderByImpl { static Result> MakeSelectK( ExecContext* ctx, const std::shared_ptr& output_schema, const SelectKOptions& options); - - static Result> MakeDefault( - ExecContext* ctx, const std::shared_ptr& output_schema); }; } // namespace compute From 4e765b29ab3ef6a2ab46edb5380a9d41be7aec5d Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 1 Feb 2022 14:12:57 +0530 Subject: [PATCH 5/8] address review comments --- cpp/src/arrow/compute/exec/options.h | 4 +- cpp/src/arrow/compute/exec/plan_test.cc | 6 +-- cpp/src/arrow/compute/exec/sink_node.cc | 62 ++++++++----------------- 3 files changed, 22 insertions(+), 50 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 20c598a1d65..68bce44daa7 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -282,8 +282,8 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { /// \brief Adapt an Table as a sink node /// -/// plan->exec_context()->executor() will be used to parallelize pushing to -/// outputs, if provided. +/// obtains the output of a execution plan to +/// a table pointer. class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: TableSinkNodeOptions(std::shared_ptr
* output_table, diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 0ab006df7c7..00d957ee6cf 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -501,7 +501,7 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { auto basic_data = MakeBasicBatches(); - TableSinkNodeOptions options{&out, basic_data.schema, finish}; + TableSinkNodeOptions options{&out, basic_data.schema}; ASSERT_OK_AND_ASSIGN( auto source, MakeExecNode("source", plan.get(), {}, @@ -516,10 +516,6 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { TableFromExecBatches(basic_data.schema, basic_data.batches)); ASSERT_EQ(5, out->num_rows()); AssertTablesEqual(*actual, *out); - // Consumer isn't finished and so plan shouldn't have finished - AssertNotFinished(plan->finished()); - // Mark consumption complete, plan should finish - finish.MarkFinished(); ASSERT_FINISHES_OK(plan->finished()); } } diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index c723be55af1..d1ad81a107c 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -242,67 +242,43 @@ class ConsumingSinkNode : public ExecNode { struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { public: TableSinkNodeConsumer(std::shared_ptr
* out, - std::shared_ptr output_schema, MemoryPool* pool, - Future<> finish) + std::shared_ptr output_schema, MemoryPool* pool) : out_(out), - output_schema_(output_schema), - pool_(pool), - finish_(std::move(finish)) {} + output_schema_(std::move(output_schema)), + pool_(pool) {} Status Consume(ExecBatch batch) override { + std::lock_guard guard(consum_mutex); ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_)); - if (rb) { - batch_vector.push_back(rb); - } else { - return Status::Invalid("Invalid ExecBatch consumed"); - } + batch_vector.push_back(rb); return Status::OK(); } Future<> Finish() override { - ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch_vector)); - *out_ = table; - return finish_; + ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(batch_vector)); + return Status::OK(); } private: std::shared_ptr
* out_; std::shared_ptr output_schema_; MemoryPool* pool_; - Future<> finish_; std::vector> batch_vector; + std::mutex consum_mutex; }; -static std::shared_ptr MakeTableSinkConsumer( - std::shared_ptr
* out, std::shared_ptr output_schema, MemoryPool* pool, - Future<> finish) { - auto tb_consumer = - std::make_shared(out, output_schema, pool, finish); - return std::move(tb_consumer); +static Result MakeTableConsumingSinkNode( + compute::ExecPlan* plan, std::vector inputs, + const compute::ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TableConsumingSinkNode")); + const auto& sink_options = checked_cast(options); + MemoryPool* pool = plan->exec_context()->memory_pool(); + auto tb_consumer = std::make_shared( + sink_options.output_table, sink_options.output_schema, pool); + auto consuming_sink_node_options = ConsumingSinkNodeOptions{tb_consumer}; + return MakeExecNode("consuming_sink", plan, inputs, consuming_sink_node_options); } -class TableConsumingSinkNode : public ConsumingSinkNode { - public: - TableConsumingSinkNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, - std::shared_ptr
* out, MemoryPool* pool, Future<> finish) - : ConsumingSinkNode(plan, inputs, - MakeTableSinkConsumer(out, output_schema, pool, finish)) {} - - static Result Make(ExecPlan* plan, std::vector inputs, - const ExecNodeOptions& options) { - RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TableConsumingSinkNode")); - - const auto& sink_options = checked_cast(options); - MemoryPool* pool = plan->exec_context()->memory_pool(); - return plan->EmplaceNode( - plan, std::move(inputs), sink_options.output_schema, sink_options.output_table, - pool, sink_options.finish); - } - - const char* kind_name() const override { return "TableConsumingSinkNode"; } -}; - // A sink node that accumulates inputs, then sorts them before emitting them. struct OrderBySinkNode final : public SinkNode { OrderBySinkNode(ExecPlan* plan, std::vector inputs, @@ -404,7 +380,7 @@ void RegisterSinkNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("order_by_sink", OrderBySinkNode::MakeSort)); DCHECK_OK(registry->AddFactory("consuming_sink", ConsumingSinkNode::Make)); DCHECK_OK(registry->AddFactory("sink", SinkNode::Make)); - DCHECK_OK(registry->AddFactory("table_sink", TableConsumingSinkNode::Make)); + DCHECK_OK(registry->AddFactory("table_sink", MakeTableConsumingSinkNode)); } } // namespace internal From 855af491391f54f01c8441e1a260d685fe110d03 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 1 Feb 2022 14:56:33 +0530 Subject: [PATCH 6/8] fix format issue --- cpp/src/arrow/compute/exec/options.h | 4 ++-- cpp/src/arrow/compute/exec/sink_node.cc | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 68bce44daa7..ff3bf0c2a9f 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -282,8 +282,8 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { /// \brief Adapt an Table as a sink node /// -/// obtains the output of a execution plan to -/// a table pointer. +/// obtains the output of a execution plan to +/// a table pointer. class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: TableSinkNodeOptions(std::shared_ptr
* output_table, diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index d1ad81a107c..ea7e655455e 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -243,9 +243,7 @@ struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { public: TableSinkNodeConsumer(std::shared_ptr
* out, std::shared_ptr output_schema, MemoryPool* pool) - : out_(out), - output_schema_(std::move(output_schema)), - pool_(pool) {} + : out_(out), output_schema_(std::move(output_schema)), pool_(pool) {} Status Consume(ExecBatch batch) override { std::lock_guard guard(consum_mutex); From dfa6806c7f4422d6cd3d78bfd3d718a2097a8149 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 2 Feb 2022 07:19:19 +0530 Subject: [PATCH 7/8] updated pr for review comments --- cpp/src/arrow/compute/exec/plan_test.cc | 1 - cpp/src/arrow/compute/exec/sink_node.cc | 10 +++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 00d957ee6cf..b4b24e832ef 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -496,7 +496,6 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { SCOPED_TRACE(parallel ? "parallel" : "single threaded"); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - Future<> finish = Future<>::Make(); std::shared_ptr
out; auto basic_data = MakeBasicBatches(); diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index ea7e655455e..c9808c0a3e3 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -246,14 +246,14 @@ struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { : out_(out), output_schema_(std::move(output_schema)), pool_(pool) {} Status Consume(ExecBatch batch) override { - std::lock_guard guard(consum_mutex); + std::lock_guard guard(consume_mutex_); ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_)); - batch_vector.push_back(rb); + batches_.push_back(rb); return Status::OK(); } Future<> Finish() override { - ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(batch_vector)); + ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(batches_)); return Status::OK(); } @@ -261,8 +261,8 @@ struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { std::shared_ptr
* out_; std::shared_ptr output_schema_; MemoryPool* pool_; - std::vector> batch_vector; - std::mutex consum_mutex; + std::vector> batches_; + std::mutex consume_mutex_; }; static Result MakeTableConsumingSinkNode( From 1125c3e0904573f9af0a0fb3cde9d0cea18fd873 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Fri, 4 Feb 2022 08:05:56 +0530 Subject: [PATCH 8/8] removing the future from options --- cpp/src/arrow/compute/exec/options.h | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index ff3bf0c2a9f..a6a97553154 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -287,15 +287,11 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: TableSinkNodeOptions(std::shared_ptr
* output_table, - std::shared_ptr output_schema, - Future<> finish = Future<>::Make()) - : output_table(output_table), - output_schema(std::move(output_schema)), - finish(std::move(finish)) {} + std::shared_ptr output_schema) + : output_table(output_table), output_schema(std::move(output_schema)) {} std::shared_ptr
* output_table; std::shared_ptr output_schema; - Future<> finish; }; } // namespace compute