From 1682f08263a848d039a5130b815ff09510f32374 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 7 Dec 2021 15:01:50 -0800 Subject: [PATCH 01/12] Get existing test to pass --- r/R/dataset-scan.R | 22 +++++++++++++--------- r/tests/testthat/test-dataset.R | 1 - 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index b7f58bfa4bd..baed2a73dca 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -185,17 +185,21 @@ ScanTask <- R6Class("ScanTask", #' `data.frame`? Default `TRUE` #' @export map_batches <- function(X, FUN, ..., .data.frame = TRUE) { - if (.data.frame) { - lapply <- map_dfr - } scanner <- Scanner$create(ensure_group_vars(X)) FUN <- as_mapper(FUN) - lapply(scanner$ScanBatches(), function(batch) { - # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, - # X$temp_columns, and X$group_by_vars - # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE - FUN(batch, ...) - }) + + # # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, + # # X$temp_columns, and X$group_by_vars + # # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE + batches <- scanner$ScanBatches() + .f <- as_mapper(FUN, ...) + res <- map(batches, FUN, ...) + + if (.data.frame) { + res <- dplyr::bind_rows(map(res, collect)) + } + + res } #' @usage NULL diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 8465e93bed5..608979340a3 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -453,7 +453,6 @@ test_that("Creating UnionDataset", { }) test_that("map_batches", { - skip("map_batches() is broken (ARROW-14029)") ds <- open_dataset(dataset_dir, partitioning = "part") expect_equal( ds %>% From df5ad1078466cd106cf1dd63410efd55dc256710 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 7 Dec 2021 15:27:22 -0800 Subject: [PATCH 02/12] Test a few more cases --- r/R/dataset-scan.R | 6 ++++-- r/tests/testthat/test-dataset.R | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index baed2a73dca..7d29bbc8b22 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -195,9 +195,11 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { .f <- as_mapper(FUN, ...) res <- map(batches, FUN, ...) - if (.data.frame) { + if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) { res <- dplyr::bind_rows(map(res, collect)) - } + } else if (.data.frame & inherits(res[[1]], "RecordBatch")) { + res <- dplyr::bind_rows(map(res, as.data.frame)) + } res } diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 608979340a3..8f5c7000d3e 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -454,6 +454,8 @@ test_that("Creating UnionDataset", { test_that("map_batches", { ds <- open_dataset(dataset_dir, partitioning = "part") + + # summarise returns arrow_dplyr_query expect_equal( ds %>% filter(int > 5) %>% @@ -461,6 +463,24 @@ test_that("map_batches", { map_batches(~ summarize(., min_int = min(int))), tibble(min_int = c(6L, 101L)) ) + + # $num_rows returns integer vector + expect_equal( + ds %>% + filter(int > 5) %>% + select(int, lgl) %>% + map_batches(~ .$num_rows), + list(5, 10) + ) + + # $Take returns RecordBatch + expect_equal( + ds %>% + filter(int > 5) %>% + select(int, lgl) %>% + map_batches(~ .$Take(0)), + tibble(int=c(6, 101), lgl=c(TRUE, TRUE)) + ) }) test_that("partitioning = NULL to ignore partition information (but why?)", { From d0d7740bb41591c709170fa9924b34727853e2e5 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 8 Dec 2021 14:27:09 -0800 Subject: [PATCH 03/12] Simplify logic --- r/R/dataset-scan.R | 4 +--- r/tests/testthat/test-dataset.R | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 7d29bbc8b22..94665bc173f 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -174,8 +174,6 @@ ScanTask <- R6Class("ScanTask", #' a `data.frame` for further aggregation, even if you couldn't fit the whole #' `Dataset` result in memory. #' -#' This is experimental and not recommended for production use. -#' #' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the #' `dplyr` methods on `Dataset`. #' @param FUN A function or `purrr`-style lambda expression to apply to each @@ -197,7 +195,7 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) { res <- dplyr::bind_rows(map(res, collect)) - } else if (.data.frame & inherits(res[[1]], "RecordBatch")) { + } else if (.data.frame) { res <- dplyr::bind_rows(map(res, as.data.frame)) } diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 8f5c7000d3e..e2db4383397 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -469,7 +469,7 @@ test_that("map_batches", { ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~ .$num_rows), + map_batches(~ .$num_rows, .data.frame = FALSE), list(5, 10) ) From bf520d0dd1926367cdc26dd4faf8323fd21efef7 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 8 Dec 2021 16:52:24 -0800 Subject: [PATCH 04/12] GImplement with RecordBatchReader --- r/R/dataset-scan.R | 10 +++++++--- r/tests/testthat/test-dataset.R | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 94665bc173f..84781145893 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -189,9 +189,13 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { # # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, # # X$temp_columns, and X$group_by_vars # # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE - batches <- scanner$ScanBatches() - .f <- as_mapper(FUN, ...) - res <- map(batches, FUN, ...) + reader <- scanner$ToRecordBatchReader() + batch <- reader$read_next_batch() + res <- list() + while (!is.null(batch)) { + res <- append(res, list(FUN(batch, ...))) + batch <- reader$read_next_batch() + } if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) { res <- dplyr::bind_rows(map(res, collect)) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index e2db4383397..47121e4408e 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -455,7 +455,7 @@ test_that("Creating UnionDataset", { test_that("map_batches", { ds <- open_dataset(dataset_dir, partitioning = "part") - # summarise returns arrow_dplyr_query + # summarize returns arrow_dplyr_query expect_equal( ds %>% filter(int > 5) %>% From 93e090de12dd8fb8e91ea6d753d2af9376c44182 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 8 Dec 2021 17:00:36 -0800 Subject: [PATCH 05/12] Update to reading plan --- r/R/dataset-scan.R | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 84781145893..fcd31e4d0d5 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -183,13 +183,15 @@ ScanTask <- R6Class("ScanTask", #' `data.frame`? Default `TRUE` #' @export map_batches <- function(X, FUN, ..., .data.frame = TRUE) { - scanner <- Scanner$create(ensure_group_vars(X)) + # TODO: possibly refactor do_exec_plan to return a RecordBatchReader + plan <- ExecPlan$create() + final_node <- plan$Build(X) + reader <- plan$Run(final_node) FUN <- as_mapper(FUN) # # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, # # X$temp_columns, and X$group_by_vars # # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE - reader <- scanner$ToRecordBatchReader() batch <- reader$read_next_batch() res <- list() while (!is.null(batch)) { From d5f78a19ba58cd2a50dd5ef8d1cd67c5243681bc Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 9 Dec 2021 10:38:18 -0800 Subject: [PATCH 06/12] Refactor do_exec_plan to always return RBR --- r/R/dataset-scan.R | 6 ++-- r/R/dplyr-collect.R | 2 +- r/R/query-engine.R | 60 +++++++++++++++++---------------- r/tests/testthat/test-dataset.R | 8 +++-- 4 files changed, 39 insertions(+), 37 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index fcd31e4d0d5..8c81ee8d065 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -183,10 +183,7 @@ ScanTask <- R6Class("ScanTask", #' `data.frame`? Default `TRUE` #' @export map_batches <- function(X, FUN, ..., .data.frame = TRUE) { - # TODO: possibly refactor do_exec_plan to return a RecordBatchReader - plan <- ExecPlan$create() - final_node <- plan$Build(X) - reader <- plan$Run(final_node) + reader <- do_exec_plan(X) FUN <- as_mapper(FUN) # # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, @@ -195,6 +192,7 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { batch <- reader$read_next_batch() res <- list() while (!is.null(batch)) { + batch_query <- arrow_dplyr_query(batch) res <- append(res, list(FUN(batch, ...))) batch <- reader$read_next_batch() } diff --git a/r/R/dplyr-collect.R b/r/R/dplyr-collect.R index c62f2559310..b2ad69c039b 100644 --- a/r/R/dplyr-collect.R +++ b/r/R/dplyr-collect.R @@ -27,7 +27,7 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) { } # See query-engine.R for ExecPlan/Nodes - tab <- do_exec_plan(x) + tab <- do_exec_plan(x)$read_table() if (as_data_frame) { df <- as.data.frame(tab) tab$invalidate() diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 234aaf56975..00b3307f96b 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -18,43 +18,45 @@ do_exec_plan <- function(.data) { plan <- ExecPlan$create() final_node <- plan$Build(.data) - tab <- plan$Run(final_node) + reader <- plan$Run(final_node) # TODO (ARROW-14289): make the head/tail methods return RBR not Table - if (inherits(tab, "RecordBatchReader")) { - tab <- tab$read_table() - } + if (inherits(reader, "Table")) { + tab <- reader - # If arrange() created $temp_columns, make sure to omit them from the result - # We can't currently handle this in the ExecPlan itself because sorting - # happens in the end (SinkNode) so nothing comes after it. - if (length(final_node$sort$temp_columns) > 0) { - tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE] - } + # If arrange() created $temp_columns, make sure to omit them from the result + # We can't currently handle this in the ExecPlan itself because sorting + # happens in the end (SinkNode) so nothing comes after it. + if (length(final_node$sort$temp_columns) > 0) { + tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE] + } - if (ncol(tab)) { - # Apply any column metadata from the original schema, where appropriate - original_schema <- source_data(.data)$schema - # TODO: do we care about other (non-R) metadata preservation? - # How would we know if it were meaningful? - r_meta <- original_schema$r_metadata - if (!is.null(r_meta)) { - # Filter r_metadata$columns on columns with name _and_ type match - new_schema <- tab$schema - common_names <- intersect(names(r_meta$columns), names(tab)) - keep <- common_names[ - map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]]) - ] - r_meta$columns <- r_meta$columns[keep] - if (has_aggregation(.data)) { - # dplyr drops top-level attributes if you do summarize - r_meta$attributes <- NULL + if (ncol(tab)) { + # Apply any column metadata from the original schema, where appropriate + original_schema <- source_data(.data)$schema + # TODO: do we care about other (non-R) metadata preservation? + # How would we know if it were meaningful? + r_meta <- original_schema$r_metadata + if (!is.null(r_meta)) { + # Filter r_metadata$columns on columns with name _and_ type match + new_schema <- tab$schema + common_names <- intersect(names(r_meta$columns), names(tab)) + keep <- common_names[ + map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]]) + ] + r_meta$columns <- r_meta$columns[keep] + if (has_aggregation(.data)) { + # dplyr drops top-level attributes if you do summarize + r_meta$attributes <- NULL + } + tab$r_metadata <- r_meta } - tab$r_metadata <- r_meta } + + reader <- Scanner$create(tab)$ToRecordBatchReader() } - tab + reader } ExecPlan <- R6Class("ExecPlan", diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 47121e4408e..213f88529be 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -460,7 +460,8 @@ test_that("map_batches", { ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~ summarize(., min_int = min(int))), + map_batches(~ summarize(., min_int = min(int))) %>% + arrange(min_int), tibble(min_int = c(6L, 101L)) ) @@ -470,7 +471,7 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ .$num_rows, .data.frame = FALSE), - list(5, 10) + list(5, 10) # TODO: Don't test order ) # $Take returns RecordBatch @@ -478,7 +479,8 @@ test_that("map_batches", { ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~ .$Take(0)), + map_batches(~ .$Take(0)) %>% + arrange(int), tibble(int=c(6, 101), lgl=c(TRUE, TRUE)) ) }) From d1950bff5833f5c78c1facfde84db2d983e94726 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 9 Dec 2021 11:18:17 -0800 Subject: [PATCH 07/12] Revert "Refactor do_exec_plan to always return RBR" This reverts commit ff1dab7e3ab835e23ef963aa5bf78d61832a1d78. --- r/R/dataset-scan.R | 6 ++-- r/R/dplyr-collect.R | 2 +- r/R/query-engine.R | 60 ++++++++++++++++----------------- r/tests/testthat/test-dataset.R | 8 ++--- 4 files changed, 37 insertions(+), 39 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 8c81ee8d065..fcd31e4d0d5 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -183,7 +183,10 @@ ScanTask <- R6Class("ScanTask", #' `data.frame`? Default `TRUE` #' @export map_batches <- function(X, FUN, ..., .data.frame = TRUE) { - reader <- do_exec_plan(X) + # TODO: possibly refactor do_exec_plan to return a RecordBatchReader + plan <- ExecPlan$create() + final_node <- plan$Build(X) + reader <- plan$Run(final_node) FUN <- as_mapper(FUN) # # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, @@ -192,7 +195,6 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { batch <- reader$read_next_batch() res <- list() while (!is.null(batch)) { - batch_query <- arrow_dplyr_query(batch) res <- append(res, list(FUN(batch, ...))) batch <- reader$read_next_batch() } diff --git a/r/R/dplyr-collect.R b/r/R/dplyr-collect.R index b2ad69c039b..c62f2559310 100644 --- a/r/R/dplyr-collect.R +++ b/r/R/dplyr-collect.R @@ -27,7 +27,7 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) { } # See query-engine.R for ExecPlan/Nodes - tab <- do_exec_plan(x)$read_table() + tab <- do_exec_plan(x) if (as_data_frame) { df <- as.data.frame(tab) tab$invalidate() diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 00b3307f96b..234aaf56975 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -18,45 +18,43 @@ do_exec_plan <- function(.data) { plan <- ExecPlan$create() final_node <- plan$Build(.data) - reader <- plan$Run(final_node) + tab <- plan$Run(final_node) # TODO (ARROW-14289): make the head/tail methods return RBR not Table - if (inherits(reader, "Table")) { - tab <- reader + if (inherits(tab, "RecordBatchReader")) { + tab <- tab$read_table() + } - # If arrange() created $temp_columns, make sure to omit them from the result - # We can't currently handle this in the ExecPlan itself because sorting - # happens in the end (SinkNode) so nothing comes after it. - if (length(final_node$sort$temp_columns) > 0) { - tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE] - } + # If arrange() created $temp_columns, make sure to omit them from the result + # We can't currently handle this in the ExecPlan itself because sorting + # happens in the end (SinkNode) so nothing comes after it. + if (length(final_node$sort$temp_columns) > 0) { + tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE] + } - if (ncol(tab)) { - # Apply any column metadata from the original schema, where appropriate - original_schema <- source_data(.data)$schema - # TODO: do we care about other (non-R) metadata preservation? - # How would we know if it were meaningful? - r_meta <- original_schema$r_metadata - if (!is.null(r_meta)) { - # Filter r_metadata$columns on columns with name _and_ type match - new_schema <- tab$schema - common_names <- intersect(names(r_meta$columns), names(tab)) - keep <- common_names[ - map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]]) - ] - r_meta$columns <- r_meta$columns[keep] - if (has_aggregation(.data)) { - # dplyr drops top-level attributes if you do summarize - r_meta$attributes <- NULL - } - tab$r_metadata <- r_meta + if (ncol(tab)) { + # Apply any column metadata from the original schema, where appropriate + original_schema <- source_data(.data)$schema + # TODO: do we care about other (non-R) metadata preservation? + # How would we know if it were meaningful? + r_meta <- original_schema$r_metadata + if (!is.null(r_meta)) { + # Filter r_metadata$columns on columns with name _and_ type match + new_schema <- tab$schema + common_names <- intersect(names(r_meta$columns), names(tab)) + keep <- common_names[ + map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]]) + ] + r_meta$columns <- r_meta$columns[keep] + if (has_aggregation(.data)) { + # dplyr drops top-level attributes if you do summarize + r_meta$attributes <- NULL } + tab$r_metadata <- r_meta } - - reader <- Scanner$create(tab)$ToRecordBatchReader() } - reader + tab } ExecPlan <- R6Class("ExecPlan", diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 213f88529be..47121e4408e 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -460,8 +460,7 @@ test_that("map_batches", { ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~ summarize(., min_int = min(int))) %>% - arrange(min_int), + map_batches(~ summarize(., min_int = min(int))), tibble(min_int = c(6L, 101L)) ) @@ -471,7 +470,7 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ .$num_rows, .data.frame = FALSE), - list(5, 10) # TODO: Don't test order + list(5, 10) ) # $Take returns RecordBatch @@ -479,8 +478,7 @@ test_that("map_batches", { ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~ .$Take(0)) %>% - arrange(int), + map_batches(~ .$Take(0)), tibble(int=c(6, 101), lgl=c(TRUE, TRUE)) ) }) From 2d91584867f0b0d74f33c83aee2b3131fde32f7c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 9 Dec 2021 11:30:16 -0800 Subject: [PATCH 08/12] Fix tests and linting --- r/R/dataset-scan.R | 6 +++--- r/tests/testthat/test-dataset.R | 14 +++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index fcd31e4d0d5..84d3f02319f 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -189,9 +189,9 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { reader <- plan$Run(final_node) FUN <- as_mapper(FUN) - # # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, - # # X$temp_columns, and X$group_by_vars - # # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE + # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, + # X$temp_columns, and X$group_by_vars + # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE batch <- reader$read_next_batch() res <- list() while (!is.null(batch)) { diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 47121e4408e..e672ac42e80 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -460,7 +460,8 @@ test_that("map_batches", { ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~ summarize(., min_int = min(int))), + map_batches(~ summarize(., min_int = min(int))) %>% + arrange(min_int), tibble(min_int = c(6L, 101L)) ) @@ -469,8 +470,10 @@ test_that("map_batches", { ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~ .$num_rows, .data.frame = FALSE), - list(5, 10) + map_batches(~ .$num_rows, .data.frame = FALSE) %>% + as.numeric() %>% + sort(), + c(5, 10) ) # $Take returns RecordBatch @@ -478,8 +481,9 @@ test_that("map_batches", { ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~ .$Take(0)), - tibble(int=c(6, 101), lgl=c(TRUE, TRUE)) + map_batches(~ .$Take(0)) %>% + arrange(int), + tibble(int = c(6, 101), lgl = c(TRUE, TRUE)) ) }) From 6811bdcc7f341fb9441e9282ba562a14ec61f680 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 9 Dec 2021 13:21:26 -0800 Subject: [PATCH 09/12] Document map_batches in vignette --- r/man/map_batches.Rd | 3 --- r/vignettes/dataset.Rmd | 58 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/r/man/map_batches.Rd b/r/man/map_batches.Rd index 08e7b86c057..f5921029863 100644 --- a/r/man/map_batches.Rd +++ b/r/man/map_batches.Rd @@ -25,6 +25,3 @@ This lets you aggregate on each chunk and pull the intermediate results into a \code{data.frame} for further aggregation, even if you couldn't fit the whole \code{Dataset} result in memory. } -\details{ -This is experimental and not recommended for production use. -} diff --git a/r/vignettes/dataset.Rmd b/r/vignettes/dataset.Rmd index a7e8b8050b4..ecb01818f15 100644 --- a/r/vignettes/dataset.Rmd +++ b/r/vignettes/dataset.Rmd @@ -290,6 +290,64 @@ rows match the filter. Relatedly, since Parquet files contain row groups with statistics on the data within, there may be entire chunks of data you can avoid scanning because they have no rows where `total_amount > 100`. +### Processing data in batches + +Sometimes you want to run R code on the entire dataset, but that dataset is much +larger than memory. You can use `map_batches` on a dataset query to process +it batch-by-batch. + +As an example, to randomly sample a dataset, use `map_batches` to sample a +percentage of rows from each batch: + +```{r, eval = file.exists("nyc-taxi")} +sampled_data <- ds %>% + filter(year == 2015) %>% + select(tip_amount, total_amount, passenger_count) %>% + map_batches(~ sample_frac(as.data.frame(.), 1e-4)) %>% + mutate(tip_pct = tip_amount / total_amount) + +str(sampled_data) +``` + +```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} +cat(" +'data.frame': 15603 obs. of 4 variables: + $ tip_amount : num 0 0 1.55 1.45 5.2 ... + $ total_amount : num 5.8 16.3 7.85 8.75 26 ... + $ passenger_count: int 1 1 1 1 1 6 5 1 2 1 ... + $ tip_pct : num 0 0 0.197 0.166 0.2 ... +") +``` + +This function can also be used to aggregate summary statistics over a dataset by +computing partial results for each batch and then aggregating those partial +results. Extending the example above, you could fit a model to the sample data +and then use `map_batches` to compute the MSE on the full dataset. + +```{r, eval = file.exists("nyc-taxi")} +model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled_data) + +ds %>% + filter(year == 2015) %>% + select(tip_amount, total_amount, passenger_count) %>% + mutate(tip_pct = tip_amount / total_amount) %>% + map_batches(function(batch) { + batch %>% + as.data.frame() %>% + mutate(pred_tip_pct = predict(model, newdata = .)) %>% + filter(!is.nan(tip_pct)) %>% + summarize(sse_partial = sum((pred_tip_pct - tip_pct)^2), n_partial = n()) + }) %>% + summarize(mse = sum(sse_partial) / sum(n_partial)) %>% + pull(mse) +``` + +```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} +cat(" +[1] 0.1304284 +") +``` + ## More dataset options There are a few ways you can control the Dataset creation to adapt to special use cases. From bc3e39b2b8054b686cbab626bffeeadf1a536833 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 21 Dec 2021 08:27:40 -0800 Subject: [PATCH 10/12] Efficiently allocate the list --- r/R/dataset-scan.R | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 84d3f02319f..61db56fcda6 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -193,12 +193,19 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { # X$temp_columns, and X$group_by_vars # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE batch <- reader$read_next_batch() - res <- list() + res <- vector("list", 1024) + i <- 0L while (!is.null(batch)) { - res <- append(res, list(FUN(batch, ...))) + i <- i + 1L + res[[i]] <- FUN(batch, ...) batch <- reader$read_next_batch() } + # Trim list back + if (i < length(res)) { + res <- res[seq_len(i)] + } + if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) { res <- dplyr::bind_rows(map(res, collect)) } else if (.data.frame) { From 7457c9d84f4024ca49eaf83534f1529ca029897c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 6 Jan 2022 10:08:32 -0800 Subject: [PATCH 11/12] PR feedback from Jon --- r/R/dataset-scan.R | 4 +++- r/man/map_batches.Rd | 3 +++ r/tests/testthat/test-dataset.R | 6 +++--- r/vignettes/dataset.Rmd | 2 ++ 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 61db56fcda6..05646b5d843 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -174,6 +174,8 @@ ScanTask <- R6Class("ScanTask", #' a `data.frame` for further aggregation, even if you couldn't fit the whole #' `Dataset` result in memory. #' +#' This is experimental and not recommended for production use. +#' #' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the #' `dplyr` methods on `Dataset`. #' @param FUN A function or `purrr`-style lambda expression to apply to each @@ -183,7 +185,7 @@ ScanTask <- R6Class("ScanTask", #' `data.frame`? Default `TRUE` #' @export map_batches <- function(X, FUN, ..., .data.frame = TRUE) { - # TODO: possibly refactor do_exec_plan to return a RecordBatchReader + # TODO(ARROW-15271): possibly refactor do_exec_plan to return a RecordBatchReader plan <- ExecPlan$create() final_node <- plan$Build(X) reader <- plan$Run(final_node) diff --git a/r/man/map_batches.Rd b/r/man/map_batches.Rd index f5921029863..08e7b86c057 100644 --- a/r/man/map_batches.Rd +++ b/r/man/map_batches.Rd @@ -25,3 +25,6 @@ This lets you aggregate on each chunk and pull the intermediate results into a \code{data.frame} for further aggregation, even if you couldn't fit the whole \code{Dataset} result in memory. } +\details{ +This is experimental and not recommended for production use. +} diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index e672ac42e80..58e7458098e 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -455,7 +455,7 @@ test_that("Creating UnionDataset", { test_that("map_batches", { ds <- open_dataset(dataset_dir, partitioning = "part") - # summarize returns arrow_dplyr_query + # summarize returns arrow_dplyr_query, which gets collected into a tibble expect_equal( ds %>% filter(int > 5) %>% @@ -471,12 +471,12 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ .$num_rows, .data.frame = FALSE) %>% - as.numeric() %>% + unlist() %>% # Returns list because .data.frame is FALSE sort(), c(5, 10) ) - # $Take returns RecordBatch + # $Take returns RecordBatch, which gets binded into a tibble expect_equal( ds %>% filter(int > 5) %>% diff --git a/r/vignettes/dataset.Rmd b/r/vignettes/dataset.Rmd index ecb01818f15..f09185589e1 100644 --- a/r/vignettes/dataset.Rmd +++ b/r/vignettes/dataset.Rmd @@ -296,6 +296,8 @@ Sometimes you want to run R code on the entire dataset, but that dataset is much larger than memory. You can use `map_batches` on a dataset query to process it batch-by-batch. +**Note**: `map_batches` is experimental and not recommended for production use. + As an example, to randomly sample a dataset, use `map_batches` to sample a percentage of rows from each batch: From fcb5515d417e594d6b3664a080e6b59f9194cfc3 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 6 Jan 2022 15:33:21 -0800 Subject: [PATCH 12/12] Update r/R/dataset-scan.R Co-authored-by: Jonathan Keane --- r/R/dataset-scan.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 05646b5d843..21a5056f7e1 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -185,7 +185,7 @@ ScanTask <- R6Class("ScanTask", #' `data.frame`? Default `TRUE` #' @export map_batches <- function(X, FUN, ..., .data.frame = TRUE) { - # TODO(ARROW-15271): possibly refactor do_exec_plan to return a RecordBatchReader + # TODO: ARROW-15271 possibly refactor do_exec_plan to return a RecordBatchReader plan <- ExecPlan$create() final_node <- plan$Build(X) reader <- plan$Run(final_node)