Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ae22b3e
updating submodule
vibhatha Mar 4, 2022
dcc0c4e
temp commit to remove files in submodule
vibhatha Mar 19, 2022
3ea6720
adding submodule
vibhatha Mar 19, 2022
f8480ad
updating testing submodule
vibhatha Mar 20, 2022
2261cea
revert to uupstream version
vibhatha Mar 20, 2022
d0a1ade
tmp test code:
vibhatha May 30, 2022
3edf966
tmp fixes on toproto plan
vibhatha May 30, 2022
3c04059
temp
vibhatha Jun 1, 2022
c0dc100
intermediate commit for releation plan
vibhatha Jun 15, 2022
6d3ebd9
adding initial functional Relation deserialization
vibhatha Jun 16, 2022
a8cff44
end-to-end initial test on scan node ToProto
vibhatha Jun 16, 2022
3ec4d32
updating test case (imd)
vibhatha Jun 16, 2022
780e32f
add an end-to-end test case
vibhatha Jun 18, 2022
47fad43
format and cleaned up
vibhatha Jun 20, 2022
7b5defd
removed unnecessary commit from an example
vibhatha Jun 20, 2022
0eb567f
remove unncessary components in exec_plan
vibhatha Jun 20, 2022
c19ebe6
removing unncessary components from plan
vibhatha Jun 20, 2022
0a0fe66
remove unncessary components from serde
vibhatha Jun 20, 2022
fe7cdc2
adding docstring
vibhatha Jun 20, 2022
6548ca6
remove unncessary comment
vibhatha Jun 20, 2022
f941ed2
adding a check for windows test
vibhatha Jun 20, 2022
01e37c9
extending test cases for scan toproto
vibhatha Jun 20, 2022
10047f1
updated toproto relation test case
vibhatha Jun 20, 2022
dadc9fa
update test case
vibhatha Jun 20, 2022
fac7d45
uupdate code on reviews
vibhatha Jun 27, 2022
b180438
adding initial filter ToProto
vibhatha Jun 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions cpp/src/arrow/engine/substrait/relation_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@
#include "arrow/engine/substrait/type_internal.h"
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/make_unique.h"

namespace arrow {
namespace engine {

namespace internal {
using ::arrow::internal::checked_cast;
using ::arrow::internal::make_unique;
} // namespace internal

template <typename RelMessage>
Status CheckRelCommon(const RelMessage& rel) {
if (rel.has_common()) {
Expand Down Expand Up @@ -316,5 +323,133 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
rel.DebugString());
}

namespace {
// TODO: add other types
enum ArrowRelationType : uint8_t {
SCAN,
FILTER,
PROJECT,
JOIN,
AGGREGATE,
};

const std::map<std::string, ArrowRelationType> enum_map{
{"scan", ArrowRelationType::SCAN}, {"filter", ArrowRelationType::FILTER},
{"project", ArrowRelationType::PROJECT}, {"join", ArrowRelationType::JOIN},
{"aggregate", ArrowRelationType::AGGREGATE},
};

struct ExtractRelation {
explicit ExtractRelation(substrait::Rel* rel, ExtensionSet* ext_set)
: rel_(rel), ext_set_(ext_set) {}

Status AddRelation(const compute::Declaration& declaration) {
const std::string& rel_name = declaration.factory_name;
switch (enum_map.find(rel_name)->second) {
case ArrowRelationType::SCAN:
return AddReadRelation(declaration);
case ArrowRelationType::FILTER:
return AddFilterRelation(declaration);
case ArrowRelationType::PROJECT:
return Status::NotImplemented("Project operator not supported.");
case ArrowRelationType::JOIN:
return Status::NotImplemented("Join operator not supported.");
case ArrowRelationType::AGGREGATE:
return Status::NotImplemented("Aggregate operator not supported.");
default:
return Status::Invalid("Unsupported exec node factory name :", rel_name);
}
}

Status AddReadRelation(const compute::Declaration& declaration) {
auto read_rel = internal::make_unique<substrait::ReadRel>();
const auto& scan_node_options =
internal::checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);

auto dataset =
dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
if (dataset == nullptr) {
return Status::Invalid(
"Can only convert file system datasets to a Substrait plan.");
}
// set schema
ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*dataset->schema(), ext_set_));
read_rel->set_allocated_base_schema(named_struct.release());

// set local files
auto read_rel_lfs = internal::make_unique<substrait::ReadRel_LocalFiles>();
for (const auto& file : dataset->files()) {
auto read_rel_lfs_ffs =
internal::make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
read_rel_lfs_ffs->set_uri_path("file://" + file);

// set file format
// arrow and feather are temporarily handled via the Parquet format until
// upgraded to the latest Substrait version.
auto format_type_name = dataset->format()->type_name();
if (format_type_name == "parquet" || format_type_name == "arrow" ||
format_type_name == "feather") {
read_rel_lfs_ffs->set_format(
substrait::ReadRel::LocalFiles::FileOrFiles::FILE_FORMAT_PARQUET);
} else {
return Status::Invalid("Unsupported file type : ", format_type_name);
}
read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release());
}
*read_rel->mutable_local_files() = *read_rel_lfs.get();

rel_->set_allocated_read(read_rel.release());
return Status::OK();
}

Status AddFilterRelation(const compute::Declaration& declaration) {
auto filter_rel = internal::make_unique<substrait::FilterRel>();
const auto& filter_node_options =
internal::checked_cast<const compute::FilterNodeOptions&>(*declaration.options);

if (declaration.inputs.size() == 0) {
return Status::Invalid("Filter node doesn't have an input.");
}

auto input_rel = GetRelationFromDeclaration(declaration, ext_set_);

filter_rel->set_allocated_input(input_rel->release());

ARROW_ASSIGN_OR_RAISE(auto subs_expr,
ToProto(filter_node_options.filter_expression, ext_set_));
*filter_rel->mutable_condition() = *subs_expr.get();

rel_->set_allocated_filter(filter_rel.release());

return Status::OK();
}

Status operator()(const compute::Declaration& declaration) {
return AddRelation(declaration);
}

private:
Result<std::unique_ptr<substrait::Rel>> GetRelationFromDeclaration(
const compute::Declaration declaration, ExtensionSet* ext_set) {
auto declr_input = declaration.inputs[0];
// TODO: figure out a better way
if (util::get_if<compute::ExecNode*>(&declr_input)) {
return Status::NotImplemented("Only support Plans written in Declaration format.");
}
return ToProto(util::get<compute::Declaration>(declr_input), ext_set);
}
substrait::Rel* rel_;
ExtensionSet* ext_set_;
};

} // namespace

Result<std::unique_ptr<substrait::Rel>> ToProto(const compute::Declaration& declaration,
ExtensionSet* ext_set) {
auto out = internal::make_unique<substrait::Rel>();
RETURN_NOT_OK(ExtractRelation(out.get(), ext_set)(declaration));
return std::move(out);
}

} // namespace engine
} // namespace arrow
4 changes: 4 additions & 0 deletions cpp/src/arrow/engine/substrait/relation_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,9 @@ namespace engine {
ARROW_ENGINE_EXPORT
Result<compute::Declaration> FromProto(const substrait::Rel&, const ExtensionSet&);

ARROW_ENGINE_EXPORT
Result<std::unique_ptr<substrait::Rel>> ToProto(const compute::Declaration&,
ExtensionSet*);

} // namespace engine
} // namespace arrow
7 changes: 7 additions & 0 deletions cpp/src/arrow/engine/substrait/serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ Result<Message> ParseFromBuffer(const Buffer& buf) {
return message;
}

Result<std::shared_ptr<Buffer>> SerializeRelation(const compute::Declaration& declaration,
ExtensionSet* ext_set) {
ARROW_ASSIGN_OR_RAISE(auto relation, ToProto(declaration, ext_set));
std::string serialized = relation->SerializeAsString();
return Buffer::FromString(std::move(serialized));
}

Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
const ExtensionSet& ext_set) {
ARROW_ASSIGN_OR_RAISE(auto rel, ParseFromBuffer<substrait::Rel>(buf));
Expand Down
16 changes: 13 additions & 3 deletions cpp/src/arrow/engine/substrait/serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
const Buffer& buf, const ConsumerFactory& consumer_factory,
ExtensionSet* ext_set_out = NULLPTR);

Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
const ConsumerFactory& consumer_factory,
ExtensionSet* ext_set_out = NULLPTR);
ARROW_ENGINE_EXPORT Result<compute::ExecPlan> DeserializePlan(
const Buffer& buf, const ConsumerFactory& consumer_factory,
ExtensionSet* ext_set_out = NULLPTR);

/// \brief Deserializes a Substrait Type message to the corresponding Arrow type
///
Expand Down Expand Up @@ -122,6 +122,16 @@ ARROW_ENGINE_EXPORT
Result<std::shared_ptr<Buffer>> SerializeExpression(const compute::Expression& expr,
ExtensionSet* ext_set);

/// \brief Serializes an Arrow compute Declaration to a Substrait Relation message
///
/// \param[in] declaration the Arrow compute declaration to serialize
/// \param[in,out] ext_set the extension mapping to use; may be updated to add
/// mappings for the components in the used declaration
/// \return a buffer containing the protobuf serialization of the corresponding Substrait
/// Relation message
ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeRelation(
const compute::Declaration& declaration, ExtensionSet* ext_set);

/// \brief Deserializes a Substrait Rel (relation) message to an ExecNode declaration
///
/// \param[in] buf a buffer containing the protobuf serialization of a Substrait
Expand Down
Loading