diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index a318c7a4f37..b3dd3a96018 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -444,8 +444,8 @@ ExecPlanReader__PlanStatus <- function(reader) { .Call(`_arrow_ExecPlanReader__PlanStatus`, reader) } -ExecPlan_run <- function(plan, final_node, sort_options, metadata, head) { - .Call(`_arrow_ExecPlan_run`, plan, final_node, sort_options, metadata, head) +ExecPlan_run <- function(plan, final_node, metadata) { + .Call(`_arrow_ExecPlan_run`, plan, final_node, metadata) } ExecPlan_ToString <- function(plan) { @@ -460,6 +460,10 @@ ExecNode_output_schema <- function(node) { .Call(`_arrow_ExecNode_output_schema`, node) } +ExecNode_has_ordered_batches <- function(node) { + .Call(`_arrow_ExecNode_has_ordered_batches`, node) +} + ExecNode_Scan <- function(plan, dataset, filter, projection) { .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection) } @@ -488,6 +492,14 @@ ExecNode_Union <- function(input, right_data) { .Call(`_arrow_ExecNode_Union`, input, right_data) } +ExecNode_Fetch <- function(input, offset, limit) { + .Call(`_arrow_ExecNode_Fetch`, input, offset, limit) +} + +ExecNode_OrderBy <- function(input, sort_options) { + .Call(`_arrow_ExecNode_OrderBy`, input, sort_options) +} + ExecNode_SourceNode <- function(plan, reader) { .Call(`_arrow_ExecNode_SourceNode`, plan, reader) } diff --git a/r/R/dplyr-summarize.R b/r/R/dplyr-summarize.R index 184c0aade46..5d943633a82 100644 --- a/r/R/dplyr-summarize.R +++ b/r/R/dplyr-summarize.R @@ -287,6 +287,11 @@ do_arrow_summarize <- function(.data, ..., .groups = NULL) { stop(paste("Invalid .groups argument:", .groups)) } out$drop_empty_groups <- .data$drop_empty_groups + if (getOption("arrow.summarise.sort", FALSE)) { + # Add sorting instructions for the rows to match dplyr + out$arrange_vars <- .data$selected_columns[.data$group_by_vars] + out$arrange_desc <- rep(FALSE, length(.data$group_by_vars)) + } } out } diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 72e74809689..54ecc80aad1 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -284,17 +284,7 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) { #' mutate(x = gear / carb) %>% #' show_exec_plan() show_exec_plan <- function(x) { - adq <- as_adq(x) - - # do not show the plan if we have a nested query (as this will force the - # evaluation of the inner query/queries) - # TODO see if we can remove after ARROW-16628 - if (is_collapsed(x) && has_head_tail(x$.data)) { - warn("The `ExecPlan` cannot be printed for a nested query.") - return(invisible(x)) - } - - result <- as_record_batch_reader(adq) + result <- as_record_batch_reader(as_adq(x)) plan <- result$Plan() on.exit({ plan$.unsafe_delete() @@ -419,6 +409,25 @@ query_can_stream <- function(x) { is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query") -has_head_tail <- function(x) { - !is.null(x$head) || !is.null(x$tail) || (is_collapsed(x) && has_head_tail(x$.data)) +has_unordered_head <- function(x) { + if (is.null(x$head %||% x$tail)) { + # no head/tail + return(FALSE) + } + !has_order(x) +} + +has_order <- function(x) { + length(x$arrange_vars) > 0 || + has_implicit_order(x) || + (is_collapsed(x) && has_order(x$.data)) +} + +has_implicit_order <- function(x) { + # Approximate what ExecNode$has_ordered_batches() would return (w/o building ExecPlan) + # An in-memory table has an implicit order + # TODO(GH-34698): FileSystemDataset and RecordBatchReader will have implicit order + inherits(x$.data, "ArrowTabular") && + # But joins, aggregations, etc. will result in non-deterministic order + is.null(x$aggregations) && is.null(x$join) && is.null(x$union_all) } diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 4b9b7ac459e..79227546dd3 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -74,11 +74,11 @@ ExecPlan <- R6Class("ExecPlan", if (is_collapsed(.data)) { # We have a nested query. - if (has_head_tail(.data$.data)) { - # head and tail are not ExecNodes; at best we can handle them via - # SinkNode, so if there are any steps done after head/tail, we need to - # evaluate the query up to then and then do a new query for the rest. - # as_record_batch_reader() will build and run an ExecPlan + if (has_unordered_head(.data$.data)) { + # TODO(GH-34941): FetchNode should do non-deterministic fetch + # Instead, we need to evaluate the query up to here, + # and then do a new query for the rest. + # as_record_batch_reader() will build and run an ExecPlan and do head() on it reader <- as_record_batch_reader(.data$.data) on.exit(reader$.unsafe_delete()) node <- self$SourceNode(reader) @@ -126,15 +126,6 @@ ExecPlan <- R6Class("ExecPlan", options = .data$aggregations, key_names = group_vars ) - - if (grouped && getOption("arrow.summarise.sort", FALSE)) { - # Add sorting instructions for the rows too to match dplyr - # (see below about why sorting isn't itself a Node) - node$extras$sort <- list( - names = group_vars, - orders = rep(0L, length(group_vars)) - ) - } } else { # If any columns are derived, reordered, or renamed we need to Project # If there are aggregations, the projection was already handled above. @@ -166,82 +157,81 @@ ExecPlan <- R6Class("ExecPlan", } } - # Apply sorting: this is currently not an ExecNode itself, it is a - # sink node option. - # TODO: handle some cases: - # (1) arrange > summarize > arrange - # (2) ARROW-13779: arrange then operation where order matters (e.g. cumsum) + # Apply sorting and head/tail + head_or_tail <- .data$head %||% .data$tail if (length(.data$arrange_vars)) { - node$extras$sort <- list( + if (!is.null(.data$tail)) { + # Handle tail first: Reverse sort, take head + # TODO(GH-34942): FetchNode support for tail + node <- node$OrderBy(list( + names = names(.data$arrange_vars), + orders = as.integer(!.data$arrange_desc) + )) + node <- node$Fetch(.data$tail) + } + # Apply sorting + node <- node$OrderBy(list( names = names(.data$arrange_vars), - orders = .data$arrange_desc, - temp_columns = names(.data$temp_columns) - ) - } - # This is only safe because we are going to evaluate queries that end - # with head/tail first, then evaluate any subsequent query as a new query - if (!is.null(.data$head)) { - node$extras$head <- .data$head - } - if (!is.null(.data$tail)) { - node$extras$tail <- .data$tail + orders = as.integer(.data$arrange_desc) + )) + + if (length(.data$temp_columns)) { + # If we sorted on ad-hoc derived columns, Project to drop them + temp_schema <- node$schema + cols_to_keep <- setdiff(names(temp_schema), names(.data$temp_columns)) + node <- node$Project(make_field_refs(cols_to_keep)) + } + + if (!is.null(.data$head)) { + # Take the head now + node <- node$Fetch(.data$head) + } + } else if (!is.null(head_or_tail)) { + # Unsorted head/tail + # Handle a couple of special cases here: + if (node$has_ordered_batches()) { + # Data that has order, even implicit order from an in-memory table, is supported + # in FetchNode + if (!is.null(.data$head)) { + node <- node$Fetch(.data$head) + } else { + # TODO(GH-34942): FetchNode support for tail + # FetchNode currently doesn't support tail, but it has limit + offset + # So if we know how many rows the query will result in, we can offset + data_without_tail <- .data + data_without_tail$tail <- NULL + row_count <- nrow(data_without_tail) + if (!is.na(row_count)) { + node <- node$Fetch(.data$tail, offset = row_count - .data$tail) + } else { + # Workaround: non-deterministic tail + node$extras$slice_size <- head_or_tail + } + } + } else { + # TODO(GH-34941): non-deterministic FetchNode + # Data has non-deterministic order, so head/tail means "just show me any N rows" + # FetchNode does not support non-deterministic scans, so we have to handle outside + node$extras$slice_size <- head_or_tail + } } node }, Run = function(node) { assert_is(node, "ExecNode") - - # Sorting and head/tail (if sorted) are handled in the SinkNode, - # created in ExecPlan_build - sorting <- node$extras$sort %||% list() - select_k <- node$extras$head %||% -1L - has_sorting <- length(sorting) > 0 - if (has_sorting) { - if (!is.null(node$extras$tail)) { - # Reverse the sort order and take the top K, then after we'll reverse - # the resulting rows so that it is ordered as expected - sorting$orders <- !sorting$orders - select_k <- node$extras$tail - } - sorting$orders <- as.integer(sorting$orders) - } - out <- ExecPlan_run( self, node, - sorting, - prepare_key_value_metadata(node$final_metadata()), - select_k + prepare_key_value_metadata(node$final_metadata()) ) - if (!has_sorting) { - # Since ExecPlans don't scan in deterministic order, head/tail are both + if (!is.null(node$extras$slice_size)) { + # For non-deterministic scans, head/tail are # essentially taking a random slice from somewhere in the dataset. # And since the head() implementation is way more efficient than tail(), # just use it to take the random slice - # TODO(ARROW-16628): handle limit in ExecNode - slice_size <- node$extras$head %||% node$extras$tail - if (!is.null(slice_size)) { - out <- head(out, slice_size) - } - } else if (!is.null(node$extras$tail)) { - # TODO(ARROW-16630): proper BottomK support - # Reverse the row order to get back what we expect - out <- as_arrow_table(out) - out <- out[rev(seq_len(nrow(out))), , drop = FALSE] - out <- as_record_batch_reader(out) - } - - # If arrange() created $temp_columns, make sure to omit them from the result - # We can't currently handle this in ExecPlan_run itself because sorting - # happens in the end (SinkNode) so nothing comes after it. - # TODO(ARROW-16631): move into ExecPlan - if (length(node$extras$sort$temp_columns) > 0) { - tab <- as_arrow_table(out) - tab <- tab[, setdiff(names(tab), node$extras$sort$temp_columns), drop = FALSE] - out <- as_record_batch_reader(tab) + out <- head(out, node$extras$slice_size) } - out }, Write = function(node, ...) { @@ -272,13 +262,8 @@ ExecNode <- R6Class("ExecNode", inherit = ArrowObject, public = list( extras = list( - # `sort` is a slight hack to be able to keep around arrange() params, - # which don't currently yield their own ExecNode but rather are consumed - # in the SinkNode (in ExecPlan$run()) - sort = NULL, - # Similar hacks for head and tail - head = NULL, - tail = NULL, + # Workaround for non-deterministic head/tail + slice_size = NULL, # `source_schema` is put here in Scan() so that at Run/Write, we can # extract the relevant metadata and keep it in the result source_schema = NULL @@ -295,6 +280,7 @@ ExecNode <- R6Class("ExecNode", old_meta$r <- get_r_metadata_from_old_schema(self$schema, old_schema) old_meta }, + has_ordered_batches = function() ExecNode_has_ordered_batches(self), Project = function(cols) { if (length(cols)) { assert_is_list_of(cols, "Expression") @@ -336,6 +322,16 @@ ExecNode <- R6Class("ExecNode", }, Union = function(right_node) { self$preserve_extras(ExecNode_Union(self, right_node)) + }, + Fetch = function(limit, offset = 0L) { + self$preserve_extras( + ExecNode_Fetch(self, offset, limit) + ) + }, + OrderBy = function(sorting) { + self$preserve_extras( + ExecNode_OrderBy(self, sorting) + ) } ), active = list( diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index dc4d0e9c709..65adcb764c9 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -988,19 +988,17 @@ extern "C" SEXP _arrow_ExecPlanReader__PlanStatus(SEXP reader_sexp){ // compute-exec.cpp #if defined(ARROW_R_WITH_ACERO) -std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); -extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ +std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::strings metadata); +extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); arrow::r::Input&>::type final_node(final_node_sexp); - arrow::r::Input::type sort_options(sort_options_sexp); arrow::r::Input::type metadata(metadata_sexp); - arrow::r::Input::type head(head_sexp); - return cpp11::as_sexp(ExecPlan_run(plan, final_node, sort_options, metadata, head)); + return cpp11::as_sexp(ExecPlan_run(plan, final_node, metadata)); END_CPP11 } #else -extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ +extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp){ Rf_error("Cannot call ExecPlan_run(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -1051,6 +1049,14 @@ extern "C" SEXP _arrow_ExecNode_output_schema(SEXP node_sexp){ } #endif +// compute-exec.cpp +bool ExecNode_has_ordered_batches(const std::shared_ptr& node); +extern "C" SEXP _arrow_ExecNode_has_ordered_batches(SEXP node_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type node(node_sexp); + return cpp11::as_sexp(ExecNode_has_ordered_batches(node)); +END_CPP11 +} // compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr ExecNode_Scan(const std::shared_ptr& plan, const std::shared_ptr& dataset, const std::shared_ptr& filter, cpp11::list projection); @@ -1187,6 +1193,39 @@ extern "C" SEXP _arrow_ExecNode_Union(SEXP input_sexp, SEXP right_data_sexp){ } #endif +// compute-exec.cpp +#if defined(ARROW_R_WITH_ACERO) +std::shared_ptr ExecNode_Fetch(const std::shared_ptr& input, int64_t offset, int64_t limit); +extern "C" SEXP _arrow_ExecNode_Fetch(SEXP input_sexp, SEXP offset_sexp, SEXP limit_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type input(input_sexp); + arrow::r::Input::type offset(offset_sexp); + arrow::r::Input::type limit(limit_sexp); + return cpp11::as_sexp(ExecNode_Fetch(input, offset, limit)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_ExecNode_Fetch(SEXP input_sexp, SEXP offset_sexp, SEXP limit_sexp){ + Rf_error("Cannot call ExecNode_Fetch(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// compute-exec.cpp +#if defined(ARROW_R_WITH_ACERO) +std::shared_ptr ExecNode_OrderBy(const std::shared_ptr& input, cpp11::list sort_options); +extern "C" SEXP _arrow_ExecNode_OrderBy(SEXP input_sexp, SEXP sort_options_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type input(input_sexp); + arrow::r::Input::type sort_options(sort_options_sexp); + return cpp11::as_sexp(ExecNode_OrderBy(input, sort_options)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_ExecNode_OrderBy(SEXP input_sexp, SEXP sort_options_sexp){ + Rf_error("Cannot call ExecNode_OrderBy(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // compute-exec.cpp #if defined(ARROW_R_WITH_ACERO) std::shared_ptr ExecNode_SourceNode(const std::shared_ptr& plan, const std::shared_ptr& reader); @@ -5555,10 +5594,11 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_Table__from_ExecPlanReader", (DL_FUNC) &_arrow_Table__from_ExecPlanReader, 1}, { "_arrow_ExecPlanReader__Plan", (DL_FUNC) &_arrow_ExecPlanReader__Plan, 1}, { "_arrow_ExecPlanReader__PlanStatus", (DL_FUNC) &_arrow_ExecPlanReader__PlanStatus, 1}, - { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 5}, + { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 3}, { "_arrow_ExecPlan_ToString", (DL_FUNC) &_arrow_ExecPlan_ToString, 1}, { "_arrow_ExecPlan_UnsafeDelete", (DL_FUNC) &_arrow_ExecPlan_UnsafeDelete, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, + { "_arrow_ExecNode_has_ordered_batches", (DL_FUNC) &_arrow_ExecNode_has_ordered_batches, 1}, { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, @@ -5566,6 +5606,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecNode_Aggregate", (DL_FUNC) &_arrow_ExecNode_Aggregate, 3}, { "_arrow_ExecNode_Join", (DL_FUNC) &_arrow_ExecNode_Join, 9}, { "_arrow_ExecNode_Union", (DL_FUNC) &_arrow_ExecNode_Union, 2}, + { "_arrow_ExecNode_Fetch", (DL_FUNC) &_arrow_ExecNode_Fetch, 3}, + { "_arrow_ExecNode_OrderBy", (DL_FUNC) &_arrow_ExecNode_OrderBy, 2}, { "_arrow_ExecNode_SourceNode", (DL_FUNC) &_arrow_ExecNode_SourceNode, 2}, { "_arrow_ExecNode_TableSourceNode", (DL_FUNC) &_arrow_ExecNode_TableSourceNode, 2}, { "_arrow_substrait__internal__SubstraitToJSON", (DL_FUNC) &_arrow_substrait__internal__SubstraitToJSON, 1}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 9c7de915fa2..347d4787fd1 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -224,35 +224,13 @@ std::string ExecPlanReader__PlanStatus(const std::shared_ptr& re // [[acero::export]] std::shared_ptr ExecPlan_run( const std::shared_ptr& plan, - const std::shared_ptr& final_node, cpp11::list sort_options, - cpp11::strings metadata, int64_t head = -1) { + const std::shared_ptr& final_node, cpp11::strings metadata) { // For now, don't require R to construct SinkNodes. // Instead, just pass the node we should collect as an argument. arrow::AsyncGenerator> sink_gen; - // Sorting uses a different sink node; there is no general sort yet - if (sort_options.size() > 0) { - if (head >= 0) { - // Use the SelectK node to take only what we need - MakeExecNodeOrStop( - "select_k_sink", plan.get(), {final_node.get()}, - acero::SelectKSinkNodeOptions{ - arrow::compute::SelectKOptions( - head, std::dynamic_pointer_cast( - make_compute_options("sort_indices", sort_options)) - ->sort_keys), - &sink_gen}); - } else { - MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()}, - acero::OrderBySinkNodeOptions{ - *std::dynamic_pointer_cast( - make_compute_options("sort_indices", sort_options)), - &sink_gen}); - } - } else { - MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, - acero::SinkNodeOptions{&sink_gen}); - } + MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, + acero::SinkNodeOptions{&sink_gen}); StopIfNotOk(plan->Validate()); @@ -283,6 +261,11 @@ std::shared_ptr ExecNode_output_schema( return node->output_schema(); } +// [[arrow::export]] +bool ExecNode_has_ordered_batches(const std::shared_ptr& node) { + return !node->ordering().is_unordered(); +} + #if defined(ARROW_R_WITH_DATASET) #include @@ -460,6 +443,23 @@ std::shared_ptr ExecNode_Union( return MakeExecNodeOrStop("union", input->plan(), {input.get(), right_data.get()}, {}); } +// [[acero::export]] +std::shared_ptr ExecNode_Fetch( + const std::shared_ptr& input, int64_t offset, int64_t limit) { + return MakeExecNodeOrStop("fetch", input->plan(), {input.get()}, + acero::FetchNodeOptions{offset, limit}); +} + +// [[acero::export]] +std::shared_ptr ExecNode_OrderBy( + const std::shared_ptr& input, cpp11::list sort_options) { + return MakeExecNodeOrStop( + "order_by", input->plan(), {input.get()}, + acero::OrderByNodeOptions{std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)) + ->AsOrdering()}); +} + // [[acero::export]] std::shared_ptr ExecNode_SourceNode( const std::shared_ptr& plan, diff --git a/r/tests/testthat/test-dataset-dplyr.R b/r/tests/testthat/test-dataset-dplyr.R index e20a6262b7c..b8d93841921 100644 --- a/r/tests/testthat/test-dataset-dplyr.R +++ b/r/tests/testthat/test-dataset-dplyr.R @@ -397,22 +397,11 @@ test_that("show_exec_plan(), show_query() and explain() with datasets", { show_exec_plan(), regexp = paste0( "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan - "OrderBySinkNode.*chr.*ASC.*", # arrange goes via the OrderBy sink node + "OrderByNode.*chr.*ASC.*", # arrange goes via the OrderBy node "ProjectNode.*", # output columns "FilterNode.*", # filter node "filter=lgl.*", # filtering expression "SourceNode" # entry point ) ) - - # printing the ExecPlan for a nested query would currently force the - # evaluation of the inner one(s), which we want to avoid => no output - expect_warning( - ds %>% - filter(lgl) %>% - arrange(chr) %>% - head() %>% - show_exec_plan(), - "The `ExecPlan` cannot be printed for a nested query." - ) }) diff --git a/r/tests/testthat/test-dplyr-collapse.R b/r/tests/testthat/test-dplyr-collapse.R index 198827e235b..a8aa5556f1e 100644 --- a/r/tests/testthat/test-dplyr-collapse.R +++ b/r/tests/testthat/test-dplyr-collapse.R @@ -167,6 +167,7 @@ lgl: bool total: int64 extra: int64 (multiply_checked(total, 5)) +* Sorted by lgl [asc] See $.data for the source Arrow object", fixed = TRUE ) diff --git a/r/tests/testthat/test-dplyr-query.R b/r/tests/testthat/test-dplyr-query.R index 5dbdb0e522b..e478d0e4c40 100644 --- a/r/tests/testthat/test-dplyr-query.R +++ b/r/tests/testthat/test-dplyr-query.R @@ -180,58 +180,26 @@ test_that("compute()", { }) test_that("head", { - batch <- record_batch(tbl) - - b2 <- batch %>% - select(int, chr) %>% - filter(int > 5) %>% - head(2) - expect_s3_class(b2, "arrow_dplyr_query") - expected <- tbl[tbl$int > 5 & !is.na(tbl$int), c("int", "chr")][1:2, ] - expect_equal(collect(b2), expected) - - b3 <- batch %>% - select(int, strng = chr) %>% - filter(int > 5) %>% - head(2) - expect_s3_class(b3, "arrow_dplyr_query") - expect_equal(as.data.frame(b3), set_names(expected, c("int", "strng"))) - - b4 <- batch %>% - select(int, strng = chr) %>% - filter(int > 5) %>% - group_by(int) %>% - head(2) - expect_s3_class(b4, "arrow_dplyr_query") - expect_equal( - as.data.frame(b4), - expected %>% - rename(strng = chr) %>% - group_by(int) - ) - - expect_equal( - batch %>% + compare_dplyr_binding( + .input %>% select(int, strng = chr) %>% filter(int > 5) %>% + group_by(int) %>% head(2) %>% - mutate(twice = int * 2) %>% collect(), - expected %>% - rename(strng = chr) %>% - mutate(twice = int * 2) + tbl ) # This would fail if we evaluated head() after filter() - expect_equal( - batch %>% + compare_dplyr_binding( + .input %>% select(int, strng = chr) %>% + arrange(int) %>% head(2) %>% filter(int > 5) %>% + mutate(twice = int * 2) %>% collect(), - expected %>% - rename(strng = chr) %>% - filter(FALSE) + tbl ) }) @@ -260,38 +228,25 @@ test_that("arrange then tail returns the right data", { }) test_that("tail", { - batch <- record_batch(tbl) - - b2 <- batch %>% - select(int, chr) %>% - filter(int > 5) %>% - arrange(int) %>% - tail(2) - - expect_s3_class(b2, "arrow_dplyr_query") - expected <- tail(tbl[tbl$int > 5 & !is.na(tbl$int), c("int", "chr")], 2) - expect_equal(as.data.frame(b2), expected) - - b3 <- batch %>% - select(int, strng = chr) %>% - filter(int > 5) %>% - arrange(int) %>% - tail(2) - expect_s3_class(b3, "arrow_dplyr_query") - expect_equal(as.data.frame(b3), set_names(expected, c("int", "strng"))) - - b4 <- batch %>% - select(int, strng = chr) %>% - filter(int > 5) %>% - group_by(int) %>% - arrange(int) %>% - tail(2) - expect_s3_class(b4, "arrow_dplyr_query") - expect_equal( - as.data.frame(b4), - expected %>% - rename(strng = chr) %>% - group_by(int) + # With sorting + compare_dplyr_binding( + .input %>% + select(int, chr) %>% + filter(int < 5) %>% + arrange(int) %>% + tail(2) %>% + collect(), + tbl + ) + # Without sorting: table order is implicit, and we can compute the filter + # row length, so the query can use Fetch with offset + compare_dplyr_binding( + .input %>% + select(int, chr) %>% + filter(int < 5) %>% + tail(2) %>% + collect(), + tbl ) }) @@ -550,7 +505,7 @@ test_that("show_exec_plan(), show_query() and explain()", { show_exec_plan(), regexp = paste0( "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan - "OrderBySinkNode.*wt.*DESC.*", # arrange goes via the OrderBy sink node + "OrderBy.*wt.*DESC.*", # arrange goes via the OrderBy node "FilterNode.*", # filter node "TableSourceNode.*" # entry point ) @@ -558,14 +513,19 @@ test_that("show_exec_plan(), show_query() and explain()", { # printing the ExecPlan for a nested query would currently force the # evaluation of the inner one(s), which we want to avoid => no output - expect_warning( + expect_output( mtcars %>% arrow_table() %>% filter(mpg > 20) %>% - arrange(desc(wt)) %>% head(3) %>% show_exec_plan(), - "The `ExecPlan` cannot be printed for a nested query." + paste0( + "ExecPlan with 4 nodes:.*", + "3:SinkNode.*", + "2:FetchNode.offset=0 count=3.*", + "1:FilterNode.filter=.mpg > 20.*", + "0:TableSourceNode.*" + ) ) }) diff --git a/r/tests/testthat/test-dplyr-summarize.R b/r/tests/testthat/test-dplyr-summarize.R index 1b834df19ff..3eb1a6ed2b2 100644 --- a/r/tests/testthat/test-dplyr-summarize.R +++ b/r/tests/testthat/test-dplyr-summarize.R @@ -616,17 +616,21 @@ test_that("min() and max() on character strings", { collect(), tbl, ) - compare_dplyr_binding( - .input %>% - group_by(fct) %>% - summarize( - min_chr = min(chr, na.rm = TRUE), - max_chr = max(chr, na.rm = TRUE) - ) %>% - arrange(min_chr) %>% - collect(), - tbl, - ) + withr::with_options(list(arrow.summarise.sort = FALSE), { + # TODO(#29887 / ARROW-14313) sorting on dictionary columns not supported + # so turn off arrow.summarise.sort so that we don't order_by fct after summarize + compare_dplyr_binding( + .input %>% + group_by(fct) %>% + summarize( + min_chr = min(chr, na.rm = TRUE), + max_chr = max(chr, na.rm = TRUE) + ) %>% + arrange(min_chr) %>% + collect(), + tbl, + ) + }) }) test_that("summarise() with !!sym()", {