From 6576061ed312273b7cf57b1f73077aa6dab874ff Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Wed, 18 Jan 2023 16:31:33 -0500 Subject: [PATCH 1/2] Pass full projection expression to ScanNode (but C++ can't handle) --- r/R/arrowExports.R | 9 +++------ r/R/query-engine.R | 10 +++------- r/src/arrowExports.cpp | 19 +++++-------------- r/src/compute-exec.cpp | 23 +++++++++++++---------- r/src/expression.cpp | 19 ------------------- 5 files changed, 24 insertions(+), 56 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 2eeca24dbdc..5e807fbab16 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -460,8 +460,8 @@ ExecNode_output_schema <- function(node) { .Call(`_arrow_ExecNode_output_schema`, node) } -ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) { - .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, materialized_field_names) +ExecNode_Scan <- function(plan, dataset, filter, projection) { + .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection) } ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { @@ -1088,10 +1088,6 @@ compute___expr__is_field_ref <- function(x) { .Call(`_arrow_compute___expr__is_field_ref`, x) } -field_names_in_expression <- function(x) { - .Call(`_arrow_field_names_in_expression`, x) -} - compute___expr__get_field_ref_name <- function(x) { .Call(`_arrow_compute___expr__get_field_ref_name`, x) } @@ -2095,3 +2091,4 @@ SetIOThreadPoolCapacity <- function(threads) { Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } + diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 2f0b421faeb..7a336b7a077 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -34,11 +34,7 @@ ExecPlan <- R6Class("ExecPlan", if (isTRUE(filter)) { filter <- Expression$scalar(TRUE) } - # Use FieldsInExpression to find all from dataset$selected_columns - colnames <- unique(unlist(map( - dataset$selected_columns, - field_names_in_expression - ))) + projection <- dataset$selected_columns dataset <- dataset$.data assert_is(dataset, "Dataset") } else { @@ -46,10 +42,10 @@ ExecPlan <- R6Class("ExecPlan", # Just a dataset, not a query, so there's no predicates to push down # so set some defaults filter <- Expression$scalar(TRUE) - colnames <- names(dataset) + projection <- make_field_refs(colnames) } - out <- ExecNode_Scan(self, dataset, filter, colnames %||% character(0)) + out <- ExecNode_Scan(self, dataset, filter, projection) # Hold onto the source data's schema so we can preserve schema metadata # in the resulting Scan/Write out$extras$source_schema <- dataset$schema diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index e918390e269..dade7626839 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -990,18 +990,18 @@ END_CPP11 } // compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr ExecNode_Scan(const std::shared_ptr& plan, const std::shared_ptr& dataset, const std::shared_ptr& filter, std::vector materialized_field_names); -extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){ +std::shared_ptr ExecNode_Scan(const std::shared_ptr& plan, const std::shared_ptr& dataset, const std::shared_ptr& filter, cpp11::list projection); +extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP projection_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); arrow::r::Input&>::type dataset(dataset_sexp); arrow::r::Input&>::type filter(filter_sexp); - arrow::r::Input>::type materialized_field_names(materialized_field_names_sexp); - return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, materialized_field_names)); + arrow::r::Input::type projection(projection_sexp); + return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, projection)); END_CPP11 } #else -extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){ +extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP projection_sexp){ Rf_error("Cannot call ExecNode_Scan(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -2740,14 +2740,6 @@ BEGIN_CPP11 END_CPP11 } // expression.cpp -std::vector field_names_in_expression(const std::shared_ptr& x); -extern "C" SEXP _arrow_field_names_in_expression(SEXP x_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type x(x_sexp); - return cpp11::as_sexp(field_names_in_expression(x)); -END_CPP11 -} -// expression.cpp std::string compute___expr__get_field_ref_name(const std::shared_ptr& x); extern "C" SEXP _arrow_compute___expr__get_field_ref_name(SEXP x_sexp){ BEGIN_CPP11 @@ -5587,7 +5579,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_compute___expr__equals", (DL_FUNC) &_arrow_compute___expr__equals, 2}, { "_arrow_compute___expr__call", (DL_FUNC) &_arrow_compute___expr__call, 3}, { "_arrow_compute___expr__is_field_ref", (DL_FUNC) &_arrow_compute___expr__is_field_ref, 1}, - { "_arrow_field_names_in_expression", (DL_FUNC) &_arrow_field_names_in_expression, 1}, { "_arrow_compute___expr__get_field_ref_name", (DL_FUNC) &_arrow_compute___expr__get_field_ref_name, 1}, { "_arrow_compute___expr__field_ref", (DL_FUNC) &_arrow_compute___expr__field_ref, 1}, { "_arrow_compute___expr__nested_field_ref", (DL_FUNC) &_arrow_compute___expr__nested_field_ref, 2}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 64ea6f5b5ef..2a2e509c238 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -288,28 +288,31 @@ std::shared_ptr ExecNode_output_schema( std::shared_ptr ExecNode_Scan( const std::shared_ptr& plan, const std::shared_ptr& dataset, - const std::shared_ptr& filter, - std::vector materialized_field_names) { + const std::shared_ptr& filter, cpp11::list projection) { arrow::dataset::internal::Initialize(); // TODO: pass in FragmentScanOptions auto options = std::make_shared(); options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true); - options->dataset_schema = dataset->schema(); + // This filter is only used for predicate pushdown; + // you still need to pass it to a FilterNode after to handle any other components options->filter = *filter; - // ScanNode needs to know which fields to materialize (and which are unnecessary) + // ScanNode needs to know which fields to materialize. + // It will pull them from this projection to prune the scan, + // but you still need to Project after std::vector exprs; - for (const auto& name : materialized_field_names) { - exprs.push_back(compute::field_ref(name)); + for (SEXP expr : projection) { + auto expr_ptr = cpp11::as_cpp>(expr); + exprs.push_back(*expr_ptr); } - - options->projection = - call("make_struct", std::move(exprs), - compute::MakeStructOptions{std::move(materialized_field_names)}); + cpp11::strings field_names(projection.attr(R_NamesSymbol)); + options->projection = call( + "make_struct", std::move(exprs), + compute::MakeStructOptions{cpp11::as_cpp>(field_names)}); return MakeExecNodeOrStop("scan", plan.get(), {}, ds::ScanNodeOptions{dataset, options}); diff --git a/r/src/expression.cpp b/r/src/expression.cpp index d7a511e7600..04b38619637 100644 --- a/r/src/expression.cpp +++ b/r/src/expression.cpp @@ -51,25 +51,6 @@ bool compute___expr__is_field_ref(const std::shared_ptr& x) return x->field_ref() != nullptr; } -// [[arrow::export]] -std::vector field_names_in_expression( - const std::shared_ptr& x) { - std::vector out; - std::vector nested; - - auto field_refs = FieldsInExpression(*x); - for (auto f : field_refs) { - if (f.IsNested()) { - // We keep the top-level field name. - nested = *f.nested_refs(); - out.push_back(*nested[0].name()); - } else { - out.push_back(*f.name()); - } - } - return out; -} - // [[arrow::export]] std::string compute___expr__get_field_ref_name( const std::shared_ptr& x) { From 45e33021206262fe012e261802211aa37039492a Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 19 Jan 2023 12:28:19 -0500 Subject: [PATCH 2/2] Handle nested field refs in scanner.cc (TODO: add test) --- cpp/src/arrow/dataset/scanner.cc | 23 ++++++++++++++++------- r/tests/testthat/test-dplyr-query.R | 1 + 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 0587863eb3c..02590cc8781 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "arrow/array/array_primitive.h" @@ -135,6 +136,7 @@ Result> GetProjectedSchemaFromExpression( const std::shared_ptr& dataset_schema) { // process resultant dataset_schema after projection FieldVector project_fields; + std::set field_names; if (auto call = projection.call()) { if (call->function_name != "make_struct") { return Status::Invalid("Top level projection expression call must be make_struct"); @@ -142,13 +144,11 @@ Result> GetProjectedSchemaFromExpression( for (const compute::Expression& arg : call->arguments) { if (auto field_ref = arg.field_ref()) { if (field_ref->IsName()) { - auto field = dataset_schema->GetFieldByName(*field_ref->name()); - if (field) { - project_fields.push_back(std::move(field)); - } - // if the field is not present in the schema we ignore it. - // the case is if kAugmentedFields are present in the expression - // and if they are not present in the provided schema, we ignore them. + field_names.emplace(*field_ref->name()); + } else if (field_ref->IsNested()) { + // We keep the top-level field name. + auto nested_field_refs = *field_ref->nested_refs(); + field_names.emplace(*nested_field_refs[0].name()); } else { return Status::Invalid( "No projected schema was supplied and we could not infer the projected " @@ -157,6 +157,15 @@ Result> GetProjectedSchemaFromExpression( } } } + for (auto f : field_names) { + auto field = dataset_schema->GetFieldByName(f); + if (field) { + // if the field is not present in the schema we ignore it. + // the case is if kAugmentedFields are present in the expression + // and if they are not present in the provided schema, we ignore them. + project_fields.push_back(std::move(field)); + } + } return schema(project_fields); } diff --git a/r/tests/testthat/test-dplyr-query.R b/r/tests/testthat/test-dplyr-query.R index a91c0b6ccb5..469bcd10aa6 100644 --- a/r/tests/testthat/test-dplyr-query.R +++ b/r/tests/testthat/test-dplyr-query.R @@ -742,6 +742,7 @@ test_that("Can use nested field refs", { ) # Now with Dataset: make sure column pushdown in ScanNode works + skip_if_not_available("dataset") expect_equal( nested_data %>% InMemoryDataset$create() %>%