From d188b77265305869d2d10b0541c87c7de2b551d6 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 19 Jul 2022 10:54:22 -0300 Subject: [PATCH 1/7] add record batch reader from function --- r/NAMESPACE | 1 + r/R/arrowExports.R | 4 ++ r/R/record-batch-reader.R | 9 +++++ r/man/as_record_batch_reader.Rd | 6 +++ r/src/arrowExports.cpp | 10 +++++ r/src/recordbatchreader.cpp | 45 +++++++++++++++++++++ r/tests/testthat/test-record-batch-reader.R | 33 +++++++++++++++ 7 files changed, 108 insertions(+) diff --git a/r/NAMESPACE b/r/NAMESPACE index 0a120dc97a6..f8f174579a2 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -67,6 +67,7 @@ S3method(as_record_batch,arrow_dplyr_query) S3method(as_record_batch,data.frame) S3method(as_record_batch,pyarrow.lib.RecordBatch) S3method(as_record_batch,pyarrow.lib.Table) +S3method(as_record_batch_reader,"function") S3method(as_record_batch_reader,Dataset) S3method(as_record_batch_reader,RecordBatch) S3method(as_record_batch_reader,RecordBatchReader) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index dfe0db614ad..711a0abf2ac 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1736,6 +1736,10 @@ RecordBatchReader__from_batches <- function(batches, schema_sxp) { .Call(`_arrow_RecordBatchReader__from_batches`, batches, schema_sxp) } +RecordBatchReader__from_function <- function(fun_sexp, schema) { + .Call(`_arrow_RecordBatchReader__from_function`, fun_sexp, schema) +} + RecordBatchReader__from_Table <- function(table) { .Call(`_arrow_RecordBatchReader__from_Table`, table) } diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R index 8f6a600dfb1..3a985d8abce 100644 --- a/r/R/record-batch-reader.R +++ b/r/R/record-batch-reader.R @@ -191,6 +191,8 @@ RecordBatchFileReader$create <- function(file) { #' Convert an object to an Arrow RecordBatchReader #' #' @param x An object to convert to a [RecordBatchReader] +#' @param schema The [schema()] that must match the schema returned by each +#' call to `x` when `x` is a function. #' @param ... Passed to S3 methods #' #' @return A [RecordBatchReader] @@ -234,6 +236,13 @@ as_record_batch_reader.Dataset <- function(x, ...) { Scanner$create(x)$ToRecordBatchReader() } +#' @rdname as_record_batch_reader +#' @export +as_record_batch_reader.function <- function(x, ..., schema) { + assert_that(inherits(schema, "Schema")) + RecordBatchReader__from_function(x, schema) +} + #' @rdname as_record_batch_reader #' @export as_record_batch_reader.arrow_dplyr_query <- function(x, ...) { diff --git a/r/man/as_record_batch_reader.Rd b/r/man/as_record_batch_reader.Rd index e635c0b98bd..2ed54354760 100644 --- a/r/man/as_record_batch_reader.Rd +++ b/r/man/as_record_batch_reader.Rd @@ -7,6 +7,7 @@ \alias{as_record_batch_reader.RecordBatch} \alias{as_record_batch_reader.data.frame} \alias{as_record_batch_reader.Dataset} +\alias{as_record_batch_reader.function} \alias{as_record_batch_reader.arrow_dplyr_query} \alias{as_record_batch_reader.Scanner} \title{Convert an object to an Arrow RecordBatchReader} @@ -23,6 +24,8 @@ as_record_batch_reader(x, ...) \method{as_record_batch_reader}{Dataset}(x, ...) +\method{as_record_batch_reader}{`function`}(x, ..., schema) + \method{as_record_batch_reader}{arrow_dplyr_query}(x, ...) \method{as_record_batch_reader}{Scanner}(x, ...) @@ -31,6 +34,9 @@ as_record_batch_reader(x, ...) \item{x}{An object to convert to a \link{RecordBatchReader}} \item{...}{Passed to S3 methods} + +\item{schema}{The \code{\link[=schema]{schema()}} that must match the schema returned by each +call to \code{x} when \code{x} is a function.} } \value{ A \link{RecordBatchReader} diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index fd9f92e5d1a..a5a75fc983c 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -4480,6 +4480,15 @@ BEGIN_CPP11 END_CPP11 } // recordbatchreader.cpp +std::shared_ptr RecordBatchReader__from_function(cpp11::sexp fun_sexp, const std::shared_ptr& schema); +extern "C" SEXP _arrow_RecordBatchReader__from_function(SEXP fun_sexp_sexp, SEXP schema_sexp){ +BEGIN_CPP11 + arrow::r::Input::type fun_sexp(fun_sexp_sexp); + arrow::r::Input&>::type schema(schema_sexp); + return cpp11::as_sexp(RecordBatchReader__from_function(fun_sexp, schema)); +END_CPP11 +} +// recordbatchreader.cpp std::shared_ptr RecordBatchReader__from_Table(const std::shared_ptr& table); extern "C" SEXP _arrow_RecordBatchReader__from_Table(SEXP table_sexp){ BEGIN_CPP11 @@ -5599,6 +5608,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_RecordBatchReader__ReadNext", (DL_FUNC) &_arrow_RecordBatchReader__ReadNext, 1}, { "_arrow_RecordBatchReader__batches", (DL_FUNC) &_arrow_RecordBatchReader__batches, 1}, { "_arrow_RecordBatchReader__from_batches", (DL_FUNC) &_arrow_RecordBatchReader__from_batches, 2}, + { "_arrow_RecordBatchReader__from_function", (DL_FUNC) &_arrow_RecordBatchReader__from_function, 2}, { "_arrow_RecordBatchReader__from_Table", (DL_FUNC) &_arrow_RecordBatchReader__from_Table, 1}, { "_arrow_Table__from_RecordBatchReader", (DL_FUNC) &_arrow_Table__from_RecordBatchReader, 1}, { "_arrow_RecordBatchReader__Head", (DL_FUNC) &_arrow_RecordBatchReader__Head, 2}, diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index fb173825f3b..c571d282da1 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -16,6 +16,7 @@ // under the License. #include "./arrow_types.h" +#include "./safe-call-into-r.h" #include #include @@ -54,6 +55,50 @@ std::shared_ptr RecordBatchReader__from_batches( } } +class RFunctionRecordBatchReader : public arrow::RecordBatchReader { + public: + RFunctionRecordBatchReader(cpp11::sexp fun, + const std::shared_ptr& schema) + : fun_(fun), schema_(schema) {} + + std::shared_ptr schema() const { return schema_; } + + arrow::Status ReadNext(std::shared_ptr* batch_out) { + auto batch = SafeCallIntoR>([&]() { + cpp11::sexp result_sexp = fun_(); + if (result_sexp == R_NilValue) { + return std::shared_ptr(nullptr); + } else if (!Rf_inherits(result_sexp, "RecordBatch")) { + cpp11::stop("Expected fun() to return an arrow::RecordBatch"); + } + + return cpp11::as_cpp>(result_sexp); + }); + + RETURN_NOT_OK(batch); + + if (batch.ValueUnsafe().get() != nullptr && + !batch.ValueUnsafe()->schema()->Equals(schema_)) { + return arrow::Status::Invalid("Expected fun() to return batch with schema '", + schema_->ToString(), "' but got batch with schema '", + batch.ValueUnsafe()->schema()->ToString(), "'"); + } + + *batch_out = batch.ValueUnsafe(); + return arrow::Status::OK(); + } + + private: + cpp11::function fun_; + std::shared_ptr schema_; +}; + +// [[arrow::export]] +std::shared_ptr RecordBatchReader__from_function( + cpp11::sexp fun_sexp, const std::shared_ptr& schema) { + return std::make_shared(fun_sexp, schema); +} + // [[arrow::export]] std::shared_ptr RecordBatchReader__from_Table( const std::shared_ptr& table) { diff --git a/r/tests/testthat/test-record-batch-reader.R b/r/tests/testthat/test-record-batch-reader.R index 597187da459..3cd856de667 100644 --- a/r/tests/testthat/test-record-batch-reader.R +++ b/r/tests/testthat/test-record-batch-reader.R @@ -236,3 +236,36 @@ test_that("as_record_batch_reader() works for data.frame", { reader <- as_record_batch_reader(df) expect_equal(reader$read_next_batch(), record_batch(a = 1, b = "two")) }) + +test_that("as_record_batch_reader() works for function", { + batches <- list( + record_batch(a = 1, b = "two"), + record_batch(a = 2, b = "three") + ) + + i <- 0 + fun <- function() { + i <<- i + 1 + if (i > length(batches)) NULL else batches[[i]] + } + + reader <- as_record_batch_reader(fun, schema = batches[[1]]$schema) + expect_equal(reader$read_next_batch(), batches[[1]]) + expect_equal(reader$read_next_batch(), batches[[2]]) + expect_null(reader$read_next_batch()) + + # check invalid returns + fun_bad_type <- function() "not a record batch" + reader <- as_record_batch_reader(fun_bad_type, schema = schema()) + expect_error( + reader$read_next_batch(), + "Expected fun\\(\\) to return an arrow::RecordBatch" + ) + + fun_bad_schema <- function() record_batch(a = 1) + reader <- as_record_batch_reader(fun_bad_schema, schema = schema(a = string())) + expect_error( + reader$read_next_batch(), + "Expected fun\\(\\) to return batch with schema 'a: string'" + ) +}) From b9d64b72e7b312c6fa3da4af8dd93963a5705e30 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 19 Jul 2022 11:27:18 -0300 Subject: [PATCH 2/7] make map_batches lazy --- r/R/dataset-scan.R | 57 ++++++++++++++++++++++++--------- r/man/map_batches.Rd | 5 ++- r/tests/testthat/test-dataset.R | 5 +++ 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index cca92b676fe..2d451e6faac 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -183,11 +183,13 @@ tail_from_batches <- function(batches, n) { #' @param FUN A function or `purrr`-style lambda expression to apply to each #' batch. It must return a RecordBatch or something coercible to one via #' `as_record_batch()'. +#' @param .schema An optional [schema()]. If NULL, the schema will be inferred +#' from the first batch. #' @param ... Additional arguments passed to `FUN` #' @param .data.frame Deprecated argument, ignored #' @return An `arrow_dplyr_query`. #' @export -map_batches <- function(X, FUN, ..., .data.frame = NULL) { +map_batches <- function(X, FUN, ..., .schema = NULL, .data.frame = NULL) { if (!is.null(.data.frame)) { warning( "The .data.frame argument is deprecated. ", @@ -197,25 +199,50 @@ map_batches <- function(X, FUN, ..., .data.frame = NULL) { } FUN <- as_mapper(FUN) reader <- as_record_batch_reader(X) + dots <- rlang::list2(...) - # TODO: for future consideration - # * Move eval to C++ and make it a generator so it can stream, not block - # * Accept an output schema argument: with that, we could make this lazy (via collapse) - batch <- reader$read_next_batch() - res <- vector("list", 1024) - i <- 0L - while (!is.null(batch)) { - i <- i + 1L - res[[i]] <- as_record_batch(FUN(batch, ...)) + # If no schema is supplied, we have to evaluate the first batch here + if (is.null(.schema)) { batch <- reader$read_next_batch() - } + if (is.null(batch)) { + abort("Can't infer schema from a RecordBatchReader with zero batches") + } + + first_result <- as_record_batch(do.call(FUN, c(list(batch, dots)))) + .schema <- first_result$schema + fun <- function() { + if (!is.null(first_result)) { + result <- first_result + first_result <<- NULL + result + } else { + batch <- reader$read_next_batch() + if (is.null(batch)) { + NULL + } else { + as_record_batch( + do.call(FUN, c(list(batch, dots))), + schema = .schema + ) + } + } + } + } else { + # otherwise, we can + fun <- function() { + batch <- reader$read_next_batch() + if (is.null(batch)) { + return(NULL) + } - # Trim list back - if (i < length(res)) { - res <- res[seq_len(i)] + as_record_batch( + do.call(FUN, c(list(batch, dots))), + schema = .schema + ) + } } - RecordBatchReader$create(batches = res) + as_record_batch_reader(fun, schema = .schema) } #' @usage NULL diff --git a/r/man/map_batches.Rd b/r/man/map_batches.Rd index eaeab6013a6..1666ccb12e7 100644 --- a/r/man/map_batches.Rd +++ b/r/man/map_batches.Rd @@ -4,7 +4,7 @@ \alias{map_batches} \title{Apply a function to a stream of RecordBatches} \usage{ -map_batches(X, FUN, ..., .data.frame = NULL) +map_batches(X, FUN, ..., .schema = NULL, .data.frame = NULL) } \arguments{ \item{X}{A \code{Dataset} or \code{arrow_dplyr_query} object, as returned by the @@ -16,6 +16,9 @@ batch. It must return a RecordBatch or something coercible to one via \item{...}{Additional arguments passed to \code{FUN}} +\item{.schema}{An optional \code{\link[=schema]{schema()}}. If NULL, the schema will be inferred +from the first batch.} + \item{.data.frame}{Deprecated argument, ignored} } \value{ diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 5c826d6dffc..e8d165970fa 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -638,6 +638,7 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ summarize(., min_int = min(int))) %>% + (function(x) x$read_table()) %>% arrange(min_int) %>% collect(), tibble(min_int = c(6L, 101L)) @@ -649,6 +650,7 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ record_batch(nrows = .$num_rows)) %>% + (function(x) x$read_table()) %>% pull(nrows) %>% sort(), c(5, 10) @@ -658,6 +660,7 @@ test_that("map_batches", { expect_equal( ds %>% map_batches(~ count(., part)) %>% + (function(x) x$read_table()) %>% arrange(part) %>% collect(), tibble(part = c(1, 2), n = c(10, 10)) @@ -669,6 +672,7 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ .$Take(0)) %>% + (function(x) x$read_table()) %>% arrange(int) %>% collect(), tibble(int = c(6, 101), lgl = c(TRUE, TRUE)) @@ -683,6 +687,7 @@ test_that("map_batches", { # as_mapper() can't handle %>%? ~ mutate(as.data.frame(.), lets = letters[int]) ) %>% + (function(x) x$read_table()) %>% arrange(int) %>% collect(), tibble(int = 1:4, lets = letters[1:4]) From 8b4cf6de6bf3429803e161134951fdb33bf001c3 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 19 Jul 2022 12:22:03 -0300 Subject: [PATCH 3/7] fix more map_batches tests --- r/tests/testthat/test-dataset-write.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index 2f4ff7e649e..eb74603fb4d 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -703,6 +703,7 @@ test_that("Dataset min_rows_per_group", { row_group_sizes <- ds %>% map_batches(~ record_batch(nrows = .$num_rows)) %>% + (function(x) x$read_table()) %>% pull(nrows) index <- 1 @@ -741,6 +742,7 @@ test_that("Dataset write max rows per group", { ds <- open_dataset(file_path) row_group_sizes <- ds %>% map_batches(~ record_batch(nrows = .$num_rows)) %>% + (function(x) x$read_table()) %>% pull(nrows) %>% sort() From 430193bc0049c8fcb02d1b959257b2b072e7835d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 19 Jul 2022 15:08:10 -0300 Subject: [PATCH 4/7] actually test dots --- r/R/dataset-scan.R | 7 ++- r/tests/testthat/test-dataset.R | 86 +++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 2d451e6faac..1f498a49319 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -208,7 +208,7 @@ map_batches <- function(X, FUN, ..., .schema = NULL, .data.frame = NULL) { abort("Can't infer schema from a RecordBatchReader with zero batches") } - first_result <- as_record_batch(do.call(FUN, c(list(batch, dots)))) + first_result <- as_record_batch(do.call(FUN, c(list(batch), dots))) .schema <- first_result$schema fun <- function() { if (!is.null(first_result)) { @@ -221,14 +221,13 @@ map_batches <- function(X, FUN, ..., .schema = NULL, .data.frame = NULL) { NULL } else { as_record_batch( - do.call(FUN, c(list(batch, dots))), + do.call(FUN, c(list(batch), dots)), schema = .schema ) } } } } else { - # otherwise, we can fun <- function() { batch <- reader$read_next_batch() if (is.null(batch)) { @@ -236,7 +235,7 @@ map_batches <- function(X, FUN, ..., .schema = NULL, .data.frame = NULL) { } as_record_batch( - do.call(FUN, c(list(batch, dots))), + do.call(FUN, c(list(batch), dots)), schema = .schema ) } diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index e8d165970fa..706a2dc7f57 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -694,6 +694,92 @@ test_that("map_batches", { ) }) +test_that("map_batches with explicit schema", { + fun_with_dots <- function(batch, first_col, first_col_val) { + record_batch( + !! first_col := first_col_val, + b = batch$a$cast(float64()) + ) + } + + empty_reader <- RecordBatchReader$create( + batches = list(), + schema = schema(a = int32()) + ) + expect_equal( + map_batches( + empty_reader, + fun_with_dots, + "first_col_name", + "first_col_value", + .schema = schema(first_col_name = string(), b = float64()) + )$read_table(), + arrow_table(first_col_name = character(), b = double()) + ) + + reader <- RecordBatchReader$create( + batches = list( + record_batch(a = 1, b = "two"), + record_batch(a = 2, b = "three") + ) + ) + expect_equal( + map_batches( + reader, + fun_with_dots, + "first_col_name", + "first_col_value", + .schema = schema(first_col_name = string(), b = float64()) + )$read_table(), + arrow_table( + first_col_name = c("first_col_value", "first_col_value"), + b = as.numeric(1:2) + ) + ) +}) + +test_that("map_batches without explicit schema", { + fun_with_dots <- function(batch, first_col, first_col_val) { + record_batch( + !! first_col := first_col_val, + b = batch$a$cast(float64()) + ) + } + + empty_reader <- RecordBatchReader$create( + batches = list(), + schema = schema(a = int32()) + ) + expect_error( + map_batches( + empty_reader, + fun_with_dots, + "first_col_name", + "first_col_value" + )$read_table(), + "Can't infer schema" + ) + + reader <- RecordBatchReader$create( + batches = list( + record_batch(a = 1, b = "two"), + record_batch(a = 2, b = "three") + ) + ) + expect_equal( + map_batches( + reader, + fun_with_dots, + "first_col_name", + "first_col_value" + )$read_table(), + arrow_table( + first_col_name = c("first_col_value", "first_col_value"), + b = as.numeric(1:2) + ) + ) +}) + test_that("head/tail", { # head/tail with no query are still deterministic order ds <- open_dataset(dataset_dir) From b6e4321c6c952b5ab5bf6fded54ba351bd183126 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 22 Jul 2022 14:26:24 -0300 Subject: [PATCH 5/7] remove kludge workarounds after map_batches --- r/src/safe-call-into-r-impl.cpp | 2 +- r/src/safe-call-into-r.h | 4 +++- r/tests/testthat/test-dataset-write.R | 2 -- r/tests/testthat/test-dataset.R | 5 ----- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index 7318c81bb55..4eec3a85df8 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -38,7 +38,7 @@ bool CanRunWithCapturedR() { on_old_windows = on_old_windows_fun(); } - return !on_old_windows; + return !on_old_windows && GetMainRThread().Executor() == nullptr; #else return false; #endif diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 937163a05df..08e8a8c11b6 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -31,7 +31,9 @@ // and crash R in older versions (ARROW-16201). Crashes also occur // on 32-bit R builds on R 3.6 and lower. Implementation provided // in safe-call-into-r-impl.cpp so that we can skip some tests -// when this feature is not provided. +// when this feature is not provided. This also checks that there +// is not already an event loop registered (via MainRThread::Executor()), +// because only one of these can exist at any given time. bool CanRunWithCapturedR(); // The MainRThread class keeps track of the thread on which it is safe diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index eb74603fb4d..2f4ff7e649e 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -703,7 +703,6 @@ test_that("Dataset min_rows_per_group", { row_group_sizes <- ds %>% map_batches(~ record_batch(nrows = .$num_rows)) %>% - (function(x) x$read_table()) %>% pull(nrows) index <- 1 @@ -742,7 +741,6 @@ test_that("Dataset write max rows per group", { ds <- open_dataset(file_path) row_group_sizes <- ds %>% map_batches(~ record_batch(nrows = .$num_rows)) %>% - (function(x) x$read_table()) %>% pull(nrows) %>% sort() diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 706a2dc7f57..3bcdd8bcde4 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -638,7 +638,6 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ summarize(., min_int = min(int))) %>% - (function(x) x$read_table()) %>% arrange(min_int) %>% collect(), tibble(min_int = c(6L, 101L)) @@ -650,7 +649,6 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ record_batch(nrows = .$num_rows)) %>% - (function(x) x$read_table()) %>% pull(nrows) %>% sort(), c(5, 10) @@ -660,7 +658,6 @@ test_that("map_batches", { expect_equal( ds %>% map_batches(~ count(., part)) %>% - (function(x) x$read_table()) %>% arrange(part) %>% collect(), tibble(part = c(1, 2), n = c(10, 10)) @@ -672,7 +669,6 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ .$Take(0)) %>% - (function(x) x$read_table()) %>% arrange(int) %>% collect(), tibble(int = c(6, 101), lgl = c(TRUE, TRUE)) @@ -687,7 +683,6 @@ test_that("map_batches", { # as_mapper() can't handle %>%? ~ mutate(as.data.frame(.), lets = letters[int]) ) %>% - (function(x) x$read_table()) %>% arrange(int) %>% collect(), tibble(int = 1:4, lets = letters[1:4]) From aa3ce4e06de714e3ab4c058824ac65d07958192e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 22 Jul 2022 16:36:59 -0300 Subject: [PATCH 6/7] safer default streaming behaviour --- r/R/dataset-scan.R | 18 ++++++++++++++++-- r/man/map_batches.Rd | 6 +++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 1f498a49319..53fe7078c23 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -185,11 +185,14 @@ tail_from_batches <- function(batches, n) { #' `as_record_batch()'. #' @param .schema An optional [schema()]. If NULL, the schema will be inferred #' from the first batch. +#' @param .lazy Use `TRUE` to evaluate `FUN` lazily as batches are read from +#' the result; use `FALSE` to evaluate `FUN` on all batches before returning +#' the reader. #' @param ... Additional arguments passed to `FUN` #' @param .data.frame Deprecated argument, ignored #' @return An `arrow_dplyr_query`. #' @export -map_batches <- function(X, FUN, ..., .schema = NULL, .data.frame = NULL) { +map_batches <- function(X, FUN, ..., .schema = NULL, .lazy = FALSE, .data.frame = NULL) { if (!is.null(.data.frame)) { warning( "The .data.frame argument is deprecated. ", @@ -241,7 +244,18 @@ map_batches <- function(X, FUN, ..., .schema = NULL, .data.frame = NULL) { } } - as_record_batch_reader(fun, schema = .schema) + reader_out <- as_record_batch_reader(fun, schema = .schema) + + # TODO(ARROW-17178) because there are some restrictions on evaluating + # reader_out in some ExecPlans, the default .lazy is FALSE for now. + if (!.lazy) { + reader_out <- RecordBatchReader$create( + batches = reader_out$batches(), + schema = .schema + ) + } + + reader_out } #' @usage NULL diff --git a/r/man/map_batches.Rd b/r/man/map_batches.Rd index 1666ccb12e7..0e4d48e024d 100644 --- a/r/man/map_batches.Rd +++ b/r/man/map_batches.Rd @@ -4,7 +4,7 @@ \alias{map_batches} \title{Apply a function to a stream of RecordBatches} \usage{ -map_batches(X, FUN, ..., .schema = NULL, .data.frame = NULL) +map_batches(X, FUN, ..., .schema = NULL, .lazy = FALSE, .data.frame = NULL) } \arguments{ \item{X}{A \code{Dataset} or \code{arrow_dplyr_query} object, as returned by the @@ -19,6 +19,10 @@ batch. It must return a RecordBatch or something coercible to one via \item{.schema}{An optional \code{\link[=schema]{schema()}}. If NULL, the schema will be inferred from the first batch.} +\item{.lazy}{Use \code{TRUE} to evaluate \code{FUN} lazily as batches are read from +the result; use \code{FALSE} to evaluate \code{FUN} on all batches before returning +the reader.} + \item{.data.frame}{Deprecated argument, ignored} } \value{ From bca52f873f3b3a4b8cc8dc74478a5c0e1eb520ea Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sat, 23 Jul 2022 07:39:23 -0300 Subject: [PATCH 7/7] maybe fix R 3.6 failures --- r/tests/testthat/test-dataset-dplyr.R | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/r/tests/testthat/test-dataset-dplyr.R b/r/tests/testthat/test-dataset-dplyr.R index fecda56c6c2..229c3e7c603 100644 --- a/r/tests/testthat/test-dataset-dplyr.R +++ b/r/tests/testthat/test-dataset-dplyr.R @@ -70,6 +70,8 @@ test_that("filter() with %in%", { }) test_that("filter() on timestamp columns", { + skip_if_not_available("re2") + ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8())) expect_equal( ds %>% @@ -116,6 +118,8 @@ test_that("filter() on date32 columns", { 1L ) + skip_if_not_available("re2") + # Also with timestamp scalar expect_equal( open_dataset(tmp) %>%