From 05afdfdb14e5caff739e7fad42baaa6924eb04d0 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 26 Oct 2022 11:58:23 -0300 Subject: [PATCH] make map_batches lazy by default --- r/R/dataset-scan.R | 9 +-------- r/man/map_batches.Rd | 6 +----- r/tests/testthat/test-dataset-write.R | 4 ++++ r/tests/testthat/test-dataset.R | 4 +++- r/tests/testthat/test-dplyr-slice.R | 2 ++ r/tests/testthat/test-query-engine.R | 2 ++ 6 files changed, 13 insertions(+), 14 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 17ebb56c05f..b210edbf960 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -202,10 +202,6 @@ tail_from_batches <- function(batches, n) { #' `map_batches()` in a dplyr pipeline and do additional dplyr methods on the #' stream of data in Arrow after it. #' -#' Note that, unlike the core dplyr methods that are implemented in the Arrow -#' query engine, `map_batches()` is not lazy: it starts evaluating on the data -#' when you call it, even if you send its result to another pipeline function. -#' #' This is experimental and not recommended for production use. It is also #' single-threaded and runs in R not C++, so it won't be as fast as core #' Arrow methods. @@ -224,7 +220,7 @@ tail_from_batches <- function(batches, n) { #' @param .data.frame Deprecated argument, ignored #' @return An `arrow_dplyr_query`. #' @export -map_batches <- function(X, FUN, ..., .schema = NULL, .lazy = FALSE, .data.frame = NULL) { +map_batches <- function(X, FUN, ..., .schema = NULL, .lazy = TRUE, .data.frame = NULL) { if (!is.null(.data.frame)) { warning( "The .data.frame argument is deprecated. ", @@ -277,9 +273,6 @@ map_batches <- function(X, FUN, ..., .schema = NULL, .lazy = FALSE, .data.frame } reader_out <- as_record_batch_reader(fun, schema = .schema) - - # TODO(ARROW-17178) because there are some restrictions on evaluating - # reader_out in some ExecPlans, the default .lazy is FALSE for now. if (!.lazy) { reader_out <- RecordBatchReader$create( batches = reader_out$batches(), diff --git a/r/man/map_batches.Rd b/r/man/map_batches.Rd index 0e4d48e024d..a147e268a96 100644 --- a/r/man/map_batches.Rd +++ b/r/man/map_batches.Rd @@ -4,7 +4,7 @@ \alias{map_batches} \title{Apply a function to a stream of RecordBatches} \usage{ -map_batches(X, FUN, ..., .schema = NULL, .lazy = FALSE, .data.frame = NULL) +map_batches(X, FUN, ..., .schema = NULL, .lazy = TRUE, .data.frame = NULL) } \arguments{ \item{X}{A \code{Dataset} or \code{arrow_dplyr_query} object, as returned by the @@ -37,10 +37,6 @@ without having to hold the entire Dataset in memory at once. You can include stream of data in Arrow after it. } \details{ -Note that, unlike the core dplyr methods that are implemented in the Arrow -query engine, \code{map_batches()} is not lazy: it starts evaluating on the data -when you call it, even if you send its result to another pipeline function. - This is experimental and not recommended for production use. It is also single-threaded and runs in R not C++, so it won't be as fast as core Arrow methods. diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index 468d11676bb..b13660be7c9 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -708,7 +708,9 @@ test_that("Dataset write max rows per files", { }) test_that("Dataset min_rows_per_group", { + skip_if_not(CanRunWithCapturedR()) skip_if_not_available("parquet") + rb1 <- record_batch( c1 = c(1, 2, 3, 4), c2 = c("a", "b", "e", "a") @@ -757,7 +759,9 @@ test_that("Dataset min_rows_per_group", { }) test_that("Dataset write max rows per group", { + skip_if_not(CanRunWithCapturedR()) skip_if_not_available("parquet") + num_of_records <- 30 max_rows_per_group <- 18 df <- tibble::tibble( diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 5e28a3ddd5b..72c1dd7ca9c 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -637,7 +637,9 @@ test_that("scalar aggregates with many batches (ARROW-16904)", { ) }) -test_that("map_batches", { +test_that("streaming map_batches into an ExecPlan", { + skip_if_not(CanRunWithCapturedR()) + ds <- open_dataset(dataset_dir, partitioning = "part") # summarize returns arrow_dplyr_query, which gets collected into a tibble diff --git a/r/tests/testthat/test-dplyr-slice.R b/r/tests/testthat/test-dplyr-slice.R index a1c71e22229..0db31c732f1 100644 --- a/r/tests/testthat/test-dplyr-slice.R +++ b/r/tests/testthat/test-dplyr-slice.R @@ -89,6 +89,8 @@ test_that("slice_min/max, ungrouped", { }) test_that("slice_sample, ungrouped", { + skip_if_not(CanRunWithCapturedR()) + tab <- arrow_table(tbl) expect_error( tab %>% slice_sample(replace = TRUE), diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index f2190eb6684..86d89898083 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -18,6 +18,8 @@ library(dplyr, warn.conflicts = FALSE) test_that("ExecPlanReader does not start evaluating a query", { + skip_if_not(CanRunWithCapturedR()) + rbr <- as_record_batch_reader( function(x) stop("This query will error if started"), schema = schema(a = int32())