From adfc91519ac1f258f7c1ef4ac7419fc159900c4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Wed, 5 Feb 2020 09:49:32 +0000 Subject: [PATCH 1/2] Use cleancall code to clean up fd leak in wait() Closes #141. --- NEWS.md | 3 + R/cleancall.R | 4 + R/errors.R | 20 +++++ R/process.R | 7 +- src/Makevars | 2 +- src/Makevars.win | 2 +- src/cleancall.c | 164 +++++++++++++++++++++++++++++++++++++ src/cleancall.h | 41 ++++++++++ src/init.c | 3 + src/unix/processx.c | 21 ++++- tests/testthat/test-wait.R | 31 +++++++ 11 files changed, 292 insertions(+), 6 deletions(-) create mode 100644 R/cleancall.R create mode 100644 src/cleancall.c create mode 100644 src/cleancall.h diff --git a/NEWS.md b/NEWS.md index caca53a7..ab92f160 100644 --- a/NEWS.md +++ b/NEWS.md @@ -21,6 +21,9 @@ * processx now does no block SIGCHLD by default in the subprocess, blocking potentially causes zombie sub-subprocesses (#240). +* The `process$wait()` method now does not leak file descriptors on + Unix when interrupted (#141). + # processx 3.4.1 * Now `run()` does not create an `ok` variable in the global environment. diff --git a/R/cleancall.R b/R/cleancall.R new file mode 100644 index 00000000..8023d6dc --- /dev/null +++ b/R/cleancall.R @@ -0,0 +1,4 @@ + +call_with_cleanup <- function(ptr, ...) { + .Call(c_cleancall_call, pairlist(ptr, ...), parent.frame()) +} diff --git a/R/errors.R b/R/errors.R index 31e8030c..9d7c4a6f 100644 --- a/R/errors.R +++ b/R/errors.R @@ -363,6 +363,25 @@ err <- local({ ) } + package_env <- topenv() + + rethrow_call_with_cleanup <- function(.NAME, ...) { + call <- sys.call() + nframe <- sys.nframe() + withCallingHandlers( + package_env$call_with_cleanup(.NAME, ...), + error = function(e) { + e$`_nframe` <- nframe + e$call <- call + if (inherits(e, "simpleError")) { + class(e) <- c("c_error", "rlib_error", "error", "condition") + } + e$`_ignore` <- list(c(nframe + 1L, sys.nframe() + 1L)) + throw(e) + } + ) + } + # -- create traceback ------------------------------------------------- #' Create a traceback @@ -723,3 +742,4 @@ new_error <- err$new_error throw <- err$throw rethrow <- err$rethrow rethrow_call <- err$rethrow_call +rethrow_call_with_cleanup <- err$.internal$rethrow_call_with_cleanup diff --git a/R/process.R b/R/process.R index 6af0e965..1b961b31 100644 --- a/R/process.R +++ b/R/process.R @@ -649,8 +649,11 @@ process <- R6::R6Class( process_wait <- function(self, private, timeout) { "!DEBUG process_wait `private$get_short_name()`" - rethrow_call(c_processx_wait, private$status, as.integer(timeout), - private$get_short_name()) + rethrow_call_with_cleanup( + c_processx_wait, private$status, + as.integer(timeout), + private$get_short_name() + ) invisible(self) } diff --git a/src/Makevars b/src/Makevars index 6b181c46..5c9bf04e 100644 --- a/src/Makevars +++ b/src/Makevars @@ -4,7 +4,7 @@ OBJECTS = init.o poll.o errors.o processx-connection.o \ processx-vector.o create-time.o base64.o \ unix/childlist.o unix/connection.o \ unix/processx.o unix/sigchld.o unix/utils.o \ - unix/named_pipe.o + unix/named_pipe.o cleancall.o .PHONY: all clean diff --git a/src/Makevars.win b/src/Makevars.win index 8eced0f4..28d6aa49 100644 --- a/src/Makevars.win +++ b/src/Makevars.win @@ -3,7 +3,7 @@ OBJECTS = init.o poll.o errors.o processx-connection.o \ processx-vector.o create-time.o base64.o \ win/processx.o win/stdio.o win/named_pipe.o \ - win/utils.o win/thread.o + win/utils.o win/thread.o cleancall.o .PHONY: all clean diff --git a/src/cleancall.c b/src/cleancall.c new file mode 100644 index 00000000..223c0389 --- /dev/null +++ b/src/cleancall.c @@ -0,0 +1,164 @@ +#define R_NO_REMAP +#include + +#include "cleancall.h" + + +#if (defined(R_VERSION) && R_VERSION < R_Version(3, 4, 0)) + SEXP R_MakeExternalPtrFn(DL_FUNC p, SEXP tag, SEXP prot) { + fn_ptr ptr; + ptr.fn = p; + return R_MakeExternalPtr(ptr.p, tag, prot); + } + DL_FUNC R_ExternalPtrAddrFn(SEXP s) { + fn_ptr ptr; + ptr.p = EXTPTR_PTR(s); + return ptr.fn; + } +#endif + +// The R API does not have a setter for function pointers + +SEXP cleancall_MakeExternalPtrFn(DL_FUNC p, SEXP tag, SEXP prot) { + fn_ptr tmp; + tmp.fn = p; + return R_MakeExternalPtr(tmp.p, tag, prot); +} + +void cleancall_SetExternalPtrAddrFn(SEXP s, DL_FUNC p) { + fn_ptr ptr; + ptr.fn = p; + R_SetExternalPtrAddr(s, ptr.p); +} + + +// Initialised at load time with the `.Call` primitive +SEXP cleancall_fns_dot_call = NULL; + +void cleancall_init() { + cleancall_fns_dot_call = Rf_findVar(Rf_install(".Call"), R_BaseEnv); +} + +struct eval_args { + SEXP call; + SEXP env; +}; + +static SEXP eval_wrap(void* data) { + struct eval_args* args = (struct eval_args*) data; + return Rf_eval(args->call, args->env); +} + + +SEXP cleancall_call(SEXP args, SEXP env) { + SEXP call = PROTECT(Rf_lcons(cleancall_fns_dot_call, args)); + struct eval_args data = { call, env }; + + SEXP out = r_with_cleanup_context(&eval_wrap, &data); + + UNPROTECT(1); + return out; +} + + +static SEXP callbacks = NULL; + +// Preallocate a callback +static void push_callback(SEXP stack) { + SEXP top = CDR(stack); + + SEXP early_handler = PROTECT(Rf_allocVector(LGLSXP, 1)); + SEXP fn_extptr = PROTECT(cleancall_MakeExternalPtrFn(NULL, R_NilValue, + R_NilValue)); + SEXP data_extptr = PROTECT(R_MakeExternalPtr(NULL, early_handler, + R_NilValue)); + SEXP cb = Rf_cons(Rf_cons(fn_extptr, data_extptr), top); + + SETCDR(stack, cb); + + UNPROTECT(3); +} + +struct data_wrapper { + SEXP (*fn)(void* data); + void *data; + SEXP callbacks; + int success; +}; + +static void call_exits(void* data) { + // Remove protecting node. Don't remove the preallocated callback on + // the top as it might contain a handler when something went wrong. + SEXP top = CDR(callbacks); + + // Restore old stack + struct data_wrapper* state = data; + callbacks = (SEXP) state->callbacks; + + // Handlers should not jump + while (top != R_NilValue) { + SEXP cb = CAR(top); + top = CDR(top); + + void (*fn)(void*) = (void (*)(void*)) R_ExternalPtrAddrFn(CAR(cb)); + void *data = (void*) EXTPTR_PTR(CDR(cb)); + int early_handler = LOGICAL(EXTPTR_TAG(CDR(cb)))[0]; + + // Check for empty pointer in preallocated callbacks + if (fn) { + if (!early_handler || !state->success) fn(data); + } + } +} + +static SEXP with_cleanup_context_wrap(void *data) { + struct data_wrapper* cdata = data; + SEXP ret = cdata->fn(cdata->data); + cdata->success = 1; + return ret; +} + +SEXP r_with_cleanup_context(SEXP (*fn)(void* data), void* data) { + // Preallocate new stack before changing `callbacks` to avoid + // leaving the global variable in a bad state if alloc fails + SEXP new = PROTECT(Rf_cons(R_NilValue, R_NilValue)); + push_callback(new); + + SEXP old = callbacks; + callbacks = new; + + struct data_wrapper state = { fn, data, old, 0 }; + + SEXP out = R_ExecWithCleanup(with_cleanup_context_wrap, &state, + &call_exits, &state); + + UNPROTECT(1); + return out; +} + +static void call_save_handler(void (*fn)(void *data), void* data, + int early) { + if (!callbacks) { + fn(data); + Rf_error("Internal error: Exit handler pushed outside " + "of an exit context"); + } + + SEXP cb = CADR(callbacks); + + // Update pointers + cleancall_SetExternalPtrAddrFn(CAR(cb), (DL_FUNC) fn); + R_SetExternalPtrAddr(CDR(cb), data); + LOGICAL(EXTPTR_TAG(CDR(cb)))[0] = early; + + // Preallocate the next callback in case the allocator jumps + push_callback(callbacks); +} + +void r_call_on_exit(void (*fn)(void* data), void* data) { + call_save_handler(fn, data, /* early = */ 0); +} + +void r_call_on_early_exit(void (*fn)(void* data), void* data) { + call_save_handler(fn, data, /* early = */ 1); +} diff --git a/src/cleancall.h b/src/cleancall.h new file mode 100644 index 00000000..f396c868 --- /dev/null +++ b/src/cleancall.h @@ -0,0 +1,41 @@ +#ifndef CLEANCALL_H +#define CLEANCALL_H + +#include +#include + +// -------------------------------------------------------------------- +// Internals +// -------------------------------------------------------------------- + +typedef union {void* p; DL_FUNC fn;} fn_ptr; + +#if (defined(R_VERSION) && R_VERSION < R_Version(3, 4, 0)) + SEXP R_MakeExternalPtrFn(DL_FUNC p, SEXP tag, SEXP prot); + DL_FUNC R_ExternalPtrAddrFn(SEXP s); +#endif + +// -------------------------------------------------------------------- +// API for packages that embed cleancall +// -------------------------------------------------------------------- + +// The R API does not have a setter for external function pointers +SEXP cleancall_MakeExternalPtrFn(DL_FUNC p, SEXP tag, SEXP prot); +void cleancall_SetExternalPtrAddrFn(SEXP s, DL_FUNC p); + +#define CLEANCALL_METHOD_RECORD \ + {"cleancall_call", (DL_FUNC) &cleancall_call, 2} + +SEXP cleancall_call(SEXP args, SEXP env); +extern SEXP cleancall_fns_dot_call; +void cleancall_init(); + +// -------------------------------------------------------------------- +// Public API +// -------------------------------------------------------------------- + +SEXP r_with_cleanup_context(SEXP (*fn)(void* data), void* data); +void r_call_on_exit(void (*fn)(void* data), void* data); +void r_call_on_early_exit(void (*fn)(void* data), void* data); + +#endif diff --git a/src/init.c b/src/init.c index 7a1c2cf4..ce1bc433 100644 --- a/src/init.c +++ b/src/init.c @@ -1,5 +1,6 @@ #include "processx.h" +#include "cleancall.h" #include #include @@ -12,6 +13,7 @@ SEXP processx__echo_on(); SEXP processx__echo_off(); static const R_CallMethodDef callMethods[] = { + CLEANCALL_METHOD_RECORD, { "processx_exec", (DL_FUNC) &processx_exec, 16 }, { "processx_wait", (DL_FUNC) &processx_wait, 3 }, { "processx_is_alive", (DL_FUNC) &processx_is_alive, 2 }, @@ -62,6 +64,7 @@ void R_init_processx(DllInfo *dll) { R_registerRoutines(dll, NULL, callMethods, NULL, NULL); R_useDynamicSymbols(dll, FALSE); R_forceSymbols(dll, TRUE); + cleancall_fns_dot_call = Rf_findVar(Rf_install(".Call"), R_BaseEnv); #ifdef _WIN32 R_init_processx_win(); #else diff --git a/src/unix/processx.c b/src/unix/processx.c index 5f27e2a7..a9e2309f 100644 --- a/src/unix/processx.c +++ b/src/unix/processx.c @@ -10,6 +10,7 @@ #include #include "../processx.h" +#include "../cleancall.h" /* Internals */ @@ -617,6 +618,14 @@ void processx__collect_exit_status(SEXP status, int retval, int wstat) { handle->collected = 1; } +static void processx__wait_cleanup(void *ptr) { + int *fds = (int*) ptr; + if (!fds) return; + if (fds[0] >= 0) close(fds[0]); + if (fds[1] >= 0) close(fds[1]); + free(fds); +} + /* In general we need to worry about three asynchronous processes here: * 1. The main code, i.e. the code in this function. * 2. The finalizer, that can be triggered by any R function. @@ -650,6 +659,11 @@ SEXP processx_wait(SEXP status, SEXP timeout, SEXP name) { int ret = 0; pid_t pid; + int *fds = malloc(sizeof(int) * 2); + if (!fds) R_THROW_SYSTEM_ERROR("Allocating memory when waiting"); + fds[0] = fds[1] = -1; + r_call_on_exit(processx__wait_cleanup, fds); + processx__block_sigchld(); if (!handle) { @@ -674,6 +688,8 @@ SEXP processx_wait(SEXP status, SEXP timeout, SEXP name) { processx__unblock_sigchld(); R_THROW_SYSTEM_ERROR("processx error when waiting for '%s'", cname); } + fds[0] = handle->waitpipe[0]; + fds[1] = handle->waitpipe[1]; processx__nonblock_fcntl(handle->waitpipe[0], 1); processx__nonblock_fcntl(handle->waitpipe[1], 1); @@ -684,6 +700,8 @@ SEXP processx_wait(SEXP status, SEXP timeout, SEXP name) { processx__unblock_sigchld(); + + while (ctimeout < 0 || timeleft > PROCESSX_INTERRUPT_INTERVAL) { do { ret = poll(&fd, 1, PROCESSX_INTERRUPT_INTERVAL); @@ -720,8 +738,7 @@ SEXP processx_wait(SEXP status, SEXP timeout, SEXP name) { } cleanup: - if (handle->waitpipe[0] >= 0) close(handle->waitpipe[0]); - if (handle->waitpipe[1] >= 0) close(handle->waitpipe[1]); + /* pipe is closed in the on_exit handler */ handle->waitpipe[0] = -1; handle->waitpipe[1] = -1; diff --git a/tests/testthat/test-wait.R b/tests/testthat/test-wait.R index c6db8e81..7c44d43a 100644 --- a/tests/testthat/test-wait.R +++ b/tests/testthat/test-wait.R @@ -45,3 +45,34 @@ test_that("wait after process already exited", { expect_true(system.time(p$wait())[["elapsed"]] < 1) expect_true(system.time(p$wait(3000))[["elapsed"]] < 1) }) + +test_that("no fd leak on unix", { + skip_on_cran() + skip_on_os("solaris") + if (is_windows()) return(expect_true(TRUE)) + + # We run this test in a subprocess, so we can send an interrupt to it + # We start a subprocess (within the subprocess) and wait on it. + # Then the main process, after waiting a second so that everything is + # set up in the subprocess, sends an interrupt. The suprocess catches + # this interrupts and copies everything back to the main process. + + rs <- callr::r_session$new() + on.exit(rs$close(), add = TRUE) + + rs$call(function() { + fd1 <- ps::ps_num_fds(ps::ps_handle()) + p <- processx::process$new("sleep", "3", poll_connection = FALSE) + err <- tryCatch(ret <- p$wait(), interrupt = function(e) e) + fd2 <- ps::ps_num_fds(ps::ps_handle()) + list(fd1 = fd1, fd2 = fd2, err = err) + }) + + Sys.sleep(1) + rs$interrupt() + rs$poll_io(1000) + res <- rs$read() + + expect_equal(res$result$fd1, res$result$fd2) + expect_s3_class(res$result$err, "interrupt") +}) From d328de2a13c03e42be922d4b01c867446930681c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Fri, 7 Feb 2020 09:43:07 +0000 Subject: [PATCH 2/2] Document rethrow_call_with_cleanup in errors.R --- R/errors.R | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/R/errors.R b/R/errors.R index 9d7c4a6f..5bcc28ed 100644 --- a/R/errors.R +++ b/R/errors.R @@ -78,6 +78,8 @@ # * Update wording of error printout to be less intimidating, avoid jargon # * Use default printing in interactive mode, so RStudio can detect the # error and highlight it. +# * Add the rethrow_call_with_cleanup function, to work with embedded +# cleancall. err <- local({ @@ -365,6 +367,18 @@ err <- local({ package_env <- topenv() + #' Version of rethrow_call that supports cleancall + #' + #' This function is the same as [rethrow_call()], except that it + #' uses cleancall's [.Call()] wrapper, to enable resource cleanup. + #' See https://github.com/r-lib/cleancall#readme for more about + #' resource cleanup. + #' + #' @noRd + #' @param .NAME Compiled function to call, see [.Call()]. + #' @param ... Function arguments, see [.Call()]. + #' @return Result of the call. + rethrow_call_with_cleanup <- function(.NAME, ...) { call <- sys.call() nframe <- sys.nframe()