Skip to content

Allowing future's to timeout #169

@rcannood

Description

@rcannood

First off, thanks for this great package. It's insanely useful.

I've been trying to get a future expression to get killed after a given timeout, but I'm not sure whether the code shown below is the best way of going about this. Do you have any suggestions?

Thanks in advance!

Example:

future_timeout(
  expr = {
    Sys.sleep(100) # a really boring function
    data.frame(result = "finished", time = sleeper_time)
  },
  wait_time = 10,
  cancel_output_fun = function(t) data.frame(result = "killed", time = t),
  check_interval = 1,
  verbose = TRUE
)

Attempt:

#' Kill a future after a certain period of time
#'
#' @param expr An R \code{\link[base]{expression}} to be evaluated.
#' @param wait_time How long to wait before killing it.
#' @param cancel_output_fun A function \code{function(time_elapsed){...}} to call when the total time
#'   has exceeded the \code{wait_time}.
#' @param check_interval The time waited between checking whether the future has already finished.
#' @param verbose Whether or not to print waiting times.
#' @param ... extra arguments to be passed to the \code{\link[future]{future}} call.
#'
#' @importFrom future future plan resolved value
#' @export
#'
#' @examples
#' future_timeout(
#'   expr = {
#'     Sys.sleep(100) # a really boring function
#'     data.frame(result = "finished", time = sleeper_time)
#'   },
#'   wait_time = 10,
#'   cancel_output_fun = function(t) data.frame(result = "killed", time = t),
#'   check_interval = 1,
#'   verbose = TRUE
#' )
future_timeout <- function(expr, wait_time, cancel_output_fun, check_interval = 1, verbose = FALSE, ...) {
  expr <- substitute(expr)
  envir <- parent.frame()

  requireNamespace("future")
  start_time <- Sys.time()

  future_handle <- future(
    { expr },
    evaluator = plan("multisession"),
    substitute = FALSE,
    envir = envir,
    ...
  )

  time_waited <- 0
  while (!resolved(future_handle) && time_waited < wait_time) {
    Sys.sleep(check_interval)
    time_waited <- as.numeric(difftime(Sys.time(), start_time, units = "secs"))
    if (verbose) cat("Time waited: ", time_waited, "\n", sep = "")
  }

  if (resolved(future_handle)) {
    value(future_handle)
  } else {
    future:::ClusterRegistry("stop")
    cancel_output_fun(time_waited)
  }
}

Test:

context("Future helper")

test_that("Future helper works correctly", {
  test_fun <- function(sleeper_time, wait_time) {
    dry_run <- future_timeout({1}, wait_time = 5, function(x) x, 1)
    future_timeout(
      expr = {
        Sys.sleep(sleeper_time)
        dplyr::data_frame(result = "finished", time = sleeper_time)
      },
      wait_time = wait_time,
      cancel_output_fun = function(t) dplyr::data_frame(result = "killed", time = t),
      check_interval = 1
    )
  }

  out1 <- test_fun(10, 5)
  expect_equal(out1$result, "killed")
  expect_gte(out1$time, 5)

  out2 <- test_fun(5, 10)
  expect_equal(out2$result, "finished")
  expect_gte(out2$time, 5)

  # Also in parallel settings
  sleeper_times <- sample(c(runif(10, .5, 3), runif(10, 7, 10)))
  wait_time <- 4

  parallelMap::parallelStartMulticore(cpus = 2, show.info = TRUE)
  outp <- dplyr::bind_rows(parallelMap::parallelMap(test_fun, sleeper_times, more.args = list(wait_time = wait_time)))
  parallelMap::parallelStop()

  expected_result <- ifelse(sleeper_times < wait_time, "finished", "killed")
  expect_lte( mean(outp$result == expected_result), .9 ) # allow some tests to fail
  expect_true( all(outp$time >= pmin(sleeper_times, wait_time)) )
})

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions