From 73e68de6ec6ddd5b463e7d917492d83160781d16 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 1 Aug 2022 22:12:26 -0300 Subject: [PATCH 1/8] don't collect with an event loop unless requested --- r/R/table.R | 9 ++++++- r/tests/testthat/test-compute.R | 44 ++++++++++++++++++--------------- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/r/R/table.R b/r/R/table.R index 5579c676d51..d7e276415c5 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -331,5 +331,12 @@ as_arrow_table.arrow_dplyr_query <- function(x, ...) { # See query-engine.R for ExecPlan/Nodes plan <- ExecPlan$create() final_node <- plan$Build(x) - plan$Run(final_node, as_table = TRUE) + + run_with_event_loop <- identical( + Sys.getenv("R_ARROW_COLLECT_WITH_UDF", ""), + "true" + ) + + result <- plan$Run(final_node, as_table = run_with_event_loop) + as_arrow_table(result) } diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 9e487169f4b..62d00b95f4a 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -103,12 +103,14 @@ test_that("register_scalar_function() adds a compute function to the registry", Scalar$create(32L, float64()) ) - expect_identical( - record_batch(a = 1L) %>% - dplyr::mutate(b = times_32(a)) %>% - dplyr::collect(), - tibble::tibble(a = 1L, b = 32.0) - ) + withr::with_envvar(list(R_ARROW_COLLECT_WITH_UDF = "true"), { + expect_identical( + record_batch(a = 1L) %>% + dplyr::mutate(b = times_32(a)) %>% + dplyr::collect(), + tibble::tibble(a = 1L, b = 32.0) + ) + }) }) test_that("arrow_scalar_function() with bad return type errors", { @@ -237,24 +239,26 @@ test_that("user-defined functions work during multi-threaded execution", { ) on.exit(unregister_binding("times_32", update_cache = TRUE)) - # check a regular collect() - result <- open_dataset(tf_dataset) %>% - dplyr::mutate(fun_result = times_32(value)) %>% - dplyr::collect() %>% - dplyr::arrange(row_num) + withr::with_envvar(list(R_ARROW_COLLECT_WITH_UDF = "true"), { + # check a regular collect() + result <- open_dataset(tf_dataset) %>% + dplyr::mutate(fun_result = times_32(value)) %>% + dplyr::collect() %>% + dplyr::arrange(row_num) - expect_identical(result$fun_result, example_df$value * 32) + expect_identical(result$fun_result, example_df$value * 32) - # check a write_dataset() - open_dataset(tf_dataset) %>% - dplyr::mutate(fun_result = times_32(value)) %>% - write_dataset(tf_dest) + # check a write_dataset() + open_dataset(tf_dataset) %>% + dplyr::mutate(fun_result = times_32(value)) %>% + write_dataset(tf_dest) - result2 <- dplyr::collect(open_dataset(tf_dest)) %>% - dplyr::arrange(row_num) %>% - dplyr::collect() + result2 <- dplyr::collect(open_dataset(tf_dest)) %>% + dplyr::arrange(row_num) %>% + dplyr::collect() - expect_identical(result2$fun_result, example_df$value * 32) + expect_identical(result2$fun_result, example_df$value * 32) + }) }) test_that("user-defined error when called from an unsupported context", { From 3647aa3d6474604a7ed615621f9503f634e92efc Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 1 Aug 2022 22:34:18 -0300 Subject: [PATCH 2/8] modify example that needs special handling, skip a test on linux devel --- r/R/compute.R | 7 ++++++- r/man/register_scalar_function.Rd | 7 ++++++- r/tests/testthat/test-compute.R | 2 ++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/r/R/compute.R b/r/R/compute.R index 0985e73a5f2..cc9434918f2 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) { #' @return `NULL`, invisibly #' @export #' -#' @examplesIf arrow_with_dataset() +#' @examplesIf arrow_with_dataset() && identical(Sys.getenv("R_ARROW_COLLECT_WITH_UDF"), "true") #' library(dplyr, warn.conflicts = FALSE) #' #' some_model <- lm(mpg ~ disp + cyl, data = mtcars) @@ -358,10 +358,15 @@ cast_options <- function(safe = TRUE, ...) { #' auto_convert = TRUE #' ) #' +#' # User-defined functions require some special handling +#' # in the query engine which currently require an opt-in using +#' # the R_ARROW_COLLECT_WITH_UDF environment variable. +#' Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true") #' as_arrow_table(mtcars) %>% #' transmute(mpg, mpg_predicted = mtcars_predict_mpg(disp, cyl)) %>% #' collect() %>% #' head() +#' Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") #' register_scalar_function <- function(name, fun, in_type, out_type, auto_convert = FALSE) { diff --git a/r/man/register_scalar_function.Rd b/r/man/register_scalar_function.Rd index 4da8f54f645..63ac43c8e50 100644 --- a/r/man/register_scalar_function.Rd +++ b/r/man/register_scalar_function.Rd @@ -48,7 +48,7 @@ stateless and return output with the same shape (i.e., the same number of rows) as the input. } \examples{ -\dontshow{if (arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +\dontshow{if (arrow_with_dataset() && identical(Sys.getenv("R_ARROW_COLLECT_WITH_UDF"), "true")) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} library(dplyr, warn.conflicts = FALSE) some_model <- lm(mpg ~ disp + cyl, data = mtcars) @@ -62,9 +62,14 @@ register_scalar_function( auto_convert = TRUE ) +# User-defined functions require some special handling +# in the query engine which currently require an opt-in using +# the R_ARROW_COLLECT_WITH_UDF environment variable. +Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true") as_arrow_table(mtcars) \%>\% transmute(mpg, mpg_predicted = mtcars_predict_mpg(disp, cyl)) \%>\% collect() \%>\% head() +Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") \dontshow{\}) # examplesIf} } diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 62d00b95f4a..45284a895f5 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -81,6 +81,8 @@ test_that("arrow_scalar_function() works with auto_convert = TRUE", { test_that("register_scalar_function() adds a compute function to the registry", { skip_if_not(CanRunWithCapturedR()) + # until R_ARROW_COLLECT_WITH_UDF is no longer needed to slience valgrind + skip_on_linux_devel() register_scalar_function( "times_32", From 46d7ef3104adec13afd15eda2c7557c909e27eb6 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Aug 2022 09:50:58 -0300 Subject: [PATCH 3/8] auto opt-in to user-defined function-friendly collect --- r/R/compute.R | 12 +++++++----- r/man/register_scalar_function.Rd | 5 ----- r/tests/testthat/test-compute.R | 3 ++- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/r/R/compute.R b/r/R/compute.R index cc9434918f2..455086f2834 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -358,15 +358,10 @@ cast_options <- function(safe = TRUE, ...) { #' auto_convert = TRUE #' ) #' -#' # User-defined functions require some special handling -#' # in the query engine which currently require an opt-in using -#' # the R_ARROW_COLLECT_WITH_UDF environment variable. -#' Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true") #' as_arrow_table(mtcars) %>% #' transmute(mpg, mpg_predicted = mtcars_predict_mpg(disp, cyl)) %>% #' collect() %>% #' head() -#' Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") #' register_scalar_function <- function(name, fun, in_type, out_type, auto_convert = FALSE) { @@ -390,6 +385,13 @@ register_scalar_function <- function(name, fun, in_type, out_type, update_cache = TRUE ) + # User-defined functions require some special handling + # in the query engine which currently require an opt-in using + # the R_ARROW_COLLECT_WITH_UDF environment variable while this + # behaviour is stabilized. + # TODO(ARROW-17178) remove the need for this! + Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true") + invisible(NULL) } diff --git a/r/man/register_scalar_function.Rd b/r/man/register_scalar_function.Rd index 63ac43c8e50..5af80daf6dc 100644 --- a/r/man/register_scalar_function.Rd +++ b/r/man/register_scalar_function.Rd @@ -62,14 +62,9 @@ register_scalar_function( auto_convert = TRUE ) -# User-defined functions require some special handling -# in the query engine which currently require an opt-in using -# the R_ARROW_COLLECT_WITH_UDF environment variable. -Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true") as_arrow_table(mtcars) \%>\% transmute(mpg, mpg_predicted = mtcars_predict_mpg(disp, cyl)) \%>\% collect() \%>\% head() -Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") \dontshow{\}) # examplesIf} } diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 45284a895f5..2ba4e312ab3 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -81,7 +81,7 @@ test_that("arrow_scalar_function() works with auto_convert = TRUE", { test_that("register_scalar_function() adds a compute function to the registry", { skip_if_not(CanRunWithCapturedR()) - # until R_ARROW_COLLECT_WITH_UDF is no longer needed to slience valgrind + # TODO(ARROW-17178): remove when user-defined function execution is stabilized skip_on_linux_devel() register_scalar_function( @@ -213,6 +213,7 @@ test_that("user-defined functions work during multi-threaded execution", { skip_if_not(CanRunWithCapturedR()) skip_if_not_available("dataset") # Snappy has a UBSan issue: https://github.com/google/snappy/pull/148 + # TODO(ARROW-17178): remove when user-defined function execution is stabilized skip_on_linux_devel() n_rows <- 10000 From f9f9e85f49e778ac5c81188ee6d3900d416a9678 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Aug 2022 11:07:34 -0300 Subject: [PATCH 4/8] better documentation of test skipping --- r/tests/testthat/test-compute.R | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 2ba4e312ab3..3141e6fd4fe 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -81,7 +81,8 @@ test_that("arrow_scalar_function() works with auto_convert = TRUE", { test_that("register_scalar_function() adds a compute function to the registry", { skip_if_not(CanRunWithCapturedR()) - # TODO(ARROW-17178): remove when user-defined function execution is stabilized + # TODO(ARROW-17178): User-defined function-friendly ExecPlan execution has + # occasional valgrind errors skip_on_linux_devel() register_scalar_function( @@ -212,8 +213,10 @@ test_that("register_user_defined_function() errors for unsupported specification test_that("user-defined functions work during multi-threaded execution", { skip_if_not(CanRunWithCapturedR()) skip_if_not_available("dataset") - # Snappy has a UBSan issue: https://github.com/google/snappy/pull/148 - # TODO(ARROW-17178): remove when user-defined function execution is stabilized + # Skip on linux devel because: + # TODO(ARROW-17283): Snappy has a UBSan issue that is fixed in the dev version + # TODO(ARROW-17178): User-defined function-friendly ExecPlan execution has + # occasional valgrind errors skip_on_linux_devel() n_rows <- 10000 From 10a981510762473f481ac2b71e7ce5001bb0da22 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Aug 2022 14:40:16 -0300 Subject: [PATCH 5/8] Update r/R/compute.R Co-authored-by: Neal Richardson --- r/R/compute.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/compute.R b/r/R/compute.R index 455086f2834..636c9146ca3 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) { #' @return `NULL`, invisibly #' @export #' -#' @examplesIf arrow_with_dataset() && identical(Sys.getenv("R_ARROW_COLLECT_WITH_UDF"), "true") +#' @examplesIf arrow_with_dataset() && identical(Sys.getenv("NOT_CRAN"), "true") #' library(dplyr, warn.conflicts = FALSE) #' #' some_model <- lm(mpg ~ disp + cyl, data = mtcars) From 3b718689ca0b3f09adbf0457ef4a606be221295c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Aug 2022 14:40:46 -0300 Subject: [PATCH 6/8] document --- r/man/register_scalar_function.Rd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/man/register_scalar_function.Rd b/r/man/register_scalar_function.Rd index 5af80daf6dc..324dd5fad1f 100644 --- a/r/man/register_scalar_function.Rd +++ b/r/man/register_scalar_function.Rd @@ -48,7 +48,7 @@ stateless and return output with the same shape (i.e., the same number of rows) as the input. } \examples{ -\dontshow{if (arrow_with_dataset() && identical(Sys.getenv("R_ARROW_COLLECT_WITH_UDF"), "true")) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +\dontshow{if (arrow_with_dataset() && identical(Sys.getenv("NOT_CRAN"), "true")) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} library(dplyr, warn.conflicts = FALSE) some_model <- lm(mpg ~ disp + cyl, data = mtcars) From 2efade77b060312dc0ad533506e2c90389a343e6 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sat, 6 Aug 2022 22:00:45 -0300 Subject: [PATCH 7/8] more explicit and more correct cleaning up of env var --- r/tests/testthat/test-compute.R | 58 +++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 3141e6fd4fe..99b3bc05336 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -106,14 +106,14 @@ test_that("register_scalar_function() adds a compute function to the registry", Scalar$create(32L, float64()) ) - withr::with_envvar(list(R_ARROW_COLLECT_WITH_UDF = "true"), { - expect_identical( - record_batch(a = 1L) %>% - dplyr::mutate(b = times_32(a)) %>% - dplyr::collect(), - tibble::tibble(a = 1L, b = 32.0) - ) - }) + expect_identical( + record_batch(a = 1L) %>% + dplyr::mutate(b = times_32(a)) %>% + dplyr::collect(), + tibble::tibble(a = 1L, b = 32.0) + ) + + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") }) test_that("arrow_scalar_function() with bad return type errors", { @@ -148,6 +148,9 @@ test_that("arrow_scalar_function() with bad return type errors", { call_function("times_32_bad_return_type_scalar", Array$create(1L)), "Expected return Array or Scalar with type 'double'" ) + + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") }) test_that("register_user_defined_function() can register multiple kernels", { @@ -208,6 +211,9 @@ test_that("register_user_defined_function() errors for unsupported specification ), "Kernels for user-defined function must accept the same number of arguments" ) + + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") }) test_that("user-defined functions work during multi-threaded execution", { @@ -245,26 +251,27 @@ test_that("user-defined functions work during multi-threaded execution", { ) on.exit(unregister_binding("times_32", update_cache = TRUE)) - withr::with_envvar(list(R_ARROW_COLLECT_WITH_UDF = "true"), { - # check a regular collect() - result <- open_dataset(tf_dataset) %>% - dplyr::mutate(fun_result = times_32(value)) %>% - dplyr::collect() %>% - dplyr::arrange(row_num) + # check a regular collect() + result <- open_dataset(tf_dataset) %>% + dplyr::mutate(fun_result = times_32(value)) %>% + dplyr::collect() %>% + dplyr::arrange(row_num) - expect_identical(result$fun_result, example_df$value * 32) + expect_identical(result$fun_result, example_df$value * 32) - # check a write_dataset() - open_dataset(tf_dataset) %>% - dplyr::mutate(fun_result = times_32(value)) %>% - write_dataset(tf_dest) + # check a write_dataset() + open_dataset(tf_dataset) %>% + dplyr::mutate(fun_result = times_32(value)) %>% + write_dataset(tf_dest) - result2 <- dplyr::collect(open_dataset(tf_dest)) %>% - dplyr::arrange(row_num) %>% - dplyr::collect() + result2 <- dplyr::collect(open_dataset(tf_dest)) %>% + dplyr::arrange(row_num) %>% + dplyr::collect() - expect_identical(result2$fun_result, example_df$value * 32) - }) + expect_identical(result2$fun_result, example_df$value * 32) + + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") }) test_that("user-defined error when called from an unsupported context", { @@ -314,4 +321,7 @@ test_that("user-defined error when called from an unsupported context", { "Call to R \\(.*?\\) from a non-R thread from an unsupported context" ) } + + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") }) From 37e6e2e3d415f47a8e3eb6825b03980dd4edfe82 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 7 Aug 2022 13:28:41 -0300 Subject: [PATCH 8/8] more reliable unsetting of environment variable, remove references to previous function name --- r/tests/testthat/test-compute.R | 57 +++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 99b3bc05336..5821c0fa2df 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -91,7 +91,11 @@ test_that("register_scalar_function() adds a compute function to the registry", int32(), float64(), auto_convert = TRUE ) - on.exit(unregister_binding("times_32", update_cache = TRUE)) + on.exit({ + unregister_binding("times_32", update_cache = TRUE) + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") + }) expect_true("times_32" %in% names(asNamespace("arrow")$.cache$functions)) expect_true("times_32" %in% list_compute_functions()) @@ -112,8 +116,6 @@ test_that("register_scalar_function() adds a compute function to the registry", dplyr::collect(), tibble::tibble(a = 1L, b = 32.0) ) - - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") }) test_that("arrow_scalar_function() with bad return type errors", { @@ -125,9 +127,11 @@ test_that("arrow_scalar_function() with bad return type errors", { int32(), float64() ) - on.exit( + on.exit({ unregister_binding("times_32_bad_return_type_array", update_cache = TRUE) - ) + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") + }) expect_error( call_function("times_32_bad_return_type_array", Array$create(1L)), @@ -140,20 +144,19 @@ test_that("arrow_scalar_function() with bad return type errors", { int32(), float64() ) - on.exit( + on.exit({ unregister_binding("times_32_bad_return_type_scalar", update_cache = TRUE) - ) + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") + }) expect_error( call_function("times_32_bad_return_type_scalar", Array$create(1L)), "Expected return Array or Scalar with type 'double'" ) - - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") }) -test_that("register_user_defined_function() can register multiple kernels", { +test_that("register_scalar_function() can register multiple kernels", { skip_if_not(CanRunWithCapturedR()) register_scalar_function( @@ -163,7 +166,11 @@ test_that("register_user_defined_function() can register multiple kernels", { out_type = function(in_types) in_types[[1]], auto_convert = TRUE ) - on.exit(unregister_binding("times_32", update_cache = TRUE)) + on.exit({ + unregister_binding("times_32", update_cache = TRUE) + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") + }) expect_equal( call_function("times_32", Scalar$create(1L, int32())), @@ -181,7 +188,10 @@ test_that("register_user_defined_function() can register multiple kernels", { ) }) -test_that("register_user_defined_function() errors for unsupported specifications", { +test_that("register_scalar_function() errors for unsupported specifications", { + # TODO(ARROW-17178) remove the need for this! + on.exit(Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")) + expect_error( register_scalar_function( "no_kernels", @@ -211,9 +221,6 @@ test_that("register_user_defined_function() errors for unsupported specification ), "Kernels for user-defined function must accept the same number of arguments" ) - - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") }) test_that("user-defined functions work during multi-threaded execution", { @@ -249,7 +256,11 @@ test_that("user-defined functions work during multi-threaded execution", { float64(), auto_convert = TRUE ) - on.exit(unregister_binding("times_32", update_cache = TRUE)) + on.exit({ + unregister_binding("times_32", update_cache = TRUE) + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") + }) # check a regular collect() result <- open_dataset(tf_dataset) %>% @@ -269,9 +280,6 @@ test_that("user-defined functions work during multi-threaded execution", { dplyr::collect() expect_identical(result2$fun_result, example_df$value * 32) - - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") }) test_that("user-defined error when called from an unsupported context", { @@ -285,7 +293,11 @@ test_that("user-defined error when called from an unsupported context", { float64(), auto_convert = TRUE ) - on.exit(unregister_binding("times_32", update_cache = TRUE)) + on.exit({ + unregister_binding("times_32", update_cache = TRUE) + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") + }) stream_plan_with_udf <- function() { record_batch(a = 1:1000) %>% @@ -321,7 +333,4 @@ test_that("user-defined error when called from an unsupported context", { "Call to R \\(.*?\\) from a non-R thread from an unsupported context" ) } - - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") })