Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ export(arrow_available)
export(arrow_info)
export(arrow_table)
export(arrow_with_dataset)
export(arrow_with_engine)
export(arrow_with_json)
export(arrow_with_parquet)
export(arrow_with_s3)
Expand Down
9 changes: 9 additions & 0 deletions r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ arrow_with_dataset <- function() {
})
}

#' @rdname arrow_available
#' @export
arrow_with_engine <- function() {
tryCatch(.Call(`_engine_available`), error = function(e) {
return(FALSE)
})
}

on_old_windows <- function() {
is_32bit <- .Machine$sizeof.pointer < 8
is_old_r <- getRversion() < "4.0.0"
Expand Down Expand Up @@ -203,6 +211,7 @@ arrow_info <- function() {
out <- c(out, list(
capabilities = c(
dataset = arrow_with_dataset(),
engine = arrow_with_engine(),
parquet = arrow_with_parquet(),
json = arrow_with_json(),
s3 = arrow_with_s3(),
Expand Down
12 changes: 12 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,16 @@ ExecNode <- R6Class("ExecNode",
schema = function() ExecNode_output_schema(self)
)
)

do_exec_plan_substrait <- function(substrait_plan, output_names) {
if (is.string(substrait_plan)) {
substrait_plan <- engine__internal__SubstraitFromJSON(substrait_plan)
} else if (is.raw(substrait_plan)) {
substrait_plan <- buffer(substrait_plan)
} else {
abort("`substrait_plan` must be a JSON string or raw() vector")
}

plan <- ExecPlan$create()
ExecPlan_run_substrait(plan, substrait_plan, output_names)
}
8 changes: 8 additions & 0 deletions r/configure
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,14 @@ if [ $? -eq 0 ]; then
# NOTE: arrow-dataset is assumed to have the same -L flag as arrow
# so there is no need to add its location to PKG_DIRS
fi
# Check for Arrow Engine subcomponent
grep -i 'set(ARROW_ENGINE "ON")' $ARROW_OPTS_CMAKE >/dev/null 2>&1
if [ $? -eq 0 ]; then
PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_ENGINE"
PKG_LIBS="-larrow_engine $PKG_LIBS"
# NOTE: arrow-engine is assumed to have the same -L flag as arrow
# so there is no need to add its location to PKG_DIRS
fi
Comment on lines +248 to +254
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In configure.win, could we add the following to configure_dev()?

  if [ $(cmake_option ARROW_ENGINE) -eq 1 ]; then
    PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_ENGINE"
    PKG_CONFIG_PACKAGES="$PKG_CONFIG_PACKAGES arrow-engine"
  fi

(I wouldn't worry about configure_release, since there are corresponding CI changes that would need to be handled there).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I really need to get a Windows environment up and running so that I notice these kinds of things.

# Check for S3
grep -i 'set(ARROW_S3 "ON")' $ARROW_OPTS_CMAKE >/dev/null 2>&1
if [ $? -eq 0 ]; then
Expand Down
9 changes: 7 additions & 2 deletions r/configure.win
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function configure_release() {
# Returns 1 if CMAKE options is set "ON", otherwise 0
function cmake_option() {
ARROW_OPTS_CMAKE="$ARROW_HOME/lib/cmake/arrow/ArrowOptions.cmake"
grep -cm1 "set($1 \"ON\")" $ARROW_OPTS_CMAKE
grep -cm1 "set($1 \"ON\")" $ARROW_OPTS_CMAKE
}

function configure_dev() {
Expand All @@ -90,7 +90,7 @@ function configure_dev() {

PKG_CFLAGS="-DARROW_R_WITH_ARROW"

if [ $(cmake_option ARROW_PARQUET) -eq 1 ]; then
if [ $(cmake_option ARROW_PARQUET) -eq 1 ]; then
PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_PARQUET"
PKG_CONFIG_PACKAGES="$PKG_CONFIG_PACKAGES parquet"
fi
Expand All @@ -108,6 +108,11 @@ function configure_dev() {
PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_JSON"
fi

if [ $(cmake_option ARROW_ENGINE) -eq 1 ]; then
PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_ENGINE"
PKG_CONFIG_PACKAGES="$PKG_CONFIG_PACKAGES arrow-engine"
fi

PKG_CFLAGS="$(pkg-config --cflags $PKG_CONFIG_PACKAGES) $PKG_CFLAGS"
PKG_LIBS=$(pkg-config --libs $PKG_CONFIG_PACKAGES)
}
Expand Down
2 changes: 1 addition & 1 deletion r/data-raw/codegen.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
# Ensure that all machines are sorting the same way
invisible(Sys.setlocale("LC_COLLATE", "C"))

features <- c("arrow", "dataset", "parquet", "s3", "json")
features <- c("arrow", "dataset", "engine", "parquet", "s3", "json")

suppressPackageStartupMessages({
library(decor)
Expand Down
3 changes: 3 additions & 0 deletions r/man/arrow_available.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 61 additions & 1 deletion r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

98 changes: 98 additions & 0 deletions r/src/compute-exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,102 @@ std::shared_ptr<compute::ExecNode> ExecNode_TableSourceNode(
return MakeExecNodeOrStop("table_source", plan.get(), {}, options);
}

#if defined(ARROW_R_WITH_ENGINE)

#include <arrow/engine/api.h>

// Just for example usage until a C++ method is available that implements
// a RecordBatchReader output (ARROW-15849)
class AccumulatingConsumer : public compute::SinkNodeConsumer {
public:
explicit AccumulatingConsumer(const std::vector<std::string>& schema_names)
: schema_names_(schema_names) {}

const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches() { return batches_; }

arrow::Status Consume(compute::ExecBatch batch) override {
arrow::SchemaBuilder builder;
auto descriptors = batch.GetDescriptors();
for (int64_t i = 0; i < schema_names_.size(); i++) {
if (i == (descriptors.size() - 1)) {
break;
}

RETURN_NOT_OK(builder.AddField(
std::make_shared<arrow::Field>(schema_names_[i], descriptors[i].type)));
}

auto schema = builder.Finish();
RETURN_NOT_OK(schema);

auto record_batch = batch.ToRecordBatch(schema.ValueUnsafe());
ARROW_RETURN_NOT_OK(record_batch);
batches_.push_back(record_batch.ValueUnsafe());

return arrow::Status::OK();
}

arrow::Future<> Finish() override { return arrow::Future<>::MakeFinished(); }

private:
std::vector<std::string> schema_names_;
std::vector<std::shared_ptr<arrow::RecordBatch>> batches_;
};

// Expose these so that it's easier to write tests

// [[engine::export]]
std::string engine__internal__SubstraitToJSON(
const std::shared_ptr<arrow::Buffer>& serialized_plan) {
return ValueOrStop(arrow::engine::internal::SubstraitToJSON("Plan", *serialized_plan));
}

// [[engine::export]]
std::shared_ptr<arrow::Buffer> engine__internal__SubstraitFromJSON(
std::string substrait_json) {
return ValueOrStop(arrow::engine::internal::SubstraitFromJSON("Plan", substrait_json));
}

// [[engine::export]]
std::shared_ptr<arrow::Table> ExecPlan_run_substrait(
const std::shared_ptr<compute::ExecPlan>& plan,
const std::shared_ptr<arrow::Buffer>& serialized_plan, cpp11::strings out_names) {
std::vector<std::shared_ptr<AccumulatingConsumer>> consumers;
std::vector<std::string> out_names_string;
for (const auto& item : out_names) {
out_names_string.push_back(item);
}

std::function<std::shared_ptr<compute::SinkNodeConsumer>()> consumer_factory = [&] {
consumers.emplace_back(new AccumulatingConsumer(out_names_string));
return consumers.back();
};

arrow::Result<std::vector<compute::Declaration>> maybe_decls =
ValueOrStop(arrow::engine::DeserializePlan(*serialized_plan, consumer_factory));
std::vector<compute::Declaration> decls = std::move(ValueOrStop(maybe_decls));

// For now, the Substrait plan must include a 'read' that points to
// a Parquet file (instead of using a source node create in Arrow)
for (const compute::Declaration& decl : decls) {
auto node = decl.AddToPlan(plan.get());
StopIfNotOk(node.status());
}

StopIfNotOk(plan->Validate());
StopIfNotOk(plan->StartProducing());
StopIfNotOk(plan->finished().status());

std::vector<std::shared_ptr<arrow::RecordBatch>> all_batches;
for (const auto& consumer : consumers) {
for (const auto& batch : consumer->batches()) {
all_batches.push_back(batch);
}
}

return ValueOrStop(arrow::Table::FromRecordBatches(std::move(all_batches)));
}

#endif

#endif
Loading