diff --git a/r/NAMESPACE b/r/NAMESPACE index ae06e8e03aa..bfb1056dff7 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -200,6 +200,7 @@ export(arrow_available) export(arrow_info) export(arrow_table) export(arrow_with_dataset) +export(arrow_with_engine) export(arrow_with_json) export(arrow_with_parquet) export(arrow_with_s3) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 9424fc9228d..17748848397 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -136,6 +136,14 @@ arrow_with_dataset <- function() { }) } +#' @rdname arrow_available +#' @export +arrow_with_engine <- function() { + tryCatch(.Call(`_engine_available`), error = function(e) { + return(FALSE) + }) +} + on_old_windows <- function() { is_32bit <- .Machine$sizeof.pointer < 8 is_old_r <- getRversion() < "4.0.0" @@ -203,6 +211,7 @@ arrow_info <- function() { out <- c(out, list( capabilities = c( dataset = arrow_with_dataset(), + engine = arrow_with_engine(), parquet = arrow_with_parquet(), json = arrow_with_json(), s3 = arrow_with_s3(), diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 145f2e4e285..77f23657f5f 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -444,6 +444,18 @@ ExecNode_TableSourceNode <- function(plan, table) { .Call(`_arrow_ExecNode_TableSourceNode`, plan, table) } +engine__internal__SubstraitToJSON <- function(serialized_plan) { + .Call(`_arrow_engine__internal__SubstraitToJSON`, serialized_plan) +} + +engine__internal__SubstraitFromJSON <- function(substrait_json) { + .Call(`_arrow_engine__internal__SubstraitFromJSON`, substrait_json) +} + +ExecPlan_run_substrait <- function(plan, serialized_plan, out_names) { + .Call(`_arrow_ExecPlan_run_substrait`, plan, serialized_plan, out_names) +} + RecordBatch__cast <- function(batch, schema, options) { .Call(`_arrow_RecordBatch__cast`, batch, schema, options) } diff --git a/r/R/query-engine.R b/r/R/query-engine.R index ee7a26b2cec..429ee2f50e9 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -304,3 +304,16 @@ ExecNode <- R6Class("ExecNode", schema = function() ExecNode_output_schema(self) ) ) + +do_exec_plan_substrait <- function(substrait_plan, output_names) { + if (is.string(substrait_plan)) { + substrait_plan <- engine__internal__SubstraitFromJSON(substrait_plan) + } else if (is.raw(substrait_plan)) { + substrait_plan <- buffer(substrait_plan) + } else { + abort("`substrait_plan` must be a JSON string or raw() vector") + } + + plan <- ExecPlan$create() + ExecPlan_run_substrait(plan, substrait_plan, output_names) +} diff --git a/r/configure b/r/configure index 18e3339fdd2..817bdb456d9 100755 --- a/r/configure +++ b/r/configure @@ -244,6 +244,14 @@ if [ $? -eq 0 ]; then # NOTE: arrow-dataset is assumed to have the same -L flag as arrow # so there is no need to add its location to PKG_DIRS fi + # Check for Arrow Engine subcomponent + grep -i 'set(ARROW_ENGINE "ON")' $ARROW_OPTS_CMAKE >/dev/null 2>&1 + if [ $? -eq 0 ]; then + PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_ENGINE" + PKG_LIBS="-larrow_engine $PKG_LIBS" + # NOTE: arrow-engine is assumed to have the same -L flag as arrow + # so there is no need to add its location to PKG_DIRS + fi # Check for S3 grep -i 'set(ARROW_S3 "ON")' $ARROW_OPTS_CMAKE >/dev/null 2>&1 if [ $? -eq 0 ]; then diff --git a/r/configure.win b/r/configure.win index 6c31be4521b..45e04613810 100644 --- a/r/configure.win +++ b/r/configure.win @@ -78,7 +78,7 @@ function configure_release() { # Returns 1 if CMAKE options is set "ON", otherwise 0 function cmake_option() { ARROW_OPTS_CMAKE="$ARROW_HOME/lib/cmake/arrow/ArrowOptions.cmake" - grep -cm1 "set($1 \"ON\")" $ARROW_OPTS_CMAKE + grep -cm1 "set($1 \"ON\")" $ARROW_OPTS_CMAKE } function configure_dev() { @@ -90,7 +90,7 @@ function configure_dev() { PKG_CFLAGS="-DARROW_R_WITH_ARROW" - if [ $(cmake_option ARROW_PARQUET) -eq 1 ]; then + if [ $(cmake_option ARROW_PARQUET) -eq 1 ]; then PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_PARQUET" PKG_CONFIG_PACKAGES="$PKG_CONFIG_PACKAGES parquet" fi @@ -108,6 +108,11 @@ function configure_dev() { PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_JSON" fi + if [ $(cmake_option ARROW_ENGINE) -eq 1 ]; then + PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_ENGINE" + PKG_CONFIG_PACKAGES="$PKG_CONFIG_PACKAGES arrow-engine" + fi + PKG_CFLAGS="$(pkg-config --cflags $PKG_CONFIG_PACKAGES) $PKG_CFLAGS" PKG_LIBS=$(pkg-config --libs $PKG_CONFIG_PACKAGES) } diff --git a/r/data-raw/codegen.R b/r/data-raw/codegen.R index febb20318dc..4b0e31ca8f0 100644 --- a/r/data-raw/codegen.R +++ b/r/data-raw/codegen.R @@ -44,7 +44,7 @@ # Ensure that all machines are sorting the same way invisible(Sys.setlocale("LC_COLLATE", "C")) -features <- c("arrow", "dataset", "parquet", "s3", "json") +features <- c("arrow", "dataset", "engine", "parquet", "s3", "json") suppressPackageStartupMessages({ library(decor) diff --git a/r/man/arrow_available.Rd b/r/man/arrow_available.Rd index 3061d10dc9c..3ff4fcd26eb 100644 --- a/r/man/arrow_available.Rd +++ b/r/man/arrow_available.Rd @@ -3,6 +3,7 @@ \name{arrow_available} \alias{arrow_available} \alias{arrow_with_dataset} +\alias{arrow_with_engine} \alias{arrow_with_parquet} \alias{arrow_with_s3} \alias{arrow_with_json} @@ -12,6 +13,8 @@ arrow_available() arrow_with_dataset() +arrow_with_engine() + arrow_with_parquet() arrow_with_s3() diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 549c232600a..920a52ea6ea 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1697,7 +1697,7 @@ extern "C" SEXP _arrow_ExecNode_Aggregate(SEXP input_sexp, SEXP options_sexp, SE // compute-exec.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr ExecNode_Join(const std::shared_ptr& input, int type, const std::shared_ptr& right_data, std::vector left_keys, std::vector right_keys, std::vector left_output, std::vector right_output, std::string output_suffix_for_left, std::string output_suffix_for_right); - extern "C" SEXP _arrow_ExecNode_Join(SEXP input_sexp, SEXP type_sexp, SEXP right_data_sexp, SEXP left_keys_sexp, SEXP right_keys_sexp, SEXP left_output_sexp, SEXP right_output_sexp, SEXP output_suffix_for_left_sexp, SEXP output_suffix_for_right_sexp){ +extern "C" SEXP _arrow_ExecNode_Join(SEXP input_sexp, SEXP type_sexp, SEXP right_data_sexp, SEXP left_keys_sexp, SEXP right_keys_sexp, SEXP left_output_sexp, SEXP right_output_sexp, SEXP output_suffix_for_left_sexp, SEXP output_suffix_for_right_sexp){ BEGIN_CPP11 arrow::r::Input&>::type input(input_sexp); arrow::r::Input::type type(type_sexp); @@ -1749,6 +1749,53 @@ extern "C" SEXP _arrow_ExecNode_TableSourceNode(SEXP plan_sexp, SEXP table_sexp) } #endif +// compute-exec.cpp +#if defined(ARROW_R_WITH_ENGINE) +std::string engine__internal__SubstraitToJSON(const std::shared_ptr& serialized_plan); +extern "C" SEXP _arrow_engine__internal__SubstraitToJSON(SEXP serialized_plan_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type serialized_plan(serialized_plan_sexp); + return cpp11::as_sexp(engine__internal__SubstraitToJSON(serialized_plan)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_engine__internal__SubstraitToJSON(SEXP serialized_plan_sexp){ + Rf_error("Cannot call engine__internal__SubstraitToJSON(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// compute-exec.cpp +#if defined(ARROW_R_WITH_ENGINE) +std::shared_ptr engine__internal__SubstraitFromJSON(std::string substrait_json); +extern "C" SEXP _arrow_engine__internal__SubstraitFromJSON(SEXP substrait_json_sexp){ +BEGIN_CPP11 + arrow::r::Input::type substrait_json(substrait_json_sexp); + return cpp11::as_sexp(engine__internal__SubstraitFromJSON(substrait_json)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_engine__internal__SubstraitFromJSON(SEXP substrait_json_sexp){ + Rf_error("Cannot call engine__internal__SubstraitFromJSON(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// compute-exec.cpp +#if defined(ARROW_R_WITH_ENGINE) +std::shared_ptr ExecPlan_run_substrait(const std::shared_ptr& plan, const std::shared_ptr& serialized_plan, cpp11::strings out_names); +extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp, SEXP out_names_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type plan(plan_sexp); + arrow::r::Input&>::type serialized_plan(serialized_plan_sexp); + arrow::r::Input::type out_names(out_names_sexp); + return cpp11::as_sexp(ExecPlan_run_substrait(plan, serialized_plan, out_names)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp, SEXP out_names_sexp){ + Rf_error("Cannot call ExecPlan_run_substrait(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // compute.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr RecordBatch__cast(const std::shared_ptr& batch, const std::shared_ptr& schema, cpp11::list options); @@ -7608,6 +7655,15 @@ return Rf_ScalarLogical( #endif ); } +extern "C" SEXP _engine_available() { +return Rf_ScalarLogical( +#if defined(ARROW_R_WITH_ENGINE) + TRUE +#else + FALSE +#endif +); +} extern "C" SEXP _parquet_available() { return Rf_ScalarLogical( #if defined(ARROW_R_WITH_PARQUET) @@ -7638,6 +7694,7 @@ return Rf_ScalarLogical( static const R_CallMethodDef CallEntries[] = { { "_arrow_available", (DL_FUNC)& _arrow_available, 0 }, { "_dataset_available", (DL_FUNC)& _dataset_available, 0 }, + { "_engine_available", (DL_FUNC)& _engine_available, 0 }, { "_parquet_available", (DL_FUNC)& _parquet_available, 0 }, { "_s3_available", (DL_FUNC)& _s3_available, 0 }, { "_json_available", (DL_FUNC)& _json_available, 0 }, @@ -7752,6 +7809,9 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecNode_Join", (DL_FUNC) &_arrow_ExecNode_Join, 9}, { "_arrow_ExecNode_SourceNode", (DL_FUNC) &_arrow_ExecNode_SourceNode, 2}, { "_arrow_ExecNode_TableSourceNode", (DL_FUNC) &_arrow_ExecNode_TableSourceNode, 2}, + { "_arrow_engine__internal__SubstraitToJSON", (DL_FUNC) &_arrow_engine__internal__SubstraitToJSON, 1}, + { "_arrow_engine__internal__SubstraitFromJSON", (DL_FUNC) &_arrow_engine__internal__SubstraitFromJSON, 1}, + { "_arrow_ExecPlan_run_substrait", (DL_FUNC) &_arrow_ExecPlan_run_substrait, 3}, { "_arrow_RecordBatch__cast", (DL_FUNC) &_arrow_RecordBatch__cast, 3}, { "_arrow_Table__cast", (DL_FUNC) &_arrow_Table__cast, 3}, { "_arrow_compute__CallFunction", (DL_FUNC) &_arrow_compute__CallFunction, 3}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index f46c3cefb36..0e52c0a3981 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -291,4 +291,102 @@ std::shared_ptr ExecNode_TableSourceNode( return MakeExecNodeOrStop("table_source", plan.get(), {}, options); } +#if defined(ARROW_R_WITH_ENGINE) + +#include + +// Just for example usage until a C++ method is available that implements +// a RecordBatchReader output (ARROW-15849) +class AccumulatingConsumer : public compute::SinkNodeConsumer { + public: + explicit AccumulatingConsumer(const std::vector& schema_names) + : schema_names_(schema_names) {} + + const std::vector>& batches() { return batches_; } + + arrow::Status Consume(compute::ExecBatch batch) override { + arrow::SchemaBuilder builder; + auto descriptors = batch.GetDescriptors(); + for (int64_t i = 0; i < schema_names_.size(); i++) { + if (i == (descriptors.size() - 1)) { + break; + } + + RETURN_NOT_OK(builder.AddField( + std::make_shared(schema_names_[i], descriptors[i].type))); + } + + auto schema = builder.Finish(); + RETURN_NOT_OK(schema); + + auto record_batch = batch.ToRecordBatch(schema.ValueUnsafe()); + ARROW_RETURN_NOT_OK(record_batch); + batches_.push_back(record_batch.ValueUnsafe()); + + return arrow::Status::OK(); + } + + arrow::Future<> Finish() override { return arrow::Future<>::MakeFinished(); } + + private: + std::vector schema_names_; + std::vector> batches_; +}; + +// Expose these so that it's easier to write tests + +// [[engine::export]] +std::string engine__internal__SubstraitToJSON( + const std::shared_ptr& serialized_plan) { + return ValueOrStop(arrow::engine::internal::SubstraitToJSON("Plan", *serialized_plan)); +} + +// [[engine::export]] +std::shared_ptr engine__internal__SubstraitFromJSON( + std::string substrait_json) { + return ValueOrStop(arrow::engine::internal::SubstraitFromJSON("Plan", substrait_json)); +} + +// [[engine::export]] +std::shared_ptr ExecPlan_run_substrait( + const std::shared_ptr& plan, + const std::shared_ptr& serialized_plan, cpp11::strings out_names) { + std::vector> consumers; + std::vector out_names_string; + for (const auto& item : out_names) { + out_names_string.push_back(item); + } + + std::function()> consumer_factory = [&] { + consumers.emplace_back(new AccumulatingConsumer(out_names_string)); + return consumers.back(); + }; + + arrow::Result> maybe_decls = + ValueOrStop(arrow::engine::DeserializePlan(*serialized_plan, consumer_factory)); + std::vector decls = std::move(ValueOrStop(maybe_decls)); + + // For now, the Substrait plan must include a 'read' that points to + // a Parquet file (instead of using a source node create in Arrow) + for (const compute::Declaration& decl : decls) { + auto node = decl.AddToPlan(plan.get()); + StopIfNotOk(node.status()); + } + + StopIfNotOk(plan->Validate()); + StopIfNotOk(plan->StartProducing()); + StopIfNotOk(plan->finished().status()); + + std::vector> all_batches; + for (const auto& consumer : consumers) { + for (const auto& batch : consumer->batches()) { + all_batches.push_back(batch); + } + } + + return ValueOrStop(arrow::Table::FromRecordBatches(std::move(all_batches))); +} + +#endif + #endif diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R new file mode 100644 index 00000000000..8293267d7cc --- /dev/null +++ b/r/tests/testthat/test-query-engine.R @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +test_that("do_exec_plan_substrait can evaluate a simple plan", { + skip_if_not_available("engine") + + df <- data.frame(i = 1:5, b = rep_len(c(TRUE, FALSE), 5)) + table <- arrow_table(df, schema = schema(i = int64(), b = bool())) + + tf <- tempfile() + on.exit(unlink(tf)) + write_parquet(table, tf) + + substrait_json <- sprintf('{ + "relations": [ + {"rel": { + "read": { + "base_schema": { + "struct": { + "types": [ {"i64": {}}, {"bool": {}} ] + }, + "names": ["i", "b"] + }, + "local_files": { + "items": [ + { + "uri_file": "file://%s", + "format": "FILE_FORMAT_PARQUET" + } + ] + } + } + }} + ] + }', tf) + + substrait_buffer <- engine__internal__SubstraitFromJSON(substrait_json) + expect_r6_class(substrait_buffer, "Buffer") + substrait_raw <- as.raw(substrait_buffer) + + substrait_json_roundtrip <- engine__internal__SubstraitToJSON(substrait_buffer) + expect_match(substrait_json_roundtrip, tf, fixed = TRUE) + + result <- do_exec_plan_substrait(substrait_json, names(df)) + expect_identical( + tibble::as_tibble(result), + tibble::as_tibble(df) + ) +})