From 38eb84f61dba988474fa62d082215d38e85706aa Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Fri, 4 Mar 2022 18:57:55 +0530 Subject: [PATCH 1/7] updating submodule --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index 53b49804710..634739c6644 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 53b498047109d9940fcfab388bd9d6aeb8c57425 +Subproject commit 634739c664433cec366b4b9a81d1e1044a8c5eda From b51d90617f7632759f5896a4edfb51e83ed41e0b Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sat, 19 Mar 2022 19:14:39 +0530 Subject: [PATCH 2/7] temp commit to remove files in submodule --- .gitmodules | 3 --- testing | 1 - 2 files changed, 4 deletions(-) delete mode 160000 testing diff --git a/.gitmodules b/.gitmodules index 6efc4871542..71722b21777 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ [submodule "cpp/submodules/parquet-testing"] path = cpp/submodules/parquet-testing url = https://github.com/apache/parquet-testing.git -[submodule "testing"] - path = testing - url = https://github.com/apache/arrow-testing diff --git a/testing b/testing deleted file mode 160000 index 634739c6644..00000000000 --- a/testing +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 634739c664433cec366b4b9a81d1e1044a8c5eda From 6fc769ac527af80d30c481742049ae9ac45fd245 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sat, 19 Mar 2022 19:15:36 +0530 Subject: [PATCH 3/7] adding submodule --- .gitmodules | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitmodules b/.gitmodules index 71722b21777..6efc4871542 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "cpp/submodules/parquet-testing"] path = cpp/submodules/parquet-testing url = https://github.com/apache/parquet-testing.git +[submodule "testing"] + path = testing + url = https://github.com/apache/arrow-testing From c9812e3f9d494d8a9125895bc3428a3356181716 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sun, 20 Mar 2022 23:09:21 +0530 Subject: [PATCH 4/7] updating testing submodule --- testing | 1 + 1 file changed, 1 insertion(+) create mode 160000 testing diff --git a/testing b/testing new file mode 160000 index 00000000000..d315f798520 --- /dev/null +++ b/testing @@ -0,0 +1 @@ +Subproject commit d315f7985207d2d67fc2c8e41053e9d97d573f4b From 39610ffa1126200f56fe48c3d634920f8082457d Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sun, 20 Mar 2022 23:11:24 +0530 Subject: [PATCH 5/7] revert to uupstream version --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index d315f798520..53b49804710 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit d315f7985207d2d67fc2c8e41053e9d97d573f4b +Subproject commit 53b498047109d9940fcfab388bd9d6aeb8c57425 From e198581c644b45e054cbe5b01125f369cdd8f8d9 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Fri, 25 Mar 2022 18:07:22 +0530 Subject: [PATCH 6/7] initial commit on sink node output_schema fix --- cpp/src/arrow/compute/exec/sink_node.cc | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 13564c736b5..d8886a581df 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -49,18 +49,19 @@ namespace { class SinkNode : public ExecNode { public: SinkNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, AsyncGenerator>* generator, util::BackpressureOptions backpressure) - : ExecNode(plan, std::move(inputs), {"collected"}, {}, + : ExecNode(plan, std::move(inputs), {"collected"}, std::move(output_schema), /*num_outputs=*/0), producer_(MakeProducer(generator, std::move(backpressure))) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "SinkNode")); - + auto schema = inputs[0]->output_schema(); const auto& sink_options = checked_cast(options); - return plan->EmplaceNode(plan, std::move(inputs), sink_options.generator, + return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), sink_options.generator, sink_options.backpressure); } @@ -307,10 +308,11 @@ static Result MakeTableConsumingSinkNode( // A sink node that accumulates inputs, then sorts them before emitting them. struct OrderBySinkNode final : public SinkNode { OrderBySinkNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, std::unique_ptr impl, AsyncGenerator>* generator, util::BackpressureOptions backpressure) - : SinkNode(plan, std::move(inputs), generator, std::move(backpressure)), + : SinkNode(plan, std::move(inputs), std::move(output_schema), generator, std::move(backpressure)), impl_{std::move(impl)} {} const char* kind_name() const override { return "OrderBySinkNode"; } @@ -319,13 +321,13 @@ struct OrderBySinkNode final : public SinkNode { static Result MakeSort(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode")); - + auto schema = inputs[0]->output_schema(); const auto& sink_options = checked_cast(options); ARROW_ASSIGN_OR_RAISE( std::unique_ptr impl, - OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(), + OrderByImpl::MakeSort(plan->exec_context(), schema, sink_options.sort_options)); - return plan->EmplaceNode(plan, std::move(inputs), std::move(impl), + return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), std::move(impl), sink_options.generator, sink_options.backpressure); } @@ -334,13 +336,13 @@ struct OrderBySinkNode final : public SinkNode { static Result MakeSelectK(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode")); - + auto schema = inputs[0]->output_schema(); const auto& sink_options = checked_cast(options); ARROW_ASSIGN_OR_RAISE( std::unique_ptr impl, - OrderByImpl::MakeSelectK(plan->exec_context(), inputs[0]->output_schema(), + OrderByImpl::MakeSelectK(plan->exec_context(), schema, sink_options.select_k_options)); - return plan->EmplaceNode(plan, std::move(inputs), std::move(impl), + return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), std::move(impl), sink_options.generator, sink_options.backpressure); } From b44adfe3173c714213204bbc06e9e77b67034b96 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Fri, 25 Mar 2022 19:07:59 +0530 Subject: [PATCH 7/7] adding test cases --- cpp/src/arrow/compute/exec/plan_test.cc | 65 +++++++++++++++++++++++++ cpp/src/arrow/compute/exec/sink_node.cc | 36 +++++++------- 2 files changed, 83 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index e176c701b65..04225e31343 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -559,6 +559,71 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { } } +TEST(ExecPlanExecution, SinkNodeOutputSchema) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + auto basic_data = MakeBasicBatches(); + + ASSERT_OK( + Declaration::Sequence({ + {"source", SourceNodeOptions{basic_data.schema, + basic_data.gen(true, true)}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + ASSERT_EQ(plan->sources()[0]->output_schema(), plan->sinks()[0]->output_schema()); +} + +TEST(ExecPlanExecution, ConsumingSinkNodeOutputSchema) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + std::atomic batches_seen{0}; + Future<> finish = Future<>::Make(); + struct TestConsumer : public SinkNodeConsumer { + TestConsumer(std::atomic* batches_seen, Future<> finish) + : batches_seen(batches_seen), finish(std::move(finish)) {} + + Status Consume(ExecBatch batch) override { + (*batches_seen)++; + return Status::OK(); + } + + Future<> Finish() override { return finish; } + + std::atomic* batches_seen; + Future<> finish; + }; + std::shared_ptr consumer = + std::make_shared(&batches_seen, finish); + + auto basic_data = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN( + auto source, + MakeExecNode("source", plan.get(), {}, + SourceNodeOptions(basic_data.schema, basic_data.gen(true, true)))); + ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source}, + ConsumingSinkNodeOptions(consumer))); + ASSERT_EQ(plan->sources()[0]->output_schema(), plan->sinks()[0]->output_schema()); +} + +TEST(ExecPlanExecution, OrderBySinkNodeOutputSchema) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + auto input_schema = schema({field("a", int32()), field("b", boolean())}); + AsyncGenerator> sink_gen; + + auto random_data = MakeRandomBatches(input_schema, 10); + + SortOptions options({SortKey("a", SortOrder::Ascending)}); + ASSERT_OK(Declaration::Sequence( + { + {"source", + SourceNodeOptions{random_data.schema, random_data.gen(true, true)}}, + {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}}, + }) + .AddToPlan(plan.get())); + ASSERT_EQ(plan->sources()[0]->output_schema(), plan->sinks()[0]->output_schema()); +} + TEST(ExecPlanExecution, ConsumingSinkError) { struct ConsumeErrorConsumer : public SinkNodeConsumer { Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); } diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index d8886a581df..3a31288e6b6 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -49,7 +49,7 @@ namespace { class SinkNode : public ExecNode { public: SinkNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, + std::shared_ptr output_schema, AsyncGenerator>* generator, util::BackpressureOptions backpressure) : ExecNode(plan, std::move(inputs), {"collected"}, std::move(output_schema), @@ -61,8 +61,8 @@ class SinkNode : public ExecNode { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "SinkNode")); auto schema = inputs[0]->output_schema(); const auto& sink_options = checked_cast(options); - return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), sink_options.generator, - sink_options.backpressure); + return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), + sink_options.generator, sink_options.backpressure); } static PushGenerator>::Producer MakeProducer( @@ -158,18 +158,19 @@ class SinkNode : public ExecNode { class ConsumingSinkNode : public ExecNode { public: ConsumingSinkNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, std::shared_ptr consumer) - : ExecNode(plan, std::move(inputs), {"to_consume"}, {}, + : ExecNode(plan, std::move(inputs), {"to_consume"}, std::move(output_schema), /*num_outputs=*/0), consumer_(std::move(consumer)) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "SinkNode")); - + auto schema = inputs[0]->output_schema(); const auto& sink_options = checked_cast(options); - return plan->EmplaceNode(plan, std::move(inputs), - std::move(sink_options.consumer)); + return plan->EmplaceNode( + plan, std::move(inputs), std::move(schema), std::move(sink_options.consumer)); } const char* kind_name() const override { return "ConsumingSinkNode"; } @@ -312,7 +313,8 @@ struct OrderBySinkNode final : public SinkNode { std::unique_ptr impl, AsyncGenerator>* generator, util::BackpressureOptions backpressure) - : SinkNode(plan, std::move(inputs), std::move(output_schema), generator, std::move(backpressure)), + : SinkNode(plan, std::move(inputs), std::move(output_schema), generator, + std::move(backpressure)), impl_{std::move(impl)} {} const char* kind_name() const override { return "OrderBySinkNode"; } @@ -325,10 +327,9 @@ struct OrderBySinkNode final : public SinkNode { const auto& sink_options = checked_cast(options); ARROW_ASSIGN_OR_RAISE( std::unique_ptr impl, - OrderByImpl::MakeSort(plan->exec_context(), schema, - sink_options.sort_options)); - return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), std::move(impl), - sink_options.generator, + OrderByImpl::MakeSort(plan->exec_context(), schema, sink_options.sort_options)); + return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), + std::move(impl), sink_options.generator, sink_options.backpressure); } @@ -338,12 +339,11 @@ struct OrderBySinkNode final : public SinkNode { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode")); auto schema = inputs[0]->output_schema(); const auto& sink_options = checked_cast(options); - ARROW_ASSIGN_OR_RAISE( - std::unique_ptr impl, - OrderByImpl::MakeSelectK(plan->exec_context(), schema, - sink_options.select_k_options)); - return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), std::move(impl), - sink_options.generator, + ARROW_ASSIGN_OR_RAISE(std::unique_ptr impl, + OrderByImpl::MakeSelectK(plan->exec_context(), schema, + sink_options.select_k_options)); + return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), + std::move(impl), sink_options.generator, sink_options.backpressure); }