diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index eb560da99cf..4f68cbab205 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -218,21 +218,14 @@ TEST(ExecPlanExecution, SourceSink) { for (bool parallel : {false, true}) { SCOPED_TRACE(parallel ? "parallel" : "single threaded"); - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - auto basic_data = MakeBasicBatches(); - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{basic_data.schema, - basic_data.gen(parallel, slow)}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(basic_data.batches)))); + Declaration plan( + "source", SourceNodeOptions{basic_data.schema, basic_data.gen(parallel, slow)}); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, + basic_data.batches); } } } @@ -258,23 +251,14 @@ TEST(ExecPlanExecution, UseSinkAfterExecution) { TEST(ExecPlanExecution, TableSourceSink) { for (int batch_size : {1, 4}) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - auto exp_batches = MakeBasicBatches(); ASSERT_OK_AND_ASSIGN(auto table, TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + Declaration plan("table_source", TableSourceNodeOptions{table, batch_size}); - ASSERT_OK(Declaration::Sequence( - { - {"table_source", TableSourceNodeOptions{table, batch_size}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); - ASSERT_OK_AND_ASSIGN(auto out_table, TableFromExecBatches(exp_batches.schema, res)); - AssertTablesEqualIgnoringOrder(table, out_table); + ASSERT_OK_AND_ASSIGN(auto result_table, + DeclarationToTable(std::move(plan), /*use_threads=*/false)); + AssertTablesEqualIgnoringOrder(table, result_table); } } @@ -322,26 +306,16 @@ void TestSourceSink( std::string source_factory_name, std::function>(const BatchesWithSchema&)> to_elements) { - ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1)); - ExecContext exec_context(default_memory_pool(), executor.get()); - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context)); - AsyncGenerator> sink_gen; - auto exp_batches = MakeBasicBatches(); ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); auto element_it_maker = [&elements]() { return MakeVectorIterator(elements); }; - - ASSERT_OK(Declaration::Sequence({ - {source_factory_name, - OptionsType{exp_batches.schema, element_it_maker}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)))); + Declaration plan(source_factory_name, + OptionsType{exp_batches.schema, element_it_maker}); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), /*use_threads=*/false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches.batches); } void TestRecordBatchReaderSourceSink( @@ -797,22 +771,14 @@ TEST(ExecPlanExecution, StressSourceSink) { int num_batches = (slow && !parallel) ? 30 : 300; - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - auto random_data = MakeRandomBatches( schema({field("a", int32()), field("b", boolean())}), num_batches); - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{random_data.schema, - random_data.gen(parallel, slow)}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(random_data.batches)))); + Declaration plan("source", SourceNodeOptions{random_data.schema, + random_data.gen(parallel, slow)}); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, + random_data.batches); } } } @@ -928,52 +894,32 @@ TEST(ExecPlanExecution, StressSourceSinkStopped) { TEST(ExecPlanExecution, SourceFilterSink) { auto basic_data = MakeBasicBatches(); - - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{basic_data.schema, - basic_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray( - {ExecBatchFromJSON({int32(), boolean()}, "[]"), - ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")})))); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}}); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); + auto exp_batches = {ExecBatchFromJSON({int32(), boolean()}, "[]"), + ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")}; + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } TEST(ExecPlanExecution, SourceProjectSink) { auto basic_data = MakeBasicBatches(); - - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{basic_data.schema, - basic_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"project", - ProjectNodeOptions{{ - not_(field_ref("bool")), - call("add", {field_ref("i32"), literal(1)}), - }, - {"!bool", "i32 + 1"}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray( - {ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"), - ExecBatchFromJSON({boolean(), int32()}, - "[[null, 6], [true, 7], [true, 8]]")})))); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"project", ProjectNodeOptions{{ + not_(field_ref("bool")), + call("add", {field_ref("i32"), literal(1)}), + }, + {"!bool", "i32 + 1"}}}}); + + auto exp_batches = { + ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"), + ExecBatchFromJSON({boolean(), int32()}, "[[null, 6], [true, 7], [true, 8]]")}; + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } namespace { @@ -1046,26 +992,22 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) { auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1); auto minmax_opts = std::make_shared(); - auto expected_result = ExecBatch::Make( - {ScalarFromJSON(struct_({field("min", int32()), field("max", int32())}), - R"({"min": -8, "max": 12})")}); - - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; + auto min_max_type = struct_({field("min", int32()), field("max", int32())}); + auto expected_table = TableFromJSON(schema({field("struct", min_max_type)}), {R"([ + [{"min": -8, "max": 12}] + ])"}); // NOTE: Test `ScalarAggregateNode` by omitting `keys` attribute - ASSERT_OK(Declaration::Sequence( - {{"source", - SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, - {"aggregate", AggregateNodeOptions{ - /*aggregates=*/{{"min_max", std::move(minmax_opts), - "i32", "min_max"}}, - /*keys=*/{}}}, - {"sink", SinkNodeOptions{&sink_gen}}}) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray({*expected_result})))); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, + {"aggregate", + AggregateNodeOptions{ + /*aggregates=*/{{"min_max", std::move(minmax_opts), "i32", "min_max"}}, + /*keys=*/{}}}}); + ASSERT_OK_AND_ASSIGN(auto result_table, + DeclarationToTable(std::move(plan), parallel)); + // No need to ignore order since there is only 1 row + AssertTablesEqual(*result_table, *expected_table); } } @@ -1074,28 +1016,20 @@ TEST(ExecPlanExecution, NestedSourceFilter) { SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); auto input = MakeNestedBatches(); - auto empty = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([])"); - auto expected = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([ + auto expected_table = TableFromJSON(input.schema, {R"([])", + R"([ [{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}], [{"i32": 7, "bool": false}] -])"); + ])"}); - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{input.schema, - input.gen(parallel, /*slow=*/false)}}, - {"filter", FilterNodeOptions{greater_equal( - field_ref(FieldRef("struct", "i32")), literal(5))}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray({empty, expected})))); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, + {"filter", FilterNodeOptions{greater_equal(field_ref(FieldRef("struct", "i32")), + literal(5))}}}); + ASSERT_OK_AND_ASSIGN(auto result_table, + DeclarationToTable(std::move(plan), parallel)); + AssertTablesEqual(*result_table, *expected_table); } } @@ -1232,30 +1166,19 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumTopK) { } TEST(ExecPlanExecution, SourceScalarAggSink) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - auto basic_data = MakeBasicBatches(); - ASSERT_OK( - Declaration::Sequence( - { - {"source", - SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"aggregate", AggregateNodeOptions{ - /*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"}, - {"any", nullptr, "bool", "any(bool)"}}, - }}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray({ - ExecBatchFromJSON({int64(), boolean()}, - {ArgShape::SCALAR, ArgShape::SCALAR}, "[[22, true]]"), - })))); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{basic_data.schema, + basic_data.gen(/*parallel=*/false, /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"}, + {"any", nullptr, "bool", "any(bool)"}}, + }}}); + auto exp_batches = {ExecBatchFromJSON( + {int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, "[[22, true]]")}; + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } TEST(ExecPlanExecution, AggregationPreservesOptions) { @@ -1308,9 +1231,6 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) { TEST(ExecPlanExecution, ScalarSourceScalarAggSink) { // ARROW-9056: scalar aggregation can be done over scalars, taking // into account batch.length > 1 (e.g. a partition column) - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - BatchesWithSchema scalar_data; scalar_data.batches = { ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR}, @@ -1320,36 +1240,31 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) { // index can't be tested as it's order-dependent // mode/quantile can't be tested as they're technically vector kernels - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{scalar_data.schema, - scalar_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"aggregate", AggregateNodeOptions{/*aggregates=*/{ - {"all", nullptr, "b", "all(b)"}, - {"any", nullptr, "b", "any(b)"}, - {"count", nullptr, "a", "count(a)"}, - {"mean", nullptr, "a", "mean(a)"}, - {"product", nullptr, "a", "product(a)"}, - {"stddev", nullptr, "a", "stddev(a)"}, - {"sum", nullptr, "a", "sum(a)"}, - {"tdigest", nullptr, "a", "tdigest(a)"}, - {"variance", nullptr, "a", "variance(a)"}}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT( - StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray({ - ExecBatchFromJSON( - {boolean(), boolean(), int64(), float64(), int64(), float64(), int64(), - float64(), float64()}, - {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, - ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::ARRAY, - ArgShape::SCALAR}, - R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"), - })))); + Declaration plan = Declaration::Sequence( + {{"source", SourceNodeOptions{scalar_data.schema, + scalar_data.gen(/*parallel=*/false, /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"all", nullptr, "b", "all(b)"}, + {"any", nullptr, "b", "any(b)"}, + {"count", nullptr, "a", "count(a)"}, + {"mean", nullptr, "a", "mean(a)"}, + {"product", nullptr, "a", "product(a)"}, + {"stddev", nullptr, "a", "stddev(a)"}, + {"sum", nullptr, "a", "sum(a)"}, + {"tdigest", nullptr, "a", "tdigest(a)"}, + {"variance", nullptr, "a", "variance(a)"}}}}}); + + auto exp_batches = { + ExecBatchFromJSON( + {boolean(), boolean(), int64(), float64(), int64(), float64(), int64(), + float64(), float64()}, + {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, + ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::ARRAY, + ArgShape::SCALAR}, + R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"), + }; + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches); } TEST(ExecPlanExecution, ScalarSourceGroupedSum) { @@ -1411,7 +1326,8 @@ TEST(ExecPlanExecution, SelfInnerHashJoinSink) { auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts)); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); std::vector expected = { ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([ @@ -1448,7 +1364,8 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) { auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts)); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(plan), parallel)); std::vector expected = { ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([ @@ -1463,16 +1380,13 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) { } TEST(ExecPlan, RecordBatchReaderSourceSink) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; - // set up a RecordBatchReader: auto input = MakeBasicBatches(); RecordBatchVector batches; for (const ExecBatch& exec_batch : input.batches) { ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema)); - batches.push_back(batch); + batches.push_back(std::move(batch)); } ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches)); @@ -1483,36 +1397,24 @@ TEST(ExecPlan, RecordBatchReaderSourceSink) { auto batch_gen, MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool())); - ASSERT_OK( - Declaration::Sequence({ - {"source", SourceNodeOptions{table->schema(), batch_gen}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(input.batches)))); + Declaration plan = + Declaration::Sequence({{"source", SourceNodeOptions{table->schema(), batch_gen}}}); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, input.batches); } TEST(ExecPlan, SourceEnforcesBatchLimit) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); auto random_data = MakeRandomBatches( schema({field("a", int32()), field("b", boolean())}), /*num_batches=*/3, /*batch_size=*/static_cast(std::floor(ExecPlan::kMaxBatchSize * 3.5))); - AsyncGenerator> sink_gen; - - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{random_data.schema, - random_data.gen(/*parallel=*/true, - /*slow=*/false)}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - ASSERT_FINISHES_OK_AND_ASSIGN(std::vector batches, - StartAndCollect(plan.get(), std::move(sink_gen))); - for (const auto& batch : batches) { + Declaration plan = Declaration::Sequence( + {{"source", + SourceNodeOptions{random_data.schema, + random_data.gen(/*parallel=*/false, /*slow=*/false)}}}); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(plan))); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, random_data.batches); + for (const auto& batch : result.batches) { ASSERT_LE(batch.length, ExecPlan::kMaxBatchSize); } }