Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/GHA_qmap_tester.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ permissions:
contents: read

jobs:
build:
lint_and_test:

runs-on: ubuntu-latest

Expand All @@ -34,7 +34,7 @@ jobs:
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --select=E9,F63,F7,F82 --extend-ignore=F824 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
Expand Down
3 changes: 1 addition & 2 deletions qmap/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
__version__ = '0.4'

__version__ = '0.4.1'
2 changes: 1 addition & 1 deletion qmap/executor/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ def get_usage():

@staticmethod
def run(cmd, parameters, quiet=False):
time.sleep(random.randint(0,100))
time.sleep(random.randint(0, 100))
pass
2 changes: 1 addition & 1 deletion qmap/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def create_script(file, commands, default_params_file, specific_params_file):
"""
Create the script to execute from a list of commands

Args:
Args:
file (str): path to the file (without extension)
commands (list): list of commands to execute for the job
default_params_file (path): path to the env file with default jobs parameters
Expand Down
89 changes: 53 additions & 36 deletions qmap/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
from qmap.job.status import Status
from qmap.utils import execute_command


STATUS_FORMAT = ",".join(['jobid', 'state',
'avecpu', 'cputime', 'elapsed', 'start', 'end',
'timelimit',
'maxdiskread', 'maxdiskwrite',
'maxvmsize',
'reqcpus', 'reqmem', 'reserved',
'reqcpus', 'reqmem',
'<placeholder>',
'nodelist', 'exitcode'])

SLURM_STATUS_CONVERSION = {Status.DONE: ['COMPLETED', 'CD'],
Expand All @@ -41,7 +41,7 @@

USAGE_FORMAT = ','.join(['nodelist', 'cpusstate', 'memory', 'allocmem', 'statecompact'])

CMD_INFO = "sinfo -N -O {} --noheader".format(USAGE_FORMAT)
CMD_INFO = f"sinfo -N -O {USAGE_FORMAT} --noheader"

CMD_SQUEUE = 'squeue -u ${USER} -t R -o "%C %m %N" --noheader'

Expand Down Expand Up @@ -98,15 +98,15 @@ def convert_time(time_):
t = time_[:-1]
units = time_[-1]
if units == 'd':
return '{}-0'.format(t)
return f'{t}-0'
elif units == 'h':
return '0-{}'.format(t)
return f'0-{t}'
elif units == 'm':
return '{}'.format(t)
return f'{t}'
elif units == 's':
return '0:{}'.format(t)
return f'0:{t}'
else:
raise ExecutorError('Invalid units for time: {}'.format(units))
raise ExecutorError(f'Invalid units for time: {units}')
else:
return time_

Expand All @@ -115,41 +115,54 @@ def parse_parameters(parameters):
"""Parse job parameters into SLURM command options"""
options = []
if 'nodes' in parameters:
options.append('-N {}'.format(parameters['nodes'])) # Number of nodes -N=1 -> One node (all cores in same machine)
options.append(f'-N {parameters["nodes"]}') # Number of nodes -N=1 -> One node (all cores in same machine)
if 'tasks' in parameters:
options.append('-n {}'.format(parameters['tasks'])) # Number of cores
options.append(f'-n {parameters["tasks"]}') # Number of cores
if 'cores' in parameters:
options.append('-c {}'.format(parameters['cores'])) # Cores per task
options.append(f'-c {parameters["cores"]}') # Cores per task
if 'memory' in parameters:
options.append('--mem {}'.format(parameters['memory'])) # Memory pool for all cores (see also --mem-per-cpu)
options.append(f'--mem {parameters["memory"]}') # Memory pool for all cores (see also --mem-per-cpu)
if 'queue' in parameters:
options.append('-p {}'.format(parameters['queue'])) # Partition(s) to submit to
options.append(f'-p {parameters["queue"]}') # Partition(s) to submit to
if 'time' in parameters:
wall_time = convert_time(parameters['time'])
options.append('-t {}'.format(wall_time)) # Runtime
wall_time = convert_time(parameters["time"])
options.append(f'-t {wall_time}') # Runtime
if 'working_directory' in parameters:
options.append('-D {}'.format(parameters['working_directory']))
options.append(f'-D {parameters["working_directory"]}')
if 'name' in parameters:
options.append('-J {}'.format(parameters['name']))
options.append(f'-J {parameters["name"]}')
if 'extra' in parameters:
options.append('{}'.format(parameters['extra']))
options.append(f'{parameters["extra"]}')
return options


class Executor(IExecutor):

@staticmethod
def run_job(f_script, parameters, out=None, err=None):
options = parse_parameters(parameters)
def _get_slurm_version_major() -> int:
"""
Fetch the SLURM version from the user's infrastructure.
"""
try:
result = subprocess.run(['scontrol', '--version'], capture_output=True, text=True, check=True)
version_line = result.stdout.strip()
version = version_line.split()[1] # Extract version from "slurm x.x.x"
return int(version.split('.')[0])
except subprocess.CalledProcessError as e:
raise ExecutorError(f"Failed to retrieve SLURM version: {e}") from e

@staticmethod
def run_job(f_script, job_parameters, out=None, err=None):
options = parse_parameters(job_parameters)
if out is not None:
options.append('-o {}'.format(out)) # File to which STDOUT will be written
options.append(f'-o {out}') # File to which STDOUT will be written
if err is not None:
options.append('-e {}'.format(err)) # File to which STDERR will be written
cmd = "sbatch --parsable {} {}.{}".format(' '.join(options), f_script, SCRIPT_FILE_EXTENSION)
options.append(f'-e {err}') # File to which STDERR will be written
cmd = f"sbatch --parsable {' '.join(options)} {f_script}.{SCRIPT_FILE_EXTENSION}"
try:
out = execute_command(cmd)
except QMapError:
raise ExecutorError('Job cannot be submitted to slurm. Command: {}'.format(cmd))
except QMapError as e:
raise ExecutorError(f'Job cannot be submitted to slurm. Command: {cmd}') from e
return out.strip(), cmd

@staticmethod
Expand All @@ -158,8 +171,12 @@ def generate_jobs_status(job_ids, retries=3):
For each job ID,
we assume we have a single step (.0 for run and .batch for batch submissions).
"""
major_version = Executor._get_slurm_version_major()
node_state_column = 'reserved' if major_version < 21 else 'planned'

status_fmt = STATUS_FORMAT.replace('<placeholder>', node_state_column)

cmd = "sacct --parsable2 --format {} --jobs {}".format(STATUS_FORMAT, ",".join(job_ids))
cmd = f"sacct --parsable2 --format {status_fmt} --jobs {','.join(job_ids)}"
try:
out = execute_command(cmd)
except QMapError as e:
Expand Down Expand Up @@ -190,28 +207,28 @@ def generate_jobs_status(job_ids, retries=3):

@staticmethod
def terminate_jobs(job_ids):
cmd = "scancel -f {}".format(" ".join(job_ids))
cmd = f"scancel -f {' '.join(job_ids)}"
if len(job_ids) == 0:
return '', cmd
try:
out = execute_command(cmd)
except QMapError as e:
raise ExecutorError(e)
raise ExecutorError(e) from e
else:
return out.strip(), cmd

@staticmethod
def create_script(file, commands, default_params_file, specific_params_file):
file = '{}.{}'.format(file, SCRIPT_FILE_EXTENSION)
file = f'{file}.{SCRIPT_FILE_EXTENSION}'
with open(file, "wt") as fd:
fd.writelines([
"#!/bin/bash\n",
'#SBATCH --no-requeue\n'
'set -e\n',
"\n",
'source "{}"\n'.format(default_params_file),
'if [ -f "{}" ]; then\n'.format(specific_params_file),
'\tsource "{}"\n'.format(specific_params_file),
f'source "{default_params_file}"\n',
f'if [ -f "{specific_params_file}" ]; then\n',
f'\tsource "{specific_params_file}"\n',
'fi\n',
"\n",
"{}\n".format('\n'.join(commands)),
Expand All @@ -231,12 +248,12 @@ def get_usage():
try:
out = execute_command(CMD_INFO)
except QMapError as e:
raise ExecutorError(e)
raise ExecutorError(e) from e
else:
lines = out.splitlines()
for line in lines:
values = line.strip().split()
node_id = values[0]
_node_id = values[0]
all_cores = values[1].split('/')
cores_total += int(all_cores[3])
cores_alloc += int(all_cores[0])
Expand All @@ -253,7 +270,7 @@ def get_usage():
try:
out = execute_command(CMD_SQUEUE)
except QMapError as e:
raise ExecutorError(e)
raise ExecutorError(e) from e
else:
lines = out.splitlines()
for line in lines:
Expand Down Expand Up @@ -283,7 +300,7 @@ def run(cmd, parameters, quiet=False):
# Skip error lines due to --pty option
pass
else:
if quiet and line in ('salloc: Granted job allocation {}\n'.format(job_id), 'salloc: Relinquishing job allocation {}\n'.format(job_id)):
if quiet and line in (f'salloc: Granted job allocation {job_id}\n', f'salloc: Relinquishing job allocation {job_id}\n'):
pass
else:
print(line, end='')
Expand Down
2 changes: 1 addition & 1 deletion qmap/file/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
The file module contains the utilities related to the files that
are generated during the qmap execution and also the input jobs file.
"""
"""
20 changes: 10 additions & 10 deletions qmap/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ def read_substitutions_file(file):
"""


glob_wildcards_regex = re.compile('\*|\*\*')
glob_wildcards_regex = re.compile(r'\*|\*\*')
"""
Glob wildcards: *, **
"""

glob_user_wildcards_regex = re.compile('{{(?:\?(?P<name>.+?):)(?P<wildcard>\*|\*\*)}}')
glob_user_wildcards_regex = re.compile(r'{{(?:\?(?P<name>.+?):)(?P<wildcard>\*|\*\*)}}')
"""
Glob wildcards within a user named group
e.g. {{?name:*}}
Expand All @@ -80,7 +80,7 @@ def check_command(cmd):
"""
# check for groups with same name
group_names = []
for match in re.finditer('{{\?(?P<name>(?!=).+?):(?P<value>.*?)}}', cmd): # user named groups: {{?name:value}}
for match in re.finditer(r'{{\?(?P<name>(?!=).+?):(?P<value>.*?)}}', cmd): # user named groups: {{?name:value}}
value = match.group('value')
if value is None:
raise TemplateError('Empty wildcard value {}'.format(value))
Expand All @@ -93,7 +93,7 @@ def check_command(cmd):
raise TemplateError('Repeated name {} in command'.format(name))
else:
group_names.append(name)
for match in re.finditer('{{\?=(?P<name>.+?)}}', cmd): # user groups replacements: {{?=name}}
for match in re.finditer(r'{{\?=(?P<name>.+?)}}', cmd): # user groups replacements: {{?=name}}
name = match.group('name')
if name not in group_names:
raise TemplateError('Missing group {} in command'.format(name))
Expand All @@ -118,7 +118,7 @@ def expand_substitutions(cmd):

expanded_cmd = cmd[:]
expansion_dict = {}
for group_count, user_group in enumerate(re.finditer('{{(\?(?P<name>[^}]+?):)?(?P<value>.+?)}}', cmd)):
for group_count, user_group in enumerate(re.finditer(r'{{(\?(?P<name>[^}]+?):)?(?P<value>.+?)}}', cmd)):
# Loop through each user defined groups (named and unnamed): {{user group}}

group_str = user_group.group(0)
Expand Down Expand Up @@ -166,7 +166,7 @@ def expand_wildcards(cmd):

"""
# Replace all glob wildcards between {{...}} that are unnamed
command = re.sub('{{(?P<wildcard>\*|\*\*)}}', '\g<wildcard>', cmd) # treat user unamed glob wildcards (e.g. {{*}}) as normal globl wildcards (e.g. /home/{{*}}.txt == /home/*.txt)
command = re.sub(r'{{(?P<wildcard>\*|\*\*)}}', r'\g<wildcard>', cmd) # treat user unamed glob wildcards (e.g. {{*}}) as normal globl wildcards (e.g. /home/{{*}}.txt == /home/*.txt)
cmd_str_list = []
expansion_dict = {}
for s in split_regex.split(command): # glob wildcards are expanded as we split the command
Expand Down Expand Up @@ -205,16 +205,16 @@ def expand_glob_magic(command):
# Expand next (look at the break) part of the command with a user named group glob wildcard
for index, cmd in enumerate(stripped_cmd):
if glob_user_wildcards_regex.search(cmd): # first item with the named wildcard
glob_search_str = glob_user_wildcards_regex.sub('\g<wildcard>', cmd)
glob_search_str = glob_user_wildcards_regex.sub(r'\g<wildcard>', cmd)

escaped_cmd = cmd.replace('.', '\.') # TODO escape rest of metacharacters
escaped_cmd = cmd.replace('.', r'\.') # TODO escape rest of metacharacters
# replace all glob wildcards that do not belong to a named group. They cannot be preceded by '\', '{{' or followed by '}}'

regex_str = re.sub(r'(?<!\\)(?<!{{)(?P<wildcard>\*)(?!\*|}})', '.*?', escaped_cmd)
regex_str = re.sub(r'(?<!\\)(?<!{{)(?P<wildcard>\*\*/?)(?!}})', '(.*?)?', regex_str)
# give a name to named wildcards
regex_str = re.sub('{{(?:\?(?P<name>.+?):)(?P<regex>\*\*)}}/?', '(?P<\g<name>>.*?)?', regex_str)
regex_str = re.sub('{{(?:\?(?P<name>.+?):)(?P<regex>\*)}}', '(?P<\g<name>>.*?)', regex_str)
regex_str = re.sub(r'{{(?:\?(?P<name>.+?):)(?P<regex>\*\*)}}/?', r'(?P<\g<name>>.*?)?', regex_str)
regex_str = re.sub(r'{{(?:\?(?P<name>.+?):)(?P<regex>\*)}}', r'(?P<\g<name>>.*?)', regex_str)
if regex_str.endswith('.*?)'):
regex_str = regex_str[:-2] + '$)'
elif regex_str.endswith('.*?'):
Expand Down