From f0f4d2f83f07c150755ddea07d65d8cbf5047799 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 10 May 2022 18:51:14 -1000 Subject: [PATCH 1/4] ARROW-16525: Added tests for the tee node and write node. Fix bug in tee node where it was not properly marking itself finished --- cpp/src/arrow/compute/exec/exec_plan.h | 2 +- cpp/src/arrow/compute/exec/sink_node.cc | 2 +- cpp/src/arrow/compute/exec/test_util.cc | 6 ++ cpp/src/arrow/compute/exec/test_util.h | 3 + cpp/src/arrow/dataset/file_base.cc | 17 ++++ cpp/src/arrow/dataset/file_test.cc | 118 ++++++++++++++++++++++++ 6 files changed, 146 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index c20dc0d048c..be2f23ad24b 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -316,7 +316,7 @@ class ARROW_EXPORT MapNode : public ExecNode { protected: void SubmitTask(std::function(ExecBatch)> map_fn, ExecBatch batch); - void Finish(Status finish_st = Status::OK()); + virtual void Finish(Status finish_st = Status::OK()); protected: // Counter for the number of batches received diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 56573f61d7c..bd6c3b79b82 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -363,7 +363,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { } protected: - virtual void Finish(const Status& finish_st) { + void Finish(const Status& finish_st) { consumer_->Finish().AddCallback([this, finish_st](const Status& st) { // Prefer the plan error over the consumer error Status final_status = finish_st & st; diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 41eb401ced6..3f5d094774c 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -165,6 +165,12 @@ ExecBatch ExecBatchFromJSON(const std::vector& descrs, return batch; } +Future<> StartAndFinish(ExecPlan* plan) { + RETURN_NOT_OK(plan->Validate()); + RETURN_NOT_OK(plan->StartProducing()); + return plan->finished(); +} + Future> StartAndCollect( ExecPlan* plan, AsyncGenerator> gen) { RETURN_NOT_OK(plan->Validate()); diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index 9347d1343f1..9cb615ac450 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -82,6 +82,9 @@ struct BatchesWithSchema { } }; +ARROW_TESTING_EXPORT +Future<> StartAndFinish(ExecPlan* plan); + ARROW_TESTING_EXPORT Future> StartAndCollect( ExecPlan* plan, AsyncGenerator> gen); diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 822fc714623..0e1e5ab3fe8 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -460,6 +460,23 @@ class TeeNode : public compute::MapNode { const char* kind_name() const override { return "TeeNode"; } + void Finish(Status finish_st) override { + dataset_writer_->Finish().AddCallback([this, finish_st](const Status& dw_status) { + // Need to wait for the task group to complete regardless of dw_status + task_group_.End().AddCallback( + [this, dw_status, finish_st](const Status& tg_status) { + // Prefer dw_status then finish_st and then tg_status + if (!dw_status.ok()) { + finished_.MarkFinished(dw_status); + } + if (!finish_st.ok()) { + finished_.MarkFinished(finish_st); + } + finished_.MarkFinished(tg_status); + }); + }); + } + Result DoTee(const compute::ExecBatch& batch) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, batch.ToRecordBatch(output_schema())); diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 226c23ef5e4..b176c0cb955 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -18,14 +18,17 @@ #include #include #include +#include #include #include #include #include "arrow/array/array_primitive.h" +#include "arrow/compute/exec/test_util.h" #include "arrow/dataset/api.h" #include "arrow/dataset/partition.h" +#include "arrow/dataset/plan.h" #include "arrow/dataset/test_util.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/test_util.h" @@ -34,6 +37,8 @@ #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" +namespace cp = arrow::compute; + namespace arrow { using internal::TemporaryDir; @@ -342,5 +347,118 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } } } + +class FileSystemWriteTest : public testing::TestWithParam> { + protected: + bool IsParallel() { return std::get<0>(GetParam()); } + bool IsSlow() { return std::get<1>(GetParam()); } + + FileSystemWriteTest() { dataset::internal::Initialize(); } + + void TestDatasetWriteRoundTrip( + std::function>( + const cp::BatchesWithSchema& source_data, const FileSystemDatasetWriteOptions&, + std::function>()>*)> + plan_factory, + bool has_output) { + // Runs in-memory data through the plan and then scans out the written + // data to ensure it matches the source data + auto format = std::make_shared(); + auto fs = std::make_shared(fs::kNoTime); + FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = format->DefaultWriteOptions(); + write_options.filesystem = fs; + write_options.base_dir = "root"; + write_options.partitioning = std::make_shared(schema({})); + write_options.basename_template = "{i}.feather"; + + cp::BatchesWithSchema source_data; + source_data.batches = { + cp::ExecBatchFromJSON({int32(), boolean()}, "[[null, true], [4, false]]"), + cp::ExecBatchFromJSON({int32(), boolean()}, + "[[5, null], [6, false], [7, false]]")}; + source_data.schema = schema({field("i32", int32()), field("bool", boolean())}); + + AsyncGenerator> sink_gen; + ASSERT_OK_AND_ASSIGN(auto plan, plan_factory(source_data, write_options, &sink_gen)); + + if (has_output) { + ASSERT_FINISHES_OK_AND_ASSIGN(auto out_batches, + cp::StartAndCollect(plan.get(), sink_gen)); + cp::AssertExecBatchesEqual(source_data.schema, source_data.batches, out_batches); + } else { + ASSERT_FINISHES_OK(cp::StartAndFinish(plan.get())); + } + + // Read written dataset and make sure it matches + ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make( + fs, {"root/0.feather"}, format, {})); + ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{})); + AssertSchemaEqual(*source_data.schema, *written_dataset->schema()); + + ASSERT_OK_AND_ASSIGN(plan, cp::ExecPlan::Make()); + ASSERT_OK_AND_ASSIGN(auto scanner_builder, written_dataset->NewScan()); + ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + ASSERT_OK(cp::Declaration::Sequence( + { + {"scan", ScanNodeOptions{written_dataset, scanner->options()}}, + {"sink", cp::SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_FINISHES_OK_AND_ASSIGN(auto written_batches, + cp::StartAndCollect(plan.get(), sink_gen)); + cp::AssertExecBatchesEqual(source_data.schema, source_data.batches, written_batches); + } +}; + +TEST_P(FileSystemWriteTest, Write) { + auto plan_factory = + [this](const cp::BatchesWithSchema& source_data, + const FileSystemDatasetWriteOptions& write_options, + std::function>()>* sink_gen) + -> Result> { + ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make()); + RETURN_NOT_OK( + cp::Declaration::Sequence( + {{"source", cp::SourceNodeOptions{source_data.schema, + source_data.gen(IsParallel(), IsSlow())}}, + {"write", WriteNodeOptions{write_options}}}) + .AddToPlan(plan.get())); + return plan; + }; + TestDatasetWriteRoundTrip(plan_factory, /*has_output=*/false); +} + +TEST_P(FileSystemWriteTest, TeeWrite) { + auto plan_factory = + [this](const cp::BatchesWithSchema& source_data, + const FileSystemDatasetWriteOptions& write_options, + std::function>()>* sink_gen) + -> Result> { + ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make()); + RETURN_NOT_OK(cp::Declaration::Sequence( + { + {"source", cp::SourceNodeOptions{source_data.schema, + source_data.gen(IsParallel(), + IsSlow())}}, + {"tee", WriteNodeOptions{write_options}}, + {"sink", cp::SinkNodeOptions{sink_gen}}, + }) + .AddToPlan(plan.get())); + return plan; + }; + TestDatasetWriteRoundTrip(plan_factory, /*has_output=*/true); +} + +INSTANTIATE_TEST_SUITE_P( + FileSystemWrite, FileSystemWriteTest, + testing::Combine(testing::Values(false, true), testing::Values(false, true)), + [](const testing::TestParamInfo& info) { + std::string parallel_desc = std::get<0>(info.param) ? "parallel" : "serial"; + std::string speed_desc = std::get<1>(info.param) ? "slow" : "fast"; + return parallel_desc + "_" + speed_desc; + }); + } // namespace dataset } // namespace arrow From 0d657b30fafef5316cd350dea62752098e79ccb3 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 11 May 2022 10:51:15 -1000 Subject: [PATCH 2/4] ARROW-16525: Addressed review feedback. Added some helper functions for working with arrow::compute::Declaration. Minor cleanup of some error messages --- cpp/src/arrow/compute/exec/exec_plan.cc | 26 +++++++++- cpp/src/arrow/compute/exec/exec_plan.h | 9 ++++ cpp/src/arrow/compute/exec/source_node.cc | 2 +- cpp/src/arrow/compute/exec/util.cc | 2 +- cpp/src/arrow/dataset/file_base.cc | 9 +--- cpp/src/arrow/dataset/file_test.cc | 62 +++++++++-------------- 6 files changed, 61 insertions(+), 49 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index b7a9c7e1bb0..cb1902e7e07 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -506,22 +506,44 @@ Result Declaration::AddToPlan(ExecPlan* plan, return node; } +Declaration* Declaration::Root() { + Declaration* current = this; + while (!current->inputs.empty()) { + DCHECK_LE(current->inputs.size(), 1) + << "No clear root when a declaration has multiple inputs"; + auto& input = current->inputs[0]; + Declaration* maybe_next = input.get(); + DCHECK(maybe_next) << "Attempt to get root when part of declaration is already built"; + current = maybe_next; + } + return current; +} + Declaration Declaration::Sequence(std::vector decls) { DCHECK(!decls.empty()); Declaration out = std::move(decls.back()); decls.pop_back(); - auto receiver = &out; + auto receiver = out.Root(); while (!decls.empty()) { Declaration input = std::move(decls.back()); decls.pop_back(); receiver->inputs.emplace_back(std::move(input)); - receiver = &util::get(receiver->inputs.front()); + receiver = util::get(receiver->inputs.front()).Root(); } return out; } +Declaration Declaration::Concat(std::vector decls) { + decls.insert(decls.begin(), *this); + return Declaration::Sequence(std::move(decls)); +} + +Declaration Declaration::Concat(Declaration next) { + return Concat(std::vector{std::move(next)}); +} + namespace internal { void RegisterSourceNode(ExecFactoryRegistry*); diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index be2f23ad24b..8d799c1bf7c 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -427,6 +427,15 @@ struct ARROW_EXPORT Declaration { /// {"n3", N3Opts{}}, /// }); static Declaration Sequence(std::vector decls); + /// \brief Create a declaration by appending a sequence onto the end of `this` + Declaration Concat(std::vector decls); + /// \brief Create a declaration by appending a declaration onto the end of `this` + Declaration Concat(Declaration decl); + /// \brief Gets the root node of a declaration + /// + /// Will fail if any node has multiple inputs or has already been constructed (i.e. if + /// the input is an ExecNode and not a Declaration) + Declaration* Root(); Result AddToPlan(ExecPlan* plan, ExecFactoryRegistry* registry = default_exec_factory_registry()) const; diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 7e72497186e..ec2b91050df 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -231,7 +231,7 @@ struct TableSourceNode : public SourceNode { static arrow::Status ValidateTableSourceNodeInput(const std::shared_ptr table, const int64_t batch_size) { if (table == nullptr) { - return Status::Invalid("TableSourceNode node requires table which is not null"); + return Status::Invalid("TableSourceNode requires table which is not null"); } if (batch_size <= 0) { diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc index ef56e6128a3..f6ac70ad45a 100644 --- a/cpp/src/arrow/compute/exec/util.cc +++ b/cpp/src/arrow/compute/exec/util.cc @@ -287,7 +287,7 @@ namespace compute { Status ValidateExecNodeInputs(ExecPlan* plan, const std::vector& inputs, int expected_num_inputs, const char* kind_name) { if (static_cast(inputs.size()) != expected_num_inputs) { - return Status::Invalid(kind_name, " node requires ", expected_num_inputs, + return Status::Invalid(kind_name, " requires ", expected_num_inputs, " inputs but got ", inputs.size()); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 0e1e5ab3fe8..10277810575 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -465,14 +465,7 @@ class TeeNode : public compute::MapNode { // Need to wait for the task group to complete regardless of dw_status task_group_.End().AddCallback( [this, dw_status, finish_st](const Status& tg_status) { - // Prefer dw_status then finish_st and then tg_status - if (!dw_status.ok()) { - finished_.MarkFinished(dw_status); - } - if (!finish_st.ok()) { - finished_.MarkFinished(finish_st); - } - finished_.MarkFinished(tg_status); + finished_.MarkFinished(dw_status & finish_st & tg_status); }); }); } diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index b176c0cb955..830a648108f 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -349,18 +349,17 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } class FileSystemWriteTest : public testing::TestWithParam> { + using PlanFactory = std::function>()>*)>; + protected: bool IsParallel() { return std::get<0>(GetParam()); } bool IsSlow() { return std::get<1>(GetParam()); } FileSystemWriteTest() { dataset::internal::Initialize(); } - void TestDatasetWriteRoundTrip( - std::function>( - const cp::BatchesWithSchema& source_data, const FileSystemDatasetWriteOptions&, - std::function>()>*)> - plan_factory, - bool has_output) { + void TestDatasetWriteRoundTrip(PlanFactory plan_factory, bool has_output) { // Runs in-memory data through the plan and then scans out the written // data to ensure it matches the source data auto format = std::make_shared(); @@ -371,6 +370,7 @@ class FileSystemWriteTest : public testing::TestWithParam write_options.base_dir = "root"; write_options.partitioning = std::make_shared(schema({})); write_options.basename_template = "{i}.feather"; + const std::string kExpectedFilename = "root/0.feather"; cp::BatchesWithSchema source_data; source_data.batches = { @@ -380,7 +380,13 @@ class FileSystemWriteTest : public testing::TestWithParam source_data.schema = schema({field("i32", int32()), field("bool", boolean())}); AsyncGenerator> sink_gen; - ASSERT_OK_AND_ASSIGN(auto plan, plan_factory(source_data, write_options, &sink_gen)); + + ASSERT_OK_AND_ASSIGN(auto plan, cp::ExecPlan::Make()); + auto source_decl = cp::Declaration::Sequence( + {{"source", cp::SourceNodeOptions{source_data.schema, + source_data.gen(IsParallel(), IsSlow())}}}); + auto plan_decl = plan_factory(write_options, &sink_gen); + ASSERT_OK(source_decl.Concat(plan_decl).AddToPlan(plan.get())); if (has_output) { ASSERT_FINISHES_OK_AND_ASSIGN(auto out_batches, @@ -392,7 +398,7 @@ class FileSystemWriteTest : public testing::TestWithParam // Read written dataset and make sure it matches ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make( - fs, {"root/0.feather"}, format, {})); + fs, {kExpectedFilename}, format, {})); ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{})); AssertSchemaEqual(*source_data.schema, *written_dataset->schema()); @@ -414,40 +420,22 @@ class FileSystemWriteTest : public testing::TestWithParam TEST_P(FileSystemWriteTest, Write) { auto plan_factory = - [this](const cp::BatchesWithSchema& source_data, - const FileSystemDatasetWriteOptions& write_options, - std::function>()>* sink_gen) - -> Result> { - ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make()); - RETURN_NOT_OK( - cp::Declaration::Sequence( - {{"source", cp::SourceNodeOptions{source_data.schema, - source_data.gen(IsParallel(), IsSlow())}}, - {"write", WriteNodeOptions{write_options}}}) - .AddToPlan(plan.get())); - return plan; - }; + [](const FileSystemDatasetWriteOptions& write_options, + std::function>()>* sink_gen) { + return cp::Declaration::Sequence({{"write", WriteNodeOptions{write_options}}}); + }; TestDatasetWriteRoundTrip(plan_factory, /*has_output=*/false); } TEST_P(FileSystemWriteTest, TeeWrite) { auto plan_factory = - [this](const cp::BatchesWithSchema& source_data, - const FileSystemDatasetWriteOptions& write_options, - std::function>()>* sink_gen) - -> Result> { - ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make()); - RETURN_NOT_OK(cp::Declaration::Sequence( - { - {"source", cp::SourceNodeOptions{source_data.schema, - source_data.gen(IsParallel(), - IsSlow())}}, - {"tee", WriteNodeOptions{write_options}}, - {"sink", cp::SinkNodeOptions{sink_gen}}, - }) - .AddToPlan(plan.get())); - return plan; - }; + [](const FileSystemDatasetWriteOptions& write_options, + std::function>()>* sink_gen) { + return cp::Declaration::Sequence({ + {"tee", WriteNodeOptions{write_options}}, + {"sink", cp::SinkNodeOptions{sink_gen}}, + }); + }; TestDatasetWriteRoundTrip(plan_factory, /*has_output=*/true); } From adc75d3bba58b91967def959fa384df088cb7068 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 11 May 2022 12:05:50 -1000 Subject: [PATCH 3/4] ARROW-16525: My changes to Sequence were a bit to aggressive with error checking. The first node should be allowed to have multiple inputs since we don't need to recurse any further --- cpp/src/arrow/compute/exec/exec_plan.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index cb1902e7e07..7eae5c7abba 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -530,7 +530,12 @@ Declaration Declaration::Sequence(std::vector decls) { decls.pop_back(); receiver->inputs.emplace_back(std::move(input)); - receiver = util::get(receiver->inputs.front()).Root(); + if (decls.empty()) { + // First node in the chain is allowed to have multiple inputs + receiver = &util::get(receiver->inputs.front()); + } else { + receiver = util::get(receiver->inputs.front()).Root(); + } } return out; } From 6f8d231926fdf38a555280eb0dd85f74d200f007 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 13 May 2022 10:56:49 -1000 Subject: [PATCH 4/4] ARROW-16525: Further refinement of unit tests. --- cpp/src/arrow/compute/exec/exec_plan.cc | 31 ++----------------------- cpp/src/arrow/compute/exec/exec_plan.h | 9 ------- cpp/src/arrow/dataset/file_test.cc | 13 ++++++----- 3 files changed, 9 insertions(+), 44 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 7eae5c7abba..b7a9c7e1bb0 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -506,49 +506,22 @@ Result Declaration::AddToPlan(ExecPlan* plan, return node; } -Declaration* Declaration::Root() { - Declaration* current = this; - while (!current->inputs.empty()) { - DCHECK_LE(current->inputs.size(), 1) - << "No clear root when a declaration has multiple inputs"; - auto& input = current->inputs[0]; - Declaration* maybe_next = input.get(); - DCHECK(maybe_next) << "Attempt to get root when part of declaration is already built"; - current = maybe_next; - } - return current; -} - Declaration Declaration::Sequence(std::vector decls) { DCHECK(!decls.empty()); Declaration out = std::move(decls.back()); decls.pop_back(); - auto receiver = out.Root(); + auto receiver = &out; while (!decls.empty()) { Declaration input = std::move(decls.back()); decls.pop_back(); receiver->inputs.emplace_back(std::move(input)); - if (decls.empty()) { - // First node in the chain is allowed to have multiple inputs - receiver = &util::get(receiver->inputs.front()); - } else { - receiver = util::get(receiver->inputs.front()).Root(); - } + receiver = &util::get(receiver->inputs.front()); } return out; } -Declaration Declaration::Concat(std::vector decls) { - decls.insert(decls.begin(), *this); - return Declaration::Sequence(std::move(decls)); -} - -Declaration Declaration::Concat(Declaration next) { - return Concat(std::vector{std::move(next)}); -} - namespace internal { void RegisterSourceNode(ExecFactoryRegistry*); diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 8d799c1bf7c..be2f23ad24b 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -427,15 +427,6 @@ struct ARROW_EXPORT Declaration { /// {"n3", N3Opts{}}, /// }); static Declaration Sequence(std::vector decls); - /// \brief Create a declaration by appending a sequence onto the end of `this` - Declaration Concat(std::vector decls); - /// \brief Create a declaration by appending a declaration onto the end of `this` - Declaration Concat(Declaration decl); - /// \brief Gets the root node of a declaration - /// - /// Will fail if any node has multiple inputs or has already been constructed (i.e. if - /// the input is an ExecNode and not a Declaration) - Declaration* Root(); Result AddToPlan(ExecPlan* plan, ExecFactoryRegistry* registry = default_exec_factory_registry()) const; diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 830a648108f..4dfc6bc584d 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -349,7 +349,7 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } class FileSystemWriteTest : public testing::TestWithParam> { - using PlanFactory = std::function( const FileSystemDatasetWriteOptions&, std::function>()>*)>; @@ -385,8 +385,9 @@ class FileSystemWriteTest : public testing::TestWithParam auto source_decl = cp::Declaration::Sequence( {{"source", cp::SourceNodeOptions{source_data.schema, source_data.gen(IsParallel(), IsSlow())}}}); - auto plan_decl = plan_factory(write_options, &sink_gen); - ASSERT_OK(source_decl.Concat(plan_decl).AddToPlan(plan.get())); + auto declarations = plan_factory(write_options, &sink_gen); + declarations.insert(declarations.begin(), std::move(source_decl)); + ASSERT_OK(cp::Declaration::Sequence(std::move(declarations)).AddToPlan(plan.get())); if (has_output) { ASSERT_FINISHES_OK_AND_ASSIGN(auto out_batches, @@ -422,7 +423,7 @@ TEST_P(FileSystemWriteTest, Write) { auto plan_factory = [](const FileSystemDatasetWriteOptions& write_options, std::function>()>* sink_gen) { - return cp::Declaration::Sequence({{"write", WriteNodeOptions{write_options}}}); + return std::vector{{"write", WriteNodeOptions{write_options}}}; }; TestDatasetWriteRoundTrip(plan_factory, /*has_output=*/false); } @@ -431,10 +432,10 @@ TEST_P(FileSystemWriteTest, TeeWrite) { auto plan_factory = [](const FileSystemDatasetWriteOptions& write_options, std::function>()>* sink_gen) { - return cp::Declaration::Sequence({ + return std::vector{ {"tee", WriteNodeOptions{write_options}}, {"sink", cp::SinkNodeOptions{sink_gen}}, - }); + }; }; TestDatasetWriteRoundTrip(plan_factory, /*has_output=*/true); }