diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index f3e0b817d5f..4d9e05dd836 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -81,6 +81,14 @@ invisible() } +.onUnload <- function(...) { + # When running valgrind we need to wait for the thread pools to finish + # running background tasks or else spurious memory leaks may be reported. + if (on_linux_dev()) { + WaitForIdleThreadPool() + } +} + configure_tzdb <- function() { # This is needed on Windows to support timezone-aware calculations if (requireNamespace("tzdb", quietly = TRUE)) { diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index ab3358d6664..c747c02d68a 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -2028,6 +2028,10 @@ SetIOThreadPoolCapacity <- function(threads) { invisible(.Call(`_arrow_SetIOThreadPoolCapacity`, threads)) } +WaitForIdleThreadPool <- function() { + invisible(.Call(`_arrow_WaitForIdleThreadPool`)) +} + Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index adb6636e9ee..4a4978f9785 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5116,6 +5116,14 @@ BEGIN_CPP11 return R_NilValue; END_CPP11 } +// threadpool.cpp +void WaitForIdleThreadPool(); +extern "C" SEXP _arrow_WaitForIdleThreadPool(){ +BEGIN_CPP11 + WaitForIdleThreadPool(); + return R_NilValue; +END_CPP11 +} // type_infer.cpp std::shared_ptr Array__infer_type(SEXP x); extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){ @@ -5692,6 +5700,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, { "_arrow_GetIOThreadPoolCapacity", (DL_FUNC) &_arrow_GetIOThreadPoolCapacity, 0}, { "_arrow_SetIOThreadPoolCapacity", (DL_FUNC) &_arrow_SetIOThreadPoolCapacity, 1}, + { "_arrow_WaitForIdleThreadPool", (DL_FUNC) &_arrow_WaitForIdleThreadPool, 0}, { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, {NULL, NULL, 0} }; diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp index 83e7a7ecfe6..9630046a487 100644 --- a/r/src/threadpool.cpp +++ b/r/src/threadpool.cpp @@ -54,3 +54,17 @@ int GetIOThreadPoolCapacity() { return arrow::io::GetIOThreadPoolCapacity(); } void SetIOThreadPoolCapacity(int threads) { StopIfNotOk(arrow::io::SetIOThreadPoolCapacity(threads)); } + +namespace arrow { +namespace io { +namespace internal { +arrow::internal::ThreadPool* GetIOThreadPool(); +} +} // namespace io +} // namespace arrow + +// [[arrow::export]] +void WaitForIdleThreadPool() { + arrow::internal::GetCpuThreadPool()->WaitForIdle(); + arrow::io::internal::GetIOThreadPool()->WaitForIdle(); +}