From 7df10f583c1e8d5c27eddcae9875f03273a8c63e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 3 Apr 2023 14:36:35 -0700 Subject: [PATCH] Fixed the add_dataset_test function to properly refer to the test file. Fixed tests which needed some compute->acero refactoring --- cpp/src/arrow/dataset/CMakeLists.txt | 2 + cpp/src/arrow/dataset/file_csv_test.cc | 9 +- cpp/src/arrow/dataset/file_test.cc | 57 +++--- cpp/src/arrow/dataset/scanner_test.cc | 203 ++++++++++----------- cpp/src/arrow/dataset/subtree_test.cc | 2 +- cpp/src/arrow/dataset/test_util_internal.h | 20 +- 6 files changed, 150 insertions(+), 143 deletions(-) diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index 13083e4bdd1..e1b14b77c47 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -140,7 +140,9 @@ function(ADD_ARROW_DATASET_TEST REL_TEST_NAME) PREFIX ${PREFIX} SOURCES + ${REL_TEST_NAME}.cc test_util_internal.cc + ../acero/test_util_internal.cc LABELS ${LABELS} ${ARG_UNPARSED_ARGUMENTS}) diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 7ee2a48d0d5..755b202439b 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -21,6 +21,7 @@ #include #include +#include "arrow/acero/exec_plan.h" #include "arrow/csv/writer.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" @@ -135,10 +136,10 @@ class TestCsvFileFormat : public FileFormatFixtureMixin, if (UseScanV2()) { ScanV2Options v2_options = MigrateLegacyOptions(fragment); EXPECT_TRUE(ScanV2Options::AddFieldsNeededForFilter(&v2_options).ok()); - EXPECT_OK_AND_ASSIGN(std::unique_ptr reader, - compute::DeclarationToReader( - compute::Declaration("scan2", std::move(v2_options)), - /*use_threads=*/false)); + EXPECT_OK_AND_ASSIGN( + std::unique_ptr reader, + acero::DeclarationToReader(acero::Declaration("scan2", std::move(v2_options)), + /*use_threads=*/false)); struct ReaderIterator { Result> Next() { return reader->Next(); } std::unique_ptr reader; diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 79c11fdffd4..dd64849526b 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -24,6 +24,8 @@ #include #include +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/test_util_internal.h" #include "arrow/array/array_primitive.h" #include "arrow/dataset/api.h" #include "arrow/dataset/partition.h" @@ -350,7 +352,7 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } class FileSystemWriteTest : public testing::TestWithParam> { - using PlanFactory = std::function( + using PlanFactory = std::function( const FileSystemDatasetWriteOptions&, std::function>()>*)>; @@ -373,30 +375,31 @@ class FileSystemWriteTest : public testing::TestWithParam write_options.basename_template = "{i}.feather"; const std::string kExpectedFilename = "root/0.feather"; - cp::BatchesWithSchema source_data; + acero::BatchesWithSchema source_data; source_data.batches = { - cp::ExecBatchFromJSON({int32(), boolean()}, "[[null, true], [4, false]]"), - cp::ExecBatchFromJSON({int32(), boolean()}, - "[[5, null], [6, false], [7, false]]")}; + acero::ExecBatchFromJSON({int32(), boolean()}, "[[null, true], [4, false]]"), + acero::ExecBatchFromJSON({int32(), boolean()}, + "[[5, null], [6, false], [7, false]]")}; source_data.schema = schema({field("i32", int32()), field("bool", boolean())}); AsyncGenerator> sink_gen; - ASSERT_OK_AND_ASSIGN(auto plan, cp::ExecPlan::Make()); - auto source_decl = cp::Declaration::Sequence( - {{"source", cp::SourceNodeOptions{source_data.schema, - source_data.gen(IsParallel(), IsSlow())}}}); + ASSERT_OK_AND_ASSIGN(auto plan, acero::ExecPlan::Make()); + auto source_decl = acero::Declaration::Sequence( + {{"source", acero::SourceNodeOptions{source_data.schema, + source_data.gen(IsParallel(), IsSlow())}}}); auto declarations = plan_factory(write_options, &sink_gen); declarations.insert(declarations.begin(), std::move(source_decl)); - ASSERT_OK(cp::Declaration::Sequence(std::move(declarations)).AddToPlan(plan.get())); + ASSERT_OK( + acero::Declaration::Sequence(std::move(declarations)).AddToPlan(plan.get())); if (has_output) { ASSERT_FINISHES_OK_AND_ASSIGN(auto out_batches, - cp::StartAndCollect(plan.get(), sink_gen)); - cp::AssertExecBatchesEqualIgnoringOrder(source_data.schema, source_data.batches, - out_batches); + acero::StartAndCollect(plan.get(), sink_gen)); + acero::AssertExecBatchesEqualIgnoringOrder(source_data.schema, source_data.batches, + out_batches); } else { - ASSERT_FINISHES_OK(cp::StartAndFinish(plan.get())); + ASSERT_FINISHES_OK(acero::StartAndFinish(plan.get())); } // Read written dataset and make sure it matches @@ -405,29 +408,29 @@ class FileSystemWriteTest : public testing::TestWithParam ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{})); AssertSchemaEqual(*source_data.schema, *written_dataset->schema()); - ASSERT_OK_AND_ASSIGN(plan, cp::ExecPlan::Make()); + ASSERT_OK_AND_ASSIGN(plan, acero::ExecPlan::Make()); ASSERT_OK_AND_ASSIGN(auto scanner_builder, written_dataset->NewScan()); ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); - ASSERT_OK(cp::Declaration::Sequence( + ASSERT_OK(acero::Declaration::Sequence( { {"scan", ScanNodeOptions{written_dataset, scanner->options()}}, - {"sink", cp::SinkNodeOptions{&sink_gen}}, + {"sink", acero::SinkNodeOptions{&sink_gen}}, }) .AddToPlan(plan.get())); ASSERT_FINISHES_OK_AND_ASSIGN(auto written_batches, - cp::StartAndCollect(plan.get(), sink_gen)); - cp::AssertExecBatchesEqualIgnoringOrder(source_data.schema, source_data.batches, - written_batches); + acero::StartAndCollect(plan.get(), sink_gen)); + acero::AssertExecBatchesEqualIgnoringOrder(source_data.schema, source_data.batches, + written_batches); } }; TEST_P(FileSystemWriteTest, Write) { - auto plan_factory = - [](const FileSystemDatasetWriteOptions& write_options, - std::function>()>* sink_gen) { - return std::vector{{"write", WriteNodeOptions{write_options}}}; - }; + auto plan_factory = [](const FileSystemDatasetWriteOptions& write_options, + std::function>()>* + sink_gen) { + return std::vector{{"write", WriteNodeOptions{write_options}}}; + }; TestDatasetWriteRoundTrip(plan_factory, /*has_output=*/false); } @@ -435,9 +438,9 @@ TEST_P(FileSystemWriteTest, TeeWrite) { auto plan_factory = [](const FileSystemDatasetWriteOptions& write_options, std::function>()>* sink_gen) { - return std::vector{ + return std::vector{ {"tee", WriteNodeOptions{write_options}}, - {"sink", cp::SinkNodeOptions{sink_gen}}, + {"sink", acero::SinkNodeOptions{sink_gen}}, }; }; TestDatasetWriteRoundTrip(plan_factory, /*has_output=*/true); diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index b0dd9bfede3..159d1ac0335 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -527,16 +527,16 @@ class TestScannerBase : public ::testing::TestWithParam { return as_one_batch; } - compute::Declaration MakeScanNode(std::shared_ptr dataset) { + acero::Declaration MakeScanNode(std::shared_ptr dataset) { ScanV2Options options(dataset); options.columns = ScanV2Options::AllColumns(*dataset->schema()); - return compute::Declaration("scan2", options); + return acero::Declaration("scan2", options); } - RecordBatchVector RunNode(compute::Declaration scan_decl, bool ordered, + RecordBatchVector RunNode(acero::Declaration scan_decl, bool ordered, MockDataset* mock_dataset) { Future batches_fut = - compute::DeclarationToBatchesAsync(std::move(scan_decl)); + acero::DeclarationToBatchesAsync(std::move(scan_decl)); if (ordered) { mock_dataset->DeliverBatchesInOrder(GetParam().slow); } else { @@ -564,7 +564,7 @@ class TestScannerBase : public ::testing::TestWithParam { void CheckScanner(bool ordered) { std::shared_ptr mock_dataset = MakeTestDataset(GetParam().num_fragments, GetParam().num_batches); - compute::Declaration scan_decl = MakeScanNode(mock_dataset); + acero::Declaration scan_decl = MakeScanNode(mock_dataset); RecordBatchVector scanned_batches = RunNode(scan_decl, ordered, mock_dataset.get()); CheckScannedBatches(std::move(scanned_batches)); } @@ -585,9 +585,9 @@ void CheckScannerBackpressure(std::shared_ptr dataset, ScanV2Option int maxConcurrentFragments, int maxConcurrentBatches, ::arrow::internal::ThreadPool* thread_pool) { // Start scanning - compute::Declaration scan_decl = compute::Declaration("scan2", std::move(options)); + acero::Declaration scan_decl = acero::Declaration("scan2", std::move(options)); Future batches_fut = - compute::DeclarationToBatchesAsync(std::move(scan_decl)); + acero::DeclarationToBatchesAsync(std::move(scan_decl)); auto get_num_inspected = [&] { int num_inspected = 0; @@ -682,7 +682,7 @@ TEST(TestNewScanner, NestedRead) { // nested.x options.columns = {FieldPath({2, 0})}; ASSERT_OK_AND_ASSIGN(std::vector> batches, - compute::DeclarationToBatches({"scan2", options})); + acero::DeclarationToBatches({"scan2", options})); ASSERT_EQ(1, batches.size()); for (const auto& batch : batches) { ASSERT_EQ("x", batch->schema()->field(0)->name()); @@ -738,7 +738,7 @@ TEST(TestNewScanner, PartitionSkip) { options.filter = greater(field_ref("filterable"), literal(75)); ASSERT_OK_AND_ASSIGN(std::vector> batches, - compute::DeclarationToBatches({"scan2", options})); + acero::DeclarationToBatches({"scan2", options})); ASSERT_EQ(1, batches.size()); std::shared_ptr expected = MakeTestBatch(0); ASSERT_OK_AND_ASSIGN(expected, expected->SetColumn(1, field("filterable", int16()), @@ -756,7 +756,7 @@ TEST(TestNewScanner, PartitionSkip) { options.filter = less(field_ref("filterable"), literal(75)); ASSERT_OK_AND_ASSIGN(std::vector> batches, - compute::DeclarationToBatches({"scan2", options})); + acero::DeclarationToBatches({"scan2", options})); ASSERT_EQ(1, batches.size()); std::shared_ptr expected = MakeTestBatch(1); ASSERT_OK_AND_ASSIGN(expected, expected->SetColumn(1, field("filterable", int16()), @@ -776,7 +776,7 @@ TEST(TestNewScanner, PartitionSkip) { Invalid, ::testing::HasSubstr( "The dataset schema defines the field FieldRef.FieldPath(1)"), - compute::DeclarationToBatches({"scan2", options})); + acero::DeclarationToBatches({"scan2", options})); } } @@ -789,7 +789,7 @@ TEST(TestNewScanner, NoFragments) { ScanV2Options options(test_dataset); options.columns = ScanV2Options::AllColumns(*test_dataset->schema()); ASSERT_OK_AND_ASSIGN(std::vector> batches, - compute::DeclarationToBatches({"scan2", options})); + acero::DeclarationToBatches({"scan2", options})); ASSERT_EQ(0, batches.size()); } @@ -804,7 +804,7 @@ TEST(TestNewScanner, EmptyFragment) { ScanV2Options options(test_dataset); options.columns = ScanV2Options::AllColumns(*test_dataset->schema()); ASSERT_OK_AND_ASSIGN(std::vector> batches, - compute::DeclarationToBatches({"scan2", options})); + acero::DeclarationToBatches({"scan2", options})); ASSERT_EQ(0, batches.size()); } @@ -822,7 +822,7 @@ TEST(TestNewScanner, EmptyBatch) { ScanV2Options options(test_dataset); options.columns = ScanV2Options::AllColumns(*test_dataset->schema()); ASSERT_OK_AND_ASSIGN(std::vector> batches, - compute::DeclarationToBatches({"scan2", options})); + acero::DeclarationToBatches({"scan2", options})); ASSERT_EQ(0, batches.size()); } @@ -835,8 +835,8 @@ TEST(TestNewScanner, NoColumns) { test_dataset->DeliverBatchesInOrder(false); ScanV2Options options(test_dataset); - ASSERT_OK_AND_ASSIGN(compute::BatchesWithCommonSchema batches_and_schema, - compute::DeclarationToExecBatches({"scan2", options})); + ASSERT_OK_AND_ASSIGN(acero::BatchesWithCommonSchema batches_and_schema, + acero::DeclarationToExecBatches({"scan2", options})); ASSERT_EQ(16, batches_and_schema.batches.size()); for (const auto& batch : batches_and_schema.batches) { ASSERT_EQ(0, batch.values.size()); @@ -867,7 +867,7 @@ TEST(TestNewScanner, MissingColumn) { options.columns = {FieldPath({0}), FieldPath({2})}; ASSERT_OK_AND_ASSIGN(std::vector> batches, - compute::DeclarationToBatches({"scan2", options})); + acero::DeclarationToBatches({"scan2", options})); ASSERT_EQ(1, batches.size()); AssertArraysEqual(*batch->column(0), *batches[0]->column(0)); @@ -1906,8 +1906,7 @@ class TestBackpressure : public ::testing::Test { } std::shared_ptr MakeScanner(::arrow::internal::Executor* io_executor) { - compute::BackpressureOptions low_backpressure(kResumeIfBelowBytes, - kPauseIfAboveBytes); + acero::BackpressureOptions low_backpressure(kResumeIfBelowBytes, kPauseIfAboveBytes); io::IOContext io_context(default_memory_pool(), io_executor); std::shared_ptr dataset = MakeDataset(); std::shared_ptr options = std::make_shared(); @@ -2193,7 +2192,7 @@ TEST(ScanOptions, TestMaterializedFields) { namespace { struct TestPlan { explicit TestPlan(compute::ExecContext* ctx = compute::threaded_exec_context()) - : plan(compute::ExecPlan::Make(*ctx).ValueOrDie()) { + : plan(acero::ExecPlan::Make(*ctx).ValueOrDie()) { internal::Initialize(); } @@ -2214,9 +2213,9 @@ struct TestPlan { }); } - compute::ExecPlan* get() { return plan.get(); } + acero::ExecPlan* get() { return plan.get(); } - std::shared_ptr plan; + std::shared_ptr plan; AsyncGenerator> sink_gen; }; @@ -2387,8 +2386,8 @@ TEST(ScanNode, Schema) { options->projection = Materialize({}); // set an empty projection ASSERT_OK_AND_ASSIGN(auto scan, - compute::MakeExecNode("scan", plan.get(), {}, - ScanNodeOptions{basic.dataset, options})); + acero::MakeExecNode("scan", plan.get(), {}, + ScanNodeOptions{basic.dataset, options})); auto fields = basic.dataset->schema()->fields(); fields.push_back(field("__fragment_index", int32())); @@ -2409,12 +2408,12 @@ TEST(ScanNode, Trivial) { // ensure all fields are materialized options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); - ASSERT_OK(compute::Declaration::Sequence( - { - {"scan", ScanNodeOptions{basic.dataset, options}}, - {"sink", compute::SinkNodeOptions{&plan.sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + acero::Declaration::Sequence({ + {"scan", ScanNodeOptions{basic.dataset, options}}, + {"sink", acero::SinkNodeOptions{&plan.sink_gen}}, + }) + .AddToPlan(plan.get())); // trivial scan: the batches are returned unmodified auto expected = basic.batches; @@ -2431,12 +2430,12 @@ TEST(ScanNode, FilteredOnVirtualColumn) { // ensure all fields are materialized options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); - ASSERT_OK(compute::Declaration::Sequence( - { - {"scan", ScanNodeOptions{basic.dataset, options}}, - {"sink", compute::SinkNodeOptions{&plan.sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + acero::Declaration::Sequence({ + {"scan", ScanNodeOptions{basic.dataset, options}}, + {"sink", acero::SinkNodeOptions{&plan.sink_gen}}, + }) + .AddToPlan(plan.get())); auto expected = basic.batches; @@ -2457,12 +2456,12 @@ TEST(ScanNode, DeferredFilterOnPhysicalColumn) { // ensure all fields are materialized options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); - ASSERT_OK(compute::Declaration::Sequence( - { - {"scan", ScanNodeOptions{basic.dataset, options}}, - {"sink", compute::SinkNodeOptions{&plan.sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + acero::Declaration::Sequence({ + {"scan", ScanNodeOptions{basic.dataset, options}}, + {"sink", acero::SinkNodeOptions{&plan.sink_gen}}, + }) + .AddToPlan(plan.get())); // No post filtering is performed by ScanNode: all batches will be yielded whole. // To filter out rows from individual batches, construct a FilterNode. @@ -2480,12 +2479,12 @@ TEST(ScanNode, DISABLED_ProjectionPushdown) { auto options = std::make_shared(); options->projection = Materialize({"b"}, /*include_aug_fields=*/true); - ASSERT_OK(compute::Declaration::Sequence( - { - {"scan", ScanNodeOptions{basic.dataset, options}}, - {"sink", compute::SinkNodeOptions{&plan.sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + acero::Declaration::Sequence({ + {"scan", ScanNodeOptions{basic.dataset, options}}, + {"sink", acero::SinkNodeOptions{&plan.sink_gen}}, + }) + .AddToPlan(plan.get())); auto expected = basic.batches; @@ -2508,13 +2507,13 @@ TEST(ScanNode, MaterializationOfVirtualColumn) { auto options = std::make_shared(); options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); - ASSERT_OK(compute::Declaration::Sequence( + ASSERT_OK(acero::Declaration::Sequence( { {"scan", ScanNodeOptions{basic.dataset, options}}, {"augmented_project", - compute::ProjectNodeOptions{ + acero::ProjectNodeOptions{ {field_ref("a"), field_ref("b"), field_ref("c")}}}, - {"sink", compute::SinkNodeOptions{&plan.sink_gen}}, + {"sink", acero::SinkNodeOptions{&plan.sink_gen}}, }) .AddToPlan(plan.get())); @@ -2537,13 +2536,13 @@ TEST(ScanNode, MaterializationOfNestedVirtualColumn) { auto options = std::make_shared(); options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); - ASSERT_OK(compute::Declaration::Sequence( + ASSERT_OK(acero::Declaration::Sequence( { {"scan", ScanNodeOptions{basic.dataset, options}}, {"augmented_project", - compute::ProjectNodeOptions{ + acero::ProjectNodeOptions{ {field_ref("a"), field_ref("b"), field_ref("c")}}}, - {"sink", compute::SinkNodeOptions{&plan.sink_gen}}, + {"sink", acero::SinkNodeOptions{&plan.sink_gen}}, }) .AddToPlan(plan.get())); @@ -2568,8 +2567,8 @@ TEST(ScanNode, MinimalEndToEnd) { // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for // predicate pushdown, a projection to skip materialization of unnecessary columns, // ...) - ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, - compute::ExecPlan::Make(exec_context)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + acero::ExecPlan::Make(exec_context)); std::shared_ptr dataset = std::make_shared( TableFromJSON(schema({field("a", int32()), field("b", boolean())}), @@ -2600,29 +2599,29 @@ TEST(ScanNode, MinimalEndToEnd) { // construct the scan node ASSERT_OK_AND_ASSIGN( - compute::ExecNode * scan, - compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); + acero::ExecNode * scan, + acero::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); // pipe the scan node into a filter node - ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, - compute::MakeExecNode("filter", plan.get(), {scan}, - compute::FilterNodeOptions{b_is_true})); + ASSERT_OK_AND_ASSIGN(acero::ExecNode * filter, + acero::MakeExecNode("filter", plan.get(), {scan}, + acero::FilterNodeOptions{b_is_true})); // pipe the filter node into a project node // NB: we're using the project node factory which preserves fragment/batch index // tagging, so we *can* reorder later if we choose. The tags will not appear in // our output. - ASSERT_OK_AND_ASSIGN(compute::ExecNode * project, - compute::MakeExecNode("augmented_project", plan.get(), {filter}, - compute::ProjectNodeOptions{{a_times_2}})); + ASSERT_OK_AND_ASSIGN(acero::ExecNode * project, + acero::MakeExecNode("augmented_project", plan.get(), {filter}, + acero::ProjectNodeOptions{{a_times_2}})); // finally, pipe the project node into a sink node AsyncGenerator> sink_gen; - ASSERT_OK(compute::MakeExecNode("ordered_sink", plan.get(), {project}, - compute::SinkNodeOptions{&sink_gen})); + ASSERT_OK(acero::MakeExecNode("ordered_sink", plan.get(), {project}, + acero::SinkNodeOptions{&sink_gen})); // translate sink_gen (async) to sink_reader (sync) - std::shared_ptr sink_reader = compute::MakeGeneratorReader( + std::shared_ptr sink_reader = acero::MakeGeneratorReader( schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); // start the ExecPlan @@ -2663,8 +2662,8 @@ TEST(ScanNode, MinimalScalarAggEndToEnd) { // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for // predicate pushdown, a projection to skip materialization of unnecessary columns, // ...) - ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, - compute::ExecPlan::Make(exec_context)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + acero::ExecPlan::Make(exec_context)); std::shared_ptr dataset = std::make_shared( TableFromJSON(schema({field("a", int32()), field("b", boolean())}), @@ -2695,36 +2694,36 @@ TEST(ScanNode, MinimalScalarAggEndToEnd) { // construct the scan node ASSERT_OK_AND_ASSIGN( - compute::ExecNode * scan, - compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); + acero::ExecNode * scan, + acero::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); // pipe the scan node into a filter node - ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, - compute::MakeExecNode("filter", plan.get(), {scan}, - compute::FilterNodeOptions{b_is_true})); + ASSERT_OK_AND_ASSIGN(acero::ExecNode * filter, + acero::MakeExecNode("filter", plan.get(), {scan}, + acero::FilterNodeOptions{b_is_true})); // pipe the filter node into a project node ASSERT_OK_AND_ASSIGN( - compute::ExecNode * project, - compute::MakeExecNode("project", plan.get(), {filter}, - compute::ProjectNodeOptions{{a_times_2}, {"a * 2"}})); + acero::ExecNode * project, + acero::MakeExecNode("project", plan.get(), {filter}, + acero::ProjectNodeOptions{{a_times_2}, {"a * 2"}})); // pipe the projection into a scalar aggregate node ASSERT_OK_AND_ASSIGN( - compute::ExecNode * aggregate, - compute::MakeExecNode("aggregate", plan.get(), {project}, - compute::AggregateNodeOptions{{compute::Aggregate{ - "sum", nullptr, "a * 2", "sum(a * 2)"}}})); + acero::ExecNode * aggregate, + acero::MakeExecNode("aggregate", plan.get(), {project}, + acero::AggregateNodeOptions{{compute::Aggregate{ + "sum", nullptr, "a * 2", "sum(a * 2)"}}})); // finally, pipe the aggregate node into a sink node AsyncGenerator> sink_gen; - ASSERT_OK(compute::MakeExecNode("sink", plan.get(), {aggregate}, - compute::SinkNodeOptions{&sink_gen})); + ASSERT_OK(acero::MakeExecNode("sink", plan.get(), {aggregate}, + acero::SinkNodeOptions{&sink_gen})); // translate sink_gen (async) to sink_reader (sync) std::shared_ptr sink_reader = - compute::MakeGeneratorReader(schema({field("a*2 sum", int64())}), - std::move(sink_gen), exec_context.memory_pool()); + acero::MakeGeneratorReader(schema({field("a*2 sum", int64())}), std::move(sink_gen), + exec_context.memory_pool()); // start the ExecPlan plan->StartProducing(); @@ -2755,8 +2754,8 @@ TEST(ScanNode, MinimalGroupedAggEndToEnd) { // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for // predicate pushdown, a projection to skip materialization of unnecessary columns, // ...) - ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, - compute::ExecPlan::Make(exec_context)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + acero::ExecPlan::Make(exec_context)); std::shared_ptr dataset = std::make_shared( TableFromJSON(schema({field("a", int32()), field("b", boolean())}), @@ -2788,31 +2787,31 @@ TEST(ScanNode, MinimalGroupedAggEndToEnd) { // construct the scan node ASSERT_OK_AND_ASSIGN( - compute::ExecNode * scan, - compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); + acero::ExecNode * scan, + acero::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); // pipe the scan node into a project node ASSERT_OK_AND_ASSIGN( - compute::ExecNode * project, - compute::MakeExecNode("project", plan.get(), {scan}, - compute::ProjectNodeOptions{{a_times_2, b}, {"a * 2", "b"}})); + acero::ExecNode * project, + acero::MakeExecNode("project", plan.get(), {scan}, + acero::ProjectNodeOptions{{a_times_2, b}, {"a * 2", "b"}})); // pipe the projection into a grouped aggregate node ASSERT_OK_AND_ASSIGN( - compute::ExecNode * aggregate, - compute::MakeExecNode( + acero::ExecNode * aggregate, + acero::MakeExecNode( "aggregate", plan.get(), {project}, - compute::AggregateNodeOptions{ + acero::AggregateNodeOptions{ {compute::Aggregate{"hash_sum", nullptr, "a * 2", "sum(a * 2)"}}, /*keys=*/{"b"}})); // finally, pipe the aggregate node into a sink node AsyncGenerator> sink_gen; - ASSERT_OK(compute::MakeExecNode("sink", plan.get(), {aggregate}, - compute::SinkNodeOptions{&sink_gen})); + ASSERT_OK(acero::MakeExecNode("sink", plan.get(), {aggregate}, + acero::SinkNodeOptions{&sink_gen})); // translate sink_gen (async) to sink_reader (sync) - std::shared_ptr sink_reader = compute::MakeGeneratorReader( + std::shared_ptr sink_reader = acero::MakeGeneratorReader( schema({field("sum(a * 2)", int64()), field("b", boolean())}), std::move(sink_gen), exec_context.memory_pool()); @@ -2845,7 +2844,7 @@ TEST(ScanNode, MinimalGroupedAggEndToEnd) { TEST(ScanNode, OnlyLoadProjectedFields) { compute::ExecContext exec_context; arrow::dataset::internal::Initialize(); - ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make()); + ASSERT_OK_AND_ASSIGN(auto plan, acero::ExecPlan::Make()); auto dummy_schema = schema( {field("key", int64()), field("shared", int64()), field("distinct", int64())}); @@ -2885,9 +2884,9 @@ TEST(ScanNode, OnlyLoadProjectedFields) { scan_options->projection = call("make_struct", {extract_expr}, compute::MakeStructOptions{{"shared"}}); - auto declarations = compute::Declaration::Sequence( - {compute::Declaration({"scan", dataset::ScanNodeOptions{dataset, scan_options}})}); - ASSERT_OK_AND_ASSIGN(auto actual, compute::DeclarationToTable(declarations)); + auto declarations = acero::Declaration::Sequence( + {acero::Declaration({"scan", dataset::ScanNodeOptions{dataset, scan_options}})}); + ASSERT_OK_AND_ASSIGN(auto actual, acero::DeclarationToTable(declarations)); // Scan node always emits augmented fields so we drop those ASSERT_OK_AND_ASSIGN(auto actualMinusAgumented, actual->SelectColumns({0, 1, 2})); auto expected = TableFromJSON(dummy_schema, {R"([ diff --git a/cpp/src/arrow/dataset/subtree_test.cc b/cpp/src/arrow/dataset/subtree_test.cc index 3535d4771a6..75429a5fb7f 100644 --- a/cpp/src/arrow/dataset/subtree_test.cc +++ b/cpp/src/arrow/dataset/subtree_test.cc @@ -24,7 +24,7 @@ #include #include -#include "arrow/acero/forest_internal.h" +#include "arrow/dataset/forest_internal.h" #include "arrow/dataset/subtree_internal.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/string.h" diff --git a/cpp/src/arrow/dataset/test_util_internal.h b/cpp/src/arrow/dataset/test_util_internal.h index 4488ee88be6..765e22c3fd6 100644 --- a/cpp/src/arrow/dataset/test_util_internal.h +++ b/cpp/src/arrow/dataset/test_util_internal.h @@ -156,7 +156,8 @@ using compute::project; using fs::internal::GetAbstractPathExtension; /// \brief Assert a dataset produces data with the schema -void AssertDatasetHasSchema(std::shared_ptr ds, std::shared_ptr schema) { +inline void AssertDatasetHasSchema(std::shared_ptr ds, + std::shared_ptr schema) { ASSERT_OK_AND_ASSIGN(auto scanner_builder, ds->NewScan()); ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable()); @@ -191,7 +192,7 @@ std::unique_ptr> MakeGeneratedRecordBatch( return std::make_unique>(schema, std::forward(gen)); } -std::unique_ptr MakeGeneratedRecordBatch( +inline std::unique_ptr MakeGeneratedRecordBatch( std::shared_ptr schema, int64_t batch_size, int64_t batch_repetitions) { auto batch = random::GenerateBatch(schema->fields(), batch_size, /*seed=*/0); int64_t i = 0; @@ -202,7 +203,7 @@ std::unique_ptr MakeGeneratedRecordBatch( }); } -void EnsureRecordBatchReaderDrained(RecordBatchReader* reader) { +inline void EnsureRecordBatchReaderDrained(RecordBatchReader* reader) { ASSERT_OK_AND_ASSIGN(auto batch, reader->Next()); EXPECT_EQ(batch, nullptr); } @@ -438,7 +439,7 @@ struct TestFormatParams { } }; -std::ostream& operator<<(std::ostream& out, const TestFormatParams& params) { +inline std::ostream& operator<<(std::ostream& out, const TestFormatParams& params) { out << params.ToString(); return out; } @@ -1744,13 +1745,14 @@ static std::vector PathsOf(const FragmentVector& fragments) { return paths; } -void AssertFilesAre(const std::shared_ptr& dataset, - std::vector expected) { +inline void AssertFilesAre(const std::shared_ptr& dataset, + std::vector expected) { auto fs_dataset = checked_cast(dataset.get()); EXPECT_THAT(fs_dataset->files(), testing::UnorderedElementsAreArray(expected)); } -void AssertFragmentsAreFromPath(FragmentIterator it, std::vector expected) { +inline void AssertFragmentsAreFromPath(FragmentIterator it, + std::vector expected) { // Ordering is not guaranteed. EXPECT_THAT(PathsOf(IteratorToVector(std::move(it))), testing::UnorderedElementsAreArray(expected)); @@ -1767,8 +1769,8 @@ static std::vector PartitionExpressionsOf( return partition_expressions; } -void AssertFragmentsHavePartitionExpressions(std::shared_ptr dataset, - std::vector expected) { +inline void AssertFragmentsHavePartitionExpressions( + std::shared_ptr dataset, std::vector expected) { ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset->GetFragments()); // Ordering is not guaranteed. EXPECT_THAT(PartitionExpressionsOf(IteratorToVector(std::move(fragment_it))),