diff --git a/r/NAMESPACE b/r/NAMESPACE index 0a120dc97a6..a8c8a974d69 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -352,6 +352,7 @@ export(s3_bucket) export(schema) export(set_cpu_count) export(set_io_thread_count) +export(show_exec_plan) export(starts_with) export(string) export(struct) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 6246f7e2ecf..f3e0b817d5f 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -41,7 +41,8 @@ "group_vars", "group_by_drop_default", "ungroup", "mutate", "transmute", "arrange", "rename", "pull", "relocate", "compute", "collapse", "distinct", "left_join", "right_join", "inner_join", "full_join", - "semi_join", "anti_join", "count", "tally", "rename_with", "union", "union_all", "glimpse" + "semi_join", "anti_join", "count", "tally", "rename_with", "union", + "union_all", "glimpse", "show_query", "explain" ) ) for (cl in c("Dataset", "ArrowTabular", "RecordBatchReader", "arrow_dplyr_query")) { diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index dfe0db614ad..7cd2c5dbfc2 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -420,6 +420,10 @@ ExecNode_output_schema <- function(node) { .Call(`_arrow_ExecNode_output_schema`, node) } +ExecPlan_BuildAndShow <- function(plan, final_node, sort_options, head) { + .Call(`_arrow_ExecPlan_BuildAndShow`, plan, final_node, sort_options, head) +} + ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) { .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, materialized_field_names) } diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 1296e603846..c1fb4fef2b8 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -219,6 +219,53 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) { x } +#' Show the details of an Arrow Execution Plan +#' +#' This is a function which gives more details about the logical query plan +#' that will be executed when evaluating an `arrow_dplyr_query` object. +#' It calls the C++ `ExecPlan` object's print method. +#' Functionally, it is similar to `dplyr::explain()`. This function is used as +#' the `dplyr::explain()` and `dplyr::show_query()` methods. +#' +#' @param x an `arrow_dplyr_query` to print the `ExecPlan` for. +#' +#' @return `x`, invisibly. +#' @export +#' +#' @examplesIf arrow_with_dataset() && requireNamespace("dplyr", quietly = TRUE) +#' library(dplyr) +#' mtcars %>% +#' arrow_table() %>% +#' filter(mpg > 20) %>% +#' mutate(x = gear/carb) %>% +#' show_exec_plan() +show_exec_plan <- function(x) { + adq <- as_adq(x) + plan <- ExecPlan$create() + # do not show the plan if we have a nested query (as this will force the + # evaluation of the inner query/queries) + # TODO see if we can remove after ARROW-16628 + if (is_collapsed(x) && has_head_tail(x$.data)) { + warn("The `ExecPlan` cannot be printed for a nested query.") + return(invisible(x)) + } + final_node <- plan$Build(adq) + cat(plan$BuildAndShow(final_node)) + invisible(x) +} + +show_query.arrow_dplyr_query <- function(x, ...) { + show_exec_plan(x) +} + +show_query.Dataset <- show_query.ArrowTabular <- show_query.RecordBatchReader <- show_query.arrow_dplyr_query + +explain.arrow_dplyr_query <- function(x, ...) { + show_exec_plan(x) +} + +explain.Dataset <- explain.ArrowTabular <- explain.RecordBatchReader <- explain.arrow_dplyr_query + ensure_group_vars <- function(x) { if (inherits(x, "arrow_dplyr_query")) { # Before pulling data from Arrow, make sure all group vars are in the projection diff --git a/r/R/query-engine.R b/r/R/query-engine.R index e63fa75ebf1..84360490fdb 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -193,6 +193,8 @@ ExecPlan <- R6Class("ExecPlan", node }, Run = function(node, as_table = FALSE) { + # a section of this code is used by `BuildAndShow()` too - the 2 need to be in sync + # Start of chunk used in `BuildAndShow()` assert_is(node, "ExecNode") # Sorting and head/tail (if sorted) are handled in the SinkNode, @@ -210,6 +212,8 @@ ExecPlan <- R6Class("ExecPlan", sorting$orders <- as.integer(sorting$orders) } + # End of chunk used in `BuildAndShow()` + # If we are going to return a Table anyway, we do this in one step and # entirely in one C++ call to ensure that we can execute user-defined # functions from the worker threads spawned by the ExecPlan. If not, we @@ -273,6 +277,39 @@ ExecPlan <- R6Class("ExecPlan", ... ) }, + # SinkNodes (involved in arrange and/or head/tail operations) are created in + # ExecPlan_run and are not captured by the regulat print method. We take a + # similar approach to expose them before calling the print method. + BuildAndShow = function(node) { + # a section of this code is copied from `Run()` - the 2 need to be in sync + # Start of chunk copied from `Run()` + + assert_is(node, "ExecNode") + + # Sorting and head/tail (if sorted) are handled in the SinkNode, + # created in ExecPlan_run + sorting <- node$extras$sort %||% list() + select_k <- node$extras$head %||% -1L + has_sorting <- length(sorting) > 0 + if (has_sorting) { + if (!is.null(node$extras$tail)) { + # Reverse the sort order and take the top K, then after we'll reverse + # the resulting rows so that it is ordered as expected + sorting$orders <- !sorting$orders + select_k <- node$extras$tail + } + sorting$orders <- as.integer(sorting$orders) + } + + # End of chunk copied from `Run()` + + ExecPlan_BuildAndShow( + self, + node, + sorting, + select_k + ) + }, Stop = function() ExecPlan_StopProducing(self) ) ) diff --git a/r/_pkgdown.yml b/r/_pkgdown.yml index b04cab8195e..8865421c0b2 100644 --- a/r/_pkgdown.yml +++ b/r/_pkgdown.yml @@ -220,6 +220,7 @@ reference: - value_counts - list_compute_functions - register_scalar_function + - show_exec_plan - title: Connections to other systems contents: - to_arrow diff --git a/r/man/show_exec_plan.Rd b/r/man/show_exec_plan.Rd new file mode 100644 index 00000000000..c020838b2ed --- /dev/null +++ b/r/man/show_exec_plan.Rd @@ -0,0 +1,31 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dplyr.R +\name{show_exec_plan} +\alias{show_exec_plan} +\title{Show the details of an Arrow Execution Plan} +\usage{ +show_exec_plan(x) +} +\arguments{ +\item{x}{an \code{arrow_dplyr_query} to print the \code{ExecPlan} for.} +} +\value{ +\code{x}, invisibly. +} +\description{ +This is a function which gives more details about the logical query plan +that will be executed when evaluating an \code{arrow_dplyr_query} object. +It calls the C++ \code{ExecPlan} object's print method. +Functionally, it is similar to \code{dplyr::explain()}. This function is used as +the \code{dplyr::explain()} and \code{dplyr::show_query()} methods. +} +\examples{ +\dontshow{if (arrow_with_dataset() && requireNamespace("dplyr", quietly = TRUE)) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +library(dplyr) +mtcars \%>\% + arrow_table() \%>\% + filter(mpg > 20) \%>\% + mutate(x = gear/carb) \%>\% + show_exec_plan() +\dontshow{\}) # examplesIf} +} diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index fd9f92e5d1a..dc96af41d4f 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -910,6 +910,17 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp +std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, int64_t head); +extern "C" SEXP _arrow_ExecPlan_BuildAndShow(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP head_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type plan(plan_sexp); + arrow::r::Input&>::type final_node(final_node_sexp); + arrow::r::Input::type sort_options(sort_options_sexp); + arrow::r::Input::type head(head_sexp); + return cpp11::as_sexp(ExecPlan_BuildAndShow(plan, final_node, sort_options, head)); +END_CPP11 +} +// compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr ExecNode_Scan(const std::shared_ptr& plan, const std::shared_ptr& dataset, const std::shared_ptr& filter, std::vector materialized_field_names); extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){ @@ -5270,6 +5281,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecPlan_read_table", (DL_FUNC) &_arrow_ExecPlan_read_table, 5}, { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, + { "_arrow_ExecPlan_BuildAndShow", (DL_FUNC) &_arrow_ExecPlan_BuildAndShow, 4}, { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index e348675fc17..91d646f0a3c 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -60,6 +60,10 @@ std::pair, std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { + // a section of this code is copied and used in ExecPlan_BuildAndShow - the 2 need + // to be in sync + // Start of chunk used in ExecPlan_BuildAndShow + // For now, don't require R to construct SinkNodes. // Instead, just pass the node we should collect as an argument. arrow::AsyncGenerator> sink_gen; @@ -88,6 +92,8 @@ ExecPlan_prepare(const std::shared_ptr& plan, compute::SinkNodeOptions{&sink_gen}); } + // End of chunk used in ExecPlan_BuildAndShow + StopIfNotOk(plan->Validate()); // If the generator is destroyed before being completely drained, inform plan @@ -155,6 +161,46 @@ std::shared_ptr ExecNode_output_schema( return node->output_schema(); } +// [[arrow::export]] +std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan, + const std::shared_ptr& final_node, + cpp11::list sort_options, int64_t head = -1) { + // a section of this code is copied from ExecPlan_prepare - the 2 need to be in sync + // Start of chunk copied from ExecPlan_prepare + + // For now, don't require R to construct SinkNodes. + // Instead, just pass the node we should collect as an argument. + arrow::AsyncGenerator> sink_gen; + + // Sorting uses a different sink node; there is no general sort yet + if (sort_options.size() > 0) { + if (head >= 0) { + // Use the SelectK node to take only what we need + MakeExecNodeOrStop( + "select_k_sink", plan.get(), {final_node.get()}, + compute::SelectKSinkNodeOptions{ + arrow::compute::SelectKOptions( + head, std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)) + ->sort_keys), + &sink_gen}); + } else { + MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()}, + compute::OrderBySinkNodeOptions{ + *std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)), + &sink_gen}); + } + } else { + MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, + compute::SinkNodeOptions{&sink_gen}); + } + + // End of chunk copied from ExecPlan_prepare + + return plan->ToString(); +} + #if defined(ARROW_R_WITH_DATASET) #include diff --git a/r/tests/testthat/test-dataset-dplyr.R b/r/tests/testthat/test-dataset-dplyr.R index fecda56c6c2..b6982939ee3 100644 --- a/r/tests/testthat/test-dataset-dplyr.R +++ b/r/tests/testthat/test-dataset-dplyr.R @@ -340,3 +340,80 @@ test_that("dplyr method not implemented messages", { fixed = TRUE ) }) + +test_that("show_exec_plan(), show_query() and explain() with datasets", { + # show_query() and explain() are wrappers around show_exec_plan() and are not + # tested separately + + ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8())) + + # minimal test + expect_output( + ds %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "ProjectNode.*", # output columns + "SourceNode" # entry point + ) + ) + + # filter and select + expect_output( + ds %>% + select(string = chr, integer = int, part) %>% + filter(integer > 6L & part == 1) %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "ProjectNode.*", # output columns + "FilterNode.*", # filter node + "int > 6.*cast.*", # filtering expressions + auto-casting of part + "SourceNode" # entry point + ) + ) + + # group_by and summarise + expect_output( + ds %>% + group_by(part) %>% + summarise(avg = mean(int)) %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "ProjectNode.*", # output columns + "GroupByNode.*", # group by node + "keys=.*part.*", # key for aggregations + "aggregates=.*hash_mean.*", # aggregations + "ProjectNode.*", # input columns + "SourceNode" # entry point + ) + ) + + # arrange and head + expect_output( + ds %>% + filter(lgl) %>% + arrange(chr) %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "OrderBySinkNode.*chr.*ASC.*", # arrange goes via the OrderBy sink node + "ProjectNode.*", # output columns + "FilterNode.*", # filter node + "filter=lgl.*", # filtering expression + "SourceNode" # entry point + ) + ) + + # printing the ExecPlan for a nested query would currently force the + # evaluation of the inner one(s), which we want to avoid => no output + expect_warning( + ds %>% + filter(lgl) %>% + arrange(chr) %>% + head() %>% + show_exec_plan(), + "The `ExecPlan` cannot be printed for a nested query." + ) +}) diff --git a/r/tests/testthat/test-dplyr-query.R b/r/tests/testthat/test-dplyr-query.R index d988d1d7529..37ab178cbb4 100644 --- a/r/tests/testthat/test-dplyr-query.R +++ b/r/tests/testthat/test-dplyr-query.R @@ -436,3 +436,126 @@ test_that("query_can_stream()", { query_can_stream() ) }) + +test_that("show_exec_plan(), show_query() and explain()", { + # show_query() and explain() are wrappers around show_exec_plan() and are not + # tested separately + + # minimal test - this fails if we don't coerce the input to `show_exec_plan()` + # to be an `arrow_dplyr_query` + expect_output( + mtcars %>% + arrow_table() %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "ProjectNode.*", # output columns + "TableSourceNode" # entry point + ) + ) + + # arrow_table and mutate + expect_output( + tbl %>% + arrow_table() %>% + filter(dbl > 2, chr != "e") %>% + select(chr, int, lgl) %>% + mutate(int_plus_ten = int + 10) %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "chr, int, lgl, \"int_plus_ten\".*", # selected columns + "FilterNode.*", # filter node + "(dbl > 2).*", # filter expressions + "chr != \"e\".*", + "TableSourceNode" # entry point + ) + ) + + # record_batch and mutate + expect_output( + tbl %>% + record_batch() %>% + filter(dbl > 2, chr != "e") %>% + select(chr, int, lgl) %>% + mutate(int_plus_ten = int + 10) %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "chr, int, lgl, \"int_plus_ten\".*", # selected columns + "(dbl > 2).*", # the filter expressions + "chr != \"e\".*", + "TableSourceNode" # the entry point" + ) + ) + + # test with group_by and summarise + expect_output( + tbl %>% + arrow_table() %>% + group_by(lgl) %>% + summarise(avg = mean(dbl, na.rm = TRUE)) %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "ProjectNode.*", # output columns + "GroupByNode.*", # the group_by statement + "keys=.*lgl.*", # the key for the aggregations + "aggregates=.*hash_mean.*avg.*", # the aggregations + "ProjectNode.*", # the input columns + "TableSourceNode" # the entry point + ) + ) + + # test with join + expect_output( + tbl %>% + arrow_table() %>% + left_join( + example_data %>% + arrow_table() %>% + mutate(doubled_dbl = dbl * 2) %>% + select(int, doubled_dbl), + by = "int" + ) %>% + select(int, verses, doubled_dbl) %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "ProjectNode.*", # output columns + "HashJoinNode.*", # the join + "ProjectNode.*", # input columns for the second table + "\"doubled_dbl\"\\: multiply_checked\\(dbl, 2\\).*", # mutate + "TableSourceNode.*", # second table + "ProjectNode.*", # input columns for the first table + "TableSourceNode" # first table + ) + ) + + expect_output( + mtcars %>% + arrow_table() %>% + filter(mpg > 20) %>% + arrange(desc(wt)) %>% + show_exec_plan(), + regexp = paste0( + "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan + "OrderBySinkNode.*wt.*DESC.*", # arrange goes via the OrderBy sink node + "ProjectNode.*", # output columns + "FilterNode.*", # filter node + "TableSourceNode.*" # entry point + ) + ) + + # printing the ExecPlan for a nested query would currently force the + # evaluation of the inner one(s), which we want to avoid => no output + expect_warning( + mtcars %>% + arrow_table() %>% + filter(mpg > 20) %>% + arrange(desc(wt)) %>% + head(3) %>% + show_exec_plan(), + "The `ExecPlan` cannot be printed for a nested query." + ) +})