Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
* processx now does no block SIGCHLD by default in the subprocess,
blocking potentially causes zombie sub-subprocesses (#240).

* The `process$wait()` method now does not leak file descriptors on
Unix when interrupted (#141).

# processx 3.4.1

* Now `run()` does not create an `ok` variable in the global environment.
Expand Down
4 changes: 4 additions & 0 deletions R/cleancall.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

call_with_cleanup <- function(ptr, ...) {
.Call(c_cleancall_call, pairlist(ptr, ...), parent.frame())
}
34 changes: 34 additions & 0 deletions R/errors.R
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
# * Update wording of error printout to be less intimidating, avoid jargon
# * Use default printing in interactive mode, so RStudio can detect the
# error and highlight it.
# * Add the rethrow_call_with_cleanup function, to work with embedded
# cleancall.

err <- local({

Expand Down Expand Up @@ -363,6 +365,37 @@ err <- local({
)
}

package_env <- topenv()

#' Version of rethrow_call that supports cleancall
#'
#' This function is the same as [rethrow_call()], except that it
#' uses cleancall's [.Call()] wrapper, to enable resource cleanup.
#' See https://github.com/r-lib/cleancall#readme for more about
#' resource cleanup.
#'
#' @noRd
#' @param .NAME Compiled function to call, see [.Call()].
#' @param ... Function arguments, see [.Call()].
#' @return Result of the call.

rethrow_call_with_cleanup <- function(.NAME, ...) {
call <- sys.call()
nframe <- sys.nframe()
withCallingHandlers(
package_env$call_with_cleanup(.NAME, ...),
error = function(e) {
e$`_nframe` <- nframe
e$call <- call
if (inherits(e, "simpleError")) {
class(e) <- c("c_error", "rlib_error", "error", "condition")
}
e$`_ignore` <- list(c(nframe + 1L, sys.nframe() + 1L))
throw(e)
}
)
}

# -- create traceback -------------------------------------------------

#' Create a traceback
Expand Down Expand Up @@ -723,3 +756,4 @@ new_error <- err$new_error
throw <- err$throw
rethrow <- err$rethrow
rethrow_call <- err$rethrow_call
rethrow_call_with_cleanup <- err$.internal$rethrow_call_with_cleanup
7 changes: 5 additions & 2 deletions R/process.R
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,11 @@ process <- R6::R6Class(

process_wait <- function(self, private, timeout) {
"!DEBUG process_wait `private$get_short_name()`"
rethrow_call(c_processx_wait, private$status, as.integer(timeout),
private$get_short_name())
rethrow_call_with_cleanup(
c_processx_wait, private$status,
as.integer(timeout),
private$get_short_name()
)
invisible(self)
}

Expand Down
2 changes: 1 addition & 1 deletion src/Makevars
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ OBJECTS = init.o poll.o errors.o processx-connection.o \
processx-vector.o create-time.o base64.o \
unix/childlist.o unix/connection.o \
unix/processx.o unix/sigchld.o unix/utils.o \
unix/named_pipe.o
unix/named_pipe.o cleancall.o

.PHONY: all clean

Expand Down
2 changes: 1 addition & 1 deletion src/Makevars.win
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
OBJECTS = init.o poll.o errors.o processx-connection.o \
processx-vector.o create-time.o base64.o \
win/processx.o win/stdio.o win/named_pipe.o \
win/utils.o win/thread.o
win/utils.o win/thread.o cleancall.o

.PHONY: all clean

Expand Down
164 changes: 164 additions & 0 deletions src/cleancall.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#define R_NO_REMAP
#include <Rinternals.h>

#include "cleancall.h"


#if (defined(R_VERSION) && R_VERSION < R_Version(3, 4, 0))
SEXP R_MakeExternalPtrFn(DL_FUNC p, SEXP tag, SEXP prot) {
fn_ptr ptr;
ptr.fn = p;
return R_MakeExternalPtr(ptr.p, tag, prot);
}
DL_FUNC R_ExternalPtrAddrFn(SEXP s) {
fn_ptr ptr;
ptr.p = EXTPTR_PTR(s);
return ptr.fn;
}
#endif

// The R API does not have a setter for function pointers

SEXP cleancall_MakeExternalPtrFn(DL_FUNC p, SEXP tag, SEXP prot) {
fn_ptr tmp;
tmp.fn = p;
return R_MakeExternalPtr(tmp.p, tag, prot);
}

void cleancall_SetExternalPtrAddrFn(SEXP s, DL_FUNC p) {
fn_ptr ptr;
ptr.fn = p;
R_SetExternalPtrAddr(s, ptr.p);
}


// Initialised at load time with the `.Call` primitive
SEXP cleancall_fns_dot_call = NULL;

void cleancall_init() {
cleancall_fns_dot_call = Rf_findVar(Rf_install(".Call"), R_BaseEnv);
}

struct eval_args {
SEXP call;
SEXP env;
};

static SEXP eval_wrap(void* data) {
struct eval_args* args = (struct eval_args*) data;
return Rf_eval(args->call, args->env);
}


SEXP cleancall_call(SEXP args, SEXP env) {
SEXP call = PROTECT(Rf_lcons(cleancall_fns_dot_call, args));
struct eval_args data = { call, env };

SEXP out = r_with_cleanup_context(&eval_wrap, &data);

UNPROTECT(1);
return out;
}


static SEXP callbacks = NULL;

// Preallocate a callback
static void push_callback(SEXP stack) {
SEXP top = CDR(stack);

SEXP early_handler = PROTECT(Rf_allocVector(LGLSXP, 1));
SEXP fn_extptr = PROTECT(cleancall_MakeExternalPtrFn(NULL, R_NilValue,
R_NilValue));
SEXP data_extptr = PROTECT(R_MakeExternalPtr(NULL, early_handler,
R_NilValue));
SEXP cb = Rf_cons(Rf_cons(fn_extptr, data_extptr), top);

SETCDR(stack, cb);

UNPROTECT(3);
}

struct data_wrapper {
SEXP (*fn)(void* data);
void *data;
SEXP callbacks;
int success;
};

static void call_exits(void* data) {
// Remove protecting node. Don't remove the preallocated callback on
// the top as it might contain a handler when something went wrong.
SEXP top = CDR(callbacks);

// Restore old stack
struct data_wrapper* state = data;
callbacks = (SEXP) state->callbacks;

// Handlers should not jump
while (top != R_NilValue) {
SEXP cb = CAR(top);
top = CDR(top);

void (*fn)(void*) = (void (*)(void*)) R_ExternalPtrAddrFn(CAR(cb));
void *data = (void*) EXTPTR_PTR(CDR(cb));
int early_handler = LOGICAL(EXTPTR_TAG(CDR(cb)))[0];

// Check for empty pointer in preallocated callbacks
if (fn) {
if (!early_handler || !state->success) fn(data);
}
}
}

static SEXP with_cleanup_context_wrap(void *data) {
struct data_wrapper* cdata = data;
SEXP ret = cdata->fn(cdata->data);
cdata->success = 1;
return ret;
}

SEXP r_with_cleanup_context(SEXP (*fn)(void* data), void* data) {
// Preallocate new stack before changing `callbacks` to avoid
// leaving the global variable in a bad state if alloc fails
SEXP new = PROTECT(Rf_cons(R_NilValue, R_NilValue));
push_callback(new);

SEXP old = callbacks;
callbacks = new;

struct data_wrapper state = { fn, data, old, 0 };

SEXP out = R_ExecWithCleanup(with_cleanup_context_wrap, &state,
&call_exits, &state);

UNPROTECT(1);
return out;
}

static void call_save_handler(void (*fn)(void *data), void* data,
int early) {
if (!callbacks) {
fn(data);
Rf_error("Internal error: Exit handler pushed outside "
"of an exit context");
}

SEXP cb = CADR(callbacks);

// Update pointers
cleancall_SetExternalPtrAddrFn(CAR(cb), (DL_FUNC) fn);
R_SetExternalPtrAddr(CDR(cb), data);
LOGICAL(EXTPTR_TAG(CDR(cb)))[0] = early;

// Preallocate the next callback in case the allocator jumps
push_callback(callbacks);
}

void r_call_on_exit(void (*fn)(void* data), void* data) {
call_save_handler(fn, data, /* early = */ 0);
}

void r_call_on_early_exit(void (*fn)(void* data), void* data) {
call_save_handler(fn, data, /* early = */ 1);
}
41 changes: 41 additions & 0 deletions src/cleancall.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#ifndef CLEANCALL_H
#define CLEANCALL_H

#include <Rversion.h>
#include <R_ext/Rdynload.h>

// --------------------------------------------------------------------
// Internals
// --------------------------------------------------------------------

typedef union {void* p; DL_FUNC fn;} fn_ptr;

#if (defined(R_VERSION) && R_VERSION < R_Version(3, 4, 0))
SEXP R_MakeExternalPtrFn(DL_FUNC p, SEXP tag, SEXP prot);
DL_FUNC R_ExternalPtrAddrFn(SEXP s);
#endif

// --------------------------------------------------------------------
// API for packages that embed cleancall
// --------------------------------------------------------------------

// The R API does not have a setter for external function pointers
SEXP cleancall_MakeExternalPtrFn(DL_FUNC p, SEXP tag, SEXP prot);
void cleancall_SetExternalPtrAddrFn(SEXP s, DL_FUNC p);

#define CLEANCALL_METHOD_RECORD \
{"cleancall_call", (DL_FUNC) &cleancall_call, 2}

SEXP cleancall_call(SEXP args, SEXP env);
extern SEXP cleancall_fns_dot_call;
void cleancall_init();

// --------------------------------------------------------------------
// Public API
// --------------------------------------------------------------------

SEXP r_with_cleanup_context(SEXP (*fn)(void* data), void* data);
void r_call_on_exit(void (*fn)(void* data), void* data);
void r_call_on_early_exit(void (*fn)(void* data), void* data);

#endif
3 changes: 3 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

#include "processx.h"
#include "cleancall.h"

#include <R_ext/Rdynload.h>
#include <R.h>
Expand All @@ -12,6 +13,7 @@ SEXP processx__echo_on();
SEXP processx__echo_off();

static const R_CallMethodDef callMethods[] = {
CLEANCALL_METHOD_RECORD,
{ "processx_exec", (DL_FUNC) &processx_exec, 16 },
{ "processx_wait", (DL_FUNC) &processx_wait, 3 },
{ "processx_is_alive", (DL_FUNC) &processx_is_alive, 2 },
Expand Down Expand Up @@ -62,6 +64,7 @@ void R_init_processx(DllInfo *dll) {
R_registerRoutines(dll, NULL, callMethods, NULL, NULL);
R_useDynamicSymbols(dll, FALSE);
R_forceSymbols(dll, TRUE);
cleancall_fns_dot_call = Rf_findVar(Rf_install(".Call"), R_BaseEnv);
#ifdef _WIN32
R_init_processx_win();
#else
Expand Down
21 changes: 19 additions & 2 deletions src/unix/processx.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <stdio.h>

#include "../processx.h"
#include "../cleancall.h"

/* Internals */

Expand Down Expand Up @@ -617,6 +618,14 @@ void processx__collect_exit_status(SEXP status, int retval, int wstat) {
handle->collected = 1;
}

static void processx__wait_cleanup(void *ptr) {
int *fds = (int*) ptr;
if (!fds) return;
if (fds[0] >= 0) close(fds[0]);
if (fds[1] >= 0) close(fds[1]);
free(fds);
}

/* In general we need to worry about three asynchronous processes here:
* 1. The main code, i.e. the code in this function.
* 2. The finalizer, that can be triggered by any R function.
Expand Down Expand Up @@ -650,6 +659,11 @@ SEXP processx_wait(SEXP status, SEXP timeout, SEXP name) {
int ret = 0;
pid_t pid;

int *fds = malloc(sizeof(int) * 2);
if (!fds) R_THROW_SYSTEM_ERROR("Allocating memory when waiting");
fds[0] = fds[1] = -1;
r_call_on_exit(processx__wait_cleanup, fds);

processx__block_sigchld();

if (!handle) {
Expand All @@ -674,6 +688,8 @@ SEXP processx_wait(SEXP status, SEXP timeout, SEXP name) {
processx__unblock_sigchld();
R_THROW_SYSTEM_ERROR("processx error when waiting for '%s'", cname);
}
fds[0] = handle->waitpipe[0];
fds[1] = handle->waitpipe[1];
processx__nonblock_fcntl(handle->waitpipe[0], 1);
processx__nonblock_fcntl(handle->waitpipe[1], 1);

Expand All @@ -684,6 +700,8 @@ SEXP processx_wait(SEXP status, SEXP timeout, SEXP name) {

processx__unblock_sigchld();



while (ctimeout < 0 || timeleft > PROCESSX_INTERRUPT_INTERVAL) {
do {
ret = poll(&fd, 1, PROCESSX_INTERRUPT_INTERVAL);
Expand Down Expand Up @@ -720,8 +738,7 @@ SEXP processx_wait(SEXP status, SEXP timeout, SEXP name) {
}

cleanup:
if (handle->waitpipe[0] >= 0) close(handle->waitpipe[0]);
if (handle->waitpipe[1] >= 0) close(handle->waitpipe[1]);
/* pipe is closed in the on_exit handler */
handle->waitpipe[0] = -1;
handle->waitpipe[1] = -1;

Expand Down
Loading