Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ src/Makevars
windows
/doc/
/Meta/
.idea/
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: clustermq
Title: Evaluate Function Calls on HPC Schedulers (LSF, SGE, SLURM, PBS/Torque)
Title: Evaluate Function Calls on HPC Schedulers (LSF, SGE, GCS, OCS, SLURM, PBS/Torque)
Version: 0.9.9
Authors@R: c(
person('Michael', 'Schubert', email='mschu.dev@gmail.com',
Expand Down
2 changes: 1 addition & 1 deletion R/clustermq-package.r
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#' Evaluate Function Calls on HPC Schedulers (LSF, SGE, SLURM)
#' Evaluate Function Calls on HPC Schedulers (LSF, SGE, GCS, OCS, SLURM)
#'
#' Provides the \code{Q} function to send arbitrary function calls to
#' workers on HPC schedulers without relying on network-mounted storage.
Expand Down
73 changes: 73 additions & 0 deletions R/qsys_sge.r
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,79 @@ SGE = R6::R6Class("SGE",
cloneable = FALSE
)

#' Class for Open Cluster Scheduler (OCS)
OCS = R6::R6Class("OCS",
inherit = QSys,

public = list(
Comment on lines +61 to +64
Copy link
Owner

@mschubert mschubert Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the SGE initializer not reused here? Job names are guaranteed to be unique within clustermq; but if IDs are better, we should use them in SGE as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no access to SGE and do not know if job names were unique back then with the old Sun Microsystems release.

initialize = function(addr, n_jobs, master, ..., template=getOption("clustermq.template", class(self)[1]),
log_worker=FALSE, log_file=NULL, verbose=TRUE) {
super$initialize(addr=addr, master=master, template=template)

# fill the template with options and required fields
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please only add comments/whitespace where they add value. If the function is called fill_template, a comment with fill the template is superfluous.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

opts = private$fill_options(n_jobs=n_jobs, ...)
filled = fill_template(private$template, opts, required=c("master", "n_jobs"))

private$job_name = opts$job_name
if (verbose)
message("Submitting ", n_jobs, " worker jobs to ", class(self)[1], " as ", sQuote(private$job_name), " ...")

# submit the job with qsub (stdin-based) and capture the output
# on success the output will contain the job id, on failure the error message
private$qsub_stdout = system2("qsub", input=filled, stdout=TRUE)

# check the return code and stop on error
status = attr(private$qsub_stdout, "status")
if (!is.null(status) && status != 0)
private$template_error(class(self)[1], status, filled)

# try to read the job ID from stdout. On error stop
private$job_id <- regmatches(private$qsub_stdout, regexpr("^[0-9]+", private$qsub_stdout))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be consistent with assignments; we use = by convention in this project

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if (length(private$job_id) == 0)
private$template_error(class(self)[1], private$qsub_stdout, filled)

# if we got here, we have a job ID and can proceed
if (verbose)
message("Submitted job has ID ", private$job_id)

private$master$add_pending_workers(n_jobs)
private$is_cleaned_up = FALSE
},

cleanup = function(success, timeout) {
# first call finalize to send qdel ...
private$finalize()
Comment on lines +100 to +101
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qdel should only be called if there are still running jobs, not otherwise. I believe the cleanup implementation in SGE is correct.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. This did not work for me when i had the same code for OCS/GCS. qdel was not triggered although the worker tasks where processed and there were still pending jobs...


# ... then set the cleaned up flag to avoid sending qdel again
private$is_cleaned_up = success
}
),

private = list(
qsub_stdout = NULL,
job_name = NULL,
job_id = NULL,

finalize = function(quiet = TRUE) {
if (!private$is_cleaned_up) {
system(paste("qdel", private$job_id), ignore.stdout=quiet, ignore.stderr=quiet, wait=FALSE)
}
private$is_cleaned_up = TRUE
}
),

cloneable = FALSE
)

#' Class for Gridware Cluster Scheduler (GCS)
GCS = R6::R6Class("GCS",
inherit = OCS,
cloneable = FALSE

# no changes needed, but we want to have a separate class for GCS to allow for GCS-specific
# templates and enterprise edition options
)

PBS = R6::R6Class("PBS",
inherit = SGE,

Expand Down
9 changes: 4 additions & 5 deletions R/zzz.r
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
qsys_default = toupper(getOption('clustermq.scheduler'))

if (length(qsys_default) == 0) {
qname = c("SLURM", "LSF", "SGE", "LOCAL")
exec = Sys.which(c("sbatch", "bsub", "qsub"))
select = c(which(nchar(exec) > 0), 4)[1]
qname = c("SLURM", "LSF", "SGE", "GCS", "OCS", "LOCAL")
exec = Sys.which(c("sbatch", "bsub", "qsub", "qsub", "qsub"))
select = c(which(nchar(exec) > 0), 6)[1]
Comment on lines +13 to +15
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work: We're checking available shell commands to guess the scheduler. If qsub is available, this is SGE (by assumption). We can not distinguish between PBS, Torque, GCS, OCS; so it makes no sense to check these here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is your recommendation here?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best to leave the code as it was before

qsys_default = qname[select]
}

Expand All @@ -26,8 +26,7 @@
#' @keywords internal
.onAttach = function(libname, pkgname) {
if (is.null(getOption("clustermq.scheduler"))) {
packageStartupMessage("* Option 'clustermq.scheduler' not set, ",
"defaulting to ", sQuote(qsys_default))
packageStartupMessage("* Option 'clustermq.scheduler' not set, ", "defaulting to ", sQuote(qsys_default))
packageStartupMessage("--- see: https://mschubert.github.io/clustermq/articles/userguide.html#configuration")
}
if (!libzmq_has_draft()) {
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ schedulers](https://mschubert.github.io/clustermq/articles/userguide.html#config
* [SLURM](https://mschubert.github.io/clustermq/articles/userguide.html#slurm) - *should work without setup*
* [LSF](https://mschubert.github.io/clustermq/articles/userguide.html#lsf) - *should work without setup*
* [SGE](https://mschubert.github.io/clustermq/articles/userguide.html#sge) - *may require configuration*
* [GCS](https://mschubert.github.io/clustermq/articles/userguide.html#gcs) - *works without setup*
* [OCS](https://mschubert.github.io/clustermq/articles/userguide.html#ocs) - *works without setup*
Comment on lines +54 to +55
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect, needs clustermq.scheduler to be set (see PBS/Torque)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Now I understand. From the scheduler end there is no change required. A default OCS/GCS installtion is sufficient.

* [PBS](https://mschubert.github.io/clustermq/articles/userguide.html#pbs)/[Torque](https://mschubert.github.io/clustermq/articles/userguide.html#torque) - *needs* `options(clustermq.scheduler="PBS"/"Torque")`
* via [SSH](https://mschubert.github.io/clustermq/articles/userguide.html#ssh-connector) -
*needs* `options(clustermq.scheduler="ssh", clustermq.ssh.host=<yourhost>)`
Expand Down
49 changes: 49 additions & 0 deletions inst/GCS.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Submit client should only show the job ID of the new job on success
#$ -terse

# Name of the job visible in OCS
#$ -N {{ job_name }}

# Join error and output file.
#$ -j y

# Location of the output file
#$ -o {{ log_file | /dev/null }}

# Start R job in the working directory
#$ -cwd

# Export the full environment to the R job (e.g if *LD_LIBRARY_PATH* is required).
# Depending on security settings might require a cluster manager to set
# ENABLE_SUBMIT_LIB_PATH=1 as *qmaster_param*
#$ -V

# Spawns workload as tasks of an array job into the scheduler (one job with multiple tasks)
#$ -t 1-{{ n_jobs }}

# Each array task will allocate one slot in the cluster, if not other specified.
#$ -pe mytestpe {{ cores | 1 }}

# Per slot the job will get one power core (C) assuming R code is single-threaded, if not other specified.
#$ -bunit C
#$ -bamount {{ threads | 1 }}

# Cores on a host are packed (cores on a die or chiplet sharing same NUMA node and caches if possible)
#$ -bstrategy packed
#$ -btype host

# The scheduler will do the binding via *HWLOC*.
# Change to *env* if scheduler should make binding decision but not do the binding itself.
#$ -binstance set

# Allows to set resource requests like memory (1 GB [in bytes])
# to set runtime limits (1 hour [in seconds])
# or to influence scheduler resource selection (job will be executed in all.q queue)
#$ -l mem_free={{ memory | 1073741824 }},h_rt={{ walltime | 3600 }},q=all.q

# Tag the job so that it can be identified later on (e.g. in a JSV script before
# submission so the job can get adapted or for filtering later on)
#$ -ac application=clustermq

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
37 changes: 37 additions & 0 deletions inst/OCS.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Submit client should only show the job ID of the new job on success
#$ -terse

# Name of the job visible in OCS
#$ -N {{ job_name }}

# Join error and output file
#$ -j y

# Location of the output file
#$ -o {{ log_file | /dev/null }}

# Start R job in the working directory
#$ -cwd

# Export the full environment to the R job (e.g if *LD_LIBRARY_PATH* is required)
# depending on security settings might require a cluster manager to set
# ENABLE_SUBMIT_LIB_PATH=1 as *qmaster_param*
#$ -V

# Spawns workload as tasks of an array job into the scheduler (one job with multiple tasks)
#$ -t 1-{{ n_jobs }}

# Each array task will allocate one slot in the cluster, if not other specified.
#$ -pe mytestpe {{ cores | 1 }}

# Per slot the job will get one power core (C) assuming R code is single-threaded, if not other specified.
#$ -bunit C
#$ -bamount {{ threads | 1 }}

# Allows to set resource requests like memory (default 1 GB in bytes)
# to set runtime limits (default 1 hour in seconds)
# or to influence scheduler resource selection (job will be executed in all.q)
#$ -l mem_free={{ memory | 1073741824 }},h_rt={{ runtime | 3600 }},q=all.q

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
4 changes: 2 additions & 2 deletions vignettes/faq.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Running 1 calculations (5 objs/19.4 Kb common; 1 calls/chunk) ...

You will see this every time your jobs are queued but not yet started.
Depending on how busy your HPC is, this may take a long time. You can check the
queueing status of your jobs in the terminal with _e.g._ `qstat` (SGE), `bjobs`
queueing status of your jobs in the terminal with _e.g._ `qstat` (SGE, GCS or OCS), `bjobs`
(LSF), or `sinfo` (SLURM).

If your jobs are already finished, this likely means that the `clustermq`
Expand Down Expand Up @@ -201,7 +201,7 @@ Alternatively, you can create a script that uses SSH to execute the scheduler
on the login node. For this, you will need an SSH client in the container,
[keys set up for password-less login](https://www.digitalocean.com/community/tutorials/how-to-configure-ssh-key-based-authentication-on-a-linux-server),
and create a script to call the scheduler on the login node via ssh (e.g.
`~/bin/qsub` for SGE/PBS/Torque, `bsub` for LSF and `sbatch` for Slurm):
`~/bin/qsub` for SGE/GCS/OCS/PBS/Torque, `bsub` for LSF and `sbatch` for Slurm):

```{sh eval=FALSE}
#!/bin/bash
Expand Down
2 changes: 2 additions & 0 deletions vignettes/technicaldocs.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ functions for submitting and cleaning up jobs:
|- Multicore
|- LSF
+ SGE
|- GCS
|- OCS
|- PBS
|- Torque
|- etc.
Expand Down
Loading