Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions r/R/dataset-scan.R
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,6 @@ tail_from_batches <- function(batches, n) {
#' `map_batches()` in a dplyr pipeline and do additional dplyr methods on the
#' stream of data in Arrow after it.
#'
#' Note that, unlike the core dplyr methods that are implemented in the Arrow
#' query engine, `map_batches()` is not lazy: it starts evaluating on the data
#' when you call it, even if you send its result to another pipeline function.
#'
#' This is experimental and not recommended for production use. It is also
#' single-threaded and runs in R not C++, so it won't be as fast as core
#' Arrow methods.
Expand All @@ -224,7 +220,7 @@ tail_from_batches <- function(batches, n) {
#' @param .data.frame Deprecated argument, ignored
#' @return An `arrow_dplyr_query`.
#' @export
map_batches <- function(X, FUN, ..., .schema = NULL, .lazy = FALSE, .data.frame = NULL) {
map_batches <- function(X, FUN, ..., .schema = NULL, .lazy = TRUE, .data.frame = NULL) {
if (!is.null(.data.frame)) {
warning(
"The .data.frame argument is deprecated. ",
Expand Down Expand Up @@ -277,9 +273,6 @@ map_batches <- function(X, FUN, ..., .schema = NULL, .lazy = FALSE, .data.frame
}

reader_out <- as_record_batch_reader(fun, schema = .schema)

# TODO(ARROW-17178) because there are some restrictions on evaluating
# reader_out in some ExecPlans, the default .lazy is FALSE for now.
if (!.lazy) {
reader_out <- RecordBatchReader$create(
batches = reader_out$batches(),
Expand Down
6 changes: 1 addition & 5 deletions r/man/map_batches.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions r/tests/testthat/test-dataset-write.R
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,9 @@ test_that("Dataset write max rows per files", {
})

test_that("Dataset min_rows_per_group", {
skip_if_not(CanRunWithCapturedR())
skip_if_not_available("parquet")

rb1 <- record_batch(
c1 = c(1, 2, 3, 4),
c2 = c("a", "b", "e", "a")
Expand Down Expand Up @@ -757,7 +759,9 @@ test_that("Dataset min_rows_per_group", {
})

test_that("Dataset write max rows per group", {
skip_if_not(CanRunWithCapturedR())
skip_if_not_available("parquet")

num_of_records <- 30
max_rows_per_group <- 18
df <- tibble::tibble(
Expand Down
4 changes: 3 additions & 1 deletion r/tests/testthat/test-dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,9 @@ test_that("scalar aggregates with many batches (ARROW-16904)", {
)
})

test_that("map_batches", {
test_that("streaming map_batches into an ExecPlan", {
skip_if_not(CanRunWithCapturedR())

ds <- open_dataset(dataset_dir, partitioning = "part")

# summarize returns arrow_dplyr_query, which gets collected into a tibble
Expand Down
2 changes: 2 additions & 0 deletions r/tests/testthat/test-dplyr-slice.R
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ test_that("slice_min/max, ungrouped", {
})

test_that("slice_sample, ungrouped", {
skip_if_not(CanRunWithCapturedR())

tab <- arrow_table(tbl)
expect_error(
tab %>% slice_sample(replace = TRUE),
Expand Down
2 changes: 2 additions & 0 deletions r/tests/testthat/test-query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
library(dplyr, warn.conflicts = FALSE)

test_that("ExecPlanReader does not start evaluating a query", {
skip_if_not(CanRunWithCapturedR())

rbr <- as_record_batch_reader(
function(x) stop("This query will error if started"),
schema = schema(a = int32())
Expand Down