From ae22b3e5b0d1060f43eb549dba80add4c4ab14c6 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Fri, 4 Mar 2022 18:57:55 +0530 Subject: [PATCH 01/26] updating submodule --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index 53b49804710..634739c6644 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 53b498047109d9940fcfab388bd9d6aeb8c57425 +Subproject commit 634739c664433cec366b4b9a81d1e1044a8c5eda From dcc0c4ee02b836db14f2e522d88078157bc3c4b0 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sat, 19 Mar 2022 19:14:39 +0530 Subject: [PATCH 02/26] temp commit to remove files in submodule --- .gitmodules | 3 --- testing | 1 - 2 files changed, 4 deletions(-) delete mode 160000 testing diff --git a/.gitmodules b/.gitmodules index 6efc4871542..71722b21777 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ [submodule "cpp/submodules/parquet-testing"] path = cpp/submodules/parquet-testing url = https://github.com/apache/parquet-testing.git -[submodule "testing"] - path = testing - url = https://github.com/apache/arrow-testing diff --git a/testing b/testing deleted file mode 160000 index 634739c6644..00000000000 --- a/testing +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 634739c664433cec366b4b9a81d1e1044a8c5eda From 3ea6720c092b637f8152a996ca9f55a44b9d70b4 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sat, 19 Mar 2022 19:15:36 +0530 Subject: [PATCH 03/26] adding submodule --- .gitmodules | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitmodules b/.gitmodules index 71722b21777..6efc4871542 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "cpp/submodules/parquet-testing"] path = cpp/submodules/parquet-testing url = https://github.com/apache/parquet-testing.git +[submodule "testing"] + path = testing + url = https://github.com/apache/arrow-testing From f8480adc13c18ab231cf75622810caebe2ba5e87 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sun, 20 Mar 2022 23:09:21 +0530 Subject: [PATCH 04/26] updating testing submodule --- testing | 1 + 1 file changed, 1 insertion(+) create mode 160000 testing diff --git a/testing b/testing new file mode 160000 index 00000000000..d315f798520 --- /dev/null +++ b/testing @@ -0,0 +1 @@ +Subproject commit d315f7985207d2d67fc2c8e41053e9d97d573f4b From 2261cea4cca1991171b121f945e9578f945aa530 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sun, 20 Mar 2022 23:11:24 +0530 Subject: [PATCH 05/26] revert to uupstream version --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index d315f798520..53b49804710 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit d315f7985207d2d67fc2c8e41053e9d97d573f4b +Subproject commit 53b498047109d9940fcfab388bd9d6aeb8c57425 From d0a1ade49ee3cc505bf732117f8380884c49531a Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 30 May 2022 12:51:36 +0530 Subject: [PATCH 06/26] tmp test code: --- cpp/examples/arrow/compute_register_example.cc | 1 - cpp/src/arrow/engine/substrait/plan_internal.cc | 7 +++++++ cpp/src/arrow/engine/substrait/plan_internal.h | 4 ++++ cpp/src/arrow/engine/substrait/relation_internal.cc | 12 ++++++++++++ cpp/src/arrow/engine/substrait/relation_internal.h | 4 ++++ cpp/src/arrow/engine/substrait/serde.cc | 7 +++++++ cpp/src/arrow/engine/substrait/serde.h | 9 ++++++--- cpp/src/arrow/engine/substrait/serde_test.cc | 5 +++++ 8 files changed, 45 insertions(+), 4 deletions(-) diff --git a/cpp/examples/arrow/compute_register_example.cc b/cpp/examples/arrow/compute_register_example.cc index 13d80b29631..87395c5b8b5 100644 --- a/cpp/examples/arrow/compute_register_example.cc +++ b/cpp/examples/arrow/compute_register_example.cc @@ -169,6 +169,5 @@ int main(int argc, char** argv) { }) .AddToPlan(plan.get()) .status()); - return EXIT_SUCCESS; } diff --git a/cpp/src/arrow/engine/substrait/plan_internal.cc b/cpp/src/arrow/engine/substrait/plan_internal.cc index fcee0b2188f..89c7679ace5 100644 --- a/cpp/src/arrow/engine/substrait/plan_internal.cc +++ b/cpp/src/arrow/engine/substrait/plan_internal.cc @@ -133,5 +133,12 @@ Result GetExtensionSetFromPlan(const substrait::Plan& plan, registry); } +Result ToProto(const compute::ExecPlan& exec_plan, + ExtensionSet* ext_set) { + std::cout << "ToProto[ExecPlan]" << std::endl; + auto plan = internal::make_unique(); + return *std::move(plan); +} + } // namespace engine } // namespace arrow diff --git a/cpp/src/arrow/engine/substrait/plan_internal.h b/cpp/src/arrow/engine/substrait/plan_internal.h index dce23cdceba..654dbffcb1c 100644 --- a/cpp/src/arrow/engine/substrait/plan_internal.h +++ b/cpp/src/arrow/engine/substrait/plan_internal.h @@ -19,6 +19,7 @@ #pragma once +#include "arrow/compute/exec/exec_plan.h" #include "arrow/engine/substrait/extension_set.h" #include "arrow/engine/substrait/visibility.h" #include "arrow/type_fwd.h" @@ -51,5 +52,8 @@ Result GetExtensionSetFromPlan( const substrait::Plan& plan, const ExtensionIdRegistry* registry = default_extension_id_registry()); +ARROW_ENGINE_EXPORT Result ToProto(const compute::ExecPlan&, + ExtensionSet*); + } // namespace engine } // namespace arrow diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 89ab7ca4dc3..951b3c639af 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -27,10 +27,15 @@ #include "arrow/engine/substrait/type_internal.h" #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/util_internal.h" +#include "arrow/util/make_unique.h" namespace arrow { namespace engine { +namespace internal { +using ::arrow::internal::make_unique; +} // namespace internal + template Status CheckRelCommon(const RelMessage& rel) { if (rel.has_common()) { @@ -316,5 +321,12 @@ Result FromProto(const substrait::Rel& rel, rel.DebugString()); } +Result> ToProto(const compute::Declaration& declaration, + ExtensionSet* ext_set) { + auto out = internal::make_unique(); + + return nullptr; +} + } // namespace engine } // namespace arrow diff --git a/cpp/src/arrow/engine/substrait/relation_internal.h b/cpp/src/arrow/engine/substrait/relation_internal.h index 77d47c586b4..c40ecd87390 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.h +++ b/cpp/src/arrow/engine/substrait/relation_internal.h @@ -33,5 +33,9 @@ namespace engine { ARROW_ENGINE_EXPORT Result FromProto(const substrait::Rel&, const ExtensionSet&); +ARROW_ENGINE_EXPORT +Result> ToProto(const compute::Declaration&, + ExtensionSet*); + } // namespace engine } // namespace arrow diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc index 2012f1fc26a..766ff193ed5 100644 --- a/cpp/src/arrow/engine/substrait/serde.cc +++ b/cpp/src/arrow/engine/substrait/serde.cc @@ -105,6 +105,13 @@ Result DeserializePlan(const Buffer& buf, } } +Result> SerializePlan(const compute::ExecPlan& exec_plan, + ExtensionSet* ext_set) { + ARROW_ASSIGN_OR_RAISE(auto plan, ToProto(exec_plan, ext_set)); + std::string serialized = ""; // named_struct->SerializeAsString(); + return Buffer::FromString(std::move(serialized)); +} + Result> DeserializeSchema(const Buffer& buf, const ExtensionSet& ext_set) { ARROW_ASSIGN_OR_RAISE(auto named_struct, ParseFromBuffer(buf)); diff --git a/cpp/src/arrow/engine/substrait/serde.h b/cpp/src/arrow/engine/substrait/serde.h index 4af9f89ac87..92c866e9719 100644 --- a/cpp/src/arrow/engine/substrait/serde.h +++ b/cpp/src/arrow/engine/substrait/serde.h @@ -52,9 +52,12 @@ ARROW_ENGINE_EXPORT Result> DeserializePlans( const Buffer& buf, const ConsumerFactory& consumer_factory, ExtensionSet* ext_set_out = NULLPTR); -Result DeserializePlan(const Buffer& buf, - const ConsumerFactory& consumer_factory, - ExtensionSet* ext_set_out = NULLPTR); +ARROW_ENGINE_EXPORT Result DeserializePlan( + const Buffer& buf, const ConsumerFactory& consumer_factory, + ExtensionSet* ext_set_out = NULLPTR); + +ARROW_ENGINE_EXPORT Result> SerializePlan( + const compute::ExecPlan& exec_plan, ExtensionSet* ext_set); /// \brief Deserializes a Substrait Type message to the corresponding Arrow type /// diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index 9a0e93fc7a0..8f8c4c145bd 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1171,6 +1171,11 @@ TEST(Substrait, JoinPlanInvalidKeys) { DeserializePlans( *buf, [] { return std::shared_ptr{nullptr}; }, &ext_set)); +TEST(Substrait, SerializePlan) { + ExtensionSet ext_set; + ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make()); + ASSERT_OK_AND_ASSIGN(auto serialized, SerializePlan(*plan, &ext_set)); + } } // namespace engine From 3edf966d23c191ca899ba40e6dc8652b5ef61101 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 30 May 2022 22:34:52 +0530 Subject: [PATCH 07/26] tmp fixes on toproto plan --- .../arrow/engine/substrait/plan_internal.cc | 4 ++-- .../arrow/engine/substrait/plan_internal.h | 2 +- cpp/src/arrow/engine/substrait/serde.cc | 2 +- cpp/src/arrow/engine/substrait/serde_test.cc | 23 +++++++++++++++++++ 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/plan_internal.cc b/cpp/src/arrow/engine/substrait/plan_internal.cc index 89c7679ace5..7079f154717 100644 --- a/cpp/src/arrow/engine/substrait/plan_internal.cc +++ b/cpp/src/arrow/engine/substrait/plan_internal.cc @@ -133,11 +133,11 @@ Result GetExtensionSetFromPlan(const substrait::Plan& plan, registry); } -Result ToProto(const compute::ExecPlan& exec_plan, +Result> ToProto(const compute::ExecPlan& exec_plan, ExtensionSet* ext_set) { std::cout << "ToProto[ExecPlan]" << std::endl; auto plan = internal::make_unique(); - return *std::move(plan); + return plan; } } // namespace engine diff --git a/cpp/src/arrow/engine/substrait/plan_internal.h b/cpp/src/arrow/engine/substrait/plan_internal.h index 654dbffcb1c..4f2eccc74ab 100644 --- a/cpp/src/arrow/engine/substrait/plan_internal.h +++ b/cpp/src/arrow/engine/substrait/plan_internal.h @@ -52,7 +52,7 @@ Result GetExtensionSetFromPlan( const substrait::Plan& plan, const ExtensionIdRegistry* registry = default_extension_id_registry()); -ARROW_ENGINE_EXPORT Result ToProto(const compute::ExecPlan&, +ARROW_ENGINE_EXPORT Result> ToProto(const compute::ExecPlan&, ExtensionSet*); } // namespace engine diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc index 766ff193ed5..d37be402fe7 100644 --- a/cpp/src/arrow/engine/substrait/serde.cc +++ b/cpp/src/arrow/engine/substrait/serde.cc @@ -108,7 +108,7 @@ Result DeserializePlan(const Buffer& buf, Result> SerializePlan(const compute::ExecPlan& exec_plan, ExtensionSet* ext_set) { ARROW_ASSIGN_OR_RAISE(auto plan, ToProto(exec_plan, ext_set)); - std::string serialized = ""; // named_struct->SerializeAsString(); + std::string serialized = plan->SerializeAsString(); return Buffer::FromString(std::move(serialized)); } diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index 8f8c4c145bd..e31db477ec5 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1174,7 +1174,30 @@ TEST(Substrait, JoinPlanInvalidKeys) { TEST(Substrait, SerializePlan) { ExtensionSet ext_set; ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make()); + auto dummy_schema = schema({field("a", int32()), + field("b", int32()) + }); + ASSERT_OK_AND_ASSIGN(auto tb, Table::MakeEmpty(dummy_schema)); + auto ds = std::make_shared(tb); + + auto options = std::make_shared(); + options->projection = compute::project({}, {}); // create empty projection + + // construct the scan node + compute::ExecNode* scan; + auto scan_node_options = dataset::ScanNodeOptions{ds, options}; + + ASSERT_OK_AND_ASSIGN(scan, + compute::MakeExecNode("scan", plan.get(), {}, scan_node_options)); + + arrow::AsyncGenerator> sink_gen; + + ASSERT_OK(compute::MakeExecNode("sink", plan.get(), {scan}, compute::SinkNodeOptions{&sink_gen})); + + std::cout << "Plan " << std::endl; + std::cout << plan->ToString() << std::endl; ASSERT_OK_AND_ASSIGN(auto serialized, SerializePlan(*plan, &ext_set)); + } From 3c040599e09ee43601951fac5fadf8204ed8ec02 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 1 Jun 2022 09:31:19 +0530 Subject: [PATCH 08/26] temp --- cpp/src/arrow/engine/substrait/relation_internal.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 951b3c639af..ebc997dacb8 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -324,7 +324,6 @@ Result FromProto(const substrait::Rel& rel, Result> ToProto(const compute::Declaration& declaration, ExtensionSet* ext_set) { auto out = internal::make_unique(); - return nullptr; } From c0dc1007c0630198ad3854c1b686c860738524d7 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 15 Jun 2022 16:21:53 +0530 Subject: [PATCH 09/26] intermediate commit for releation plan --- cpp/src/arrow/compute/exec/exec_plan.cc | 4 ++ cpp/src/arrow/compute/exec/exec_plan.h | 3 + .../arrow/engine/substrait/plan_internal.cc | 5 +- .../arrow/engine/substrait/plan_internal.h | 4 +- .../engine/substrait/relation_internal.cc | 57 ++++++++++++++++++- cpp/src/arrow/engine/substrait/serde.cc | 7 +++ cpp/src/arrow/engine/substrait/serde.h | 6 ++ cpp/src/arrow/engine/substrait/serde_test.cc | 32 +++++++---- 8 files changed, 101 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 468d0accead..ebcf0cf0a55 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -283,6 +283,10 @@ const ExecPlan::NodeVector& ExecPlan::sources() const { const ExecPlan::NodeVector& ExecPlan::sinks() const { return ToDerived(this)->sinks_; } +const ExecPlan::NodeVector& ExecPlan::nodes() const { + return ToDerived(this)->sorted_nodes_; +} + Status ExecPlan::Validate() { return ToDerived(this)->Validate(); } Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); } diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index dcf271bd360..f1bb7bbf48f 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -67,6 +67,9 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { /// The final outputs const NodeVector& sinks() const; + /// Nodes in the Plan + const NodeVector& nodes() const; + Status Validate(); /// \brief Start producing on all nodes diff --git a/cpp/src/arrow/engine/substrait/plan_internal.cc b/cpp/src/arrow/engine/substrait/plan_internal.cc index 7079f154717..d1d2765ca6f 100644 --- a/cpp/src/arrow/engine/substrait/plan_internal.cc +++ b/cpp/src/arrow/engine/substrait/plan_internal.cc @@ -134,9 +134,10 @@ Result GetExtensionSetFromPlan(const substrait::Plan& plan, } Result> ToProto(const compute::ExecPlan& exec_plan, - ExtensionSet* ext_set) { - std::cout << "ToProto[ExecPlan]" << std::endl; + ExtensionSet* ext_set) { + std::cout << "ToProto[ExecPlan]" << std::endl; auto plan = internal::make_unique(); + return plan; } diff --git a/cpp/src/arrow/engine/substrait/plan_internal.h b/cpp/src/arrow/engine/substrait/plan_internal.h index 4f2eccc74ab..699d1073266 100644 --- a/cpp/src/arrow/engine/substrait/plan_internal.h +++ b/cpp/src/arrow/engine/substrait/plan_internal.h @@ -52,8 +52,8 @@ Result GetExtensionSetFromPlan( const substrait::Plan& plan, const ExtensionIdRegistry* registry = default_extension_id_registry()); -ARROW_ENGINE_EXPORT Result> ToProto(const compute::ExecPlan&, - ExtensionSet*); +ARROW_ENGINE_EXPORT Result> ToProto( + const compute::ExecPlan&, ExtensionSet*); } // namespace engine } // namespace arrow diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index ebc997dacb8..62f729febe5 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -321,10 +321,65 @@ Result FromProto(const substrait::Rel& rel, rel.DebugString()); } +namespace { + +enum ArrowRelationType : u_int8_t { + SCAN, + SINK, +}; + +const std::map enum_map{ + {"scan", ArrowRelationType::SCAN}, + {"sink", ArrowRelationType::SINK}, +}; + +struct ExtractRelation { + explicit ExtractRelation(substrait::Rel* rel, ExtensionSet* ext_set) + : rel_(rel), ext_set_(ext_set) {} + + Status AddRelation(const compute::Declaration& declaration) { + const std::string& rel_name = declaration.factory_name; + switch (enum_map.find(rel_name)->second) { + case ArrowRelationType::SCAN: + std::cout << "Scan Relation" << std::endl; + return AddReadRelation(declaration); + case ArrowRelationType::SINK: + // Do nothing, Substrait doesn't have a concept called Sink + return Status::OK(); + default: + return Status::Invalid("Unsupported factory name :", rel_name); + } + } + + Status AddReadRelation(const compute::Declaration& declaration) { + auto read_rel = internal::make_unique(); + const auto& scan_node_options = + arrow::internal::checked_cast( + *declaration.options); + auto dataset = scan_node_options.dataset; + ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*dataset->schema(), ext_set_)); + read_rel->set_allocated_base_schema(named_struct.release()); + // read_rel->set_allocated_local_files(); + rel_->set_allocated_read(read_rel.release()); + return Status::OK(); + } + + Status operator()(const compute::Declaration& declaration) { + return AddRelation(declaration); + } + + substrait::Rel* rel_; + ExtensionSet* ext_set_; +}; + +} // namespace + Result> ToProto(const compute::Declaration& declaration, ExtensionSet* ext_set) { + std::cout << ">>>>> ToProto[Rel] >>>> " << std::endl; auto out = internal::make_unique(); - return nullptr; + RETURN_NOT_OK(ExtractRelation(out.get(), ext_set)(declaration)); + return std::move(out); } } // namespace engine diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc index d37be402fe7..4ea3f2f2f2b 100644 --- a/cpp/src/arrow/engine/substrait/serde.cc +++ b/cpp/src/arrow/engine/substrait/serde.cc @@ -52,6 +52,13 @@ Result ParseFromBuffer(const Buffer& buf) { return message; } +Result> SerializeRelation(const compute::Declaration& declaration, + ExtensionSet* ext_set) { + ARROW_ASSIGN_OR_RAISE(auto relation, ToProto(declaration, ext_set)); + std::string serialized = relation->SerializeAsString(); + return Buffer::FromString(std::move(serialized)); +} + Result DeserializeRelation(const Buffer& buf, const ExtensionSet& ext_set) { ARROW_ASSIGN_OR_RAISE(auto rel, ParseFromBuffer(buf)); diff --git a/cpp/src/arrow/engine/substrait/serde.h b/cpp/src/arrow/engine/substrait/serde.h index 92c866e9719..49bdb447c7d 100644 --- a/cpp/src/arrow/engine/substrait/serde.h +++ b/cpp/src/arrow/engine/substrait/serde.h @@ -56,6 +56,7 @@ ARROW_ENGINE_EXPORT Result DeserializePlan( const Buffer& buf, const ConsumerFactory& consumer_factory, ExtensionSet* ext_set_out = NULLPTR); +/// TODO: add docstring ARROW_ENGINE_EXPORT Result> SerializePlan( const compute::ExecPlan& exec_plan, ExtensionSet* ext_set); @@ -125,6 +126,11 @@ ARROW_ENGINE_EXPORT Result> SerializeExpression(const compute::Expression& expr, ExtensionSet* ext_set); +/// TODO: add docstring + +Result> SerializeRelation(const compute::Declaration& declaration, + ExtensionSet* ext_set); + /// \brief Deserializes a Substrait Rel (relation) message to an ExecNode declaration /// /// \param[in] buf a buffer containing the protobuf serialization of a Substrait diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index e31db477ec5..7a4638ee1ad 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1171,34 +1171,42 @@ TEST(Substrait, JoinPlanInvalidKeys) { DeserializePlans( *buf, [] { return std::shared_ptr{nullptr}; }, &ext_set)); +} + TEST(Substrait, SerializePlan) { ExtensionSet ext_set; ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make()); - auto dummy_schema = schema({field("a", int32()), - field("b", int32()) - }); + auto dummy_schema = schema({field("a", int32()), field("b", int32())}); ASSERT_OK_AND_ASSIGN(auto tb, Table::MakeEmpty(dummy_schema)); auto ds = std::make_shared(tb); auto options = std::make_shared(); options->projection = compute::project({}, {}); // create empty projection - // construct the scan node - compute::ExecNode* scan; auto scan_node_options = dataset::ScanNodeOptions{ds, options}; - ASSERT_OK_AND_ASSIGN(scan, - compute::MakeExecNode("scan", plan.get(), {}, scan_node_options)); - arrow::AsyncGenerator> sink_gen; - ASSERT_OK(compute::MakeExecNode("sink", plan.get(), {scan}, compute::SinkNodeOptions{&sink_gen})); + auto sink_node_options = compute::SinkNodeOptions{&sink_gen}; + + auto scan_declaration = compute::Declaration({"scan", scan_node_options}); + auto sink_declaration = compute::Declaration({"sink", sink_node_options}); + + auto declarations = + compute::Declaration::Sequence({scan_declaration, sink_declaration}); + + ASSERT_OK_AND_ASSIGN(auto decl, declarations.AddToPlan(plan.get())); + + ASSERT_OK(decl->Validate()); std::cout << "Plan " << std::endl; std::cout << plan->ToString() << std::endl; - ASSERT_OK_AND_ASSIGN(auto serialized, SerializePlan(*plan, &ext_set)); - - + // ASSERT_OK_AND_ASSIGN(auto serialized, SerializePlan(*plan, &ext_set)); + std::cout << "Serialize Scan Relation" << std::endl; + ASSERT_OK_AND_ASSIGN(auto serialized_rel, + SerializeRelation(scan_declaration, &ext_set)); + ASSERT_OK_AND_ASSIGN(auto deserialized_rel, + DeserializeRelation(*serialized_rel, ext_set)); } } // namespace engine From 6d3ebd92289b55f2cd3cbdb93a70b39f3e89fe2c Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 16 Jun 2022 13:08:53 +0530 Subject: [PATCH 10/26] adding initial functional Relation deserialization --- .../arrow/engine/substrait/relation_internal.cc | 14 +++++++++++++- cpp/src/arrow/engine/substrait/serde_test.cc | 12 +++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 62f729febe5..a3fedb5c709 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -356,10 +356,21 @@ struct ExtractRelation { const auto& scan_node_options = arrow::internal::checked_cast( *declaration.options); + // TODO: do necessary validations or throw Invalids + + // set schema auto dataset = scan_node_options.dataset; ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*dataset->schema(), ext_set_)); read_rel->set_allocated_base_schema(named_struct.release()); - // read_rel->set_allocated_local_files(); + + // set local files + auto read_rel_lfs = internal::make_unique(); + auto read_rel_lfs_lfs = + internal::make_unique(); + std::string path = "file:///some_path/data.arrow"; + read_rel_lfs_lfs->set_uri_path(path); + read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_lfs.release()); + *read_rel->mutable_local_files() = *read_rel_lfs.get(); rel_->set_allocated_read(read_rel.release()); return Status::OK(); } @@ -368,6 +379,7 @@ struct ExtractRelation { return AddRelation(declaration); } + private: substrait::Rel* rel_; ExtensionSet* ext_set_; }; diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index 7a4638ee1ad..f6db4247609 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1205,8 +1205,18 @@ TEST(Substrait, SerializePlan) { std::cout << "Serialize Scan Relation" << std::endl; ASSERT_OK_AND_ASSIGN(auto serialized_rel, SerializeRelation(scan_declaration, &ext_set)); - ASSERT_OK_AND_ASSIGN(auto deserialized_rel, + ASSERT_OK_AND_ASSIGN(auto deserialized_decl, DeserializeRelation(*serialized_rel, ext_set)); + + auto t_decls = compute::Declaration::Sequence({deserialized_decl, sink_declaration}); + + ASSERT_OK_AND_ASSIGN(auto t_plan, compute::ExecPlan::Make()); + ASSERT_OK_AND_ASSIGN(auto t_decl, t_decls.AddToPlan(t_plan.get())); + + ASSERT_OK(t_decl->Validate()); + + std::cout << "Des Plan " << std::endl; + std::cout << t_plan->ToString() << std::endl; } } // namespace engine From a8cff44b0a7e03401f955e026ad8b0cf20bbf929 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 16 Jun 2022 20:01:01 +0530 Subject: [PATCH 11/26] end-to-end initial test on scan node ToProto --- .../engine/substrait/relation_internal.cc | 40 +++++++++++-------- cpp/src/arrow/engine/substrait/serde_test.cc | 22 ++++++++-- 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index a3fedb5c709..3a6db084517 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -27,12 +27,14 @@ #include "arrow/engine/substrait/type_internal.h" #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/util_internal.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/make_unique.h" namespace arrow { namespace engine { namespace internal { +using ::arrow::internal::checked_cast; using ::arrow::internal::make_unique; } // namespace internal @@ -323,14 +325,12 @@ Result FromProto(const substrait::Rel& rel, namespace { -enum ArrowRelationType : u_int8_t { +enum ArrowRelationType : uint8_t { SCAN, - SINK, }; const std::map enum_map{ {"scan", ArrowRelationType::SCAN}, - {"sink", ArrowRelationType::SINK}, }; struct ExtractRelation { @@ -343,9 +343,6 @@ struct ExtractRelation { case ArrowRelationType::SCAN: std::cout << "Scan Relation" << std::endl; return AddReadRelation(declaration); - case ArrowRelationType::SINK: - // Do nothing, Substrait doesn't have a concept called Sink - return Status::OK(); default: return Status::Invalid("Unsupported factory name :", rel_name); } @@ -354,23 +351,34 @@ struct ExtractRelation { Status AddReadRelation(const compute::Declaration& declaration) { auto read_rel = internal::make_unique(); const auto& scan_node_options = - arrow::internal::checked_cast( - *declaration.options); - // TODO: do necessary validations or throw Invalids + internal::checked_cast(*declaration.options); + const auto& fds = internal::checked_cast( + *scan_node_options.dataset); // set schema - auto dataset = scan_node_options.dataset; - ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*dataset->schema(), ext_set_)); + ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*fds.schema(), ext_set_)); read_rel->set_allocated_base_schema(named_struct.release()); // set local files auto read_rel_lfs = internal::make_unique(); - auto read_rel_lfs_lfs = - internal::make_unique(); - std::string path = "file:///some_path/data.arrow"; - read_rel_lfs_lfs->set_uri_path(path); - read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_lfs.release()); + for (const auto& file : fds.files()) { + auto read_rel_lfs_ffs = + internal::make_unique(); + read_rel_lfs_ffs->set_uri_path("file://" + file); + + // set file format + auto format_type_name = fds.format()->type_name(); + if (format_type_name == "parquet" || format_type_name == "arrow" || + format_type_name == "feather") { + read_rel_lfs_ffs->set_format( + substrait::ReadRel::LocalFiles::FileOrFiles::FILE_FORMAT_PARQUET); + } else { + return Status::Invalid("Unsupported file type : ", format_type_name); + } + read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release()); + } *read_rel->mutable_local_files() = *read_rel_lfs.get(); + rel_->set_allocated_read(read_rel.release()); return Status::OK(); } diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index f6db4247609..9cdb8010183 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -25,8 +25,10 @@ #include "arrow/compute/exec/expression_internal.h" #include "arrow/dataset/file_base.h" +#include "arrow/dataset/file_parquet.h" #include "arrow/dataset/scanner.h" #include "arrow/engine/substrait/extension_types.h" +#include "arrow/filesystem/localfs.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" #include "arrow/util/key_value_metadata.h" @@ -1177,13 +1179,27 @@ TEST(Substrait, SerializePlan) { ExtensionSet ext_set; ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make()); auto dummy_schema = schema({field("a", int32()), field("b", int32())}); - ASSERT_OK_AND_ASSIGN(auto tb, Table::MakeEmpty(dummy_schema)); - auto ds = std::make_shared(tb); + + auto format = std::make_shared(); + auto filesystem = std::make_shared(); + std::vector files; + const std::string f_path = "/tmp/data1.parquet"; + // const std::string s_path = "file:///tmp/data2.parquet"; + ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path)); + // ASSERT_OK_AND_ASSIGN(auto s_file, filesystem->GetFileInfo(s_path)); + files.push_back(std::move(f_file)); + // files.push_back(std::move(s_file)); + + ASSERT_OK_AND_ASSIGN(auto ds_factory, dataset::FileSystemDatasetFactory::Make( + std::move(filesystem), std::move(files), + std::move(format), {})); + + ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(std::move(dummy_schema))); auto options = std::make_shared(); options->projection = compute::project({}, {}); // create empty projection - auto scan_node_options = dataset::ScanNodeOptions{ds, options}; + auto scan_node_options = dataset::ScanNodeOptions{dataset, options}; arrow::AsyncGenerator> sink_gen; From 3ec4d320b15161f54baba6cced535a9b70ad7bba Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 16 Jun 2022 21:51:34 +0530 Subject: [PATCH 12/26] updating test case (imd) --- cpp/src/arrow/engine/substrait/serde_test.cc | 47 +++++++++++++++----- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index 9cdb8010183..544b805b8ff 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1175,15 +1175,22 @@ TEST(Substrait, JoinPlanInvalidKeys) { &ext_set)); } -TEST(Substrait, SerializePlan) { +TEST(Substrait, SerializeRelation) { ExtensionSet ext_set; - ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make()); - auto dummy_schema = schema({field("a", int32()), field("b", int32())}); + + ASSERT_OK_AND_ASSIGN(std::string dir_string, + arrow::internal::GetEnvVar("PARQUET_TEST_DATA")); + auto file_name = + arrow::internal::PlatformFilename::FromString(dir_string)->Join("binary.parquet"); + + compute::ExecContext exec_context; + ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make(&exec_context)); + auto dummy_schema = schema({field("foo", binary())}); auto format = std::make_shared(); auto filesystem = std::make_shared(); std::vector files; - const std::string f_path = "/tmp/data1.parquet"; + const std::string f_path = file_name->ToString();//"/tmp/data1.parquet"; // const std::string s_path = "file:///tmp/data2.parquet"; ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path)); // ASSERT_OK_AND_ASSIGN(auto s_file, filesystem->GetFileInfo(s_path)); @@ -1202,7 +1209,7 @@ TEST(Substrait, SerializePlan) { auto scan_node_options = dataset::ScanNodeOptions{dataset, options}; arrow::AsyncGenerator> sink_gen; - + auto sink_node_options = compute::SinkNodeOptions{&sink_gen}; auto scan_declaration = compute::Declaration({"scan", scan_node_options}); @@ -1215,9 +1222,24 @@ TEST(Substrait, SerializePlan) { ASSERT_OK(decl->Validate()); - std::cout << "Plan " << std::endl; - std::cout << plan->ToString() << std::endl; - // ASSERT_OK_AND_ASSIGN(auto serialized, SerializePlan(*plan, &ext_set)); + // std::shared_ptr sink_reader = + // compute::MakeGeneratorReader(dummy_schema, std::move(sink_gen), exec_context.memory_pool()); + + // ASSERT_OK(plan->Validate()); + // ASSERT_OK(plan->StartProducing()); + + // std::cout << "Plan " << std::endl; + // std::cout << plan->ToString() << std::endl; + + // collect sink_reader into a Table + std::shared_ptr response_table; + + ASSERT_OK_AND_ASSIGN(response_table, + arrow::Table::FromRecordBatchReader(sink_reader.get())); + + std::cout << "Results : " << response_table->ToString() << std::endl; + + std::cout << "Serialize Scan Relation" << std::endl; ASSERT_OK_AND_ASSIGN(auto serialized_rel, SerializeRelation(scan_declaration, &ext_set)); @@ -1231,8 +1253,13 @@ TEST(Substrait, SerializePlan) { ASSERT_OK(t_decl->Validate()); - std::cout << "Des Plan " << std::endl; - std::cout << t_plan->ToString() << std::endl; + // std::cout << "Des Plan " << std::endl; + // std::cout << t_plan->ToString() << std::endl; + + + //ASSERT_OK(t_plan->Validate()); + //ASSERT_OK(t_plan->StartProducing()); + } } // namespace engine From 780e32f065830e2ada858d7308932e839e389a65 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Sat, 18 Jun 2022 07:01:32 +0530 Subject: [PATCH 13/26] add an end-to-end test case --- cpp/src/arrow/engine/substrait/serde_test.cc | 59 ++++++++++---------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index 544b805b8ff..4c162a172fc 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1177,31 +1177,27 @@ TEST(Substrait, JoinPlanInvalidKeys) { TEST(Substrait, SerializeRelation) { ExtensionSet ext_set; + compute::ExecContext exec_context; ASSERT_OK_AND_ASSIGN(std::string dir_string, - arrow::internal::GetEnvVar("PARQUET_TEST_DATA")); + arrow::internal::GetEnvVar("PARQUET_TEST_DATA")); auto file_name = arrow::internal::PlatformFilename::FromString(dir_string)->Join("binary.parquet"); - compute::ExecContext exec_context; - ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make(&exec_context)); auto dummy_schema = schema({field("foo", binary())}); - auto format = std::make_shared(); auto filesystem = std::make_shared(); + std::vector files; - const std::string f_path = file_name->ToString();//"/tmp/data1.parquet"; - // const std::string s_path = "file:///tmp/data2.parquet"; + const std::string f_path = file_name->ToString(); ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path)); - // ASSERT_OK_AND_ASSIGN(auto s_file, filesystem->GetFileInfo(s_path)); files.push_back(std::move(f_file)); - // files.push_back(std::move(s_file)); ASSERT_OK_AND_ASSIGN(auto ds_factory, dataset::FileSystemDatasetFactory::Make( std::move(filesystem), std::move(files), std::move(format), {})); - ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(std::move(dummy_schema))); + ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema)); auto options = std::make_shared(); options->projection = compute::project({}, {}); // create empty projection @@ -1209,7 +1205,7 @@ TEST(Substrait, SerializeRelation) { auto scan_node_options = dataset::ScanNodeOptions{dataset, options}; arrow::AsyncGenerator> sink_gen; - + auto sink_node_options = compute::SinkNodeOptions{&sink_gen}; auto scan_declaration = compute::Declaration({"scan", scan_node_options}); @@ -1218,48 +1214,53 @@ TEST(Substrait, SerializeRelation) { auto declarations = compute::Declaration::Sequence({scan_declaration, sink_declaration}); + ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make(&exec_context)); ASSERT_OK_AND_ASSIGN(auto decl, declarations.AddToPlan(plan.get())); ASSERT_OK(decl->Validate()); - // std::shared_ptr sink_reader = - // compute::MakeGeneratorReader(dummy_schema, std::move(sink_gen), exec_context.memory_pool()); - - // ASSERT_OK(plan->Validate()); - // ASSERT_OK(plan->StartProducing()); + std::shared_ptr sink_reader = compute::MakeGeneratorReader( + dummy_schema, std::move(sink_gen), exec_context.memory_pool()); - // std::cout << "Plan " << std::endl; - // std::cout << plan->ToString() << std::endl; + ASSERT_OK(plan->Validate()); + ASSERT_OK(plan->StartProducing()); - // collect sink_reader into a Table std::shared_ptr response_table; ASSERT_OK_AND_ASSIGN(response_table, - arrow::Table::FromRecordBatchReader(sink_reader.get())); + arrow::Table::FromRecordBatchReader(sink_reader.get())); - std::cout << "Results : " << response_table->ToString() << std::endl; - - - std::cout << "Serialize Scan Relation" << std::endl; ASSERT_OK_AND_ASSIGN(auto serialized_rel, SerializeRelation(scan_declaration, &ext_set)); ASSERT_OK_AND_ASSIGN(auto deserialized_decl, DeserializeRelation(*serialized_rel, ext_set)); - auto t_decls = compute::Declaration::Sequence({deserialized_decl, sink_declaration}); + arrow::AsyncGenerator> des_sink_gen; + auto des_sink_node_options = compute::SinkNodeOptions{&des_sink_gen}; + + auto des_sink_declaration = compute::Declaration({"sink", des_sink_node_options}); + + auto t_decls = + compute::Declaration::Sequence({deserialized_decl, des_sink_declaration}); ASSERT_OK_AND_ASSIGN(auto t_plan, compute::ExecPlan::Make()); ASSERT_OK_AND_ASSIGN(auto t_decl, t_decls.AddToPlan(t_plan.get())); ASSERT_OK(t_decl->Validate()); - // std::cout << "Des Plan " << std::endl; - // std::cout << t_plan->ToString() << std::endl; + std::shared_ptr des_sink_reader = + compute::MakeGeneratorReader(dummy_schema, std::move(des_sink_gen), + exec_context.memory_pool()); + + ASSERT_OK(t_plan->Validate()); + ASSERT_OK(t_plan->StartProducing()); + + std::shared_ptr des_response_table; - - //ASSERT_OK(t_plan->Validate()); - //ASSERT_OK(t_plan->StartProducing()); + ASSERT_OK_AND_ASSIGN(des_response_table, + arrow::Table::FromRecordBatchReader(des_sink_reader.get())); + ASSERT_TRUE(response_table->Equals(*des_response_table, true)); } } // namespace engine From 47fad43cf7f1354009fb51f8ee515cefb968a4e7 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 06:25:55 +0530 Subject: [PATCH 14/26] format and cleaned up --- cpp/src/arrow/engine/substrait/relation_internal.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 3a6db084517..4db1009cb41 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -341,7 +341,6 @@ struct ExtractRelation { const std::string& rel_name = declaration.factory_name; switch (enum_map.find(rel_name)->second) { case ArrowRelationType::SCAN: - std::cout << "Scan Relation" << std::endl; return AddReadRelation(declaration); default: return Status::Invalid("Unsupported factory name :", rel_name); @@ -396,7 +395,6 @@ struct ExtractRelation { Result> ToProto(const compute::Declaration& declaration, ExtensionSet* ext_set) { - std::cout << ">>>>> ToProto[Rel] >>>> " << std::endl; auto out = internal::make_unique(); RETURN_NOT_OK(ExtractRelation(out.get(), ext_set)(declaration)); return std::move(out); From 7b5defdf5ee763f2a86fb1557620bd39f711b9ef Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 06:58:59 +0530 Subject: [PATCH 15/26] removed unnecessary commit from an example --- cpp/examples/arrow/compute_register_example.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/examples/arrow/compute_register_example.cc b/cpp/examples/arrow/compute_register_example.cc index 87395c5b8b5..13d80b29631 100644 --- a/cpp/examples/arrow/compute_register_example.cc +++ b/cpp/examples/arrow/compute_register_example.cc @@ -169,5 +169,6 @@ int main(int argc, char** argv) { }) .AddToPlan(plan.get()) .status()); + return EXIT_SUCCESS; } From 0eb567f3167711c249feda79fefab10c817f1c6b Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 07:00:32 +0530 Subject: [PATCH 16/26] remove unncessary components in exec_plan --- cpp/src/arrow/compute/exec/exec_plan.cc | 4 ---- cpp/src/arrow/compute/exec/exec_plan.h | 3 --- 2 files changed, 7 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index ebcf0cf0a55..468d0accead 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -283,10 +283,6 @@ const ExecPlan::NodeVector& ExecPlan::sources() const { const ExecPlan::NodeVector& ExecPlan::sinks() const { return ToDerived(this)->sinks_; } -const ExecPlan::NodeVector& ExecPlan::nodes() const { - return ToDerived(this)->sorted_nodes_; -} - Status ExecPlan::Validate() { return ToDerived(this)->Validate(); } Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); } diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index f1bb7bbf48f..dcf271bd360 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -67,9 +67,6 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { /// The final outputs const NodeVector& sinks() const; - /// Nodes in the Plan - const NodeVector& nodes() const; - Status Validate(); /// \brief Start producing on all nodes From c19ebe6aafb9ac87e29a18c16574e68380beb99a Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 07:06:42 +0530 Subject: [PATCH 17/26] removing unncessary components from plan --- cpp/src/arrow/engine/substrait/plan_internal.cc | 8 -------- cpp/src/arrow/engine/substrait/plan_internal.h | 4 ---- 2 files changed, 12 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/plan_internal.cc b/cpp/src/arrow/engine/substrait/plan_internal.cc index d1d2765ca6f..fcee0b2188f 100644 --- a/cpp/src/arrow/engine/substrait/plan_internal.cc +++ b/cpp/src/arrow/engine/substrait/plan_internal.cc @@ -133,13 +133,5 @@ Result GetExtensionSetFromPlan(const substrait::Plan& plan, registry); } -Result> ToProto(const compute::ExecPlan& exec_plan, - ExtensionSet* ext_set) { - std::cout << "ToProto[ExecPlan]" << std::endl; - auto plan = internal::make_unique(); - - return plan; -} - } // namespace engine } // namespace arrow diff --git a/cpp/src/arrow/engine/substrait/plan_internal.h b/cpp/src/arrow/engine/substrait/plan_internal.h index 699d1073266..dce23cdceba 100644 --- a/cpp/src/arrow/engine/substrait/plan_internal.h +++ b/cpp/src/arrow/engine/substrait/plan_internal.h @@ -19,7 +19,6 @@ #pragma once -#include "arrow/compute/exec/exec_plan.h" #include "arrow/engine/substrait/extension_set.h" #include "arrow/engine/substrait/visibility.h" #include "arrow/type_fwd.h" @@ -52,8 +51,5 @@ Result GetExtensionSetFromPlan( const substrait::Plan& plan, const ExtensionIdRegistry* registry = default_extension_id_registry()); -ARROW_ENGINE_EXPORT Result> ToProto( - const compute::ExecPlan&, ExtensionSet*); - } // namespace engine } // namespace arrow From 0a0fe66ab31fbe96cadaa2e1fa98c346e9dadecb Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 07:08:56 +0530 Subject: [PATCH 18/26] remove unncessary components from serde --- cpp/src/arrow/engine/substrait/serde.cc | 7 ------- cpp/src/arrow/engine/substrait/serde.h | 4 ---- 2 files changed, 11 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc index 4ea3f2f2f2b..89792bb459d 100644 --- a/cpp/src/arrow/engine/substrait/serde.cc +++ b/cpp/src/arrow/engine/substrait/serde.cc @@ -112,13 +112,6 @@ Result DeserializePlan(const Buffer& buf, } } -Result> SerializePlan(const compute::ExecPlan& exec_plan, - ExtensionSet* ext_set) { - ARROW_ASSIGN_OR_RAISE(auto plan, ToProto(exec_plan, ext_set)); - std::string serialized = plan->SerializeAsString(); - return Buffer::FromString(std::move(serialized)); -} - Result> DeserializeSchema(const Buffer& buf, const ExtensionSet& ext_set) { ARROW_ASSIGN_OR_RAISE(auto named_struct, ParseFromBuffer(buf)); diff --git a/cpp/src/arrow/engine/substrait/serde.h b/cpp/src/arrow/engine/substrait/serde.h index 49bdb447c7d..456a02a7d5a 100644 --- a/cpp/src/arrow/engine/substrait/serde.h +++ b/cpp/src/arrow/engine/substrait/serde.h @@ -56,10 +56,6 @@ ARROW_ENGINE_EXPORT Result DeserializePlan( const Buffer& buf, const ConsumerFactory& consumer_factory, ExtensionSet* ext_set_out = NULLPTR); -/// TODO: add docstring -ARROW_ENGINE_EXPORT Result> SerializePlan( - const compute::ExecPlan& exec_plan, ExtensionSet* ext_set); - /// \brief Deserializes a Substrait Type message to the corresponding Arrow type /// /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Type From fe7cdc248fd83cf84cb6680c94a76d19d420dde9 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 07:14:57 +0530 Subject: [PATCH 19/26] adding docstring --- cpp/src/arrow/engine/substrait/serde.h | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/serde.h b/cpp/src/arrow/engine/substrait/serde.h index 456a02a7d5a..c5d0eaa51be 100644 --- a/cpp/src/arrow/engine/substrait/serde.h +++ b/cpp/src/arrow/engine/substrait/serde.h @@ -122,10 +122,15 @@ ARROW_ENGINE_EXPORT Result> SerializeExpression(const compute::Expression& expr, ExtensionSet* ext_set); -/// TODO: add docstring - -Result> SerializeRelation(const compute::Declaration& declaration, - ExtensionSet* ext_set); +/// \brief Serializes an Arrow compute Declaration to a Substrait Relation message +/// +/// \param[in] declaration the Arrow compute declaration to serialize +/// \param[in,out] ext_set the extension mapping to use; may be updated to add +/// mappings for the components in the used declaration +/// \return a buffer containing the protobuf serialization of the corresponding Substrait +/// Relation message +ARROW_ENGINE_EXPORT Result> SerializeRelation( + const compute::Declaration& declaration, ExtensionSet* ext_set); /// \brief Deserializes a Substrait Rel (relation) message to an ExecNode declaration /// From 6548ca697b050e392a3d790298352b05db68ba3b Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 07:38:15 +0530 Subject: [PATCH 20/26] remove unncessary comment --- cpp/src/arrow/engine/substrait/serde_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index 4c162a172fc..e2994a2aa7c 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1200,7 +1200,7 @@ TEST(Substrait, SerializeRelation) { ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema)); auto options = std::make_shared(); - options->projection = compute::project({}, {}); // create empty projection + options->projection = compute::project({}, {}); auto scan_node_options = dataset::ScanNodeOptions{dataset, options}; From f941ed2868626e612ed8a1aadf054a9d91096e95 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 09:03:10 +0530 Subject: [PATCH 21/26] adding a check for windows test --- cpp/src/arrow/engine/substrait/serde_test.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index e2994a2aa7c..a5ef1c37bd1 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1176,6 +1176,9 @@ TEST(Substrait, JoinPlanInvalidKeys) { } TEST(Substrait, SerializeRelation) { +#ifdef _WIN32 + GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows"; +#else ExtensionSet ext_set; compute::ExecContext exec_context; @@ -1204,7 +1207,7 @@ TEST(Substrait, SerializeRelation) { auto scan_node_options = dataset::ScanNodeOptions{dataset, options}; - arrow::AsyncGenerator> sink_gen; + arrow::AsyncGenerator > sink_gen; auto sink_node_options = compute::SinkNodeOptions{&sink_gen}; @@ -1235,7 +1238,7 @@ TEST(Substrait, SerializeRelation) { ASSERT_OK_AND_ASSIGN(auto deserialized_decl, DeserializeRelation(*serialized_rel, ext_set)); - arrow::AsyncGenerator> des_sink_gen; + arrow::AsyncGenerator > des_sink_gen; auto des_sink_node_options = compute::SinkNodeOptions{&des_sink_gen}; auto des_sink_declaration = compute::Declaration({"sink", des_sink_node_options}); @@ -1261,6 +1264,7 @@ TEST(Substrait, SerializeRelation) { arrow::Table::FromRecordBatchReader(des_sink_reader.get())); ASSERT_TRUE(response_table->Equals(*des_response_table, true)); +#endif } } // namespace engine From 01e37c92a2b4fe705fff6cfceddb81b947873b02 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 11:14:18 +0530 Subject: [PATCH 22/26] extending test cases for scan toproto --- cpp/src/arrow/engine/substrait/serde_test.cc | 71 ++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index a5ef1c37bd1..e91be522d34 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1176,6 +1176,77 @@ TEST(Substrait, JoinPlanInvalidKeys) { } TEST(Substrait, SerializeRelation) { +#ifdef _WIN32 + GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows"; +#else + ExtensionSet ext_set; + auto dummy_schema = schema({field("f1", int32()), field("f2", int32())}); + // creating a dummy dataset using a dummy table + auto format = std::make_shared(); + auto filesystem = std::make_shared(); + + std::vector files; + const std::vector f_paths = {"/tmp/data1.parquet", "/tmp/data2.parquet"}; + + for (const auto& f_path : f_paths) { + ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path)); + files.push_back(std::move(f_file)); + } + + ASSERT_OK_AND_ASSIGN(auto ds_factory, dataset::FileSystemDatasetFactory::Make( + std::move(filesystem), std::move(files), + std::move(format), {})); + ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema)); + + auto options = std::make_shared(); + options->projection = compute::project({}, {}); + auto scan_node_options = dataset::ScanNodeOptions{dataset, options}; + + auto scan_declaration = compute::Declaration({"scan", scan_node_options}); + + ASSERT_OK_AND_ASSIGN(auto serialized_rel, + SerializeRelation(scan_declaration, &ext_set)); + ASSERT_OK_AND_ASSIGN(auto deserialized_decl, + DeserializeRelation(*serialized_rel, ext_set)); + + auto dataset_comparator = [](std::shared_ptr ds_lhs, + std::shared_ptr ds_rhs) -> bool { + const auto& fds_lhs = checked_cast(*ds_lhs); + const auto& fds_rhs = checked_cast(*ds_lhs); + const auto& files_lhs = fds_lhs.files(); + const auto& files_rhs = fds_rhs.files(); + + bool cmp_fsize = files_lhs.size() == files_rhs.size(); + uint64_t fidx = 0; + for (const auto& l_file : files_lhs) { + if (l_file != files_rhs[fidx]) { + return false; + } + fidx++; + } + bool cmp_file_format = fds_lhs.format()->Equals(*fds_lhs.format()); + bool cmp_file_system = fds_lhs.filesystem()->Equals(fds_rhs.filesystem()); + return cmp_fsize && cmp_file_format && cmp_file_system; + }; + + auto scan_option_comparator = [dataset_comparator]( + const dataset::ScanNodeOptions& lhs, + const dataset::ScanNodeOptions& rhs) -> bool { + bool cmp_rso = lhs.require_sequenced_output == rhs.require_sequenced_output; + bool cmp_ds = dataset_comparator(lhs.dataset, rhs.dataset); + return cmp_rso && cmp_ds; + }; + + EXPECT_EQ(deserialized_decl.factory_name, scan_declaration.factory_name); + const auto& lhs = + checked_cast(*deserialized_decl.options); + const auto& rhs = + checked_cast(*scan_declaration.options); + ASSERT_TRUE(scan_option_comparator(lhs, rhs)); +#endif +} + +TEST(Substrait, SerializeRelationEndToEnd) { #ifdef _WIN32 GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows"; #else From 10047f16a427359d1466270cdbd4a356fba6ed6b Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 11:44:48 +0530 Subject: [PATCH 23/26] updated toproto relation test case --- cpp/src/arrow/engine/substrait/serde_test.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index e91be522d34..789c21ebf8d 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1180,13 +1180,18 @@ TEST(Substrait, SerializeRelation) { GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows"; #else ExtensionSet ext_set; - auto dummy_schema = schema({field("f1", int32()), field("f2", int32())}); + auto dummy_schema = schema({field("foo", binary())}); // creating a dummy dataset using a dummy table auto format = std::make_shared(); auto filesystem = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::string dir_string, + arrow::internal::GetEnvVar("PARQUET_TEST_DATA")); + auto file_name = + arrow::internal::PlatformFilename::FromString(dir_string)->Join("binary.parquet"); + std::vector files; - const std::vector f_paths = {"/tmp/data1.parquet", "/tmp/data2.parquet"}; + const std::vector f_paths = {file_name->ToString()}; for (const auto& f_path : f_paths) { ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path)); From dadc9fa007d1090f6967fdd1bdaada2666beb417 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 20 Jun 2022 15:13:22 +0530 Subject: [PATCH 24/26] update test case --- .../engine/substrait/relation_internal.cc | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 4db1009cb41..191f299b932 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -324,13 +324,19 @@ Result FromProto(const substrait::Rel& rel, } namespace { - +// TODO: add other types enum ArrowRelationType : uint8_t { SCAN, + FILTER, + PROJECT, + JOIN, + AGGREGATE, }; const std::map enum_map{ - {"scan", ArrowRelationType::SCAN}, + {"scan", ArrowRelationType::SCAN}, {"filter", ArrowRelationType::FILTER}, + {"project", ArrowRelationType::PROJECT}, {"join", ArrowRelationType::JOIN}, + {"aggregate", ArrowRelationType::AGGREGATE}, }; struct ExtractRelation { @@ -342,6 +348,14 @@ struct ExtractRelation { switch (enum_map.find(rel_name)->second) { case ArrowRelationType::SCAN: return AddReadRelation(declaration); + case ArrowRelationType::FILTER: + return Status::NotImplemented("Filter operator not supported."); + case ArrowRelationType::PROJECT: + return Status::NotImplemented("Project operator not supported."); + case ArrowRelationType::JOIN: + return Status::NotImplemented("Join operator not supported."); + case ArrowRelationType::AGGREGATE: + return Status::NotImplemented("Aggregate operator not supported."); default: return Status::Invalid("Unsupported factory name :", rel_name); } @@ -351,6 +365,7 @@ struct ExtractRelation { auto read_rel = internal::make_unique(); const auto& scan_node_options = internal::checked_cast(*declaration.options); + const auto& fds = internal::checked_cast( *scan_node_options.dataset); From fac7d45fd3098baea62a1407efba8c6be87bfd04 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Mon, 27 Jun 2022 21:07:19 +0530 Subject: [PATCH 25/26] uupdate code on reviews --- .../engine/substrait/relation_internal.cc | 19 ++++++++++++------- cpp/src/arrow/engine/substrait/serde_test.cc | 18 ++++++++++-------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 191f299b932..e9020490e24 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -357,7 +357,7 @@ struct ExtractRelation { case ArrowRelationType::AGGREGATE: return Status::NotImplemented("Aggregate operator not supported."); default: - return Status::Invalid("Unsupported factory name :", rel_name); + return Status::Invalid("Unsupported exec node factory name :", rel_name); } } @@ -366,22 +366,27 @@ struct ExtractRelation { const auto& scan_node_options = internal::checked_cast(*declaration.options); - const auto& fds = internal::checked_cast( - *scan_node_options.dataset); - + auto dataset = + dynamic_cast(scan_node_options.dataset.get()); + if (dataset == nullptr) { + return Status::Invalid( + "Can only convert file system datasets to a Substrait plan."); + } // set schema - ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*fds.schema(), ext_set_)); + ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*dataset->schema(), ext_set_)); read_rel->set_allocated_base_schema(named_struct.release()); // set local files auto read_rel_lfs = internal::make_unique(); - for (const auto& file : fds.files()) { + for (const auto& file : dataset->files()) { auto read_rel_lfs_ffs = internal::make_unique(); read_rel_lfs_ffs->set_uri_path("file://" + file); // set file format - auto format_type_name = fds.format()->type_name(); + // arrow and feather are temporarily handled via the Parquet format until + // upgraded to the latest Substrait version. + auto format_type_name = dataset->format()->type_name(); if (format_type_name == "parquet" || format_type_name == "arrow" || format_type_name == "feather") { read_rel_lfs_ffs->set_format( diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index 789c21ebf8d..5e5ee9ced7a 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1216,12 +1216,14 @@ TEST(Substrait, SerializeRelation) { auto dataset_comparator = [](std::shared_ptr ds_lhs, std::shared_ptr ds_rhs) -> bool { - const auto& fds_lhs = checked_cast(*ds_lhs); - const auto& fds_rhs = checked_cast(*ds_lhs); - const auto& files_lhs = fds_lhs.files(); - const auto& files_rhs = fds_rhs.files(); + const auto& fsd_lhs = checked_cast(*ds_lhs); + const auto& fsd_rhs = checked_cast(*ds_lhs); + const auto& files_lhs = fsd_lhs.files(); + const auto& files_rhs = fsd_rhs.files(); - bool cmp_fsize = files_lhs.size() == files_rhs.size(); + if (files_lhs.size() != files_rhs.size()) { + return false; + } uint64_t fidx = 0; for (const auto& l_file : files_lhs) { if (l_file != files_rhs[fidx]) { @@ -1229,9 +1231,9 @@ TEST(Substrait, SerializeRelation) { } fidx++; } - bool cmp_file_format = fds_lhs.format()->Equals(*fds_lhs.format()); - bool cmp_file_system = fds_lhs.filesystem()->Equals(fds_rhs.filesystem()); - return cmp_fsize && cmp_file_format && cmp_file_system; + bool cmp_file_format = fsd_lhs.format()->Equals(*fsd_lhs.format()); + bool cmp_file_system = fsd_lhs.filesystem()->Equals(fsd_rhs.filesystem()); + return cmp_file_format && cmp_file_system; }; auto scan_option_comparator = [dataset_comparator]( From b1804383270c0faa05bb856f4a1f87ea9b821f68 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 28 Jun 2022 20:42:14 +0530 Subject: [PATCH 26/26] adding initial filter ToProto --- .../engine/substrait/relation_internal.cc | 35 +++++- cpp/src/arrow/engine/substrait/serde_test.cc | 118 ++++++++++++++++++ 2 files changed, 151 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index e9020490e24..1ea03b08b5a 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -349,7 +349,7 @@ struct ExtractRelation { case ArrowRelationType::SCAN: return AddReadRelation(declaration); case ArrowRelationType::FILTER: - return Status::NotImplemented("Filter operator not supported."); + return AddFilterRelation(declaration); case ArrowRelationType::PROJECT: return Status::NotImplemented("Project operator not supported."); case ArrowRelationType::JOIN: @@ -384,7 +384,7 @@ struct ExtractRelation { read_rel_lfs_ffs->set_uri_path("file://" + file); // set file format - // arrow and feather are temporarily handled via the Parquet format until + // arrow and feather are temporarily handled via the Parquet format until // upgraded to the latest Substrait version. auto format_type_name = dataset->format()->type_name(); if (format_type_name == "parquet" || format_type_name == "arrow" || @@ -402,11 +402,42 @@ struct ExtractRelation { return Status::OK(); } + Status AddFilterRelation(const compute::Declaration& declaration) { + auto filter_rel = internal::make_unique(); + const auto& filter_node_options = + internal::checked_cast(*declaration.options); + + if (declaration.inputs.size() == 0) { + return Status::Invalid("Filter node doesn't have an input."); + } + + auto input_rel = GetRelationFromDeclaration(declaration, ext_set_); + + filter_rel->set_allocated_input(input_rel->release()); + + ARROW_ASSIGN_OR_RAISE(auto subs_expr, + ToProto(filter_node_options.filter_expression, ext_set_)); + *filter_rel->mutable_condition() = *subs_expr.get(); + + rel_->set_allocated_filter(filter_rel.release()); + + return Status::OK(); + } + Status operator()(const compute::Declaration& declaration) { return AddRelation(declaration); } private: + Result> GetRelationFromDeclaration( + const compute::Declaration declaration, ExtensionSet* ext_set) { + auto declr_input = declaration.inputs[0]; + // TODO: figure out a better way + if (util::get_if(&declr_input)) { + return Status::NotImplemented("Only support Plans written in Declaration format."); + } + return ToProto(util::get(declr_input), ext_set); + } substrait::Rel* rel_; ExtensionSet* ext_set_; }; diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index 5e5ee9ced7a..2d5da991567 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -1345,5 +1345,123 @@ TEST(Substrait, SerializeRelationEndToEnd) { #endif } +TEST(Substrait, SerializeFilterRelation) { +#ifdef _WIN32 + GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows"; +#else + ExtensionSet ext_set; + compute::ExecContext exec_context; + + ASSERT_OK_AND_ASSIGN(std::string dir_string, + arrow::internal::GetEnvVar("PARQUET_TEST_DATA")); + auto file_name = arrow::internal::PlatformFilename::FromString(dir_string) + ->Join("alltypes_plain.parquet"); + + // Note: left the timestamp field since it is not supported. + // Add it back once it is added. + auto dummy_schema = schema({ + field("id", int32()), + field("bool_col", boolean()), + field("tinyint_col", int32()), + field("smallint_col", int32()), + field("int_col", int32()), + field("bigint_col", int64()), + field("float_col", float32()), + field("double_col", float64()), + field("date_string_col", binary()), + field("string_col", binary()), + }); + auto format = std::make_shared(); + auto filesystem = std::make_shared(); + + std::vector files; + const std::string f_path = file_name->ToString(); + ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path)); + files.push_back(std::move(f_file)); + + ASSERT_OK_AND_ASSIGN(auto ds_factory, dataset::FileSystemDatasetFactory::Make( + std::move(filesystem), std::move(files), + std::move(format), {})); + + ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema)); + + auto options = std::make_shared(); + + options->projection = compute::project({}, {}); + + auto scan_node_options = dataset::ScanNodeOptions{dataset, options}; + + arrow::AsyncGenerator > sink_gen; + + auto sink_node_options = compute::SinkNodeOptions{&sink_gen}; + + compute::Expression filter_expr = + compute::equal(compute::field_ref("bigint_col"), compute::literal(10)); + // TODO: evaluate this + const std::shared_ptr kBoringSchema = schema({field("bigint_col", int32())}); + ASSERT_OK_AND_ASSIGN(filter_expr, filter_expr.Bind(*kBoringSchema)); + auto filter_node_options = compute::FilterNodeOptions{{filter_expr}}; + + auto scan_declaration = compute::Declaration({"scan", scan_node_options}); + auto filter_declaration = compute::Declaration({"filter", filter_node_options}); + auto sink_declaration = compute::Declaration({"sink", sink_node_options}); + + auto declarations = compute::Declaration::Sequence( + {scan_declaration, filter_declaration, sink_declaration}); + + ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make(&exec_context)); + ASSERT_OK_AND_ASSIGN(auto decl, declarations.AddToPlan(plan.get())); + + ASSERT_OK(decl->Validate()); + + auto out_schema = schema({field("bigint_col", int64())}); + std::shared_ptr sink_reader = compute::MakeGeneratorReader( + out_schema, std::move(sink_gen), exec_context.memory_pool()); + + ASSERT_OK(plan->Validate()); + ASSERT_OK(plan->StartProducing()); + + std::shared_ptr response_table; + + ASSERT_OK_AND_ASSIGN(response_table, + arrow::Table::FromRecordBatchReader(sink_reader.get())); + + auto scan_filter_declr = + compute::Declaration::Sequence({scan_declaration, filter_declaration}); + + ASSERT_OK_AND_ASSIGN(auto serialized_filter_rel, + SerializeRelation(scan_filter_declr, &ext_set)); + ASSERT_OK_AND_ASSIGN(auto deserialized_filter_decl, + DeserializeRelation(*serialized_filter_rel, ext_set)); + + arrow::AsyncGenerator > des_sink_gen; + auto des_sink_node_options = compute::SinkNodeOptions{&des_sink_gen}; + + auto des_sink_declaration = compute::Declaration({"sink", des_sink_node_options}); + + auto t_decls = + compute::Declaration::Sequence({deserialized_filter_decl, des_sink_declaration}); + + ASSERT_OK_AND_ASSIGN(auto t_plan, compute::ExecPlan::Make()); + ASSERT_OK_AND_ASSIGN(auto t_decl, t_decls.AddToPlan(t_plan.get())); + + ASSERT_OK(t_decl->Validate()); + + std::shared_ptr des_sink_reader = + compute::MakeGeneratorReader(out_schema, std::move(des_sink_gen), + exec_context.memory_pool()); + + ASSERT_OK(t_plan->Validate()); + ASSERT_OK(t_plan->StartProducing()); + + std::shared_ptr des_response_table; + + ASSERT_OK_AND_ASSIGN(des_response_table, + arrow::Table::FromRecordBatchReader(des_sink_reader.get())); + + ASSERT_TRUE(response_table->Equals(*des_response_table, true)); +#endif +} + } // namespace engine } // namespace arrow