diff --git a/.Rbuildignore b/.Rbuildignore index 18aee3d..30fb57b 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -28,3 +28,4 @@ ^CRAN-RELEASE$ ^doc$ ^Meta$ +^delayed_dump.*$ \ No newline at end of file diff --git a/.gitignore b/.gitignore index 1e871e0..cd764e9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ inst/doc doc Meta +delayed_dump* \ No newline at end of file diff --git a/DESCRIPTION b/DESCRIPTION index fdc013e..8c13554 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: delayed Title: A Framework for Parallelizing Dependent Tasks -Version: 0.4.0 +Version: 0.5.0 Authors@R: c( person("Jeremy", "Coyle", email = "jeremyrcoyle@gmail.com", role = c("aut", "cre", "cph"), @@ -28,7 +28,10 @@ Imports: BBmisc, progress, R.utils, - R.oo + R.oo, + futile.logger, + tryCatchLog, + fs Suggests: testthat, knitr, diff --git a/NAMESPACE b/NAMESPACE index fbcdbd6..d294144 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -8,10 +8,12 @@ export(SequentialJob) export(bundle_delayed) export(delayed) export(delayed_fun) +export(delayed_traceback) export(eval_delayed) export(find_delayed_error) export(plot_delayed_shiny) import(R6) +import(futile.logger) import(rlang) import(rstackdeque) import(visNetwork) diff --git a/R/Delayed.R b/R/Delayed.R index eb54f29..aeb32e6 100644 --- a/R/Delayed.R +++ b/R/Delayed.R @@ -136,7 +136,6 @@ Delayed <- R6Class( value <- scheduler$compute() return(value) } - ), active = list( @@ -195,13 +194,17 @@ Delayed <- R6Class( runtime_self = function() { return(private$.job$runtime) }, - runtime = function(){ - if(is.null(private$.runtime_total)){ - sub_times <- sapply(self$delayed_dependencies,`[[`,"runtime") - - private$.runtime_total <- sum(unlist(sub_times))+self$runtime_self + runtime = function() { + if (is.null(private$.runtime_total)) { + all_runtimes <- get_all_runtimes(self) + suppressWarnings({ + dt <- rbindlist(all_runtimes) + }) + sub_time <- dt[!duplicated(dt), sum(runtime)] + + private$.runtime_total <- sub_time + self$runtime_self } - + return(private$.runtime_total) }, state = function() { diff --git a/R/Job.R b/R/Job.R index 513c5d0..b4c8139 100644 --- a/R/Job.R +++ b/R/Job.R @@ -1,23 +1,28 @@ #' Helper Function to Evaluate Delayed #' @param to_eval a list as generated from Delayed$prepare_eval() #' @param timeout a timeout indicating when to terminate the job +#' @param name the name of the delayed object (used for logging output) #' @export #' @importFrom R.utils withTimeout TimeoutException #' @importFrom R.oo throw -eval_delayed <- function(to_eval, timeout = Inf) { +eval_delayed <- function(to_eval, timeout = Inf, name = "") { if (timeout < 0) { R.oo::throw(R.utils::TimeoutException("time exhausted in other steps")) } - result <- R.utils::withTimeout( - { - rlang::eval_bare( - expr = to_eval$expr, - env = to_eval$env - ) - }, - timeout = timeout - ) + if (is.finite(timeout)) { + setTimeLimit(timeout, timeout, transient = TRUE) + on.exit({ + setTimeLimit(cpu = Inf, elapsed = Inf, transient = FALSE) + }) + } + + result <- try_with_logs({ + result <- rlang::eval_bare( + expr = to_eval$expr, + env = to_eval$env + ) + }, context = name) return(result) } @@ -92,14 +97,19 @@ SequentialJob <- R6Class( public = list( initialize = function(delayed_object) { to_eval <- delayed_object$prepare_eval() - start_time <-proc.time() - - private$.result <- try({ - set.seed(delayed_object$seed) - eval_delayed(to_eval, delayed_object$timeout) - }) + start_time <- proc.time() + + set.seed(delayed_object$seed) + private$.result <- try( + { + eval_delayed(to_eval, + delayed_object$timeout, + delayed_object$name) + }, + silent = TRUE + ) - private$.runtime <- (proc.time()-start_time)[[3]] + private$.runtime <- (proc.time() - start_time)[[3]] super$initialize(delayed_object) } ), @@ -148,12 +158,13 @@ FutureJob <- R6Class( env <- list( eval_delayed = eval_delayed, to_eval = delayed_object$prepare_eval(), - timeout = delayed_object$timeout + timeout = delayed_object$timeout, + name = delayed_object$name ) private$.start_time <- proc.time() private$.future <- future( - expr = quote(eval_delayed(to_eval, timeout)), + expr = quote(eval_delayed(to_eval, timeout, name)), # expr = to_eval$expr, # env = to_eval$env, substitute = FALSE, diff --git a/R/Scheduler.R b/R/Scheduler.R index a4cd5db..289c661 100644 --- a/R/Scheduler.R +++ b/R/Scheduler.R @@ -21,7 +21,7 @@ Scheduler <- R6Class( initialize = function(delayed_object, job_type = FutureJob, nworkers = NULL, - verbose = FALSE, + verbose = getOption("delayed.verbose"), progress = FALSE, ...) { private$.delayed_object <- delayed_object private$.task_lists <- list( @@ -64,7 +64,7 @@ Scheduler <- R6Class( dependent_uuid = NULL) { state <- delayed_object$update_state uuid <- delayed_object$uuid - delayed_object$seed <- runif(1,0,1e6) + delayed_object$seed <- runif(1, 0, 1e6) private$.n_tasks <- private$.n_tasks + 1 delayed_object$task_order <- private$.n_tasks assign(uuid, delayed_object, envir = private$.task_lists[[state]]) @@ -128,12 +128,12 @@ Scheduler <- R6Class( job_type <- private$.job_type if (current_task$sequential) { - SequentialJob$new(current_task) self$update_task(current_task, "ready", "running") + SequentialJob$new(current_task) } else { + self$update_task(current_task, "ready", "running") current_task$timeout <- self$time_left job <- job_type$new(current_task) - self$update_task(current_task, "ready", "running") } updated_tasks <- c(current_task) diff --git a/R/debug_helpers.R b/R/debug_helpers.R deleted file mode 100644 index bde7a49..0000000 --- a/R/debug_helpers.R +++ /dev/null @@ -1,8 +0,0 @@ -.delayed_traceback <- NULL - -delayed_traceback <- function() { - traceback(.delayed_traceback) -} - -delayed_log_traceback <- function() { -} diff --git a/R/logging_helpers.R b/R/logging_helpers.R new file mode 100644 index 0000000..dc3f27e --- /dev/null +++ b/R/logging_helpers.R @@ -0,0 +1,91 @@ + +#' Get traceback for last delayed error +#' @param call_stack a call stack argument (defaults to last delayed error) +#' @param cat if TRUE, uses cat to print the traceback +#' @export +delayed_traceback <- function(call_stack=NULL, cat = TRUE){ + if(is.null(call_stack)){ + call_stack <- get0("traceback", envir=.delayed_env) + } + # TODO: maybe suppress the delayed boilerplate in here + if(length(call_stack)==0){ + message("no traceback available") + } + + stacktrace <- tryCatchLog::limitedLabelsCompact(call_stack, FALSE) + stacktrace <- paste(" ", + seq_along(stacktrace), + stacktrace, + collapse = "\n") + + + if(cat){ + cat(stacktrace) + } + + invisible(stacktrace) +} + +#' @import futile.logger +default_condition_handler <- function(c){ + condition_class <- class(c)[1] + message <- sprintf("%s %s %s", condition_class, context, c$message) + + + + + if (inherits(c,"error")){ + log_fun <- flog.error + + # TODO: add option to also do this for warnings + call_stack <- sys.calls() + assign("traceback", call_stack, envir=.delayed_env) + + if(getOption("delayed.dumpfile")){ + context <- fs::path_sanitize(context,"-") + filename <- sprintf("delayed_dump_%s_%s.rdata", + context, + strftime(Sys.time(),"%Y%m%d%H%M%S")) + utils::dump.frames() + save.image(file = filename) + + message <- paste(message, + sprintf("\tframes dumped to %s", filename), + sep = "\n") + + } + + if(getOption("delayed.stacktrace")){ + stacktrace <- delayed_traceback(call_stack, FALSE) + message <- paste(message, stacktrace, sep = "\n") + } + + + + } else if(inherits(c,"warning")){ + log_fun <- flog.warn + } else { + log_fun <- flog.info + + } + + + + + log_fun(message) + + +} + +try_with_logs <- function(expr, condition_handler=default_condition_handler, context=NULL){ + # make a copy of the handler and add context + ch_instance <- condition_handler + environment(ch_instance) <- new.env(parent=environment(condition_handler)) + assign("context",context, envir=environment(ch_instance)) + + tryCatch( + withCallingHandlers(expr, + condition = ch_instance + ) + ) +} diff --git a/R/utils.R b/R/utils.R index 57a1213..94e82c9 100644 --- a/R/utils.R +++ b/R/utils.R @@ -35,3 +35,11 @@ find_delayed_error <- function(delayed_object) { bundle_args <- function(...) { return(list(...)) } + +get_all_runtimes <- function(delayed_obj) { + dd_runtimes <- lapply(delayed_obj$delayed_dependencies, get_all_runtimes) + this_runtime <- list(uuid = delayed_obj$uuid, runtime = delayed_obj$runtime_self) + + result <- c(unlist(dd_runtimes, recursive = FALSE), list(this_runtime)) + return(result) +} diff --git a/R/zzz.R b/R/zzz.R index 658fedf..c619ce5 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -5,3 +5,12 @@ utils::packageDescription("delayed")$Title )) } + +.onLoad <- function(...){ + options(delayed.stacktrace = FALSE) + options(delayed.dumpfile = FALSE) + options(delayed.verbose = FALSE) +} + +.delayed_env <- new.env() + diff --git a/README.html b/README.html index 35669a9..e11ca55 100644 --- a/README.html +++ b/README.html @@ -603,7 +603,7 @@
delayed@@ -614,47 +614,47 @@A framework for parallelizing dependent tasks
delayed?For standard use, we recommend installing the package from CRAN via
- +install.packages("delayed")Install the most recent stable release from GitHub via devtools:
devtools::install_github("tlverse/delayed")If you encounter any bugs or have any specific feature requests, please file an issue.
This minimal example shows how to use delayed to handle dependent computations via chaining of tasks:
library(delayed)
-#> delayed v0.3.0: Framework for Parallelizing Dependent Tasks
-
-# delay a function that does a bit of math
-mapfun <- function(x, y) {(x + y) / (x - y)}
-delayed_mapfun <- delayed_fun(mapfun)
-
-set.seed(14765)
-library(future)
-plan(multicore, workers = 2)
-const <- 7
-
-# re-define the delayed object from above
-delayed_norm <- delayed(rnorm(n = const))
-delayed_pois <- delayed(rpois(n = const, lambda = const))
-chained_norm_pois <- delayed_mapfun(delayed_norm, delayed_pois)
-
-# compute it using the future plan (multicore with 2 cores)
-chained_norm_pois$compute(nworkers = 2, verbose = TRUE)
-#> run:0 ready:2 workers:2
-#> updating rnorm(n = const) from ready to running
-#> run:1 ready:1 workers:2
-#> updating rpois(n = const, lambda = const) from ready to running
-#> run:2 ready:0 workers:2
-#> updating rnorm(n = const) from running to resolved
-#> updating rpois(n = const, lambda = const) from running to resolved
-#> updating mapfun(x = delayed_norm, y = delayed_pois) from waiting to ready
-#> run:0 ready:1 workers:2
-#> updating mapfun(x = delayed_norm, y = delayed_pois) from ready to running
-#> run:1 ready:0 workers:2
-#> updating mapfun(x = delayed_norm, y = delayed_pois) from running to resolved
-#> [1] -1.1601934 -0.4678799 -1.2152393 -0.8963905 -1.0718538 -1.0619060 -0.8325901library(delayed)
+#> delayed v0.4.0: A Framework for Parallelizing Dependent Tasks
+
+# delay a function that does a bit of math
+mapfun <- function(x, y) {(x + y) / (x - y)}
+delayed_mapfun <- delayed_fun(mapfun)
+
+set.seed(14765)
+library(future)
+plan(multicore, workers = 2)
+const <- 7
+
+# re-define the delayed object from above
+delayed_norm <- delayed(rnorm(n = const))
+delayed_pois <- delayed(rpois(n = const, lambda = const))
+chained_norm_pois <- delayed_mapfun(delayed_norm, delayed_pois)
+
+# compute it using the future plan (multicore with 2 cores)
+chained_norm_pois$compute(nworkers = 2, verbose = TRUE)
+#> run:0 ready:2 workers:2
+#> updating rnorm(n = const) from ready to running
+#> run:1 ready:1 workers:2
+#> updating rpois(n = const, lambda = const) from ready to running
+#> run:2 ready:0 workers:2
+#> updating rnorm(n = const) from running to resolved
+#> updating rpois(n = const, lambda = const) from running to resolved
+#> updating mapfun(x = delayed_norm, y = delayed_pois) from waiting to ready
+#> run:0 ready:1 workers:2
+#> updating mapfun(x = delayed_norm, y = delayed_pois) from ready to running
+#> run:1 ready:0 workers:2
+#> updating mapfun(x = delayed_norm, y = delayed_pois) from running to resolved
+#> [1] -0.6688907 -1.2691496 -1.1808899 -1.7605806 -0.5992127 -0.6838026 -1.4086257Remark: In the above, the delayed computation is carried out in parallel using the framework offered by the excellent future package and its associated ecosystem.
rlang updates.R/Delayed.R
DelayedClass.Rd+#> [1] TRUEd$compute()#> [1] 7
#> [1] TRUEd$compute() +#> [1] 7
R/Scheduler.R
Scheduler.Rd+#> [1] 7
#> [1] 7
Delayed objects.
bundle_delayed(delayed_list) +bundle_delayed(delayed_list) -bundle_args(...)+bundle_args(...)