From d38781fcba634b9bc631b10ff52a7032f2660244 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Aug 2022 10:47:34 -0300 Subject: [PATCH 1/9] wait for thread pools to be idle on unload --- r/R/arrow-package.R | 4 ++++ r/R/arrowExports.R | 4 ++++ r/src/arrowExports.cpp | 9 +++++++++ r/src/threadpool.cpp | 14 ++++++++++++++ 4 files changed, 31 insertions(+) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index f3e0b817d5f..e2e790b6d45 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -81,6 +81,10 @@ invisible() } +.onUnload <- function(...) { + WaitForIdleThreadPool() +} + configure_tzdb <- function() { # This is needed on Windows to support timezone-aware calculations if (requireNamespace("tzdb", quietly = TRUE)) { diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index ab3358d6664..c747c02d68a 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -2028,6 +2028,10 @@ SetIOThreadPoolCapacity <- function(threads) { invisible(.Call(`_arrow_SetIOThreadPoolCapacity`, threads)) } +WaitForIdleThreadPool <- function() { + invisible(.Call(`_arrow_WaitForIdleThreadPool`)) +} + Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index adb6636e9ee..4a4978f9785 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5116,6 +5116,14 @@ BEGIN_CPP11 return R_NilValue; END_CPP11 } +// threadpool.cpp +void WaitForIdleThreadPool(); +extern "C" SEXP _arrow_WaitForIdleThreadPool(){ +BEGIN_CPP11 + WaitForIdleThreadPool(); + return R_NilValue; +END_CPP11 +} // type_infer.cpp std::shared_ptr Array__infer_type(SEXP x); extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){ @@ -5692,6 +5700,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, { "_arrow_GetIOThreadPoolCapacity", (DL_FUNC) &_arrow_GetIOThreadPoolCapacity, 0}, { "_arrow_SetIOThreadPoolCapacity", (DL_FUNC) &_arrow_SetIOThreadPoolCapacity, 1}, + { "_arrow_WaitForIdleThreadPool", (DL_FUNC) &_arrow_WaitForIdleThreadPool, 0}, { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, {NULL, NULL, 0} }; diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp index 83e7a7ecfe6..9630046a487 100644 --- a/r/src/threadpool.cpp +++ b/r/src/threadpool.cpp @@ -54,3 +54,17 @@ int GetIOThreadPoolCapacity() { return arrow::io::GetIOThreadPoolCapacity(); } void SetIOThreadPoolCapacity(int threads) { StopIfNotOk(arrow::io::SetIOThreadPoolCapacity(threads)); } + +namespace arrow { +namespace io { +namespace internal { +arrow::internal::ThreadPool* GetIOThreadPool(); +} +} // namespace io +} // namespace arrow + +// [[arrow::export]] +void WaitForIdleThreadPool() { + arrow::internal::GetCpuThreadPool()->WaitForIdle(); + arrow::io::internal::GetIOThreadPool()->WaitForIdle(); +} From aa429abd79ccfa17b0ad3a087993bd45f64680cc Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Aug 2022 16:28:42 -0300 Subject: [PATCH 2/9] don't wait for thread pools unless on valgrind --- r/R/arrow-package.R | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index e2e790b6d45..f0a7cd0adf4 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -82,7 +82,11 @@ } .onUnload <- function(...) { - WaitForIdleThreadPool() + # When running valgrind we need to wait for the thread pools to finish + # running background tasks or else we may get spurious memory leaks reported. + if (on_linux_dev()) { + WaitForIdleThreadPool() + } } configure_tzdb <- function() { From ca0e250719e0b46e36e24e9a1e78a622b908b1fc Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Aug 2022 09:07:47 -0300 Subject: [PATCH 3/9] try only waiting for the cpu thread pool --- r/src/threadpool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp index 9630046a487..0dcab00af51 100644 --- a/r/src/threadpool.cpp +++ b/r/src/threadpool.cpp @@ -66,5 +66,5 @@ arrow::internal::ThreadPool* GetIOThreadPool(); // [[arrow::export]] void WaitForIdleThreadPool() { arrow::internal::GetCpuThreadPool()->WaitForIdle(); - arrow::io::internal::GetIOThreadPool()->WaitForIdle(); + // testing without arrow::io::internal::GetIOThreadPool()->WaitForIdle(); } From fcbd8625a8ba1b85bcb97a956ef240a07335e808 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Aug 2022 21:23:03 -0300 Subject: [PATCH 4/9] skip io thread pool stuff --- r/src/threadpool.cpp | 9 --------- 1 file changed, 9 deletions(-) diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp index 0dcab00af51..0841462df23 100644 --- a/r/src/threadpool.cpp +++ b/r/src/threadpool.cpp @@ -55,16 +55,7 @@ void SetIOThreadPoolCapacity(int threads) { StopIfNotOk(arrow::io::SetIOThreadPoolCapacity(threads)); } -namespace arrow { -namespace io { -namespace internal { -arrow::internal::ThreadPool* GetIOThreadPool(); -} -} // namespace io -} // namespace arrow - // [[arrow::export]] void WaitForIdleThreadPool() { arrow::internal::GetCpuThreadPool()->WaitForIdle(); - // testing without arrow::io::internal::GetIOThreadPool()->WaitForIdle(); } From 6d48ad52e60d77ea453c7490bf821a3a22d8fc79 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 4 Aug 2022 09:44:34 -0300 Subject: [PATCH 5/9] try literally just waiting --- r/R/arrow-package.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index f0a7cd0adf4..f4a93b26195 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -85,7 +85,7 @@ # When running valgrind we need to wait for the thread pools to finish # running background tasks or else we may get spurious memory leaks reported. if (on_linux_dev()) { - WaitForIdleThreadPool() + Sys.sleep(1) } } From ac58c2d32dea3d9c277717e4446851f354d97470 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 4 Aug 2022 21:55:16 -0300 Subject: [PATCH 6/9] nudge --- r/R/arrowExports.R | 4 ---- r/src/arrowExports.cpp | 9 --------- r/src/threadpool.cpp | 5 ----- 3 files changed, 18 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index c747c02d68a..ab3358d6664 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -2028,10 +2028,6 @@ SetIOThreadPoolCapacity <- function(threads) { invisible(.Call(`_arrow_SetIOThreadPoolCapacity`, threads)) } -WaitForIdleThreadPool <- function() { - invisible(.Call(`_arrow_WaitForIdleThreadPool`)) -} - Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 4a4978f9785..adb6636e9ee 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5116,14 +5116,6 @@ BEGIN_CPP11 return R_NilValue; END_CPP11 } -// threadpool.cpp -void WaitForIdleThreadPool(); -extern "C" SEXP _arrow_WaitForIdleThreadPool(){ -BEGIN_CPP11 - WaitForIdleThreadPool(); - return R_NilValue; -END_CPP11 -} // type_infer.cpp std::shared_ptr Array__infer_type(SEXP x); extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){ @@ -5700,7 +5692,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, { "_arrow_GetIOThreadPoolCapacity", (DL_FUNC) &_arrow_GetIOThreadPoolCapacity, 0}, { "_arrow_SetIOThreadPoolCapacity", (DL_FUNC) &_arrow_SetIOThreadPoolCapacity, 1}, - { "_arrow_WaitForIdleThreadPool", (DL_FUNC) &_arrow_WaitForIdleThreadPool, 0}, { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, {NULL, NULL, 0} }; diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp index 0841462df23..83e7a7ecfe6 100644 --- a/r/src/threadpool.cpp +++ b/r/src/threadpool.cpp @@ -54,8 +54,3 @@ int GetIOThreadPoolCapacity() { return arrow::io::GetIOThreadPoolCapacity(); } void SetIOThreadPoolCapacity(int threads) { StopIfNotOk(arrow::io::SetIOThreadPoolCapacity(threads)); } - -// [[arrow::export]] -void WaitForIdleThreadPool() { - arrow::internal::GetCpuThreadPool()->WaitForIdle(); -} From e38cb2c650c97d1deadb387b310b5b89fa40fc4a Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sat, 6 Aug 2022 15:59:01 -0300 Subject: [PATCH 7/9] really just a bump so I can run crossbow again --- r/R/arrow-package.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index f4a93b26195..21bbfd7ca51 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -82,8 +82,9 @@ } .onUnload <- function(...) { - # When running valgrind we need to wait for the thread pools to finish - # running background tasks or else we may get spurious memory leaks reported. + # When running valgrind this helps other threads run any tasks that + # may be completing, reducing the risk of valgrind reporting a spurious + # memory leak. if (on_linux_dev()) { Sys.sleep(1) } From 77176163e936526e8c1f58a98176d2b7e12b66cb Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sat, 6 Aug 2022 22:12:03 -0300 Subject: [PATCH 8/9] back to io thread pool approach --- r/R/arrow-package.R | 7 +++---- r/R/arrowExports.R | 4 ++++ r/src/arrowExports.cpp | 9 +++++++++ r/src/threadpool.cpp | 14 ++++++++++++++ 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 21bbfd7ca51..f0a7cd0adf4 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -82,11 +82,10 @@ } .onUnload <- function(...) { - # When running valgrind this helps other threads run any tasks that - # may be completing, reducing the risk of valgrind reporting a spurious - # memory leak. + # When running valgrind we need to wait for the thread pools to finish + # running background tasks or else we may get spurious memory leaks reported. if (on_linux_dev()) { - Sys.sleep(1) + WaitForIdleThreadPool() } } diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index ab3358d6664..c747c02d68a 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -2028,6 +2028,10 @@ SetIOThreadPoolCapacity <- function(threads) { invisible(.Call(`_arrow_SetIOThreadPoolCapacity`, threads)) } +WaitForIdleThreadPool <- function() { + invisible(.Call(`_arrow_WaitForIdleThreadPool`)) +} + Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index adb6636e9ee..4a4978f9785 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5116,6 +5116,14 @@ BEGIN_CPP11 return R_NilValue; END_CPP11 } +// threadpool.cpp +void WaitForIdleThreadPool(); +extern "C" SEXP _arrow_WaitForIdleThreadPool(){ +BEGIN_CPP11 + WaitForIdleThreadPool(); + return R_NilValue; +END_CPP11 +} // type_infer.cpp std::shared_ptr Array__infer_type(SEXP x); extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){ @@ -5692,6 +5700,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, { "_arrow_GetIOThreadPoolCapacity", (DL_FUNC) &_arrow_GetIOThreadPoolCapacity, 0}, { "_arrow_SetIOThreadPoolCapacity", (DL_FUNC) &_arrow_SetIOThreadPoolCapacity, 1}, + { "_arrow_WaitForIdleThreadPool", (DL_FUNC) &_arrow_WaitForIdleThreadPool, 0}, { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, {NULL, NULL, 0} }; diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp index 83e7a7ecfe6..9630046a487 100644 --- a/r/src/threadpool.cpp +++ b/r/src/threadpool.cpp @@ -54,3 +54,17 @@ int GetIOThreadPoolCapacity() { return arrow::io::GetIOThreadPoolCapacity(); } void SetIOThreadPoolCapacity(int threads) { StopIfNotOk(arrow::io::SetIOThreadPoolCapacity(threads)); } + +namespace arrow { +namespace io { +namespace internal { +arrow::internal::ThreadPool* GetIOThreadPool(); +} +} // namespace io +} // namespace arrow + +// [[arrow::export]] +void WaitForIdleThreadPool() { + arrow::internal::GetCpuThreadPool()->WaitForIdle(); + arrow::io::internal::GetIOThreadPool()->WaitForIdle(); +} From 123d8bdcc2ca61e06829e4e95efacec2cc6375b1 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 7 Aug 2022 13:19:32 -0300 Subject: [PATCH 9/9] fix comment --- r/R/arrow-package.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index f0a7cd0adf4..4d9e05dd836 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -83,7 +83,7 @@ .onUnload <- function(...) { # When running valgrind we need to wait for the thread pools to finish - # running background tasks or else we may get spurious memory leaks reported. + # running background tasks or else spurious memory leaks may be reported. if (on_linux_dev()) { WaitForIdleThreadPool() }