From 8ff3ea927401b97e94d115639b99a1bb912933ce Mon Sep 17 00:00:00 2001 From: vibhatha Date: Tue, 10 Jan 2023 09:52:12 +0530 Subject: [PATCH 01/12] feat(batch-declr): adding DeclarationToExecBatches --- cpp/src/arrow/compute/exec/plan_test.cc | 361 ++++++++++-------------- 1 file changed, 155 insertions(+), 206 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index eb560da99cf..424f1c69570 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -218,21 +218,13 @@ TEST(ExecPlanExecution, SourceSink) { for (bool parallel : {false, true}) { SCOPED_TRACE(parallel ? "parallel" : "single threaded"); - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - auto basic_data = MakeBasicBatches(); - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{basic_data.schema, - basic_data.gen(parallel, slow)}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(basic_data.batches)))); + Declaration plan( + "source", SourceNodeOptions{basic_data.schema, basic_data.gen(parallel, slow)}); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, + basic_data.batches); } } } @@ -257,24 +249,18 @@ TEST(ExecPlanExecution, UseSinkAfterExecution) { } TEST(ExecPlanExecution, TableSourceSink) { - for (int batch_size : {1, 4}) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - - auto exp_batches = MakeBasicBatches(); - ASSERT_OK_AND_ASSIGN(auto table, - TableFromExecBatches(exp_batches.schema, exp_batches.batches)); - - ASSERT_OK(Declaration::Sequence( - { - {"table_source", TableSourceNodeOptions{table, batch_size}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); - ASSERT_OK_AND_ASSIGN(auto out_table, TableFromExecBatches(exp_batches.schema, res)); - AssertTablesEqualIgnoringOrder(table, out_table); + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel" : "single threaded"); + for (int batch_size : {1, 4}) { + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto table, + TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + Declaration plan("table_source", TableSourceNodeOptions{table, batch_size}); + + ASSERT_OK_AND_ASSIGN(auto result_table, + DeclarationToTable(std::move(plan), parallel)); + AssertTablesEqualIgnoringOrder(table, result_table); + } } } @@ -322,26 +308,19 @@ void TestSourceSink( std::string source_factory_name, std::function>(const BatchesWithSchema&)> to_elements) { - ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1)); - ExecContext exec_context(default_memory_pool(), executor.get()); - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context)); - AsyncGenerator> sink_gen; - - auto exp_batches = MakeBasicBatches(); - ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); - auto element_it_maker = [&elements]() { - return MakeVectorIterator(elements); - }; - - ASSERT_OK(Declaration::Sequence({ - {source_factory_name, - OptionsType{exp_batches.schema, element_it_maker}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)))); + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel" : "single threaded"); + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); + auto element_it_maker = [&elements]() { + return MakeVectorIterator(elements); + }; + Declaration plan(source_factory_name, + OptionsType{exp_batches.schema, element_it_maker}); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, + exp_batches.batches); + } } void TestRecordBatchReaderSourceSink( @@ -797,22 +776,13 @@ TEST(ExecPlanExecution, StressSourceSink) { int num_batches = (slow && !parallel) ? 30 : 300; - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - auto random_data = MakeRandomBatches( schema({field("a", int32()), field("b", boolean())}), num_batches); - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{random_data.schema, - random_data.gen(parallel, slow)}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(random_data.batches)))); + Declaration plan("source", SourceNodeOptions{random_data.schema, + random_data.gen(parallel, slow)}); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, + random_data.batches); } } } @@ -927,53 +897,41 @@ TEST(ExecPlanExecution, StressSourceSinkStopped) { } TEST(ExecPlanExecution, SourceFilterSink) { - auto basic_data = MakeBasicBatches(); - - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{basic_data.schema, - basic_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray( - {ExecBatchFromJSON({int32(), boolean()}, "[]"), - ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")})))); + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel" : "single threaded"); + auto basic_data = MakeBasicBatches(); + Declaration plan = Declaration::Sequence( + {{"source", + SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}}); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + auto exp_batches = {ExecBatchFromJSON({int32(), boolean()}, "[]"), + ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")}; + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); + } } TEST(ExecPlanExecution, SourceProjectSink) { - auto basic_data = MakeBasicBatches(); - - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{basic_data.schema, - basic_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"project", - ProjectNodeOptions{{ + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel" : "single threaded"); + auto basic_data = MakeBasicBatches(); + Declaration plan = Declaration::Sequence( + {{"source", + SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"project", ProjectNodeOptions{{ not_(field_ref("bool")), call("add", {field_ref("i32"), literal(1)}), }, - {"!bool", "i32 + 1"}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + {"!bool", "i32 + 1"}}}}); - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray( - {ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"), - ExecBatchFromJSON({boolean(), int32()}, - "[[null, 6], [true, 7], [true, 8]]")})))); + auto exp_batches = { + ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"), + ExecBatchFromJSON({boolean(), int32()}, "[[null, 6], [true, 7], [true, 8]]")}; + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); + } } namespace { @@ -1232,30 +1190,27 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumTopK) { } TEST(ExecPlanExecution, SourceScalarAggSink) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - - auto basic_data = MakeBasicBatches(); - - ASSERT_OK( - Declaration::Sequence( - { - {"source", - SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"aggregate", AggregateNodeOptions{ - /*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"}, - {"any", nullptr, "bool", "any(bool)"}}, - }}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + for (bool slow : {false, true}) { + SCOPED_TRACE(slow ? "slowed" : "unslowed"); + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + auto basic_data = MakeBasicBatches(); - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray({ - ExecBatchFromJSON({int64(), boolean()}, - {ArgShape::SCALAR, ArgShape::SCALAR}, "[[22, true]]"), - })))); + Declaration plan = Declaration::Sequence( + {{"source", + SourceNodeOptions{basic_data.schema, basic_data.gen(parallel, slow)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"}, + {"any", nullptr, "bool", "any(bool)"}}, + }}}); + auto exp_batches = { + ExecBatchFromJSON({int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, + "[[22, true]]"), + }; + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); + } + } } TEST(ExecPlanExecution, AggregationPreservesOptions) { @@ -1308,40 +1263,34 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) { TEST(ExecPlanExecution, ScalarSourceScalarAggSink) { // ARROW-9056: scalar aggregation can be done over scalars, taking // into account batch.length > 1 (e.g. a partition column) - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - - BatchesWithSchema scalar_data; - scalar_data.batches = { - ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, - "[[5, false], [5, false], [5, false]]"), - ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")}; - scalar_data.schema = schema({field("a", int32()), field("b", boolean())}); - - // index can't be tested as it's order-dependent - // mode/quantile can't be tested as they're technically vector kernels - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{scalar_data.schema, - scalar_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"aggregate", AggregateNodeOptions{/*aggregates=*/{ - {"all", nullptr, "b", "all(b)"}, - {"any", nullptr, "b", "any(b)"}, - {"count", nullptr, "a", "count(a)"}, - {"mean", nullptr, "a", "mean(a)"}, - {"product", nullptr, "a", "product(a)"}, - {"stddev", nullptr, "a", "stddev(a)"}, - {"sum", nullptr, "a", "sum(a)"}, - {"tdigest", nullptr, "a", "tdigest(a)"}, - {"variance", nullptr, "a", "variance(a)"}}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT( - StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray({ + for (bool slow : {false, true}) { + SCOPED_TRACE(slow ? "slowed" : "unslowed"); + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + BatchesWithSchema scalar_data; + scalar_data.batches = { + ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, + "[[5, false], [5, false], [5, false]]"), + ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")}; + scalar_data.schema = schema({field("a", int32()), field("b", boolean())}); + + // index can't be tested as it's order-dependent + // mode/quantile can't be tested as they're technically vector kernels + Declaration plan = Declaration::Sequence( + {{"source", + SourceNodeOptions{scalar_data.schema, scalar_data.gen(parallel, slow)}}, + {"aggregate", AggregateNodeOptions{/*aggregates=*/{ + {"all", nullptr, "b", "all(b)"}, + {"any", nullptr, "b", "any(b)"}, + {"count", nullptr, "a", "count(a)"}, + {"mean", nullptr, "a", "mean(a)"}, + {"product", nullptr, "a", "product(a)"}, + {"stddev", nullptr, "a", "stddev(a)"}, + {"sum", nullptr, "a", "sum(a)"}, + {"tdigest", nullptr, "a", "tdigest(a)"}, + {"variance", nullptr, "a", "variance(a)"}}}}}); + + auto exp_batches = { ExecBatchFromJSON( {boolean(), boolean(), int64(), float64(), int64(), float64(), int64(), float64(), float64()}, @@ -1349,7 +1298,11 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) { ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::ARRAY, ArgShape::SCALAR}, R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"), - })))); + }; + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); + } + } } TEST(ExecPlanExecution, ScalarSourceGroupedSum) { @@ -1463,57 +1416,53 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) { } TEST(ExecPlan, RecordBatchReaderSourceSink) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - // set up a RecordBatchReader: - auto input = MakeBasicBatches(); - - RecordBatchVector batches; - for (const ExecBatch& exec_batch : input.batches) { - ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema)); - batches.push_back(batch); - } + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + auto input = MakeBasicBatches(); - ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches)); - std::shared_ptr reader = std::make_shared(*table); + RecordBatchVector batches; + for (const ExecBatch& exec_batch : input.batches) { + ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema)); + batches.push_back(batch); + } - // Map the RecordBatchReader to a SourceNode - ASSERT_OK_AND_ASSIGN( - auto batch_gen, - MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool())); + ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches)); + std::shared_ptr reader = + std::make_shared(*table); - ASSERT_OK( - Declaration::Sequence({ - {"source", SourceNodeOptions{table->schema(), batch_gen}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + // Map the RecordBatchReader to a SourceNode + ASSERT_OK_AND_ASSIGN( + auto batch_gen, + MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool())); - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(input.batches)))); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{table->schema(), batch_gen}}}); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, input.batches); + } } TEST(ExecPlan, SourceEnforcesBatchLimit) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - auto random_data = MakeRandomBatches( - schema({field("a", int32()), field("b", boolean())}), /*num_batches=*/3, - /*batch_size=*/static_cast(std::floor(ExecPlan::kMaxBatchSize * 3.5))); - - AsyncGenerator> sink_gen; + for (bool slow : {false, true}) { + SCOPED_TRACE(slow ? "slowed" : "unslowed"); + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + auto random_data = MakeRandomBatches( + schema({field("a", int32()), field("b", boolean())}), /*num_batches=*/3, + /*batch_size=*/static_cast(std::floor(ExecPlan::kMaxBatchSize * 3.5))); - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{random_data.schema, - random_data.gen(/*parallel=*/true, - /*slow=*/false)}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - ASSERT_FINISHES_OK_AND_ASSIGN(std::vector batches, - StartAndCollect(plan.get(), std::move(sink_gen))); - for (const auto& batch : batches) { - ASSERT_LE(batch.length, ExecPlan::kMaxBatchSize); + // AsyncGenerator> sink_gen; + Declaration plan = Declaration::Sequence( + {{"source", + SourceNodeOptions{random_data.schema, random_data.gen(parallel, slow)}}}); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, true)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, + random_data.batches); + for (const auto& batch : result.batches) { + ASSERT_LE(batch.length, ExecPlan::kMaxBatchSize); + } + } } } From 49f9bda02b457923da01ba8cc7878d8bfce91db6 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Thu, 12 Jan 2023 09:30:15 +0530 Subject: [PATCH 02/12] fix(startAndCollect): replaced with DeclarationToXyz methods --- cpp/src/arrow/compute/exec/plan_test.cc | 61 ++++++++++--------------- 1 file changed, 23 insertions(+), 38 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 424f1c69570..edf035fa339 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -1004,56 +1004,41 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) { auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1); auto minmax_opts = std::make_shared(); - auto expected_result = ExecBatch::Make( - {ScalarFromJSON(struct_({field("min", int32()), field("max", int32())}), - R"({"min": -8, "max": 12})")}); - - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; + auto ty = struct_({field("min", int32()), field("max", int32())}); + auto expected_table = TableFromJSON(schema({field("struct", ty)}), {R"([ + [{"min": -8, "max": 12}] + ])"}); // NOTE: Test `ScalarAggregateNode` by omitting `keys` attribute - ASSERT_OK(Declaration::Sequence( - {{"source", - SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, - {"aggregate", AggregateNodeOptions{ - /*aggregates=*/{{"min_max", std::move(minmax_opts), - "i32", "min_max"}}, - /*keys=*/{}}}, - {"sink", SinkNodeOptions{&sink_gen}}}) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray({*expected_result})))); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, + {"aggregate", + AggregateNodeOptions{ + /*aggregates=*/{{"min_max", std::move(minmax_opts), "i32", "min_max"}}, + /*keys=*/{}}}}); + ASSERT_OK_AND_ASSIGN(auto result_table, DeclarationToTable(plan, parallel)); + AssertTablesEqual(*result_table, *expected_table); } } - +/// VIBHATHA WORKING TEST(ExecPlanExecution, NestedSourceFilter) { for (bool parallel : {false, true}) { SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); auto input = MakeNestedBatches(); - auto empty = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([])"); - auto expected = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([ + auto expected_table = TableFromJSON(input.schema, {R"([])", + R"([ [{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}], [{"i32": 7, "bool": false}] -])"); - - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{input.schema, - input.gen(parallel, /*slow=*/false)}}, - {"filter", FilterNodeOptions{greater_equal( - field_ref(FieldRef("struct", "i32")), literal(5))}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); +])"}); - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray({empty, expected})))); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, + {"filter", FilterNodeOptions{greater_equal(field_ref(FieldRef("struct", "i32")), + literal(5))}}}); + ASSERT_OK_AND_ASSIGN(auto result_table, DeclarationToTable(plan, parallel)); + AssertTablesEqual(*result_table, *expected_table); } } @@ -1424,7 +1409,7 @@ TEST(ExecPlan, RecordBatchReaderSourceSink) { RecordBatchVector batches; for (const ExecBatch& exec_batch : input.batches) { ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema)); - batches.push_back(batch); + batches.push_back(std::move(batch)); } ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches)); From a5c8ae39db23e5ca68a243ec3c227fec10ed681a Mon Sep 17 00:00:00 2001 From: vibhatha Date: Thu, 12 Jan 2023 10:46:43 +0530 Subject: [PATCH 03/12] fix(clean): adding move and cleaning up PR --- cpp/src/arrow/compute/exec/plan_test.cc | 258 +++++++++++------------- 1 file changed, 120 insertions(+), 138 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index edf035fa339..7634be6e948 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -317,7 +317,8 @@ void TestSourceSink( }; Declaration plan(source_factory_name, OptionsType{exp_batches.schema, element_it_maker}); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches.batches); } @@ -780,7 +781,8 @@ TEST(ExecPlanExecution, StressSourceSink) { schema({field("a", int32()), field("b", boolean())}), num_batches); Declaration plan("source", SourceNodeOptions{random_data.schema, random_data.gen(parallel, slow)}); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, random_data.batches); } @@ -897,41 +899,35 @@ TEST(ExecPlanExecution, StressSourceSinkStopped) { } TEST(ExecPlanExecution, SourceFilterSink) { - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel" : "single threaded"); - auto basic_data = MakeBasicBatches(); - Declaration plan = Declaration::Sequence( - {{"source", - SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}}); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); - auto exp_batches = {ExecBatchFromJSON({int32(), boolean()}, "[]"), - ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")}; - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); - } + auto basic_data = MakeBasicBatches(); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}}); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), /*user_threads=*/false)); + auto exp_batches = {ExecBatchFromJSON({int32(), boolean()}, "[]"), + ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")}; + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } TEST(ExecPlanExecution, SourceProjectSink) { - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel" : "single threaded"); - auto basic_data = MakeBasicBatches(); - Declaration plan = Declaration::Sequence( - {{"source", - SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"project", ProjectNodeOptions{{ - not_(field_ref("bool")), - call("add", {field_ref("i32"), literal(1)}), - }, - {"!bool", "i32 + 1"}}}}); - - auto exp_batches = { - ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"), - ExecBatchFromJSON({boolean(), int32()}, "[[null, 6], [true, 7], [true, 8]]")}; - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); - } + auto basic_data = MakeBasicBatches(); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"project", ProjectNodeOptions{{ + not_(field_ref("bool")), + call("add", {field_ref("i32"), literal(1)}), + }, + {"!bool", "i32 + 1"}}}}); + + auto exp_batches = { + ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"), + ExecBatchFromJSON({boolean(), int32()}, "[[null, 6], [true, 7], [true, 8]]")}; + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } namespace { @@ -1016,11 +1012,12 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) { AggregateNodeOptions{ /*aggregates=*/{{"min_max", std::move(minmax_opts), "i32", "min_max"}}, /*keys=*/{}}}}); - ASSERT_OK_AND_ASSIGN(auto result_table, DeclarationToTable(plan, parallel)); + ASSERT_OK_AND_ASSIGN(auto result_table, + DeclarationToTable(std::move(plan), parallel)); AssertTablesEqual(*result_table, *expected_table); } } -/// VIBHATHA WORKING + TEST(ExecPlanExecution, NestedSourceFilter) { for (bool parallel : {false, true}) { SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); @@ -1037,7 +1034,8 @@ TEST(ExecPlanExecution, NestedSourceFilter) { {{"source", SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, {"filter", FilterNodeOptions{greater_equal(field_ref(FieldRef("struct", "i32")), literal(5))}}}); - ASSERT_OK_AND_ASSIGN(auto result_table, DeclarationToTable(plan, parallel)); + ASSERT_OK_AND_ASSIGN(auto result_table, + DeclarationToTable(std::move(plan), parallel)); AssertTablesEqual(*result_table, *expected_table); } } @@ -1175,27 +1173,22 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumTopK) { } TEST(ExecPlanExecution, SourceScalarAggSink) { - for (bool slow : {false, true}) { - SCOPED_TRACE(slow ? "slowed" : "unslowed"); - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); - auto basic_data = MakeBasicBatches(); + auto basic_data = MakeBasicBatches(); - Declaration plan = Declaration::Sequence( - {{"source", - SourceNodeOptions{basic_data.schema, basic_data.gen(parallel, slow)}}, - {"aggregate", AggregateNodeOptions{ - /*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"}, - {"any", nullptr, "bool", "any(bool)"}}, - }}}); - auto exp_batches = { - ExecBatchFromJSON({int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, - "[[22, true]]"), - }; - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false)); - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); - } - } + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{basic_data.schema, + basic_data.gen(/*parallel=*/false, /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"}, + {"any", nullptr, "bool", "any(bool)"}}, + }}}); + auto exp_batches = { + ExecBatchFromJSON({int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, + "[[22, true]]"), + }; + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } TEST(ExecPlanExecution, AggregationPreservesOptions) { @@ -1248,46 +1241,41 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) { TEST(ExecPlanExecution, ScalarSourceScalarAggSink) { // ARROW-9056: scalar aggregation can be done over scalars, taking // into account batch.length > 1 (e.g. a partition column) - for (bool slow : {false, true}) { - SCOPED_TRACE(slow ? "slowed" : "unslowed"); - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); - BatchesWithSchema scalar_data; - scalar_data.batches = { - ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, - "[[5, false], [5, false], [5, false]]"), - ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")}; - scalar_data.schema = schema({field("a", int32()), field("b", boolean())}); - - // index can't be tested as it's order-dependent - // mode/quantile can't be tested as they're technically vector kernels - Declaration plan = Declaration::Sequence( - {{"source", - SourceNodeOptions{scalar_data.schema, scalar_data.gen(parallel, slow)}}, - {"aggregate", AggregateNodeOptions{/*aggregates=*/{ - {"all", nullptr, "b", "all(b)"}, - {"any", nullptr, "b", "any(b)"}, - {"count", nullptr, "a", "count(a)"}, - {"mean", nullptr, "a", "mean(a)"}, - {"product", nullptr, "a", "product(a)"}, - {"stddev", nullptr, "a", "stddev(a)"}, - {"sum", nullptr, "a", "sum(a)"}, - {"tdigest", nullptr, "a", "tdigest(a)"}, - {"variance", nullptr, "a", "variance(a)"}}}}}); - - auto exp_batches = { - ExecBatchFromJSON( - {boolean(), boolean(), int64(), float64(), int64(), float64(), int64(), - float64(), float64()}, - {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, - ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::ARRAY, - ArgShape::SCALAR}, - R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"), - }; - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false)); - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); - } - } + BatchesWithSchema scalar_data; + scalar_data.batches = { + ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, + "[[5, false], [5, false], [5, false]]"), + ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")}; + scalar_data.schema = schema({field("a", int32()), field("b", boolean())}); + + // index can't be tested as it's order-dependent + // mode/quantile can't be tested as they're technically vector kernels + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{scalar_data.schema, + scalar_data.gen(/*parallel=*/false, /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"all", nullptr, "b", "all(b)"}, + {"any", nullptr, "b", "any(b)"}, + {"count", nullptr, "a", "count(a)"}, + {"mean", nullptr, "a", "mean(a)"}, + {"product", nullptr, "a", "product(a)"}, + {"stddev", nullptr, "a", "stddev(a)"}, + {"sum", nullptr, "a", "sum(a)"}, + {"tdigest", nullptr, "a", "tdigest(a)"}, + {"variance", nullptr, "a", "variance(a)"}}}}}); + + auto exp_batches = { + ExecBatchFromJSON( + {boolean(), boolean(), int64(), float64(), int64(), float64(), int64(), + float64(), float64()}, + {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, + ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::ARRAY, + ArgShape::SCALAR}, + R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"), + }; + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } TEST(ExecPlanExecution, ScalarSourceGroupedSum) { @@ -1349,7 +1337,8 @@ TEST(ExecPlanExecution, SelfInnerHashJoinSink) { auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts)); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); std::vector expected = { ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([ @@ -1386,7 +1375,8 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) { auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts)); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); std::vector expected = { ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([ @@ -1402,52 +1392,44 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) { TEST(ExecPlan, RecordBatchReaderSourceSink) { // set up a RecordBatchReader: - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); - auto input = MakeBasicBatches(); + auto input = MakeBasicBatches(); - RecordBatchVector batches; - for (const ExecBatch& exec_batch : input.batches) { - ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema)); - batches.push_back(std::move(batch)); - } + RecordBatchVector batches; + for (const ExecBatch& exec_batch : input.batches) { + ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema)); + batches.push_back(std::move(batch)); + } - ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches)); - std::shared_ptr reader = - std::make_shared(*table); + ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches)); + std::shared_ptr reader = std::make_shared(*table); - // Map the RecordBatchReader to a SourceNode - ASSERT_OK_AND_ASSIGN( - auto batch_gen, - MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool())); + // Map the RecordBatchReader to a SourceNode + ASSERT_OK_AND_ASSIGN( + auto batch_gen, + MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool())); - Declaration plan = Declaration::Sequence( - {{"source", SourceNodeOptions{table->schema(), batch_gen}}}); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false)); - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, input.batches); - } + Declaration plan = + Declaration::Sequence({{"source", SourceNodeOptions{table->schema(), batch_gen}}}); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, input.batches); } TEST(ExecPlan, SourceEnforcesBatchLimit) { - for (bool slow : {false, true}) { - SCOPED_TRACE(slow ? "slowed" : "unslowed"); - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); - auto random_data = MakeRandomBatches( - schema({field("a", int32()), field("b", boolean())}), /*num_batches=*/3, - /*batch_size=*/static_cast(std::floor(ExecPlan::kMaxBatchSize * 3.5))); - - // AsyncGenerator> sink_gen; - Declaration plan = Declaration::Sequence( - {{"source", - SourceNodeOptions{random_data.schema, random_data.gen(parallel, slow)}}}); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, true)); - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, - random_data.batches); - for (const auto& batch : result.batches) { - ASSERT_LE(batch.length, ExecPlan::kMaxBatchSize); - } - } + auto random_data = MakeRandomBatches( + schema({field("a", int32()), field("b", boolean())}), /*num_batches=*/3, + /*batch_size=*/static_cast(std::floor(ExecPlan::kMaxBatchSize * 3.5))); + + // AsyncGenerator> sink_gen; + Declaration plan = Declaration::Sequence( + {{"source", + SourceNodeOptions{random_data.schema, + random_data.gen(/*parallel=*/false, /*slow=*/false)}}}); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, random_data.batches); + for (const auto& batch : result.batches) { + ASSERT_LE(batch.length, ExecPlan::kMaxBatchSize); } } From 3d9065ab5d5d7dfb304c2f79c5ee717fbe7c3911 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Thu, 12 Jan 2023 16:05:01 +0530 Subject: [PATCH 04/12] fix(self-review): s1 --- cpp/src/arrow/compute/exec/plan_test.cc | 45 +++++++++++-------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 7634be6e948..8b890a4fd92 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -249,18 +249,15 @@ TEST(ExecPlanExecution, UseSinkAfterExecution) { } TEST(ExecPlanExecution, TableSourceSink) { - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel" : "single threaded"); - for (int batch_size : {1, 4}) { - auto exp_batches = MakeBasicBatches(); - ASSERT_OK_AND_ASSIGN(auto table, - TableFromExecBatches(exp_batches.schema, exp_batches.batches)); - Declaration plan("table_source", TableSourceNodeOptions{table, batch_size}); - - ASSERT_OK_AND_ASSIGN(auto result_table, - DeclarationToTable(std::move(plan), parallel)); - AssertTablesEqualIgnoringOrder(table, result_table); - } + for (int batch_size : {1, 4}) { + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto table, + TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + Declaration plan("table_source", TableSourceNodeOptions{table, batch_size}); + + ASSERT_OK_AND_ASSIGN(auto result_table, + DeclarationToTable(std::move(plan), /*use_threads=*/false)); + AssertTablesEqualIgnoringOrder(table, result_table); } } @@ -308,20 +305,16 @@ void TestSourceSink( std::string source_factory_name, std::function>(const BatchesWithSchema&)> to_elements) { - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel" : "single threaded"); - auto exp_batches = MakeBasicBatches(); - ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); - auto element_it_maker = [&elements]() { - return MakeVectorIterator(elements); - }; - Declaration plan(source_factory_name, - OptionsType{exp_batches.schema, element_it_maker}); - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), parallel)); - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, - exp_batches.batches); - } + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); + auto element_it_maker = [&elements]() { + return MakeVectorIterator(elements); + }; + Declaration plan(source_factory_name, + OptionsType{exp_batches.schema, element_it_maker}); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches.batches); } void TestRecordBatchReaderSourceSink( From f356a733b294a367391edbba0c38db88e27ef15f Mon Sep 17 00:00:00 2001 From: vibhatha Date: Thu, 12 Jan 2023 20:03:07 +0530 Subject: [PATCH 05/12] fix(issue): ide sync issue --- cpp/src/arrow/compute/exec/plan_test.cc | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 8b890a4fd92..7d076cea483 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -219,6 +219,7 @@ TEST(ExecPlanExecution, SourceSink) { SCOPED_TRACE(parallel ? "parallel" : "single threaded"); auto basic_data = MakeBasicBatches(); + Declaration plan( "source", SourceNodeOptions{basic_data.schema, basic_data.gen(parallel, slow)}); ASSERT_OK_AND_ASSIGN(auto result, @@ -311,10 +312,11 @@ void TestSourceSink( return MakeVectorIterator(elements); }; Declaration plan(source_factory_name, - OptionsType{exp_batches.schema, element_it_maker}); + OptionsType{exp_batches.schema, element_it_maker}); ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches.batches); + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, + exp_batches.batches); } void TestRecordBatchReaderSourceSink( @@ -1017,11 +1019,11 @@ TEST(ExecPlanExecution, NestedSourceFilter) { auto input = MakeNestedBatches(); auto expected_table = TableFromJSON(input.schema, {R"([])", - R"([ + R"([ [{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}], [{"i32": 7, "bool": false}] -])"}); + ])"}); Declaration plan = Declaration::Sequence( {{"source", SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, @@ -1330,8 +1332,7 @@ TEST(ExecPlanExecution, SelfInnerHashJoinSink) { auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts)); - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), parallel)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan), parallel)); std::vector expected = { ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([ @@ -1368,8 +1369,7 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) { auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts)); - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), parallel)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan), parallel)); std::vector expected = { ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([ @@ -1413,7 +1413,6 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) { schema({field("a", int32()), field("b", boolean())}), /*num_batches=*/3, /*batch_size=*/static_cast(std::floor(ExecPlan::kMaxBatchSize * 3.5))); - // AsyncGenerator> sink_gen; Declaration plan = Declaration::Sequence( {{"source", SourceNodeOptions{random_data.schema, From 55997b8f9be005f19cd5770ba599db50014b21f7 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Thu, 12 Jan 2023 20:04:50 +0530 Subject: [PATCH 06/12] fix(format) --- cpp/src/arrow/compute/exec/plan_test.cc | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 7d076cea483..a40e8a95f89 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -219,7 +219,7 @@ TEST(ExecPlanExecution, SourceSink) { SCOPED_TRACE(parallel ? "parallel" : "single threaded"); auto basic_data = MakeBasicBatches(); - + Declaration plan( "source", SourceNodeOptions{basic_data.schema, basic_data.gen(parallel, slow)}); ASSERT_OK_AND_ASSIGN(auto result, @@ -312,11 +312,10 @@ void TestSourceSink( return MakeVectorIterator(elements); }; Declaration plan(source_factory_name, - OptionsType{exp_batches.schema, element_it_maker}); + OptionsType{exp_batches.schema, element_it_maker}); ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, - exp_batches.batches); + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches.batches); } void TestRecordBatchReaderSourceSink( @@ -1019,7 +1018,7 @@ TEST(ExecPlanExecution, NestedSourceFilter) { auto input = MakeNestedBatches(); auto expected_table = TableFromJSON(input.schema, {R"([])", - R"([ + R"([ [{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}], [{"i32": 7, "bool": false}] @@ -1332,7 +1331,8 @@ TEST(ExecPlanExecution, SelfInnerHashJoinSink) { auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts)); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan), parallel)); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); std::vector expected = { ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([ @@ -1369,7 +1369,8 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) { auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts)); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan), parallel)); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); std::vector expected = { ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([ From bf54801ac1dc5d1372aac0373af4ae142d15dfdc Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Sat, 14 Jan 2023 06:37:40 +0530 Subject: [PATCH 07/12] fix(suggestion) Co-authored-by: Weston Pace --- cpp/src/arrow/compute/exec/plan_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index a40e8a95f89..c819dd87d91 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -1178,7 +1178,7 @@ TEST(ExecPlanExecution, SourceScalarAggSink) { }}}); auto exp_batches = { ExecBatchFromJSON({int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, - "[[22, true]]"), + "[[22, true]]") }; ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); From bd64d3716553ac1b8621f45a2631d6b1951c55ad Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Sat, 14 Jan 2023 06:41:42 +0530 Subject: [PATCH 08/12] fix(suggestion) Co-authored-by: Weston Pace --- cpp/src/arrow/compute/exec/plan_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index c819dd87d91..67a84afe47b 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -899,7 +899,7 @@ TEST(ExecPlanExecution, SourceFilterSink) { /*slow=*/false)}}, {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}}); ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), /*user_threads=*/false)); + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); auto exp_batches = {ExecBatchFromJSON({int32(), boolean()}, "[]"), ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")}; AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); From 72f96bd866c780b20185d055e82102839d9b1c63 Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Sat, 14 Jan 2023 06:42:06 +0530 Subject: [PATCH 09/12] fix(suggestion) Co-authored-by: Weston Pace --- cpp/src/arrow/compute/exec/plan_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 67a84afe47b..d9389897795 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -994,8 +994,8 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) { auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1); auto minmax_opts = std::make_shared(); - auto ty = struct_({field("min", int32()), field("max", int32())}); - auto expected_table = TableFromJSON(schema({field("struct", ty)}), {R"([ + auto min_max_type = struct_({field("min", int32()), field("max", int32())}); + auto expected_table = TableFromJSON(schema({field("struct", min_max_type)}), {R"([ [{"min": -8, "max": 12}] ])"}); From c305dec7a0afe1953cddfa36c7852e4ada0326e4 Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Sat, 14 Jan 2023 06:42:32 +0530 Subject: [PATCH 10/12] fix(suggestion) Co-authored-by: Weston Pace --- cpp/src/arrow/compute/exec/plan_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index d9389897795..d894a382397 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -1008,6 +1008,7 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) { /*keys=*/{}}}}); ASSERT_OK_AND_ASSIGN(auto result_table, DeclarationToTable(std::move(plan), parallel)); + // No need to ignore order since there is only 1 row AssertTablesEqual(*result_table, *expected_table); } } From a4c4ffca3815f9869b6e42f46e16be05a5b78460 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Sat, 14 Jan 2023 07:55:49 +0530 Subject: [PATCH 11/12] fix(format): formatted added suggestions --- cpp/src/arrow/compute/exec/plan_test.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index d894a382397..9faaedfa754 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -1177,10 +1177,8 @@ TEST(ExecPlanExecution, SourceScalarAggSink) { /*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"}, {"any", nullptr, "bool", "any(bool)"}}, }}}); - auto exp_batches = { - ExecBatchFromJSON({int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, - "[[22, true]]") - }; + auto exp_batches = {ExecBatchFromJSON( + {int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, "[[22, true]]")}; ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); From ff85011e992e5cb5473ba96316ae5af9acade1fa Mon Sep 17 00:00:00 2001 From: vibhatha Date: Sat, 14 Jan 2023 08:54:28 +0530 Subject: [PATCH 12/12] fix(defaults): setting default value to DeclaraToExecBatches call --- cpp/src/arrow/compute/exec/plan_test.cc | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 9faaedfa754..4f68cbab205 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -898,8 +898,7 @@ TEST(ExecPlanExecution, SourceFilterSink) { {{"source", SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, /*slow=*/false)}}, {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}}); - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); auto exp_batches = {ExecBatchFromJSON({int32(), boolean()}, "[]"), ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")}; AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); @@ -919,8 +918,7 @@ TEST(ExecPlanExecution, SourceProjectSink) { auto exp_batches = { ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"), ExecBatchFromJSON({boolean(), int32()}, "[[null, 6], [true, 7], [true, 8]]")}; - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } @@ -1179,8 +1177,7 @@ TEST(ExecPlanExecution, SourceScalarAggSink) { }}}); auto exp_batches = {ExecBatchFromJSON( {int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, "[[22, true]]")}; - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } @@ -1266,8 +1263,7 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) { ArgShape::SCALAR}, R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"), }; - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } @@ -1403,8 +1399,7 @@ TEST(ExecPlan, RecordBatchReaderSourceSink) { Declaration plan = Declaration::Sequence({{"source", SourceNodeOptions{table->schema(), batch_gen}}}); - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, input.batches); } @@ -1417,8 +1412,7 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) { {{"source", SourceNodeOptions{random_data.schema, random_data.gen(/*parallel=*/false, /*slow=*/false)}}}); - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, random_data.batches); for (const auto& batch : result.batches) { ASSERT_LE(batch.length, ExecPlan::kMaxBatchSize);