Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@
^CRAN-RELEASE$
^doc$
^Meta$
^delayed_dump.*$
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ inst/doc

doc
Meta
delayed_dump*
7 changes: 5 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: delayed
Title: A Framework for Parallelizing Dependent Tasks
Version: 0.4.0
Version: 0.5.0
Authors@R: c(
person("Jeremy", "Coyle", email = "jeremyrcoyle@gmail.com",
role = c("aut", "cre", "cph"),
Expand Down Expand Up @@ -28,7 +28,10 @@ Imports:
BBmisc,
progress,
R.utils,
R.oo
R.oo,
futile.logger,
tryCatchLog,
fs
Suggests:
testthat,
knitr,
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ export(SequentialJob)
export(bundle_delayed)
export(delayed)
export(delayed_fun)
export(delayed_traceback)
export(eval_delayed)
export(find_delayed_error)
export(plot_delayed_shiny)
import(R6)
import(futile.logger)
import(rlang)
import(rstackdeque)
import(visNetwork)
Expand Down
17 changes: 10 additions & 7 deletions R/Delayed.R
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ Delayed <- R6Class(
value <- scheduler$compute()
return(value)
}

),

active = list(
Expand Down Expand Up @@ -195,13 +194,17 @@ Delayed <- R6Class(
runtime_self = function() {
return(private$.job$runtime)
},
runtime = function(){
if(is.null(private$.runtime_total)){
sub_times <- sapply(self$delayed_dependencies,`[[`,"runtime")

private$.runtime_total <- sum(unlist(sub_times))+self$runtime_self
runtime = function() {
if (is.null(private$.runtime_total)) {
all_runtimes <- get_all_runtimes(self)
suppressWarnings({
dt <- rbindlist(all_runtimes)
})
sub_time <- dt[!duplicated(dt), sum(runtime)]

private$.runtime_total <- sub_time + self$runtime_self
}

return(private$.runtime_total)
},
state = function() {
Expand Down
49 changes: 30 additions & 19 deletions R/Job.R
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
#' Helper Function to Evaluate Delayed
#' @param to_eval a list as generated from Delayed$prepare_eval()
#' @param timeout a timeout indicating when to terminate the job
#' @param name the name of the delayed object (used for logging output)
#' @export
#' @importFrom R.utils withTimeout TimeoutException
#' @importFrom R.oo throw
eval_delayed <- function(to_eval, timeout = Inf) {
eval_delayed <- function(to_eval, timeout = Inf, name = "") {
if (timeout < 0) {
R.oo::throw(R.utils::TimeoutException("time exhausted in other steps"))
}

result <- R.utils::withTimeout(
{
rlang::eval_bare(
expr = to_eval$expr,
env = to_eval$env
)
},
timeout = timeout
)
if (is.finite(timeout)) {
setTimeLimit(timeout, timeout, transient = TRUE)
on.exit({
setTimeLimit(cpu = Inf, elapsed = Inf, transient = FALSE)
})
}

result <- try_with_logs({
result <- rlang::eval_bare(
expr = to_eval$expr,
env = to_eval$env
)
}, context = name)
return(result)
}

Expand Down Expand Up @@ -92,14 +97,19 @@ SequentialJob <- R6Class(
public = list(
initialize = function(delayed_object) {
to_eval <- delayed_object$prepare_eval()
start_time <-proc.time()

private$.result <- try({
set.seed(delayed_object$seed)
eval_delayed(to_eval, delayed_object$timeout)
})
start_time <- proc.time()

set.seed(delayed_object$seed)
private$.result <- try(
{
eval_delayed(to_eval,
delayed_object$timeout,
delayed_object$name)
},
silent = TRUE
)

private$.runtime <- (proc.time()-start_time)[[3]]
private$.runtime <- (proc.time() - start_time)[[3]]
super$initialize(delayed_object)
}
),
Expand Down Expand Up @@ -148,12 +158,13 @@ FutureJob <- R6Class(
env <- list(
eval_delayed = eval_delayed,
to_eval = delayed_object$prepare_eval(),
timeout = delayed_object$timeout
timeout = delayed_object$timeout,
name = delayed_object$name
)

private$.start_time <- proc.time()
private$.future <- future(
expr = quote(eval_delayed(to_eval, timeout)),
expr = quote(eval_delayed(to_eval, timeout, name)),
# expr = to_eval$expr,
# env = to_eval$env,
substitute = FALSE,
Expand Down
8 changes: 4 additions & 4 deletions R/Scheduler.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Scheduler <- R6Class(
initialize = function(delayed_object,
job_type = FutureJob,
nworkers = NULL,
verbose = FALSE,
verbose = getOption("delayed.verbose"),
progress = FALSE, ...) {
private$.delayed_object <- delayed_object
private$.task_lists <- list(
Expand Down Expand Up @@ -64,7 +64,7 @@ Scheduler <- R6Class(
dependent_uuid = NULL) {
state <- delayed_object$update_state
uuid <- delayed_object$uuid
delayed_object$seed <- runif(1,0,1e6)
delayed_object$seed <- runif(1, 0, 1e6)
private$.n_tasks <- private$.n_tasks + 1
delayed_object$task_order <- private$.n_tasks
assign(uuid, delayed_object, envir = private$.task_lists[[state]])
Expand Down Expand Up @@ -128,12 +128,12 @@ Scheduler <- R6Class(
job_type <- private$.job_type

if (current_task$sequential) {
SequentialJob$new(current_task)
self$update_task(current_task, "ready", "running")
SequentialJob$new(current_task)
} else {
self$update_task(current_task, "ready", "running")
current_task$timeout <- self$time_left
job <- job_type$new(current_task)
self$update_task(current_task, "ready", "running")
}

updated_tasks <- c(current_task)
Expand Down
8 changes: 0 additions & 8 deletions R/debug_helpers.R

This file was deleted.

91 changes: 91 additions & 0 deletions R/logging_helpers.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@

#' Get traceback for last delayed error
#' @param call_stack a call stack argument (defaults to last delayed error)
#' @param cat if TRUE, uses cat to print the traceback
#' @export
delayed_traceback <- function(call_stack=NULL, cat = TRUE){
if(is.null(call_stack)){
call_stack <- get0("traceback", envir=.delayed_env)
}
# TODO: maybe suppress the delayed boilerplate in here
if(length(call_stack)==0){
message("no traceback available")
}

stacktrace <- tryCatchLog::limitedLabelsCompact(call_stack, FALSE)
stacktrace <- paste(" ",
seq_along(stacktrace),
stacktrace,
collapse = "\n")


if(cat){
cat(stacktrace)
}

invisible(stacktrace)
}

#' @import futile.logger
default_condition_handler <- function(c){
condition_class <- class(c)[1]
message <- sprintf("%s %s %s", condition_class, context, c$message)




if (inherits(c,"error")){
log_fun <- flog.error

# TODO: add option to also do this for warnings
call_stack <- sys.calls()
assign("traceback", call_stack, envir=.delayed_env)

if(getOption("delayed.dumpfile")){
context <- fs::path_sanitize(context,"-")
filename <- sprintf("delayed_dump_%s_%s.rdata",
context,
strftime(Sys.time(),"%Y%m%d%H%M%S"))
utils::dump.frames()
save.image(file = filename)

message <- paste(message,
sprintf("\tframes dumped to %s", filename),
sep = "\n")

}

if(getOption("delayed.stacktrace")){
stacktrace <- delayed_traceback(call_stack, FALSE)
message <- paste(message, stacktrace, sep = "\n")
}



} else if(inherits(c,"warning")){
log_fun <- flog.warn
} else {
log_fun <- flog.info

}




log_fun(message)


}

try_with_logs <- function(expr, condition_handler=default_condition_handler, context=NULL){
# make a copy of the handler and add context
ch_instance <- condition_handler
environment(ch_instance) <- new.env(parent=environment(condition_handler))
assign("context",context, envir=environment(ch_instance))

tryCatch(
withCallingHandlers(expr,
condition = ch_instance
)
)
}
8 changes: 8 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,11 @@ find_delayed_error <- function(delayed_object) {
bundle_args <- function(...) {
return(list(...))
}

get_all_runtimes <- function(delayed_obj) {
dd_runtimes <- lapply(delayed_obj$delayed_dependencies, get_all_runtimes)
this_runtime <- list(uuid = delayed_obj$uuid, runtime = delayed_obj$runtime_self)

result <- c(unlist(dd_runtimes, recursive = FALSE), list(this_runtime))
return(result)
}
9 changes: 9 additions & 0 deletions R/zzz.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,12 @@
utils::packageDescription("delayed")$Title
))
}

.onLoad <- function(...){
options(delayed.stacktrace = FALSE)
options(delayed.dumpfile = FALSE)
options(delayed.verbose = FALSE)
}

.delayed_env <- new.env()

70 changes: 35 additions & 35 deletions README.html

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ v3](https://img.shields.io/badge/License-GPL%20v3-blue.svg)](http://www.gnu.org/

**Author:** [Jeremy Coyle](https://github.com/jeremyrcoyle)

-----
------------------------------------------------------------------------

## What’s `delayed`?

Expand All @@ -35,7 +35,7 @@ use `delayed`, please consult the package
[vignette](https://nhejazi.github.io/delayed/articles/delayed.html)
online, or do so from within [R](https://www.r-project.org/).

-----
------------------------------------------------------------------------

## Installation

Expand All @@ -53,14 +53,14 @@ Install the most recent *stable release* from GitHub via
devtools::install_github("tlverse/delayed")
```

-----
------------------------------------------------------------------------

## Issues

If you encounter any bugs or have any specific feature requests, please
[file an issue](https://github.com/tlverse/delayed/issues).

-----
------------------------------------------------------------------------

## Example

Expand All @@ -69,7 +69,7 @@ computations via chaining of tasks:

``` r
library(delayed)
#> delayed v0.3.0: Framework for Parallelizing Dependent Tasks
#> delayed v0.4.0: A Framework for Parallelizing Dependent Tasks

# delay a function that does a bit of math
mapfun <- function(x, y) {(x + y) / (x - y)}
Expand Down Expand Up @@ -99,15 +99,15 @@ chained_norm_pois$compute(nworkers = 2, verbose = TRUE)
#> updating mapfun(x = delayed_norm, y = delayed_pois) from ready to running
#> run:1 ready:0 workers:2
#> updating mapfun(x = delayed_norm, y = delayed_pois) from running to resolved
#> [1] -1.1601934 -0.4678799 -1.2152393 -0.8963905 -1.0718538 -1.0619060 -0.8325901
#> [1] -0.6688907 -1.2691496 -1.1808899 -1.7605806 -0.5992127 -0.6838026 -1.4086257
```

*Remark:* In the above, the delayed computation is carried out in
parallel using the framework offered by the excellent [`future`
package](https://github.com/HenrikBengtsson/future) and its associated
ecosystem.

-----
------------------------------------------------------------------------

## License

Expand Down
Loading