Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5b691f8
rebase: short history 16855
vibhatha Jul 27, 2022
623ef61
fix(registry): initial version of registry def
vibhatha Jul 28, 2022
49647c7
adding registry end-to-end WIP
vibhatha Jul 28, 2022
57347db
fix(scan): include initial version of registry test WIP
vibhatha Aug 1, 2022
00549f7
adding initial testing on registry-based roundtrip tests for plan/rel…
vibhatha Aug 1, 2022
87c5328
feat(test): adding end-to-end testing
vibhatha Aug 2, 2022
53e2245
feat(test-end-to-end): added an end-to-end test case
vibhatha Aug 2, 2022
6907639
fix(refactor): refactor the relation and plan impl
vibhatha Aug 2, 2022
5b5cc83
fix(format): reformat code
vibhatha Aug 2, 2022
a451043
fix(export): updating export declaration for registry
vibhatha Aug 3, 2022
05c2d7a
fix(format): formatting the code and fixing unresolved minor issues
vibhatha Aug 3, 2022
3ce1c18
fix(format): included docstrings and clode clean up
vibhatha Aug 3, 2022
1a179a1
fix(review): addressing a previous review comment
vibhatha Aug 3, 2022
630524a
fix(review): addressing review comment
vibhatha Aug 3, 2022
ce13740
fix(code): missed move op added
vibhatha Aug 3, 2022
e6abfc9
fix(path): using ToNative instead of ToString
vibhatha Aug 4, 2022
f07de57
fix(docs): added conversion_options to docstring
vibhatha Aug 4, 2022
ea878ea
fix(rebase): rebasing with Substrait changes
vibhatha Aug 22, 2022
1daecba
fix(address_review): refactor
vibhatha Aug 22, 2022
ea8c557
fix(registry): cleaning up registry
vibhatha Aug 24, 2022
ef407b0
fix(reviews): uri fix, remove SetRelation, simplify code
vibhatha Aug 29, 2022
6571de2
fix(cleanup): raddressing reviews
vibhatha Aug 29, 2022
3841bfb
fix(reviews): updated input handling
vibhatha Aug 29, 2022
b9d6f07
fix(native): updated the file_path method to check CI failure
vibhatha Aug 29, 2022
0479dac
fix(ipc): adding ipc write replacing parquet
vibhatha Aug 31, 2022
c1de2b8
fix(file_path_issue): temp commit
vibhatha Sep 6, 2022
3156fd2
fix(temp): testing a fix for additional slash in file handling
vibhatha Sep 6, 2022
491a985
fix(path-issue): fixed the path issue and updated the test cases
vibhatha Sep 7, 2022
33d7753
fix(path): windows issue fixing
vibhatha Sep 7, 2022
616d6e5
fix(reviews): address reviews
vibhatha Sep 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cpp/src/arrow/engine/substrait/extension_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,20 @@ ExtensionIdRegistry::ArrowToSubstraitCall EncodeOptionlessOverflowableArithmetic
};
}

ExtensionIdRegistry::ArrowToSubstraitCall EncodeOptionlessComparison(Id substrait_fn_id) {
return
[substrait_fn_id](const compute::Expression::Call& call) -> Result<SubstraitCall> {
// nullable=true isn't quite correct but we don't know the nullability of
// the inputs
SubstraitCall substrait_call(substrait_fn_id, call.type.GetSharedPtr(),
/*nullable=*/true);
for (std::size_t i = 0; i < call.arguments.size(); i++) {
substrait_call.SetValueArg(static_cast<uint32_t>(i), call.arguments[i]);
}
return std::move(substrait_call);
};
}

ExtensionIdRegistry::SubstraitCallToArrow DecodeOptionlessBasicMapping(
const std::string& function_name, uint32_t max_args) {
return [function_name,
Expand Down Expand Up @@ -873,6 +887,11 @@ struct DefaultExtensionIdRegistry : ExtensionIdRegistryImpl {
AddArrowToSubstraitCall(std::string(fn_name) + "_checked",
EncodeOptionlessOverflowableArithmetic<true>(fn_id)));
}
// Comparison operators
for (const auto& fn_name : {"equal", "is_not_distinct_from"}) {
Id fn_id{kSubstraitComparisonFunctionsUri, fn_name};
DCHECK_OK(AddArrowToSubstraitCall(fn_name, EncodeOptionlessComparison(fn_id)));
}
}
};

Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/engine/substrait/plan_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "arrow/engine/substrait/plan_internal.h"

#include "arrow/dataset/plan.h"
#include "arrow/engine/substrait/relation_internal.h"
#include "arrow/result.h"
#include "arrow/util/hashing.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -133,5 +135,19 @@ Result<ExtensionSet> GetExtensionSetFromPlan(const substrait::Plan& plan,
registry);
}

Result<std::unique_ptr<substrait::Plan>> PlanToProto(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the name PlanToProto better but, for consistency, I think this should be named ToProto right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that there is the same function signature in the relation_internal.h. What we do there is output the corresponding serialized substrait::Rel for a Declaration. Since the Substrait relation model entities are final classes which doesn't extend from a generic relation interface, what we do is create a substrait::Rel and fill the corresponding part. This is considered as a partial plan. For instance if the passed Declaration is a scan we populate the read in the substrait::Rel object. Then in the plan_internal.cc we extract that read component and bind to substrait::Rel which is considered as the full plan. In the plan_internal.cc what we do is we pass the sink to the PlanToProto to get the substrait::Rel which is recursively called until the whole plan is serialized.

So that's the reason for having this function signature to make clear and avoid compiler errors. Wanted to expose both interfaces to the user so that it can be used accordingly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. So the root problem is that a relation and a plan are both represented in Acero by compute::Declaration and so there is ambiguity. I think the name PlanToProto is fine.

const compute::Declaration& declr, ExtensionSet* ext_set,
const ConversionOptions& conversion_options) {
auto subs_plan = internal::make_unique<substrait::Plan>();
auto plan_rel = internal::make_unique<substrait::PlanRel>();
auto rel_root = internal::make_unique<substrait::RelRoot>();
ARROW_ASSIGN_OR_RAISE(auto rel, ToProto(declr, ext_set, conversion_options));
rel_root->set_allocated_input(rel.release());
plan_rel->set_allocated_root(rel_root.release());
subs_plan->mutable_relations()->AddAllocated(plan_rel.release());
RETURN_NOT_OK(AddExtensionSetToPlan(*ext_set, subs_plan.get()));
return std::move(subs_plan);
}

} // namespace engine
} // namespace arrow
14 changes: 14 additions & 0 deletions cpp/src/arrow/engine/substrait/plan_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

#pragma once

#include "arrow/compute/exec/exec_plan.h"
#include "arrow/engine/substrait/extension_set.h"
#include "arrow/engine/substrait/options.h"
#include "arrow/engine/substrait/visibility.h"
#include "arrow/type_fwd.h"

Expand Down Expand Up @@ -51,5 +53,17 @@ Result<ExtensionSet> GetExtensionSetFromPlan(
const substrait::Plan& plan,
const ExtensionIdRegistry* registry = default_extension_id_registry());

/// \brief Serialize a declaration into a substrait::Plan.
///
/// Note that, this is a part of a roundtripping test API and not
/// designed for use in production
/// \param[in] declr the sequence of declarations to be serialized
/// \param[in, out] ext_set the extension set to be updated
/// \param[in] conversion_options options to control serialization behavior
/// \return the serialized plan
ARROW_ENGINE_EXPORT Result<std::unique_ptr<substrait::Plan>> PlanToProto(
const compute::Declaration& declr, ExtensionSet* ext_set,
const ConversionOptions& conversion_options = {});

} // namespace engine
} // namespace arrow
201 changes: 177 additions & 24 deletions cpp/src/arrow/engine/substrait/relation_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/uri.h"

namespace arrow {

using ::arrow::internal::UriFromAbsolutePath;
using internal::checked_cast;
using internal::make_unique;

namespace engine {

template <typename RelMessage>
Expand Down Expand Up @@ -162,36 +170,45 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
}

path = path.substr(7);
if (item.path_type_case() ==
substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath) {
ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
if (file.type() == fs::FileType::File) {
files.push_back(std::move(file));
} else if (file.type() == fs::FileType::Directory) {
switch (item.path_type_case()) {
case substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath: {
ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
if (file.type() == fs::FileType::File) {
files.push_back(std::move(file));
} else if (file.type() == fs::FileType::Directory) {
fs::FileSelector selector;
selector.base_dir = path;
selector.recursive = true;
ARROW_ASSIGN_OR_RAISE(auto discovered_files,
filesystem->GetFileInfo(selector));
std::move(files.begin(), files.end(), std::back_inserter(discovered_files));
}
break;
}
case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile: {
files.emplace_back(path, fs::FileType::File);
break;
}
case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder: {
fs::FileSelector selector;
selector.base_dir = path;
selector.recursive = true;
ARROW_ASSIGN_OR_RAISE(auto discovered_files,
filesystem->GetFileInfo(selector));
std::move(files.begin(), files.end(), std::back_inserter(discovered_files));
std::move(discovered_files.begin(), discovered_files.end(),
std::back_inserter(files));
break;
}
case substrait::ReadRel_LocalFiles_FileOrFiles::kUriPathGlob: {
ARROW_ASSIGN_OR_RAISE(auto discovered_files,
fs::internal::GlobFiles(filesystem, path));
std::move(discovered_files.begin(), discovered_files.end(),
std::back_inserter(files));
break;
}
default: {
return Status::Invalid("Unrecognized file type in LocalFiles");
}
}
if (item.path_type_case() ==
substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile) {
files.emplace_back(path, fs::FileType::File);
} else if (item.path_type_case() ==
substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder) {
fs::FileSelector selector;
selector.base_dir = path;
selector.recursive = true;
ARROW_ASSIGN_OR_RAISE(auto discovered_files, filesystem->GetFileInfo(selector));
std::move(discovered_files.begin(), discovered_files.end(),
std::back_inserter(files));
} else {
ARROW_ASSIGN_OR_RAISE(auto discovered_files,
fs::internal::GlobFiles(filesystem, path));
std::move(discovered_files.begin(), discovered_files.end(),
std::back_inserter(files));
}
}

Expand Down Expand Up @@ -421,5 +438,141 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
rel.DebugString());
}

namespace {

Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) {
std::shared_ptr<Schema> bind_schema;
if (declr.factory_name == "scan") {
const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options));
bind_schema = opts.dataset->schema();
} else if (declr.factory_name == "filter") {
auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
} else if (declr.factory_name == "sink") {
// Note that the sink has no output_schema
return bind_schema;
} else {
return Status::Invalid("Schema extraction failed, unsupported factory ",
declr.factory_name);
}
return bind_schema;
}

Result<std::unique_ptr<substrait::ReadRel>> ScanRelationConverter(
const std::shared_ptr<Schema>& schema, const compute::Declaration& declaration,
ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
auto read_rel = make_unique<substrait::ReadRel>();
const auto& scan_node_options =
checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
auto dataset =
dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
if (dataset == nullptr) {
return Status::Invalid(
"Can only convert scan node with FileSystemDataset to a Substrait plan.");
}
// set schema
ARROW_ASSIGN_OR_RAISE(auto named_struct,
ToProto(*dataset->schema(), ext_set, conversion_options));
read_rel->set_allocated_base_schema(named_struct.release());

// set local files
auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>();
for (const auto& file : dataset->files()) {
auto read_rel_lfs_ffs = make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
read_rel_lfs_ffs->set_uri_path(UriFromAbsolutePath(file));
// set file format
auto format_type_name = dataset->format()->type_name();
if (format_type_name == "parquet") {
read_rel_lfs_ffs->set_allocated_parquet(
new substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions());
} else if (format_type_name == "ipc") {
read_rel_lfs_ffs->set_allocated_arrow(
new substrait::ReadRel::LocalFiles::FileOrFiles::ArrowReadOptions());
} else if (format_type_name == "orc") {
read_rel_lfs_ffs->set_allocated_orc(
new substrait::ReadRel::LocalFiles::FileOrFiles::OrcReadOptions());
} else {
return Status::NotImplemented("Unsupported file type: ", format_type_name);
}
read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release());
}
read_rel->set_allocated_local_files(read_rel_lfs.release());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a follow-up JIRA to add support for scan options projection & filter? I don't think it should be done as part of this JIRA since it is changing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return std::move(read_rel);
}

Result<std::unique_ptr<substrait::FilterRel>> FilterRelationConverter(
const std::shared_ptr<Schema>& schema, const compute::Declaration& declaration,
ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
auto filter_rel = make_unique<substrait::FilterRel>();
const auto& filter_node_options =
checked_cast<const compute::FilterNodeOptions&>(*(declaration.options));

auto filter_expr = filter_node_options.filter_expression;
compute::Expression bound_expression;
if (!filter_expr.IsBound()) {
ARROW_ASSIGN_OR_RAISE(bound_expression, filter_expr.Bind(*schema));
}

if (declaration.inputs.size() == 0) {
return Status::Invalid("Filter node doesn't have an input.");
}

// handling input
auto declr_input = declaration.inputs[0];
ARROW_ASSIGN_OR_RAISE(
auto input_rel,
ToProto(util::get<compute::Declaration>(declr_input), ext_set, conversion_options));
filter_rel->set_allocated_input(input_rel.release());

ARROW_ASSIGN_OR_RAISE(auto subs_expr,
ToProto(bound_expression, ext_set, conversion_options));
filter_rel->set_allocated_condition(subs_expr.release());
return std::move(filter_rel);
}

} // namespace

Status SerializeAndCombineRelations(const compute::Declaration& declaration,
ExtensionSet* ext_set,
std::unique_ptr<substrait::Rel>* rel,
const ConversionOptions& conversion_options) {
const auto& factory_name = declaration.factory_name;
ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration));
// Note that the sink declaration factory doesn't exist for serialization as
// Substrait doesn't deal with a sink node definition

if (factory_name == "scan") {
ARROW_ASSIGN_OR_RAISE(
auto read_rel,
ScanRelationConverter(schema, declaration, ext_set, conversion_options));
(*rel)->set_allocated_read(read_rel.release());
} else if (factory_name == "filter") {
ARROW_ASSIGN_OR_RAISE(
auto filter_rel,
FilterRelationConverter(schema, declaration, ext_set, conversion_options));
(*rel)->set_allocated_filter(filter_rel.release());
} else if (factory_name == "sink") {
// Generally when a plan is deserialized the declaration will be a sink declaration.
// Since there is no Sink relation in substrait, this function would be recursively
// called on the input of the Sink declaration.
auto sink_input_decl = util::get<compute::Declaration>(declaration.inputs[0]);
RETURN_NOT_OK(
SerializeAndCombineRelations(sink_input_decl, ext_set, rel, conversion_options));
} else {
return Status::NotImplemented("Factory ", factory_name,
" not implemented for roundtripping.");
}

return Status::OK();
}

Result<std::unique_ptr<substrait::Rel>> ToProto(
const compute::Declaration& declr, ExtensionSet* ext_set,
const ConversionOptions& conversion_options) {
auto rel = make_unique<substrait::Rel>();
RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, &rel, conversion_options));
return std::move(rel);
}

} // namespace engine
} // namespace arrow
10 changes: 10 additions & 0 deletions cpp/src/arrow/engine/substrait/relation_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,19 @@ struct DeclarationInfo {
int num_columns;
};

/// \brief Convert a Substrait Rel object to an Acero declaration
ARROW_ENGINE_EXPORT
Result<DeclarationInfo> FromProto(const substrait::Rel&, const ExtensionSet&,
const ConversionOptions&);

/// \brief Convert an Acero Declaration to a Substrait Rel
///
/// Note that, in order to provide a generic interface for ToProto,
/// the ExecNode or ExecPlan are not used in this context as Declaration
/// is preferred in the Substrait space rather than internal components of
/// Acero execution engine.
ARROW_ENGINE_EXPORT Result<std::unique_ptr<substrait::Rel>> ToProto(
const compute::Declaration&, ExtensionSet*, const ConversionOptions&);

} // namespace engine
} // namespace arrow
17 changes: 17 additions & 0 deletions cpp/src/arrow/engine/substrait/serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ Result<Message> ParseFromBuffer(const Buffer& buf) {
return message;
}

Result<std::shared_ptr<Buffer>> SerializePlan(
const compute::Declaration& declaration, ExtensionSet* ext_set,
const ConversionOptions& conversion_options) {
ARROW_ASSIGN_OR_RAISE(auto subs_plan,
PlanToProto(declaration, ext_set, conversion_options));
std::string serialized = subs_plan->SerializeAsString();
return Buffer::FromString(std::move(serialized));
}

Result<std::shared_ptr<Buffer>> SerializeRelation(
const compute::Declaration& declaration, ExtensionSet* ext_set,
const ConversionOptions& conversion_options) {
ARROW_ASSIGN_OR_RAISE(auto relation, ToProto(declaration, ext_set, conversion_options));
std::string serialized = relation->SerializeAsString();
return Buffer::FromString(std::move(serialized));
}

Result<compute::Declaration> DeserializeRelation(
const Buffer& buf, const ExtensionSet& ext_set,
const ConversionOptions& conversion_options) {
Expand Down
Loading