From cb306e6b48ddf667380e72d74a405413a7e07e73 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 29 Jul 2022 08:26:34 -0400 Subject: [PATCH 1/8] try more explicit capture list to see if that helps the valgrind thing --- r/src/compute-exec.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 91d646f0a3c..9fadfa3ec58 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -142,7 +142,7 @@ std::shared_ptr ExecPlan_read_table( auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); auto result = RunWithCapturedRIfPossible>( - [&]() -> arrow::Result> { + [prepared_plan]() -> arrow::Result> { ARROW_RETURN_NOT_OK(prepared_plan.first->StartProducing()); return prepared_plan.second->ToTable(); }); @@ -273,7 +273,7 @@ void ExecPlan_Write( StopIfNotOk(plan->Validate()); - arrow::Status result = RunWithCapturedRIfPossibleVoid([&]() { + arrow::Status result = RunWithCapturedRIfPossibleVoid([&plan]() { RETURN_NOT_OK(plan->StartProducing()); RETURN_NOT_OK(plan->finished().status()); return arrow::Status::OK(); From 0e7fdbde03e8e721d9c1ca178eb69d382c093cf4 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sat, 30 Jul 2022 13:53:43 -0300 Subject: [PATCH 2/8] turn off runwithcaptured R --- r/src/safe-call-into-r-impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index 4eec3a85df8..1342c04a836 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -38,7 +38,7 @@ bool CanRunWithCapturedR() { on_old_windows = on_old_windows_fun(); } - return !on_old_windows && GetMainRThread().Executor() == nullptr; + return false && !on_old_windows && GetMainRThread().Executor() == nullptr; #else return false; #endif From b72ac2cff1466c932d1c704514a793e96b87f25a Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sat, 30 Jul 2022 14:35:55 -0300 Subject: [PATCH 3/8] don't run examples that require runwithcapturedr --- r/R/compute.R | 2 +- r/man/register_scalar_function.Rd | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/r/R/compute.R b/r/R/compute.R index 0985e73a5f2..111edd78a79 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) { #' @return `NULL`, invisibly #' @export #' -#' @examplesIf arrow_with_dataset() +#' @examplesIf FALSE && arrow_with_dataset() #' library(dplyr, warn.conflicts = FALSE) #' #' some_model <- lm(mpg ~ disp + cyl, data = mtcars) diff --git a/r/man/register_scalar_function.Rd b/r/man/register_scalar_function.Rd index 4da8f54f645..95ffcbbafe4 100644 --- a/r/man/register_scalar_function.Rd +++ b/r/man/register_scalar_function.Rd @@ -48,7 +48,7 @@ stateless and return output with the same shape (i.e., the same number of rows) as the input. } \examples{ -\dontshow{if (arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +\dontshow{if (FALSE && arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} library(dplyr, warn.conflicts = FALSE) some_model <- lm(mpg ~ disp + cyl, data = mtcars) From 5ba62bcdf02a99988acf10e3180e02a20ef86bfa Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 31 Jul 2022 21:20:19 -0300 Subject: [PATCH 4/8] don't copy std::functions, restrict capture lists --- r/src/safe-call-into-r.h | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 08e8a8c11b6..5ca0a90b25b 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -110,7 +110,7 @@ arrow::Future SafeCallIntoRAsync(std::function(void)> fun, // use it to run the task on the main R thread. We can't throw // a cpp11::unwind_exception here, so we need to propagate it back // to RunWithCapturedR through the MainRThread singleton. - return DeferNotOk(main_r_thread.Executor()->Submit([fun, reason]() { + return DeferNotOk(main_r_thread.Executor()->Submit([&fun, reason]() { // This occurs when some other R code that was previously scheduled to run // has errored, in which case we skip execution and let the original // error surface. @@ -174,7 +174,7 @@ arrow::Result RunWithCapturedR(std::function()> make_arrow_c GetMainRThread().ResetError(); arrow::Result result = arrow::internal::SerialExecutor::RunInSerialExecutor( - [make_arrow_call](arrow::internal::Executor* executor) { + [&make_arrow_call](arrow::internal::Executor* executor) { GetMainRThread().Executor() = executor; return make_arrow_call(); }); @@ -198,7 +198,7 @@ arrow::Result RunWithCapturedRIfPossible( // Note that the use of the io_context here is arbitrary (i.e. we could use // any construct that launches a background thread). const auto& io_context = arrow::io::default_io_context(); - return RunWithCapturedR([&]() { + return RunWithCapturedR([&make_arrow_call]() { return DeferNotOk(io_context.executor()->Submit(std::move(make_arrow_call))); }); } else { @@ -210,10 +210,11 @@ arrow::Result RunWithCapturedRIfPossible( // a Result. static inline arrow::Status RunWithCapturedRIfPossibleVoid( std::function make_arrow_call) { - auto result = RunWithCapturedRIfPossible([&]() -> arrow::Result { - ARROW_RETURN_NOT_OK(make_arrow_call()); - return true; - }); + auto result = + RunWithCapturedRIfPossible([&make_arrow_call]() -> arrow::Result { + ARROW_RETURN_NOT_OK(make_arrow_call()); + return true; + }); ARROW_RETURN_NOT_OK(result); return arrow::Status::OK(); } From 069afe91cbf0b01c4d677536d73d895beb98c368 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 31 Jul 2022 21:43:08 -0300 Subject: [PATCH 5/8] see if completely avoiding runwithcapturedr in compute-exec makes the valgrind problems go away --- r/R/compute.R | 2 +- r/man/register_scalar_function.Rd | 2 +- r/src/compute-exec.cpp | 120 +++++++++++++++++++++++++++--- r/src/safe-call-into-r-impl.cpp | 2 +- r/src/safe-call-into-r.h | 4 +- r/tests/testthat/test-compute.R | 4 + 6 files changed, 119 insertions(+), 15 deletions(-) diff --git a/r/R/compute.R b/r/R/compute.R index 111edd78a79..0985e73a5f2 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) { #' @return `NULL`, invisibly #' @export #' -#' @examplesIf FALSE && arrow_with_dataset() +#' @examplesIf arrow_with_dataset() #' library(dplyr, warn.conflicts = FALSE) #' #' some_model <- lm(mpg ~ disp + cyl, data = mtcars) diff --git a/r/man/register_scalar_function.Rd b/r/man/register_scalar_function.Rd index 95ffcbbafe4..4da8f54f645 100644 --- a/r/man/register_scalar_function.Rd +++ b/r/man/register_scalar_function.Rd @@ -48,7 +48,7 @@ stateless and return output with the same shape (i.e., the same number of rows) as the input. } \examples{ -\dontshow{if (FALSE && arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +\dontshow{if (arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} library(dplyr, warn.conflicts = FALSE) some_model <- lm(mpg ~ disp + cyl, data = mtcars) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 9fadfa3ec58..dc86dc488cc 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -129,9 +129,61 @@ std::shared_ptr ExecPlan_run( const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { - auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); - StopIfNotOk(prepared_plan.first->StartProducing()); - return prepared_plan.second; + // For now, don't require R to construct SinkNodes. + // Instead, just pass the node we should collect as an argument. + arrow::AsyncGenerator> sink_gen; + + // Sorting uses a different sink node; there is no general sort yet + if (sort_options.size() > 0) { + if (head >= 0) { + // Use the SelectK node to take only what we need + MakeExecNodeOrStop( + "select_k_sink", plan.get(), {final_node.get()}, + compute::SelectKSinkNodeOptions{ + arrow::compute::SelectKOptions( + head, std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)) + ->sort_keys), + &sink_gen}); + } else { + MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()}, + compute::OrderBySinkNodeOptions{ + *std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)), + &sink_gen}); + } + } else { + MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, + compute::SinkNodeOptions{&sink_gen}); + } + + // End of chunk used in ExecPlan_BuildAndShow + + StopIfNotOk(plan->Validate()); + + // If the generator is destroyed before being completely drained, inform plan + std::shared_ptr stop_producing{nullptr, [plan](...) { + bool not_finished_yet = + plan->finished().TryAddCallback([&plan] { + return [plan](const arrow::Status&) {}; + }); + + if (not_finished_yet) { + plan->StopProducing(); + } + }}; + + // Attach metadata to the schema + auto out_schema = final_node->output_schema(); + if (metadata.size() > 0) { + auto kv = strings_to_kvm(metadata); + out_schema = out_schema->WithMetadata(kv); + } + + StopIfNotOk(plan->StartProducing()); + return compute::MakeGeneratorReader( + out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); }, + gc_memory_pool()); } // [[arrow::export]] @@ -139,15 +191,63 @@ std::shared_ptr ExecPlan_read_table( const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { - auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); + // For now, don't require R to construct SinkNodes. + // Instead, just pass the node we should collect as an argument. + arrow::AsyncGenerator> sink_gen; - auto result = RunWithCapturedRIfPossible>( - [prepared_plan]() -> arrow::Result> { - ARROW_RETURN_NOT_OK(prepared_plan.first->StartProducing()); - return prepared_plan.second->ToTable(); - }); + // Sorting uses a different sink node; there is no general sort yet + if (sort_options.size() > 0) { + if (head >= 0) { + // Use the SelectK node to take only what we need + MakeExecNodeOrStop( + "select_k_sink", plan.get(), {final_node.get()}, + compute::SelectKSinkNodeOptions{ + arrow::compute::SelectKOptions( + head, std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)) + ->sort_keys), + &sink_gen}); + } else { + MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()}, + compute::OrderBySinkNodeOptions{ + *std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)), + &sink_gen}); + } + } else { + MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, + compute::SinkNodeOptions{&sink_gen}); + } + + // End of chunk used in ExecPlan_BuildAndShow + + StopIfNotOk(plan->Validate()); + + // If the generator is destroyed before being completely drained, inform plan + std::shared_ptr stop_producing{nullptr, [plan](...) { + bool not_finished_yet = + plan->finished().TryAddCallback([&plan] { + return [plan](const arrow::Status&) {}; + }); + + if (not_finished_yet) { + plan->StopProducing(); + } + }}; + + // Attach metadata to the schema + auto out_schema = final_node->output_schema(); + if (metadata.size() > 0) { + auto kv = strings_to_kvm(metadata); + out_schema = out_schema->WithMetadata(kv); + } + + StopIfNotOk(plan->StartProducing()); + auto reader = compute::MakeGeneratorReader( + out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); }, + gc_memory_pool()); - return ValueOrStop(result); + return ValueOrStop(reader->ToTable()); } // [[arrow::export]] diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index 1342c04a836..4eec3a85df8 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -38,7 +38,7 @@ bool CanRunWithCapturedR() { on_old_windows = on_old_windows_fun(); } - return false && !on_old_windows && GetMainRThread().Executor() == nullptr; + return !on_old_windows && GetMainRThread().Executor() == nullptr; #else return false; #endif diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 5ca0a90b25b..98322af4316 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -110,7 +110,7 @@ arrow::Future SafeCallIntoRAsync(std::function(void)> fun, // use it to run the task on the main R thread. We can't throw // a cpp11::unwind_exception here, so we need to propagate it back // to RunWithCapturedR through the MainRThread singleton. - return DeferNotOk(main_r_thread.Executor()->Submit([&fun, reason]() { + return DeferNotOk(main_r_thread.Executor()->Submit([fun, reason]() { // This occurs when some other R code that was previously scheduled to run // has errored, in which case we skip execution and let the original // error surface. @@ -198,7 +198,7 @@ arrow::Result RunWithCapturedRIfPossible( // Note that the use of the io_context here is arbitrary (i.e. we could use // any construct that launches a background thread). const auto& io_context = arrow::io::default_io_context(); - return RunWithCapturedR([&make_arrow_call]() { + return RunWithCapturedR([&make_arrow_call, io_context]() { return DeferNotOk(io_context.executor()->Submit(std::move(make_arrow_call))); }); } else { diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 9e487169f4b..a3bbd787962 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -103,6 +103,8 @@ test_that("register_scalar_function() adds a compute function to the registry", Scalar$create(32L, float64()) ) + skip("while testing valgrind errors") + expect_identical( record_batch(a = 1L) %>% dplyr::mutate(b = times_32(a)) %>% @@ -206,6 +208,7 @@ test_that("register_user_defined_function() errors for unsupported specification }) test_that("user-defined functions work during multi-threaded execution", { + skip("while testing valgrind errors") skip_if_not(CanRunWithCapturedR()) skip_if_not_available("dataset") # Snappy has a UBSan issue: https://github.com/google/snappy/pull/148 @@ -258,6 +261,7 @@ test_that("user-defined functions work during multi-threaded execution", { }) test_that("user-defined error when called from an unsupported context", { + skip("while testing valgrind errors") skip_if_not_available("dataset") skip_if_not(CanRunWithCapturedR()) From a5a501191fec087251de005396a202c6d7dc1df7 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 31 Jul 2022 22:17:40 -0300 Subject: [PATCH 6/8] don't run UDF example --- r/R/compute.R | 2 +- r/man/register_scalar_function.Rd | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/r/R/compute.R b/r/R/compute.R index 0985e73a5f2..111edd78a79 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) { #' @return `NULL`, invisibly #' @export #' -#' @examplesIf arrow_with_dataset() +#' @examplesIf FALSE && arrow_with_dataset() #' library(dplyr, warn.conflicts = FALSE) #' #' some_model <- lm(mpg ~ disp + cyl, data = mtcars) diff --git a/r/man/register_scalar_function.Rd b/r/man/register_scalar_function.Rd index 4da8f54f645..95ffcbbafe4 100644 --- a/r/man/register_scalar_function.Rd +++ b/r/man/register_scalar_function.Rd @@ -48,7 +48,7 @@ stateless and return output with the same shape (i.e., the same number of rows) as the input. } \examples{ -\dontshow{if (arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +\dontshow{if (FALSE && arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} library(dplyr, warn.conflicts = FALSE) some_model <- lm(mpg ~ disp + cyl, data = mtcars) From db12bff986d21997463b58be55d176356334f98c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 1 Aug 2022 09:25:20 -0300 Subject: [PATCH 7/8] remove one more runwithcapturedr --- r/src/compute-exec.cpp | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index dc86dc488cc..8d7abcc668d 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -372,14 +372,8 @@ void ExecPlan_Write( ds::WriteNodeOptions{std::move(opts), std::move(kv)}); StopIfNotOk(plan->Validate()); - - arrow::Status result = RunWithCapturedRIfPossibleVoid([&plan]() { - RETURN_NOT_OK(plan->StartProducing()); - RETURN_NOT_OK(plan->finished().status()); - return arrow::Status::OK(); - }); - - StopIfNotOk(result); + StopIfNotOk(plan->StartProducing()); + StopIfNotOk(plan->finished().status()); } #endif From 690a9a028a56b1bde969256154b9f7043b52666e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 1 Aug 2022 17:02:32 -0300 Subject: [PATCH 8/8] avoid ExecPlan_read_table alltogether --- r/R/query-engine.R | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 84360490fdb..4fefaaba6cc 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -193,6 +193,9 @@ ExecPlan <- R6Class("ExecPlan", node }, Run = function(node, as_table = FALSE) { + table_at_end <- as_table + as_table <- FALSE + # a section of this code is used by `BuildAndShow()` too - the 2 need to be in sync # Start of chunk used in `BuildAndShow()` assert_is(node, "ExecNode") @@ -266,7 +269,11 @@ ExecPlan <- R6Class("ExecPlan", } } - out + if (table_at_end && !inherits(out, "Table")) { + out$read_table() + } else { + out + } }, Write = function(node, ...) { # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...