From 97ece333b845918c33a3863a230565dd0107ceb1 Mon Sep 17 00:00:00 2001 From: "Miquel L. Grau" Date: Tue, 17 Sep 2024 00:02:14 +0200 Subject: [PATCH 1/5] Update slurm.py Removing the "reserved" column, since it doesn't exist in slurm 23.02.1. (bbgcluster slurm is v16) --- qmap/executor/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qmap/executor/slurm.py b/qmap/executor/slurm.py index 0e947f9..ace18ce 100644 --- a/qmap/executor/slurm.py +++ b/qmap/executor/slurm.py @@ -21,7 +21,7 @@ 'timelimit', 'maxdiskread', 'maxdiskwrite', 'maxvmsize', - 'reqcpus', 'reqmem', 'reserved', + 'reqcpus', 'reqmem', 'nodelist', 'exitcode']) SLURM_STATUS_CONVERSION = {Status.DONE: ['COMPLETED', 'CD'], From 5b35c7abaf829a122ed616dc7b292f8447957f70 Mon Sep 17 00:00:00 2001 From: FedericaBrando Date: Tue, 15 Apr 2025 15:01:30 +0200 Subject: [PATCH 2/5] fix: ensure compatibility of slurm executor to handle older versions Older versions of Slurm use 'reserved', while newer versions use 'planned'. --- qmap/executor/slurm.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/qmap/executor/slurm.py b/qmap/executor/slurm.py index ace18ce..5e3ab41 100644 --- a/qmap/executor/slurm.py +++ b/qmap/executor/slurm.py @@ -22,6 +22,7 @@ 'maxdiskread', 'maxdiskwrite', 'maxvmsize', 'reqcpus', 'reqmem', + '', 'nodelist', 'exitcode']) SLURM_STATUS_CONVERSION = {Status.DONE: ['COMPLETED', 'CD'], @@ -139,8 +140,18 @@ def parse_parameters(parameters): class Executor(IExecutor): @staticmethod - def run_job(f_script, parameters, out=None, err=None): - options = parse_parameters(parameters) + def _get_slurm_version_major() -> str: + """ + Fetch the SLURM version from the user's infrastructure. + """ + try: + result = subprocess.run(['scontrol', '--version'], capture_output=True, text=True, check=True) + version_line = result.stdout.strip() + version = version_line.split()[1] # Extract version from "slurm x.x.x" + return int(version.split('.')[0]) + except subprocess.CalledProcessError as e: + raise ExecutorError(f"Failed to retrieve SLURM version: {e}") from e + if out is not None: options.append('-o {}'.format(out)) # File to which STDOUT will be written if err is not None: @@ -158,8 +169,12 @@ def generate_jobs_status(job_ids, retries=3): For each job ID, we assume we have a single step (.0 for run and .batch for batch submissions). """ + major_version = Executor._get_slurm_version_major() + node_state_column = 'reserved' if major_version < 21 else 'planned' + + status_fmt = STATUS_FORMAT.replace('', node_state_column) - cmd = "sacct --parsable2 --format {} --jobs {}".format(STATUS_FORMAT, ",".join(job_ids)) + cmd = f"sacct --parsable2 --format {status_fmt} --jobs {",".join(job_ids)}" try: out = execute_command(cmd) except QMapError as e: From 3dfa7e45763e86ac62f64567284016b621263658 Mon Sep 17 00:00:00 2001 From: FedericaBrando Date: Tue, 15 Apr 2025 15:04:45 +0200 Subject: [PATCH 3/5] bump: v0.4.1 --- qmap/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qmap/__init__.py b/qmap/__init__.py index 10e4225..1dd4e65 100644 --- a/qmap/__init__.py +++ b/qmap/__init__.py @@ -1,2 +1,2 @@ -__version__ = '0.4' +__version__ = '0.4.1' From bd1b5b6b03022a73621bdf4a4d73cfa08249661a Mon Sep 17 00:00:00 2001 From: FedericaBrando Date: Tue, 15 Apr 2025 15:12:05 +0200 Subject: [PATCH 4/5] refactor: slurm executor script linting --- qmap/executor/slurm.py | 64 ++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/qmap/executor/slurm.py b/qmap/executor/slurm.py index 5e3ab41..5fa9466 100644 --- a/qmap/executor/slurm.py +++ b/qmap/executor/slurm.py @@ -15,7 +15,6 @@ from qmap.job.status import Status from qmap.utils import execute_command - STATUS_FORMAT = ",".join(['jobid', 'state', 'avecpu', 'cputime', 'elapsed', 'start', 'end', 'timelimit', @@ -42,7 +41,7 @@ USAGE_FORMAT = ','.join(['nodelist', 'cpusstate', 'memory', 'allocmem', 'statecompact']) -CMD_INFO = "sinfo -N -O {} --noheader".format(USAGE_FORMAT) +CMD_INFO = f"sinfo -N -O {USAGE_FORMAT} --noheader" CMD_SQUEUE = 'squeue -u ${USER} -t R -o "%C %m %N" --noheader' @@ -99,15 +98,15 @@ def convert_time(time_): t = time_[:-1] units = time_[-1] if units == 'd': - return '{}-0'.format(t) + return f'{t}-0' elif units == 'h': - return '0-{}'.format(t) + return f'0-{t}' elif units == 'm': - return '{}'.format(t) + return f'{t}' elif units == 's': - return '0:{}'.format(t) + return f'0:{t}' else: - raise ExecutorError('Invalid units for time: {}'.format(units)) + raise ExecutorError(f'Invalid units for time: {units}') else: return time_ @@ -116,24 +115,24 @@ def parse_parameters(parameters): """Parse job parameters into SLURM command options""" options = [] if 'nodes' in parameters: - options.append('-N {}'.format(parameters['nodes'])) # Number of nodes -N=1 -> One node (all cores in same machine) + options.append(f'-N {parameters['nodes']}') # Number of nodes -N=1 -> One node (all cores in same machine) if 'tasks' in parameters: - options.append('-n {}'.format(parameters['tasks'])) # Number of cores + options.append(f'-n {parameters['tasks']}') # Number of cores if 'cores' in parameters: - options.append('-c {}'.format(parameters['cores'])) # Cores per task + options.append(f'-c {parameters['cores']}') # Cores per task if 'memory' in parameters: - options.append('--mem {}'.format(parameters['memory'])) # Memory pool for all cores (see also --mem-per-cpu) + options.append(f'--mem {parameters['memory']}') # Memory pool for all cores (see also --mem-per-cpu) if 'queue' in parameters: - options.append('-p {}'.format(parameters['queue'])) # Partition(s) to submit to + options.append(f'-p {parameters['queue']}') # Partition(s) to submit to if 'time' in parameters: wall_time = convert_time(parameters['time']) - options.append('-t {}'.format(wall_time)) # Runtime + options.append(f'-t {wall_time}') # Runtime if 'working_directory' in parameters: - options.append('-D {}'.format(parameters['working_directory'])) + options.append(f'-D {parameters['working_directory']}') if 'name' in parameters: - options.append('-J {}'.format(parameters['name'])) + options.append(f'-J {parameters['name']}') if 'extra' in parameters: - options.append('{}'.format(parameters['extra'])) + options.append(f'{parameters['extra']}') return options @@ -152,15 +151,18 @@ def _get_slurm_version_major() -> str: except subprocess.CalledProcessError as e: raise ExecutorError(f"Failed to retrieve SLURM version: {e}") from e + @staticmethod + def run_job(f_script, job_parameters, out=None, err=None): + options = parse_parameters(job_parameters) if out is not None: - options.append('-o {}'.format(out)) # File to which STDOUT will be written + options.append(f'-o {out}') # File to which STDOUT will be written if err is not None: - options.append('-e {}'.format(err)) # File to which STDERR will be written - cmd = "sbatch --parsable {} {}.{}".format(' '.join(options), f_script, SCRIPT_FILE_EXTENSION) + options.append(f'-e {err}') # File to which STDERR will be written + cmd = f"sbatch --parsable {' '.join(options)} {f_script}.{SCRIPT_FILE_EXTENSION}" try: out = execute_command(cmd) - except QMapError: - raise ExecutorError('Job cannot be submitted to slurm. Command: {}'.format(cmd)) + except QMapError as e: + raise ExecutorError(f'Job cannot be submitted to slurm. Command: {cmd}') from e return out.strip(), cmd @staticmethod @@ -205,28 +207,28 @@ def generate_jobs_status(job_ids, retries=3): @staticmethod def terminate_jobs(job_ids): - cmd = "scancel -f {}".format(" ".join(job_ids)) + cmd = f"scancel -f {" ".join(job_ids)}" if len(job_ids) == 0: return '', cmd try: out = execute_command(cmd) except QMapError as e: - raise ExecutorError(e) + raise ExecutorError(e) from e else: return out.strip(), cmd @staticmethod def create_script(file, commands, default_params_file, specific_params_file): - file = '{}.{}'.format(file, SCRIPT_FILE_EXTENSION) + file = f'{file}.{SCRIPT_FILE_EXTENSION}' with open(file, "wt") as fd: fd.writelines([ "#!/bin/bash\n", '#SBATCH --no-requeue\n' 'set -e\n', "\n", - 'source "{}"\n'.format(default_params_file), - 'if [ -f "{}" ]; then\n'.format(specific_params_file), - '\tsource "{}"\n'.format(specific_params_file), + f'source "{default_params_file}"\n', + f'if [ -f "{specific_params_file}" ]; then\n', + f'\tsource "{specific_params_file}"\n', 'fi\n', "\n", "{}\n".format('\n'.join(commands)), @@ -246,12 +248,12 @@ def get_usage(): try: out = execute_command(CMD_INFO) except QMapError as e: - raise ExecutorError(e) + raise ExecutorError(e) from e else: lines = out.splitlines() for line in lines: values = line.strip().split() - node_id = values[0] + _node_id = values[0] all_cores = values[1].split('/') cores_total += int(all_cores[3]) cores_alloc += int(all_cores[0]) @@ -268,7 +270,7 @@ def get_usage(): try: out = execute_command(CMD_SQUEUE) except QMapError as e: - raise ExecutorError(e) + raise ExecutorError(e) from e else: lines = out.splitlines() for line in lines: @@ -298,7 +300,7 @@ def run(cmd, parameters, quiet=False): # Skip error lines due to --pty option pass else: - if quiet and line in ('salloc: Granted job allocation {}\n'.format(job_id), 'salloc: Relinquishing job allocation {}\n'.format(job_id)): + if quiet and line in (f'salloc: Granted job allocation {job_id}\n', f'salloc: Relinquishing job allocation {job_id}\n'): pass else: print(line, end='') From 7fe640f75cb8c1c6179fef92919d20941dba1b62 Mon Sep 17 00:00:00 2001 From: FedericaBrando Date: Tue, 15 Apr 2025 15:53:18 +0200 Subject: [PATCH 5/5] fix: linting and ignore F824 in gh-action TODO: clean unusued globals --- .github/workflows/GHA_qmap_tester.yml | 4 ++-- qmap/__init__.py | 1 - qmap/executor/dummy.py | 2 +- qmap/executor/executor.py | 2 +- qmap/executor/slurm.py | 24 ++++++++++++------------ qmap/file/__init__.py | 2 +- qmap/template.py | 20 ++++++++++---------- 7 files changed, 27 insertions(+), 28 deletions(-) diff --git a/.github/workflows/GHA_qmap_tester.yml b/.github/workflows/GHA_qmap_tester.yml index 778bebd..c04f746 100644 --- a/.github/workflows/GHA_qmap_tester.yml +++ b/.github/workflows/GHA_qmap_tester.yml @@ -15,7 +15,7 @@ permissions: contents: read jobs: - build: + lint_and_test: runs-on: ubuntu-latest @@ -34,7 +34,7 @@ jobs: - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --select=E9,F63,F7,F82 --extend-ignore=F824 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest diff --git a/qmap/__init__.py b/qmap/__init__.py index 1dd4e65..f0ede3d 100644 --- a/qmap/__init__.py +++ b/qmap/__init__.py @@ -1,2 +1 @@ __version__ = '0.4.1' - diff --git a/qmap/executor/dummy.py b/qmap/executor/dummy.py index 5ed109a..5db6ec2 100644 --- a/qmap/executor/dummy.py +++ b/qmap/executor/dummy.py @@ -75,5 +75,5 @@ def get_usage(): @staticmethod def run(cmd, parameters, quiet=False): - time.sleep(random.randint(0,100)) + time.sleep(random.randint(0, 100)) pass diff --git a/qmap/executor/executor.py b/qmap/executor/executor.py index 1dea57c..ba1f501 100644 --- a/qmap/executor/executor.py +++ b/qmap/executor/executor.py @@ -101,7 +101,7 @@ def create_script(file, commands, default_params_file, specific_params_file): """ Create the script to execute from a list of commands - Args: + Args: file (str): path to the file (without extension) commands (list): list of commands to execute for the job default_params_file (path): path to the env file with default jobs parameters diff --git a/qmap/executor/slurm.py b/qmap/executor/slurm.py index 5fa9466..54bbb8c 100644 --- a/qmap/executor/slurm.py +++ b/qmap/executor/slurm.py @@ -115,31 +115,31 @@ def parse_parameters(parameters): """Parse job parameters into SLURM command options""" options = [] if 'nodes' in parameters: - options.append(f'-N {parameters['nodes']}') # Number of nodes -N=1 -> One node (all cores in same machine) + options.append(f'-N {parameters["nodes"]}') # Number of nodes -N=1 -> One node (all cores in same machine) if 'tasks' in parameters: - options.append(f'-n {parameters['tasks']}') # Number of cores + options.append(f'-n {parameters["tasks"]}') # Number of cores if 'cores' in parameters: - options.append(f'-c {parameters['cores']}') # Cores per task + options.append(f'-c {parameters["cores"]}') # Cores per task if 'memory' in parameters: - options.append(f'--mem {parameters['memory']}') # Memory pool for all cores (see also --mem-per-cpu) + options.append(f'--mem {parameters["memory"]}') # Memory pool for all cores (see also --mem-per-cpu) if 'queue' in parameters: - options.append(f'-p {parameters['queue']}') # Partition(s) to submit to + options.append(f'-p {parameters["queue"]}') # Partition(s) to submit to if 'time' in parameters: - wall_time = convert_time(parameters['time']) + wall_time = convert_time(parameters["time"]) options.append(f'-t {wall_time}') # Runtime if 'working_directory' in parameters: - options.append(f'-D {parameters['working_directory']}') + options.append(f'-D {parameters["working_directory"]}') if 'name' in parameters: - options.append(f'-J {parameters['name']}') + options.append(f'-J {parameters["name"]}') if 'extra' in parameters: - options.append(f'{parameters['extra']}') + options.append(f'{parameters["extra"]}') return options class Executor(IExecutor): @staticmethod - def _get_slurm_version_major() -> str: + def _get_slurm_version_major() -> int: """ Fetch the SLURM version from the user's infrastructure. """ @@ -176,7 +176,7 @@ def generate_jobs_status(job_ids, retries=3): status_fmt = STATUS_FORMAT.replace('', node_state_column) - cmd = f"sacct --parsable2 --format {status_fmt} --jobs {",".join(job_ids)}" + cmd = f"sacct --parsable2 --format {status_fmt} --jobs {','.join(job_ids)}" try: out = execute_command(cmd) except QMapError as e: @@ -207,7 +207,7 @@ def generate_jobs_status(job_ids, retries=3): @staticmethod def terminate_jobs(job_ids): - cmd = f"scancel -f {" ".join(job_ids)}" + cmd = f"scancel -f {' '.join(job_ids)}" if len(job_ids) == 0: return '', cmd try: diff --git a/qmap/file/__init__.py b/qmap/file/__init__.py index 1843327..15c83b8 100644 --- a/qmap/file/__init__.py +++ b/qmap/file/__init__.py @@ -1,4 +1,4 @@ """ The file module contains the utilities related to the files that are generated during the qmap execution and also the input jobs file. -""" \ No newline at end of file +""" diff --git a/qmap/template.py b/qmap/template.py index 2f8b586..c3ed645 100644 --- a/qmap/template.py +++ b/qmap/template.py @@ -49,12 +49,12 @@ def read_substitutions_file(file): """ -glob_wildcards_regex = re.compile('\*|\*\*') +glob_wildcards_regex = re.compile(r'\*|\*\*') """ Glob wildcards: *, ** """ -glob_user_wildcards_regex = re.compile('{{(?:\?(?P.+?):)(?P\*|\*\*)}}') +glob_user_wildcards_regex = re.compile(r'{{(?:\?(?P.+?):)(?P\*|\*\*)}}') """ Glob wildcards within a user named group e.g. {{?name:*}} @@ -80,7 +80,7 @@ def check_command(cmd): """ # check for groups with same name group_names = [] - for match in re.finditer('{{\?(?P(?!=).+?):(?P.*?)}}', cmd): # user named groups: {{?name:value}} + for match in re.finditer(r'{{\?(?P(?!=).+?):(?P.*?)}}', cmd): # user named groups: {{?name:value}} value = match.group('value') if value is None: raise TemplateError('Empty wildcard value {}'.format(value)) @@ -93,7 +93,7 @@ def check_command(cmd): raise TemplateError('Repeated name {} in command'.format(name)) else: group_names.append(name) - for match in re.finditer('{{\?=(?P.+?)}}', cmd): # user groups replacements: {{?=name}} + for match in re.finditer(r'{{\?=(?P.+?)}}', cmd): # user groups replacements: {{?=name}} name = match.group('name') if name not in group_names: raise TemplateError('Missing group {} in command'.format(name)) @@ -118,7 +118,7 @@ def expand_substitutions(cmd): expanded_cmd = cmd[:] expansion_dict = {} - for group_count, user_group in enumerate(re.finditer('{{(\?(?P[^}]+?):)?(?P.+?)}}', cmd)): + for group_count, user_group in enumerate(re.finditer(r'{{(\?(?P[^}]+?):)?(?P.+?)}}', cmd)): # Loop through each user defined groups (named and unnamed): {{user group}} group_str = user_group.group(0) @@ -166,7 +166,7 @@ def expand_wildcards(cmd): """ # Replace all glob wildcards between {{...}} that are unnamed - command = re.sub('{{(?P\*|\*\*)}}', '\g', cmd) # treat user unamed glob wildcards (e.g. {{*}}) as normal globl wildcards (e.g. /home/{{*}}.txt == /home/*.txt) + command = re.sub(r'{{(?P\*|\*\*)}}', r'\g', cmd) # treat user unamed glob wildcards (e.g. {{*}}) as normal globl wildcards (e.g. /home/{{*}}.txt == /home/*.txt) cmd_str_list = [] expansion_dict = {} for s in split_regex.split(command): # glob wildcards are expanded as we split the command @@ -205,16 +205,16 @@ def expand_glob_magic(command): # Expand next (look at the break) part of the command with a user named group glob wildcard for index, cmd in enumerate(stripped_cmd): if glob_user_wildcards_regex.search(cmd): # first item with the named wildcard - glob_search_str = glob_user_wildcards_regex.sub('\g', cmd) + glob_search_str = glob_user_wildcards_regex.sub(r'\g', cmd) - escaped_cmd = cmd.replace('.', '\.') # TODO escape rest of metacharacters + escaped_cmd = cmd.replace('.', r'\.') # TODO escape rest of metacharacters # replace all glob wildcards that do not belong to a named group. They cannot be preceded by '\', '{{' or followed by '}}' regex_str = re.sub(r'(?\*)(?!\*|}})', '.*?', escaped_cmd) regex_str = re.sub(r'(?\*\*/?)(?!}})', '(.*?)?', regex_str) # give a name to named wildcards - regex_str = re.sub('{{(?:\?(?P.+?):)(?P\*\*)}}/?', '(?P<\g>.*?)?', regex_str) - regex_str = re.sub('{{(?:\?(?P.+?):)(?P\*)}}', '(?P<\g>.*?)', regex_str) + regex_str = re.sub(r'{{(?:\?(?P.+?):)(?P\*\*)}}/?', r'(?P<\g>.*?)?', regex_str) + regex_str = re.sub(r'{{(?:\?(?P.+?):)(?P\*)}}', r'(?P<\g>.*?)', regex_str) if regex_str.endswith('.*?)'): regex_str = regex_str[:-2] + '$)' elif regex_str.endswith('.*?'):