diff --git a/.github/workflows/GHA_qmap_tester.yml b/.github/workflows/GHA_qmap_tester.yml index 778bebd..c04f746 100644 --- a/.github/workflows/GHA_qmap_tester.yml +++ b/.github/workflows/GHA_qmap_tester.yml @@ -15,7 +15,7 @@ permissions: contents: read jobs: - build: + lint_and_test: runs-on: ubuntu-latest @@ -34,7 +34,7 @@ jobs: - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --select=E9,F63,F7,F82 --extend-ignore=F824 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest diff --git a/qmap/__init__.py b/qmap/__init__.py index 10e4225..f0ede3d 100644 --- a/qmap/__init__.py +++ b/qmap/__init__.py @@ -1,2 +1 @@ -__version__ = '0.4' - +__version__ = '0.4.1' diff --git a/qmap/executor/dummy.py b/qmap/executor/dummy.py index 5ed109a..5db6ec2 100644 --- a/qmap/executor/dummy.py +++ b/qmap/executor/dummy.py @@ -75,5 +75,5 @@ def get_usage(): @staticmethod def run(cmd, parameters, quiet=False): - time.sleep(random.randint(0,100)) + time.sleep(random.randint(0, 100)) pass diff --git a/qmap/executor/executor.py b/qmap/executor/executor.py index 1dea57c..ba1f501 100644 --- a/qmap/executor/executor.py +++ b/qmap/executor/executor.py @@ -101,7 +101,7 @@ def create_script(file, commands, default_params_file, specific_params_file): """ Create the script to execute from a list of commands - Args: + Args: file (str): path to the file (without extension) commands (list): list of commands to execute for the job default_params_file (path): path to the env file with default jobs parameters diff --git a/qmap/executor/slurm.py b/qmap/executor/slurm.py index 0e947f9..54bbb8c 100644 --- a/qmap/executor/slurm.py +++ b/qmap/executor/slurm.py @@ -15,13 +15,13 @@ from qmap.job.status import Status from qmap.utils import execute_command - STATUS_FORMAT = ",".join(['jobid', 'state', 'avecpu', 'cputime', 'elapsed', 'start', 'end', 'timelimit', 'maxdiskread', 'maxdiskwrite', 'maxvmsize', - 'reqcpus', 'reqmem', 'reserved', + 'reqcpus', 'reqmem', + '', 'nodelist', 'exitcode']) SLURM_STATUS_CONVERSION = {Status.DONE: ['COMPLETED', 'CD'], @@ -41,7 +41,7 @@ USAGE_FORMAT = ','.join(['nodelist', 'cpusstate', 'memory', 'allocmem', 'statecompact']) -CMD_INFO = "sinfo -N -O {} --noheader".format(USAGE_FORMAT) +CMD_INFO = f"sinfo -N -O {USAGE_FORMAT} --noheader" CMD_SQUEUE = 'squeue -u ${USER} -t R -o "%C %m %N" --noheader' @@ -98,15 +98,15 @@ def convert_time(time_): t = time_[:-1] units = time_[-1] if units == 'd': - return '{}-0'.format(t) + return f'{t}-0' elif units == 'h': - return '0-{}'.format(t) + return f'0-{t}' elif units == 'm': - return '{}'.format(t) + return f'{t}' elif units == 's': - return '0:{}'.format(t) + return f'0:{t}' else: - raise ExecutorError('Invalid units for time: {}'.format(units)) + raise ExecutorError(f'Invalid units for time: {units}') else: return time_ @@ -115,41 +115,54 @@ def parse_parameters(parameters): """Parse job parameters into SLURM command options""" options = [] if 'nodes' in parameters: - options.append('-N {}'.format(parameters['nodes'])) # Number of nodes -N=1 -> One node (all cores in same machine) + options.append(f'-N {parameters["nodes"]}') # Number of nodes -N=1 -> One node (all cores in same machine) if 'tasks' in parameters: - options.append('-n {}'.format(parameters['tasks'])) # Number of cores + options.append(f'-n {parameters["tasks"]}') # Number of cores if 'cores' in parameters: - options.append('-c {}'.format(parameters['cores'])) # Cores per task + options.append(f'-c {parameters["cores"]}') # Cores per task if 'memory' in parameters: - options.append('--mem {}'.format(parameters['memory'])) # Memory pool for all cores (see also --mem-per-cpu) + options.append(f'--mem {parameters["memory"]}') # Memory pool for all cores (see also --mem-per-cpu) if 'queue' in parameters: - options.append('-p {}'.format(parameters['queue'])) # Partition(s) to submit to + options.append(f'-p {parameters["queue"]}') # Partition(s) to submit to if 'time' in parameters: - wall_time = convert_time(parameters['time']) - options.append('-t {}'.format(wall_time)) # Runtime + wall_time = convert_time(parameters["time"]) + options.append(f'-t {wall_time}') # Runtime if 'working_directory' in parameters: - options.append('-D {}'.format(parameters['working_directory'])) + options.append(f'-D {parameters["working_directory"]}') if 'name' in parameters: - options.append('-J {}'.format(parameters['name'])) + options.append(f'-J {parameters["name"]}') if 'extra' in parameters: - options.append('{}'.format(parameters['extra'])) + options.append(f'{parameters["extra"]}') return options class Executor(IExecutor): @staticmethod - def run_job(f_script, parameters, out=None, err=None): - options = parse_parameters(parameters) + def _get_slurm_version_major() -> int: + """ + Fetch the SLURM version from the user's infrastructure. + """ + try: + result = subprocess.run(['scontrol', '--version'], capture_output=True, text=True, check=True) + version_line = result.stdout.strip() + version = version_line.split()[1] # Extract version from "slurm x.x.x" + return int(version.split('.')[0]) + except subprocess.CalledProcessError as e: + raise ExecutorError(f"Failed to retrieve SLURM version: {e}") from e + + @staticmethod + def run_job(f_script, job_parameters, out=None, err=None): + options = parse_parameters(job_parameters) if out is not None: - options.append('-o {}'.format(out)) # File to which STDOUT will be written + options.append(f'-o {out}') # File to which STDOUT will be written if err is not None: - options.append('-e {}'.format(err)) # File to which STDERR will be written - cmd = "sbatch --parsable {} {}.{}".format(' '.join(options), f_script, SCRIPT_FILE_EXTENSION) + options.append(f'-e {err}') # File to which STDERR will be written + cmd = f"sbatch --parsable {' '.join(options)} {f_script}.{SCRIPT_FILE_EXTENSION}" try: out = execute_command(cmd) - except QMapError: - raise ExecutorError('Job cannot be submitted to slurm. Command: {}'.format(cmd)) + except QMapError as e: + raise ExecutorError(f'Job cannot be submitted to slurm. Command: {cmd}') from e return out.strip(), cmd @staticmethod @@ -158,8 +171,12 @@ def generate_jobs_status(job_ids, retries=3): For each job ID, we assume we have a single step (.0 for run and .batch for batch submissions). """ + major_version = Executor._get_slurm_version_major() + node_state_column = 'reserved' if major_version < 21 else 'planned' + + status_fmt = STATUS_FORMAT.replace('', node_state_column) - cmd = "sacct --parsable2 --format {} --jobs {}".format(STATUS_FORMAT, ",".join(job_ids)) + cmd = f"sacct --parsable2 --format {status_fmt} --jobs {','.join(job_ids)}" try: out = execute_command(cmd) except QMapError as e: @@ -190,28 +207,28 @@ def generate_jobs_status(job_ids, retries=3): @staticmethod def terminate_jobs(job_ids): - cmd = "scancel -f {}".format(" ".join(job_ids)) + cmd = f"scancel -f {' '.join(job_ids)}" if len(job_ids) == 0: return '', cmd try: out = execute_command(cmd) except QMapError as e: - raise ExecutorError(e) + raise ExecutorError(e) from e else: return out.strip(), cmd @staticmethod def create_script(file, commands, default_params_file, specific_params_file): - file = '{}.{}'.format(file, SCRIPT_FILE_EXTENSION) + file = f'{file}.{SCRIPT_FILE_EXTENSION}' with open(file, "wt") as fd: fd.writelines([ "#!/bin/bash\n", '#SBATCH --no-requeue\n' 'set -e\n', "\n", - 'source "{}"\n'.format(default_params_file), - 'if [ -f "{}" ]; then\n'.format(specific_params_file), - '\tsource "{}"\n'.format(specific_params_file), + f'source "{default_params_file}"\n', + f'if [ -f "{specific_params_file}" ]; then\n', + f'\tsource "{specific_params_file}"\n', 'fi\n', "\n", "{}\n".format('\n'.join(commands)), @@ -231,12 +248,12 @@ def get_usage(): try: out = execute_command(CMD_INFO) except QMapError as e: - raise ExecutorError(e) + raise ExecutorError(e) from e else: lines = out.splitlines() for line in lines: values = line.strip().split() - node_id = values[0] + _node_id = values[0] all_cores = values[1].split('/') cores_total += int(all_cores[3]) cores_alloc += int(all_cores[0]) @@ -253,7 +270,7 @@ def get_usage(): try: out = execute_command(CMD_SQUEUE) except QMapError as e: - raise ExecutorError(e) + raise ExecutorError(e) from e else: lines = out.splitlines() for line in lines: @@ -283,7 +300,7 @@ def run(cmd, parameters, quiet=False): # Skip error lines due to --pty option pass else: - if quiet and line in ('salloc: Granted job allocation {}\n'.format(job_id), 'salloc: Relinquishing job allocation {}\n'.format(job_id)): + if quiet and line in (f'salloc: Granted job allocation {job_id}\n', f'salloc: Relinquishing job allocation {job_id}\n'): pass else: print(line, end='') diff --git a/qmap/file/__init__.py b/qmap/file/__init__.py index 1843327..15c83b8 100644 --- a/qmap/file/__init__.py +++ b/qmap/file/__init__.py @@ -1,4 +1,4 @@ """ The file module contains the utilities related to the files that are generated during the qmap execution and also the input jobs file. -""" \ No newline at end of file +""" diff --git a/qmap/template.py b/qmap/template.py index 2f8b586..c3ed645 100644 --- a/qmap/template.py +++ b/qmap/template.py @@ -49,12 +49,12 @@ def read_substitutions_file(file): """ -glob_wildcards_regex = re.compile('\*|\*\*') +glob_wildcards_regex = re.compile(r'\*|\*\*') """ Glob wildcards: *, ** """ -glob_user_wildcards_regex = re.compile('{{(?:\?(?P.+?):)(?P\*|\*\*)}}') +glob_user_wildcards_regex = re.compile(r'{{(?:\?(?P.+?):)(?P\*|\*\*)}}') """ Glob wildcards within a user named group e.g. {{?name:*}} @@ -80,7 +80,7 @@ def check_command(cmd): """ # check for groups with same name group_names = [] - for match in re.finditer('{{\?(?P(?!=).+?):(?P.*?)}}', cmd): # user named groups: {{?name:value}} + for match in re.finditer(r'{{\?(?P(?!=).+?):(?P.*?)}}', cmd): # user named groups: {{?name:value}} value = match.group('value') if value is None: raise TemplateError('Empty wildcard value {}'.format(value)) @@ -93,7 +93,7 @@ def check_command(cmd): raise TemplateError('Repeated name {} in command'.format(name)) else: group_names.append(name) - for match in re.finditer('{{\?=(?P.+?)}}', cmd): # user groups replacements: {{?=name}} + for match in re.finditer(r'{{\?=(?P.+?)}}', cmd): # user groups replacements: {{?=name}} name = match.group('name') if name not in group_names: raise TemplateError('Missing group {} in command'.format(name)) @@ -118,7 +118,7 @@ def expand_substitutions(cmd): expanded_cmd = cmd[:] expansion_dict = {} - for group_count, user_group in enumerate(re.finditer('{{(\?(?P[^}]+?):)?(?P.+?)}}', cmd)): + for group_count, user_group in enumerate(re.finditer(r'{{(\?(?P[^}]+?):)?(?P.+?)}}', cmd)): # Loop through each user defined groups (named and unnamed): {{user group}} group_str = user_group.group(0) @@ -166,7 +166,7 @@ def expand_wildcards(cmd): """ # Replace all glob wildcards between {{...}} that are unnamed - command = re.sub('{{(?P\*|\*\*)}}', '\g', cmd) # treat user unamed glob wildcards (e.g. {{*}}) as normal globl wildcards (e.g. /home/{{*}}.txt == /home/*.txt) + command = re.sub(r'{{(?P\*|\*\*)}}', r'\g', cmd) # treat user unamed glob wildcards (e.g. {{*}}) as normal globl wildcards (e.g. /home/{{*}}.txt == /home/*.txt) cmd_str_list = [] expansion_dict = {} for s in split_regex.split(command): # glob wildcards are expanded as we split the command @@ -205,16 +205,16 @@ def expand_glob_magic(command): # Expand next (look at the break) part of the command with a user named group glob wildcard for index, cmd in enumerate(stripped_cmd): if glob_user_wildcards_regex.search(cmd): # first item with the named wildcard - glob_search_str = glob_user_wildcards_regex.sub('\g', cmd) + glob_search_str = glob_user_wildcards_regex.sub(r'\g', cmd) - escaped_cmd = cmd.replace('.', '\.') # TODO escape rest of metacharacters + escaped_cmd = cmd.replace('.', r'\.') # TODO escape rest of metacharacters # replace all glob wildcards that do not belong to a named group. They cannot be preceded by '\', '{{' or followed by '}}' regex_str = re.sub(r'(?\*)(?!\*|}})', '.*?', escaped_cmd) regex_str = re.sub(r'(?\*\*/?)(?!}})', '(.*?)?', regex_str) # give a name to named wildcards - regex_str = re.sub('{{(?:\?(?P.+?):)(?P\*\*)}}/?', '(?P<\g>.*?)?', regex_str) - regex_str = re.sub('{{(?:\?(?P.+?):)(?P\*)}}', '(?P<\g>.*?)', regex_str) + regex_str = re.sub(r'{{(?:\?(?P.+?):)(?P\*\*)}}/?', r'(?P<\g>.*?)?', regex_str) + regex_str = re.sub(r'{{(?:\?(?P.+?):)(?P\*)}}', r'(?P<\g>.*?)', regex_str) if regex_str.endswith('.*?)'): regex_str = regex_str[:-2] + '$)' elif regex_str.endswith('.*?'):