From cab96ad506946e81a6f422f18b381c988b1a3e4e Mon Sep 17 00:00:00 2001 From: Samuel Farrens Date: Wed, 7 Jul 2021 18:50:29 +0200 Subject: [PATCH 1/9] saving progress --- shapepipe/pipeline/file_handler.py | 43 +++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index d642d7a64..2606a7d6f 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -505,13 +505,22 @@ def _set_module_property(self, module, property, get_type): """ + m_run_name = self._module_dict[module]['run_name'] + + print(f'M_RUN_NAME: {m_run_name}') + # 1) Check for parameter value in module section of config file - if self._config.has_option(module.upper(), property.upper()): + if self._config.has_option(m_run_name.upper(), property.upper()): if get_type == 'str': - prop_val = self._config.get(module.upper(), property.upper()) + prop_val = self._config.get( + m_run_name.upper(), + property.upper(), + ) elif get_type == 'list': - prop_val = self._config.getlist(module.upper(), - property.upper()) + prop_val = self._config.getlist( + m_run_name.upper(), + property.upper(), + ) else: raise ValueError('{} is not a valid get type'.format(get_type)) @@ -529,8 +538,8 @@ def _set_module_property(self, module, property, get_type): # Look for additional module properties for list objects if (isinstance(prop_val, list) and - self.get_add_module_property(module, property)): - prop_val += self.get_add_module_property(module, property) + self.get_add_module_property(m_run_name, property)): + prop_val += self.get_add_module_property(m_run_name, property) self._module_dict[module][property] = prop_val @@ -573,7 +582,7 @@ def _set_module_properties(self, module): ''.format(len(self._module_dict[module]['file_ext']), len(self._module_dict[module]['file_pattern']))) - def _create_module_run_dirs(self, module): + def _create_module_run_dirs(self, module, call_num=None): """ Create Module Run Directories This method creates the module output directories for a given run. @@ -585,8 +594,13 @@ def _create_module_run_dirs(self, module): """ + if not isinstance(call_num, type(None)): + run_name = f'{module}_run_{call_num}' + else: + run_name = module + self._module_dict[module]['run_dir'] = \ - (self.format(self._run_dir, module)) + (self.format(self._run_dir, run_name)) self._module_dict[module]['log_dir'] = \ (self.format(self._module_dict[module]['run_dir'], 'logs')) self._module_dict[module]['output_dir'] = \ @@ -1052,9 +1066,18 @@ def set_up_module(self, module): """ - self._module_dict[module] = {} + if module in self._module_dict.keys(): + self._module_dict[module]['run_count'] += 1 + call_num = self._module_dict[module]['run_count'] + self._module_dict[module]['run_name'] = f'{module}/run_{call_num}' + else: + self._module_dict[module] = {} + self._module_dict[module]['run_count'] = 1 + call_num = None + self._module_dict[module]['run_name'] = module + self._set_module_properties(module) - self._create_module_run_dirs(module) + self._create_module_run_dirs(module, call_num) self._set_module_input_dir(module) self._get_module_input_files(module) From ce3ce154707ec293d219464cae919ecb8647f6b6 Mon Sep 17 00:00:00 2001 From: Samuel Farrens Date: Thu, 8 Jul 2021 18:07:04 +0200 Subject: [PATCH 2/9] save progress --- example/config.ini | 14 ++- shapepipe/modules/execute_example.py | 29 ----- shapepipe/modules/execute_example_runner.py | 42 +++++++ shapepipe/modules/python_example.py | 86 --------------- .../python_example_package/__init__.py | 11 ++ .../python_example_package/python_example.py | 104 ++++++++++++++++++ shapepipe/modules/python_example_runner.py | 54 +++++++++ ...al_example.py => serial_example_runner.py} | 29 +++-- shapepipe/modules/vignetmaker_runner.py | 23 ++-- shapepipe/pipeline/file_handler.py | 70 ++++++++---- shapepipe/pipeline/job_handler.py | 37 ++++--- shapepipe/pipeline/worker_handler.py | 49 ++++++--- shapepipe_run.py | 90 +++++++++------ 13 files changed, 418 insertions(+), 220 deletions(-) delete mode 100644 shapepipe/modules/execute_example.py create mode 100644 shapepipe/modules/execute_example_runner.py delete mode 100644 shapepipe/modules/python_example.py create mode 100644 shapepipe/modules/python_example_package/__init__.py create mode 100644 shapepipe/modules/python_example_package/python_example.py create mode 100644 shapepipe/modules/python_example_runner.py rename shapepipe/modules/{serial_example.py => serial_example_runner.py} (67%) diff --git a/example/config.ini b/example/config.ini index b6dc05e3d..0d1433e90 100644 --- a/example/config.ini +++ b/example/config.ini @@ -12,7 +12,7 @@ ## ShapePipe execution options [EXECUTION] # MODULE (required) must be a valid module runner name (or a comma separated list of names) -MODULE = python_example, serial_example, execute_example +MODULE = python_example_runner, serial_example_runner, execute_example_runner, python_example_runner # MODE (optional) options are smp or mpi, default is smp ; MODE = mpi @@ -54,8 +54,16 @@ TIMEOUT = 00:01:35 ; PROCESS_PRINT_LIMIT = 100 ## Module options -[PYTHON_EXAMPLE] +[PYTHON_EXAMPLE_RUNNER] MESSAGE = The obtained value is: -[SERIAL_EXAMPLE] +[SERIAL_EXAMPLE_RUNNER] ADD_INPUT_DIR = ./example/data/numbers, ./example/data/letters + +[PYTHON_EXAMPLE_RUNNER/RUN_2] + +INPUT_MODULE = python_example_runner, execute_example_runner +FILE_PATTERN = pyex_output, head_output +FILE_EXT = cat, txt + +MESSAGE = The new obtained value is: diff --git a/shapepipe/modules/execute_example.py b/shapepipe/modules/execute_example.py deleted file mode 100644 index 3a4c3fe0a..000000000 --- a/shapepipe/modules/execute_example.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- coding: utf-8 -*- - -"""EXECUTE MODULE EXAMPLE - -This module defines methods for an example command line execution module. - -:Author: Samuel Farrens - -""" - -from shapepipe.pipeline.execute import execute -from shapepipe.modules.module_decorator import module_runner - - -@module_runner(input_module='python_example', version='1.0', - file_pattern='pyex_output', file_ext='.cat', executes='head', - run_method='parallel') -def execute_example(input_file_list, run_dirs, file_number_string, *args): - - command_line = 'head {}'.format(input_file_list[0]) - output_file_name = '{}/head_output{}.txt'.format(run_dirs['output'], - file_number_string) - - stdout, stderr = execute(command_line) - - text_file = open(output_file_name, 'w') - text_file.write(stdout) - - return stdout, stderr diff --git a/shapepipe/modules/execute_example_runner.py b/shapepipe/modules/execute_example_runner.py new file mode 100644 index 000000000..ad7ae580a --- /dev/null +++ b/shapepipe/modules/execute_example_runner.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +"""EXECUTE MODULE EXAMPLE + +This module defines methods for an example command line execution module. + +:Author: Samuel Farrens + +""" + +from shapepipe.pipeline.execute import execute +from shapepipe.modules.module_decorator import module_runner + + +@module_runner( + input_module='python_example_runner', + version='1.0', + file_pattern='pyex_output', + file_ext='.cat', + executes='head', + run_method='parallel', +) +def execute_example_runner( + input_file_list, + run_dirs, + file_number_string, + config, + module_config_sec, + w_log, +): + + command_line = f'head {input_file_list[0]}' + output_file_name = ( + f"{run_dirs['output']}/head_output{file_number_string}.txt" + ) + + stdout, stderr = execute(command_line) + + text_file = open(output_file_name, 'w') + text_file.write(stdout) + + return stdout, stderr diff --git a/shapepipe/modules/python_example.py b/shapepipe/modules/python_example.py deleted file mode 100644 index 956e933ec..000000000 --- a/shapepipe/modules/python_example.py +++ /dev/null @@ -1,86 +0,0 @@ -# -*- coding: utf-8 -*- - -"""PYTHON MODULE EXAMPLE - -This module defines methods for an example Python module. - -:Author: Samuel Farrens - -""" - -import time -from numpy.random import randint -from shapepipe.modules.module_decorator import module_runner - - -class Dummy(object): - - def __init__(self, sleep_time=None): - - if not isinstance(sleep_time, type(None)): - self.sleep_time = sleep_time - - else: - self.sleep_time = randint(1, 10) - - def _wait(self): - - time.sleep(self.sleep_time) - - def _read_file(self, file_name): - - with open(file_name) as data_file: - content = data_file.read().replace('\n', '') - - return content - - def read_files(self, file_name1, file_name2): - - self._wait() - content1 = self._read_file(file_name1) - content2 = self._read_file(file_name2) - - self.content = '{} and {}'.format(content1, content2) - - def write_file(self, file_name, message): - - new_content = message + str(self.content) - - text_file = open(file_name, 'w') - text_file.write(new_content) - text_file.close() - - -@module_runner( - version='1.0', - file_pattern=['numbers', 'letters'], - file_ext='.txt', - depends=[ - 'numpy', - 'astropy', - 'galsim', - 'joblib', - 'mccd', - 'ngmix', - 'pandas', - 'pysap', - 'scipy', - 'sf_tools', - 'sip_tpv', - 'sqlitedict', - 'treecorr', - ], - run_method='parallel' -) -def python_example(input_file_list, run_dirs, file_number_string, - config, w_log): - - output_file_name = ('{}/pyex_output{}.cat'.format(run_dirs['output'], - file_number_string)) - message = config.get('PYTHON_EXAMPLE', 'MESSAGE') - - inst = Dummy() - inst.read_files(*input_file_list) - inst.write_file(output_file_name, message) - - return inst.content, None diff --git a/shapepipe/modules/python_example_package/__init__.py b/shapepipe/modules/python_example_package/__init__.py new file mode 100644 index 000000000..f691ad040 --- /dev/null +++ b/shapepipe/modules/python_example_package/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +"""PYTHON EXAMPLE PACKAGE + +This package contains the module(s) for ``python_example``. + +:Author: Samuel Farrens + +""" + +__all__ = ['python_example'] diff --git a/shapepipe/modules/python_example_package/python_example.py b/shapepipe/modules/python_example_package/python_example.py new file mode 100644 index 000000000..7d8509c3b --- /dev/null +++ b/shapepipe/modules/python_example_package/python_example.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- + +"""PYTHON EXAMPLE + +This module contains an example Python class. + +:Author: Samuel Farrens + +""" + +import time +from numpy.random import randint + + +class PythonExample: + """Python Example + + An example Python class. + + Parameters + ---------- + sleep_time : int + Sleep time in seconds + + """ + + def __init__(self, sleep_time=None): + + if not isinstance(sleep_time, type(None)): + self.sleep_time = sleep_time + + else: + self.sleep_time = randint(1, 10) + + def _wait(self): + """Wait + + Wait for n seconds. + + """ + + time.sleep(self.sleep_time) + + def _read_file(self, file_name): + """Read File + + Read input file content. + + Parameters + ---------- + file_name : str + Name of file to read + + Returns + ------- + str + Content of file + + """ + + with open(file_name) as data_file: + content = data_file.read().replace('\n', '') + + return content + + def read_files(self, file_name1, file_name2): + """Read Files + + Read two input files. + + Parameters + ---------- + file_name1 : str + Name of first file + file_name2 : str + Name of second file + + """ + + self._wait() + content1 = self._read_file(file_name1) + content2 = self._read_file(file_name2) + + self.content = '{} and {}'.format(content1, content2) + + def write_file(self, file_name, message): + """Write File + + Write content to file. + + Parameters + ---------- + file_name : str + Name of output file + Message : str + Content to write to file + + """ + + new_content = message + str(self.content) + + text_file = open(file_name, 'w') + text_file.write(new_content) + text_file.close() diff --git a/shapepipe/modules/python_example_runner.py b/shapepipe/modules/python_example_runner.py new file mode 100644 index 000000000..0571d9e43 --- /dev/null +++ b/shapepipe/modules/python_example_runner.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- + +"""PYTHON MODULE EXAMPLE + +This module defines methods for an example Python module. + +:Author: Samuel Farrens + +""" + +from shapepipe.modules.module_decorator import module_runner +from shapepipe.modules.python_example_package import python_example as pe + + +@module_runner( + version='1.1', + file_pattern=['numbers', 'letters'], + file_ext='.txt', + depends=[ + 'numpy', + 'astropy', + 'galsim', + 'joblib', + 'mccd', + 'ngmix', + 'pandas', + 'pysap', + 'scipy', + 'sf_tools', + 'sip_tpv', + 'sqlitedict', + 'treecorr', + ], + run_method='parallel' +) +def python_example_runner( + input_file_list, + run_dirs, + file_number_string, + config, + module_config_sec, + w_log, +): + + output_file_name = ( + f"{run_dirs['output']}/pyex_output{file_number_string}.cat" + ) + message = config.get(module_config_sec, 'MESSAGE') + + inst = pe.PythonExample(0) + inst.read_files(*input_file_list) + inst.write_file(output_file_name, message) + + return inst.content, None diff --git a/shapepipe/modules/serial_example.py b/shapepipe/modules/serial_example_runner.py similarity index 67% rename from shapepipe/modules/serial_example.py rename to shapepipe/modules/serial_example_runner.py index f6c4609ef..9a02abdf9 100644 --- a/shapepipe/modules/serial_example.py +++ b/shapepipe/modules/serial_example_runner.py @@ -39,15 +39,26 @@ def write_file(self, file_name): text_file.close() -@module_runner(input_module='python_example', version='1.0', - file_pattern=['numbers', 'letters', 'pyex_output'], - file_ext=['.txt', '.txt', '.cat'], depends='numpy', - run_method='serial') -def serial_example(input_file_list, run_dirs, file_number_string, - config, w_log): - - output_file_name = ('{}/serial_output{}.cat'.format(run_dirs['output'], - file_number_string)) +@module_runner( + input_module='python_example_runner', + version='1.0', + file_pattern=['numbers', 'letters', 'pyex_output'], + file_ext=['.txt', '.txt', '.cat'], + depends='numpy', + run_method='serial', +) +def serial_example_runner( + input_file_list, + run_dirs, + file_number_string, + config, + module_config_sec, + w_log, +): + + output_file_name = ( + f"{run_dirs['output']}/serial_outputfile_number_string.cat" + ) inst = Dummy() inst.read_files(input_file_list) diff --git a/shapepipe/modules/vignetmaker_runner.py b/shapepipe/modules/vignetmaker_runner.py index 2fd097987..f006d8066 100644 --- a/shapepipe/modules/vignetmaker_runner.py +++ b/shapepipe/modules/vignetmaker_runner.py @@ -24,7 +24,8 @@ def vignetmaker_runner( run_dirs, file_number_string, config, - w_log + module_config_sec, + w_log, ): # Get path to galaxy catalogue @@ -32,9 +33,9 @@ def vignetmaker_runner( # Check if masking should be performed # With masking - if config.getboolean('VIGNETMAKER_RUNNER', 'MASKING'): + if config.getboolean(module_config_sec, 'MASKING'): # Fetch the mask value - mask_value = config.getfloat('VIGNETMAKER_RUNNER', 'MASK_VALUE') + mask_value = config.getfloat(module_config_sec, 'MASK_VALUE') # Make a mask vignet = vm.make_mask(galcat_path=galcat_path, mask_value=mask_value) # Save the vignet @@ -50,7 +51,7 @@ def vignetmaker_runner( else: # Fetch stamp size - stamp_size = config.getint('VIGNETMAKER_RUNNER', 'STAMP_SIZE') - 1 + stamp_size = config.getint(module_config_sec, 'STAMP_SIZE') - 1 # Check stamp size if stamp_size % 2 != 0: raise ValueError('The STAMP_SIZE must be odd') @@ -58,10 +59,10 @@ def vignetmaker_runner( radius = int(stamp_size / 2) # Fetch position type and values - pos_type = config.get('VIGNETMAKER_RUNNER', 'COORD') - pos_params = config.getlist('VIGNETMAKER_RUNNER', 'POSITION_PARAMS') + pos_type = config.get(module_config_sec, 'COORD') + pos_params = config.getlist(module_config_sec, 'POSITION_PARAMS') # Fetch vignet run mode - mode = config.get('VIGNETMAKER_RUNNER', 'MODE') + mode = config.get(module_config_sec, 'MODE') # Create instance of VignetMaker vm_inst = vm.VignetMaker( @@ -75,7 +76,7 @@ def vignetmaker_runner( # Run in CLASSIC mode if mode == 'CLASSIC': # Fetch suffix - suffix = config.getlist('VIGNETMAKER_RUNNER', 'SUFFIX') + suffix = config.getlist(module_config_sec, 'SUFFIX') # Check suffix if len(suffix) != len(input_file_list[1:]): raise ValueError( @@ -90,13 +91,13 @@ def vignetmaker_runner( # Run in MULTI-EPOCH mode elif mode == 'MULTI-EPOCH': # Fetch image directory and patterns - image_dir = config.getlist('VIGNETMAKER_RUNNER', 'ME_IMAGE_DIR') + image_dir = config.getlist(module_config_sec, 'ME_IMAGE_DIR') image_pattern = config.getlist( - 'VIGNETMAKER_RUNNER', + module_config_sec, 'ME_IMAGE_PATTERN', ) # Fetch WCS log path - f_wcs_path = config.getexpanded('VIGNETMAKER_RUNNER', 'ME_LOG_WCS') + f_wcs_path = config.getexpanded(module_config_sec, 'ME_LOG_WCS') # Process inputs vm_inst.process_me(image_dir, image_pattern, f_wcs_path, radius) diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index 2606a7d6f..8f04269f5 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -448,6 +448,25 @@ def _copy_config_to_log(self): copyfile(self._config.file_name, '{}/{}'.format(self._log_dir, config_file_name)) + def get_module_config_sec(self, module): + """Set Module Configuration Section + + Set the name of section name in the configuration file for the module. + + Parameters + ---------- + module : str + Module name + + Returns + ------- + str + Configuration file section name + + """ + + return self._module_dict[module]['run_name'].upper() + def get_add_module_property(self, module, property): """ Get Additional Module Properties @@ -467,11 +486,17 @@ def get_add_module_property(self, module, property): """ - if (self._config.has_option(module.upper(), 'ADD_{}'.format( - property.upper()))): + module_config_sec = self.get_module_config_sec(module) + + if (self._config.has_option( + module_config_sec, + f'ADD_{property.upper()}', + )): - return self._config.getlist(module.upper(), 'ADD_{}'.format( - property.upper())) + return self._config.getlist( + module_config_sec, + f'ADD_{property.upper()}', + ) def _set_module_property(self, module, property, get_type): """ Set Module Property @@ -505,24 +530,22 @@ def _set_module_property(self, module, property, get_type): """ - m_run_name = self._module_dict[module]['run_name'] - - print(f'M_RUN_NAME: {m_run_name}') + module_config_sec = self.get_module_config_sec(module) # 1) Check for parameter value in module section of config file - if self._config.has_option(m_run_name.upper(), property.upper()): + if self._config.has_option(module_config_sec, property.upper()): if get_type == 'str': prop_val = self._config.get( - m_run_name.upper(), + module_config_sec, property.upper(), ) elif get_type == 'list': prop_val = self._config.getlist( - m_run_name.upper(), + module_config_sec, property.upper(), ) else: - raise ValueError('{} is not a valid get type'.format(get_type)) + raise ValueError(f'{get_type} is not a valid get type') # 2) Check for default parameter values in file handler elif hasattr(self, '_{}'.format(property)): @@ -533,13 +556,16 @@ def _set_module_property(self, module, property, get_type): prop_val = getattr(self.module_runners[module], property) else: - raise ValueError('No value for {} in {} could be found.' - ''.format(property, module)) + raise ValueError( + f'No value for {property} in {module} could be found.' + ) # Look for additional module properties for list objects - if (isinstance(prop_val, list) and - self.get_add_module_property(m_run_name, property)): - prop_val += self.get_add_module_property(m_run_name, property) + if ( + isinstance(prop_val, list) + and self.get_add_module_property(module, property) + ): + prop_val += self.get_add_module_property(module, property) self._module_dict[module][property] = prop_val @@ -788,6 +814,8 @@ def _save_num_patterns(self, dir_list, re_pattern, pattern, ext, file_list = find_files(path, pattern, ext) + print('XXXX', file_list) + if file_list: true_file_list = file_list true_path = path @@ -795,16 +823,16 @@ def _save_num_patterns(self, dir_list, re_pattern, pattern, ext, break if not true_file_list: - raise RuntimeError('No files found matching "{}" and "{}" in the ' - ' directories {}.' - ''.format(pattern, ext, dir_list)) + raise RuntimeError( + f'No files found matching "{pattern}" and "{ext}" in the ' + + f'directories {dir_list}.' + ) # Correct the extension if necessary new_ext = '.' + ext if not ext.startswith('.') else ext if new_ext != ext: - print('Updating file extension from "{}" to "{}".' - ''.format(ext, new_ext)) + print(f'Updating file extension from "{ext}" to "{new_ext}".') print() # Select files matching the numbering scheme diff --git a/shapepipe/pipeline/job_handler.py b/shapepipe/pipeline/job_handler.py index 6024b103c..276d92b0e 100644 --- a/shapepipe/pipeline/job_handler.py +++ b/shapepipe/pipeline/job_handler.py @@ -359,14 +359,21 @@ def _distribute_smp_jobs(self): """ - result = (Parallel(n_jobs=self.batch_size, backend=self.backend) - (delayed(WorkerHandler(verbose=self._verbose).worker) - (process[1:], process[0], - self.filehd.get_worker_log_name(self._module, - process[0]), - self.filehd.module_run_dirs, self.config, self.timeout, - self._module_runner) - for process in self.filehd.process_list)) + result = ( + Parallel(n_jobs=self.batch_size, backend=self.backend)( + delayed(WorkerHandler(verbose=self._verbose).worker)( + process[1:], + process[0], + self.filehd.get_worker_log_name(self._module, process[0]), + self.filehd.module_run_dirs, + self.config, + self.filehd.get_module_config_sec(self._module), + self.timeout, + self._module_runner + ) + for process in self.filehd.process_list + ) + ) self.worker_dicts = result @@ -380,11 +387,15 @@ def submit_serial_job(self): wh = WorkerHandler(verbose=self._verbose) process = self.filehd.process_list - result = wh.worker(process, '', - self.filehd.get_worker_log_name(self._module, - '_serial'), - self.filehd.module_run_dirs, self.config, - self.timeout, self._module_runner) + result = wh.worker( + process, + '', + self.filehd.get_worker_log_name(self._module, '_serial'), + self.filehd.module_run_dirs, self.config, + self.filehd.get_module_config_sec(self._module), + self.timeout, + self._module_runner, + ) self.worker_dicts = [result] diff --git a/shapepipe/pipeline/worker_handler.py b/shapepipe/pipeline/worker_handler.py index ee2c022d9..bdd80742f 100644 --- a/shapepipe/pipeline/worker_handler.py +++ b/shapepipe/pipeline/worker_handler.py @@ -30,8 +30,17 @@ def __init__(self, verbose=True): self._stderr = None self._verbose = verbose - def worker(self, process, job_name, w_log_name, run_dirs, config, - timeout, module_runner): + def worker( + self, + process, + job_name, + w_log_name, + run_dirs, + config, + module_config_sec, + timeout, + module_runner + ): """ Worker This method defines a worker. @@ -48,6 +57,8 @@ def worker(self, process, job_name, w_log_name, run_dirs, config, Run directories config : CustomParser Configuaration parser instance + module_config_sec : str + Configuration file section name timeout : int Timeout limit in seconds @@ -61,6 +72,7 @@ def worker(self, process, job_name, w_log_name, run_dirs, config, self._w_log_name = w_log_name self._run_dirs = run_dirs self._config = config + self._module_config_sec = module_config_sec self._module_runner = module_runner self._prepare_worker(process, job_name, timeout, module_runner.__name__) @@ -168,8 +180,9 @@ def _run_worker(self): """ try: - with_timeout(self.worker_dict['timeout'], - self.w_log.name)(self._worker_execution)() + with_timeout(self.worker_dict['timeout'], self.w_log.name)( + self._worker_execution + )() except Exception as err: catch_error(err, self.w_log) @@ -197,17 +210,21 @@ def _run_module(self): """ - self.w_log.info(' - Running module: {}'.format( - self.worker_dict['module'])) + self.w_log.info( + f" - Running module: {self.worker_dict['module']}" + ) file_number_string = self.worker_dict['file_number_string'] input_file_list = self.worker_dict['process'] - self._stdout, self._stderr = self._module_runner(input_file_list, - self._run_dirs, - file_number_string, - self._config, - self.w_log) + self._stdout, self._stderr = self._module_runner( + input_file_list, + self._run_dirs, + file_number_string, + self._config, + self._module_config_sec, + self.w_log, + ) def _log_stdout(self): """ Log STDOUT @@ -216,10 +233,12 @@ def _log_stdout(self): """ - self.w_log.info('Process produced the following output: {}'.format( - self._stdout)) + self.w_log.info( + f'Process produced the following output: {self._stdout}' + ) if self._stderr: - self.w_log.info('Process produced the following error(s): {}' - ''.format(self._stderr)) + self.w_log.info( + f'Process produced the following error(s): {self._stderr}' + ) self.worker_dict['stderr'] = True diff --git a/shapepipe_run.py b/shapepipe_run.py index db6174f2f..55fad582a 100755 --- a/shapepipe_run.py +++ b/shapepipe_run.py @@ -30,7 +30,7 @@ class ShapePipe(): - """ ShapePipe + """ShapePipe ShapePipe runner class. @@ -41,7 +41,7 @@ def __init__(self): self.log = None def set_up(self): - """ Set Up + """Set Up Set up ShapePipe properties. @@ -58,7 +58,7 @@ def set_up(self): self._prep_run() def _set_run_name(self): - """ Set Run Name + """Set Run Name Set the name of the current pipeline run. @@ -70,7 +70,7 @@ def _set_run_name(self): self._run_name += datetime.now().strftime('_%Y-%m-%d_%H-%M-%S') def _create_pipeline_log(self): - """ Create Pipeline Log + """Create Pipeline Log Create a general logging instance for the pipeline run. @@ -78,7 +78,7 @@ def _create_pipeline_log(self): self.log = set_up_log(self.filehd.log_name, verbose=False) - start_text = 'Starting ShapePipe Run: {}'.format(self._run_name) + start_text = f'Starting ShapePipe Run: {self._run_name}' self.log.info(shapepipe_logo()) self.log.info(start_text) @@ -90,7 +90,7 @@ def _create_pipeline_log(self): print('') def close_pipeline_log(self): - """ Close Pipeline Log + """Close Pipeline Log Close general logging instance for the pipeline run. @@ -105,8 +105,9 @@ def close_pipeline_log(self): plur = ' was' else: plur = 's were' - final_error_count = ('A total of {} error{} recorded.'.format( - self.error_count, plur)) + final_error_count = ( + f'A total of {self.error_count} error{plur} recorded.' + ) end_text = 'Finishing ShapePipe Run' self.log.info(final_error_count) @@ -123,7 +124,7 @@ def close_pipeline_log(self): raise RuntimeError(final_error_count) def _get_module_depends(self, property): - """ Get Module Dependencies + """Get Module Dependencies List the Python packages and executables needed to run the modules. @@ -146,14 +147,16 @@ def _get_module_depends(self, property): else: prop_list += getattr(module_runners[module], property) - if self.filehd.get_add_module_property(module, property): - prop_list += self.filehd.get_add_module_property(module, - property) + # if self.filehd.get_add_module_property(module, property): + # prop_list += self.filehd.get_add_module_property( + # module, + # property, + # ) return prop_list def _check_dependencies(self): - """ Check Dependencies + """Check Dependencies Check that all pipeline dependencies have been installed. @@ -200,7 +203,7 @@ def _check_dependencies(self): print('') def _check_module_versions(self): - """ Check Module Version + """Check Module Version Check versions of the modules. @@ -214,9 +217,9 @@ def _check_module_versions(self): for module in self.modules: - module_txt = (' - {} {}'.format( - module, - self.filehd.module_runners[module].version)) + module_txt = ( + f' - {module} {self.filehd.module_runners[module].version}' + ) self.log.info(module_txt) if self.verbose: @@ -227,7 +230,7 @@ def _check_module_versions(self): print('') def _get_module_run_methods(self): - """ Get Module Run Method + """Get Module Run Method Create a dictionary of modules with corresponding run methods. @@ -237,11 +240,12 @@ def _get_module_run_methods(self): for module in self.modules: - self.run_method[module] = \ + self.run_method[module] = ( self.filehd.module_runners[module].run_method + ) def _prep_run(self): - """ Run + """Run Run the pipeline. @@ -263,13 +267,13 @@ def _prep_run(self): self._get_module_run_methods() def record_mode(self): - """ Record Mode + """Record Mode Log mode in which ShapePipe is running. """ - mode_text = 'Running ShapePipe using {}'.format(self.mode) + mode_text = f'Running ShapePipe using {self.mode}' self.log.info(mode_text) self.log.info('') @@ -279,7 +283,7 @@ def record_mode(self): def run_smp(pipe): - """ Run SMP + """Run SMP Run ShapePipe using SMP. @@ -293,9 +297,14 @@ def run_smp(pipe): for module in pipe.modules: # Create a job handler for the current module - jh = JobHandler(module, filehd=pipe.filehd, config=pipe.config, - log=pipe.log, job_type=pipe.run_method[module], - verbose=pipe.verbose) + jh = JobHandler( + module, + filehd=pipe.filehd, + config=pipe.config, + log=pipe.log, + job_type=pipe.run_method[module], + verbose=pipe.verbose, + ) # Submit jobs jh.submit_jobs() @@ -311,7 +320,7 @@ def run_smp(pipe): def run_mpi(pipe, comm): - """ Run MPI + """Run MPI Run ShapePipe using MPI. @@ -345,9 +354,15 @@ def run_mpi(pipe, comm): # Run set up on master if master: # Create a job handler for the current module - jh = JobHandler(module, filehd=pipe.filehd, config=config, - log=pipe.log, job_type=pipe.run_method[module], - parallel_mode='mpi', verbose=verbose) + jh = JobHandler( + module, + filehd=pipe.filehd, + config=config, + log=pipe.log, + job_type=pipe.run_method[module], + parallel_mode='mpi', + verbose=verbose, + ) # Get job type job_type = jh.job_type @@ -387,9 +402,18 @@ def run_mpi(pipe, comm): jobs = comm.scatter(jobs, root=0) # Submit the MPI jobs and gather results - results = comm.gather(submit_mpi_jobs(jobs, config, timeout, - run_dirs, module_runner, worker_log, - verbose), root=0) + results = comm.gather( + submit_mpi_jobs( + jobs, + config, + timeout, + run_dirs, + module_runner, + worker_log, + verbose + ), + root=0, + ) # Delete broadcast objects del module_runner, worker_log, timeout, jobs From e8ffa13a844a633378a0d988c4f42338ec922699 Mon Sep 17 00:00:00 2001 From: Samuel Farrens Date: Mon, 12 Jul 2021 18:12:05 +0200 Subject: [PATCH 3/9] save progress --- example/config.ini | 2 +- shapepipe/pipeline/file_handler.py | 492 +++++++++++++++++------------ 2 files changed, 298 insertions(+), 196 deletions(-) diff --git a/example/config.ini b/example/config.ini index 0d1433e90..6d1838a40 100644 --- a/example/config.ini +++ b/example/config.ini @@ -12,7 +12,7 @@ ## ShapePipe execution options [EXECUTION] # MODULE (required) must be a valid module runner name (or a comma separated list of names) -MODULE = python_example_runner, serial_example_runner, execute_example_runner, python_example_runner +MODULE = python_example_runner, serial_example_runner, execute_example_runner # MODE (optional) options are smp or mpi, default is smp ; MODE = mpi diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index 8f04269f5..8c5384063 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -20,7 +20,7 @@ def find_files(path, pattern='*', ext='*'): - """ Find Files + """Find Files This method recursively retrieves file names from a given path that match a given pattern and/or have a given extension. @@ -60,18 +60,18 @@ def find_files(path, pattern='*', ext='*'): raise ValueError('Do not include "*" in extension.') if (not ext.startswith(dot) and dot in ext) or (ext.count(dot) > 1): - raise ValueError('Invalid extension format: "{}".'.format(ext)) + raise ValueError(f'Invalid extension format: "{ext}".') if ext != star and not ext.startswith(dot): ext = dot + ext - search_string = '{}/**/*{}*{}'.format(path, pattern, ext) + search_string = f'{path}/**/*{pattern}*{ext}' return glob(search_string, recursive=True) def check_duplicate(input_list): - """ Check Duplicate. + """Check Duplicate. Check whether input list contains at least one duplicate. @@ -98,7 +98,7 @@ def check_duplicate(input_list): class FileHandler(object): - """ File Handler + """File Handler This class manages the files used and produced during a pipeline run. @@ -126,11 +126,15 @@ def __init__(self, run_name, modules, config): self._input_list = config.getlist('FILE', 'INPUT_DIR') self._output_dir = config.getexpanded('FILE', 'OUTPUT_DIR') self._log_name = config.get('FILE', 'LOG_NAME') - self._correct_pattern = config.getboolean('FILE', - 'CORRECT_FILE_PATTERN') - self._run_log_file = self.format(self._output_dir, - config.get('FILE', 'RUN_LOG_NAME'), - '.txt') + self._correct_pattern = config.getboolean( + 'FILE', + 'CORRECT_FILE_PATTERN', + ) + self._run_log_file = self.format( + self._output_dir, + config.get('FILE', 'RUN_LOG_NAME'), + '.txt', + ) self._module_dict = {} if config.has_option('FILE', 'FILE_PATTERN'): @@ -143,8 +147,9 @@ def __init__(self, run_name, modules, config): self._numbering_scheme = r'RE:\_\d+' if config.has_option('FILE', 'NUMBER_LIST'): if os.path.isfile(config.get('FILE', 'NUMBER_LIST')): - self._number_list = (self.read_number_list(config.get('FILE', - 'NUMBER_LIST'))) + self._number_list = ( + self.read_number_list(config.get('FILE', 'NUMBER_LIST')) + ) else: self._number_list = config.getlist('FILE', 'NUMBER_LIST') else: @@ -152,7 +157,7 @@ def __init__(self, run_name, modules, config): @property def run_dir(self): - """ Run Directory + """Run Directory This method defines the run directory. @@ -167,7 +172,7 @@ def run_dir(self, value): @property def _input_dir(self): - """ Input Directory + """Input Directory This method defines the input directories. @@ -182,7 +187,7 @@ def _input_dir(self, value): @property def _output_dir(self): - """ Output Directory + """Output Directory This method defines the output directory. @@ -197,7 +202,7 @@ def _output_dir(self, value): @staticmethod def read_number_list(file_name): - """ Read Number List + """Read Number List Extract number strings to be processed from a file. @@ -215,7 +220,7 @@ def read_number_list(file_name): @classmethod def check_dir(cls, dir_name, check_exists=False): - """ Check Directory + """Check Directory Raise error if directory exists. @@ -232,13 +237,13 @@ def check_dir(cls, dir_name, check_exists=False): """ if check_exists and os.path.isdir(dir_name): - raise OSError('Directory {} already exists.'.format(dir_name)) + raise OSError('Directory {dir_name} already exists.') return cls.strip_slash(dir_name) @classmethod def check_dirs(cls, dir_list): - """ Check Directories + """Check Directories Check directories in list @@ -253,7 +258,7 @@ def check_dirs(cls, dir_list): @classmethod def mkdir(cls, dir_name): - """ Make Directory + """Make Directory This method creates a directory at the specified path. @@ -269,7 +274,7 @@ def mkdir(cls, dir_name): @staticmethod def format(path, name, ext=''): - """ Format Path Name + """Format Path Name This method appends the file/directory name to the input path. @@ -289,11 +294,11 @@ def format(path, name, ext=''): """ - return '{}/{}{}'.format(path, name, ext) + return f'{path}/{name}{ext}' @staticmethod def strip_slash(path): - """ Strip Slash + """Strip Slash This method removes the trailing slash from a path. @@ -313,7 +318,7 @@ def strip_slash(path): @classmethod def strip_slash_list(cls, path_list): - """ Strip Slash List + """Strip Slash List This method removes the trailing slash from a list of paths. @@ -333,7 +338,7 @@ def strip_slash_list(cls, path_list): @staticmethod def flatten_list(input_list): - """ Flatten List + """Flatten List Flatten a list of lists. @@ -352,7 +357,7 @@ def flatten_list(input_list): return [item for sublist in input_list for item in sublist] def _check_input_dir_list(self, dir_list): - """ Check Input Directory List + """Check Input Directory List Check an input list to see if the directories exist or if the the run log should be serarched for an appropriate output directory. @@ -379,35 +384,42 @@ def _check_input_dir_list(self, dir_list): elif 'last' in dir.lower(): module = dir.lower().split(':')[1] last_module = self._run_log.get_last(module) - input_dir.append(self.format(self.format( - last_module, - module), 'output')) + input_dir.append( + self.format(self.format(last_module, module), 'output') + ) elif 'all' in dir.lower(): module = dir.lower().split(':')[1] all_runs = self._run_log.get_all(module) - input_dir.extend([self.format(self.format( - run.split(' ')[0], - module), 'output') - for run in all_runs]) + input_dir.extend([ + self.format( + self.format(run.split(' ')[0], module), + 'output' + ) + for run in all_runs + ]) elif ':' in dir.lower(): string, module = dir.split(':') module = module.lower() - input_dir.append(self.format(self.format( - self._run_log.get_run(string), - module), - 'output')) + input_dir.append( + self.format( + self.format(self._run_log.get_run(string), module), + 'output' + ) + ) else: - raise ValueError('Invalid INPUT_DIR ({}). Make sure the paths ' - 'provided are valid directories or use the ' - 'allowed special keys.'.format(dir)) + raise ValueError( + f'Invalid INPUT_DIR ({dir}). Make sure the paths ' + + 'provided are valid directories or use the ' + + 'allowed special keys.' + ) return input_dir def _get_input_dir(self): - """ Get Input Directory + """Get Input Directory This method sets the module input directory @@ -416,7 +428,7 @@ def _get_input_dir(self): self._input_dir = self._check_input_dir_list(self._input_list) def create_global_run_dirs(self): - """ Create Global Run Directories + """Create Global Run Directories This method creates the pipeline output directories for a given run. @@ -426,8 +438,11 @@ def create_global_run_dirs(self): self._log_dir = self.format(self.run_dir, 'logs') self._tmp_dir = self.format(self.run_dir, 'tmp') self.log_name = self.format(self._log_dir, self._log_name) - self._run_log = RunLog(self._run_log_file, self._module_list, - self.run_dir) + self._run_log = RunLog( + self._run_log_file, + self._module_list, + self.run_dir, + ) self.mkdir(self.run_dir) self.mkdir(self._log_dir) @@ -437,7 +452,7 @@ def create_global_run_dirs(self): self._copy_config_to_log() def _copy_config_to_log(self): - """ Copy Config to Log + """Copy Config to Log Copy configuration file to run log directory. @@ -445,8 +460,10 @@ def _copy_config_to_log(self): config_file_name = os.path.basename(self._config.file_name) - copyfile(self._config.file_name, '{}/{}'.format(self._log_dir, - config_file_name)) + copyfile( + self._config.file_name, + f'{self._log_dir}/{config_file_name}', + ) def get_module_config_sec(self, module): """Set Module Configuration Section @@ -465,10 +482,11 @@ def get_module_config_sec(self, module): """ - return self._module_dict[module]['run_name'].upper() + # return self._module_dict[module]['run_name'].upper() + return self._module_dict[module]['latest'].upper() - def get_add_module_property(self, module, property): - """ Get Additional Module Properties + def get_add_module_property(self, module, run_name, property): + """Get Additional Module Properties Get a list of additional module property values. @@ -485,21 +503,18 @@ def get_add_module_property(self, module, property): Additional module property values """ - - module_config_sec = self.get_module_config_sec(module) - if (self._config.has_option( - module_config_sec, + run_name.upper(), f'ADD_{property.upper()}', )): return self._config.getlist( - module_config_sec, + run_name.upper(), f'ADD_{property.upper()}', ) - def _set_module_property(self, module, property, get_type): - """ Set Module Property + def _set_module_property(self, module, run_name, property, get_type): + """Set Module Property Set a module property from either the configuration file or the module runner. @@ -529,27 +544,24 @@ def _set_module_property(self, module, property, get_type): definitions, but only for the module in question. """ - - module_config_sec = self.get_module_config_sec(module) - # 1) Check for parameter value in module section of config file - if self._config.has_option(module_config_sec, property.upper()): + if self._config.has_option(run_name.upper(), property.upper()): if get_type == 'str': prop_val = self._config.get( - module_config_sec, + run_name.upper(), property.upper(), ) elif get_type == 'list': prop_val = self._config.getlist( - module_config_sec, + run_name.upper(), property.upper(), ) else: raise ValueError(f'{get_type} is not a valid get type') # 2) Check for default parameter values in file handler - elif hasattr(self, '_{}'.format(property)): - prop_val = getattr(self, '_{}'.format(property)) + elif hasattr(self, f'_{property}'): + prop_val = getattr(self, f'_{property}') # 3) Check for default parameter values in module runner elif hasattr(self.module_runners[module], property): @@ -563,14 +575,18 @@ def _set_module_property(self, module, property, get_type): # Look for additional module properties for list objects if ( isinstance(prop_val, list) - and self.get_add_module_property(module, property) + and self.get_add_module_property(module, run_name, property) ): - prop_val += self.get_add_module_property(module, property) + prop_val += self.get_add_module_property( + module, + run_name, + property, + ) - self._module_dict[module][property] = prop_val + self._module_dict[module][run_name][property] = prop_val - def _set_module_properties(self, module): - """ Get Module Properties + def _set_module_properties(self, module, run_name): + """Get Module Properties Get module properties defined in module runner wrapper. @@ -591,25 +607,32 @@ def _set_module_properties(self, module): 'executes': 'list' } - [self._set_module_property(module, property, get_type) - for property, get_type in module_props.items()] + for property, get_type in module_props.items(): + self._set_module_property(module, run_name, property, get_type) # Make sure the number of patterns and extensions match - if ((len(self._module_dict[module]['file_ext']) == 1) and - (len(self._module_dict[module]['file_pattern']) > 1)): - self._module_dict[module]['file_ext'] = \ - [self._module_dict[module]['file_ext'][0] for i in - self._module_dict[module]['file_pattern']] - - if (len(self._module_dict[module]['file_ext']) != - len(self._module_dict[module]['file_pattern'])): - raise ValueError('The number of file_ext values ({}) does not match ' - 'the number of file_pattern values ({}).' - ''.format(len(self._module_dict[module]['file_ext']), - len(self._module_dict[module]['file_pattern']))) - - def _create_module_run_dirs(self, module, call_num=None): - """ Create Module Run Directories + if ( + (len(self._module_dict[module][run_name]['file_ext']) == 1) + and (len(self._module_dict[module][run_name]['file_pattern']) > 1) + ): + self._module_dict[module][run_name]['file_ext'] = [ + self._module_dict[module][run_name]['file_ext'][0] + for i in self._module_dict[module][run_name]['file_pattern'] + ] + + if ( + len(self._module_dict[module][run_name]['file_ext']) + != len(self._module_dict[module][run_name]['file_pattern']) + ): + n_fext = len(self._module_dict[module][run_name]['file_ext']) + n_fpat = len(self._module_dict[module][run_name]['file_pattern']) + raise ValueError( + f'The number of file_ext values ({n_fext}) does not match the ' + + f'number of file_pattern values ({n_fpat}).' + ) + + def _create_module_run_dirs(self, module, run_name): + """Create Module Run Directories This method creates the module output directories for a given run. @@ -620,30 +643,39 @@ def _create_module_run_dirs(self, module, call_num=None): """ - if not isinstance(call_num, type(None)): - run_name = f'{module}_run_{call_num}' - else: - run_name = module - - self._module_dict[module]['run_dir'] = \ - (self.format(self._run_dir, run_name)) - self._module_dict[module]['log_dir'] = \ - (self.format(self._module_dict[module]['run_dir'], 'logs')) - self._module_dict[module]['output_dir'] = \ - (self.format(self._module_dict[module]['run_dir'], 'output')) + # if not isinstance(call_num, type(None)): + # run_name = f'{module}_run_{call_num}' + # else: + # run_name = module + + self._module_dict[module][run_name]['run_dir'] = ( + self.format(self._run_dir, run_name) + ) + self._module_dict[module][run_name]['log_dir'] = ( + self.format(self._module_dict[module][run_name]['run_dir'], 'logs') + ) + self._module_dict[module][run_name]['output_dir'] = ( + self.format( + self._module_dict[module][run_name]['run_dir'], + 'output', + ) + ) - self.mkdir(self._module_dict[module]['run_dir']) - self.mkdir(self._module_dict[module]['log_dir']) - self.mkdir(self._module_dict[module]['output_dir']) + self.mkdir(self._module_dict[module][run_name]['run_dir']) + self.mkdir(self._module_dict[module][run_name]['log_dir']) + self.mkdir(self._module_dict[module][run_name]['output_dir']) # Set current output directory to module output directory - self.output_dir = self._module_dict[module]['output_dir'] - self.module_run_dirs = {'run': self.run_dir, 'log': self._log_dir, - 'tmp': self._tmp_dir, - 'output': self.output_dir} + self.output_dir = self._module_dict[module][run_name]['output_dir'] + self.module_run_dirs = { + 'run': self.run_dir, + 'log': self._log_dir, + 'tmp': self._tmp_dir, + 'output': self.output_dir + } - def _set_module_input_dir(self, module): - """ Set Module Input Directory + def _set_module_input_dir(self, module, run_name): + """Set Module Input Directory Set the module input directory. If the module specified is the first module in the pipeline or does not have any input modules then @@ -660,34 +692,49 @@ def _set_module_input_dir(self, module): """ - if (isinstance(self._module_dict[module]['input_module'], type(None)) - or len(self._module_dict) == 1): + if ( + isinstance( + self._module_dict[module][run_name]['input_module'], + type(None), + ) + or len(self._module_dict) == 1 + ): input_dir = self._input_dir else: - input_dir = [self._module_dict[input_mod]['output_dir'] - for input_mod in - self._module_dict[module]['input_module'] - if input_mod in self._module_dict] + input_dir = [ + self._module_dict[input_mod][run_name]['output_dir'] + for input_mod in + self._module_dict[module][run_name]['input_module'] + if input_mod in self._module_dict + ] - if self._config.has_option(module.upper(), 'INPUT_DIR'): - input_dir = (self._check_input_dir_list( - self._config.getlist(module.upper(), - 'INPUT_DIR'))) + if self._config.has_option(run_name.upper(), 'INPUT_DIR'): + input_dir = self._check_input_dir_list( + self._config.getlist(run_name.upper(), 'INPUT_DIR') + ) - if self.get_add_module_property(module, 'input_dir'): - input_dir += self.get_add_module_property(module, 'input_dir') + if self.get_add_module_property(module, run_name, 'input_dir'): + input_dir += self.get_add_module_property( + module, + run_name, + 'input_dir', + ) if not input_dir: - raise RuntimeError('Could not find appropriate input directory ' - 'for module {}.'.format(module)) + raise RuntimeError( + 'Could not find appropriate input directory ' + + f'for module {run_name}.' + ) - self._module_dict[module]['input_dir'] = self.check_dirs(input_dir) + self._module_dict[module][run_name]['input_dir'] = ( + self.check_dirs(input_dir) + ) @staticmethod def _generate_re_pattern(match_pattern): - """ Generate Regular Expression Pattern + """Generate Regular Expression Pattern Generate a regular expression pattern from an input string. @@ -713,17 +760,20 @@ def _generate_re_pattern(match_pattern): chars = [char for char in match_pattern if not char.isalnum()] split_pattern = '|'.join(chars).replace('.', r'\.') - chars = ['\\{}'.format(char) for char in chars] + [''] - num_length = ['\\d{{{}}}'.format(len(digits)) for digits in - re.split(split_pattern, match_pattern)] - re_pattern = r''.join([a for b in zip(num_length, chars) - for a in b]).replace('{1}', '+') + chars = [f'\\{char}' for char in chars] + [''] + num_length = [ + f'\\d{{{len(digits)}}}' + for digits in re.split(split_pattern, match_pattern) + ] + re_pattern = r''.join( + [a for b in zip(num_length, chars) for a in b] + ).replace('{1}', '+') return re.compile(re_pattern) @staticmethod def _strip_dir_from_file(file_name, dir_list): - """ Strip Directory from File Name + """Strip Directory from File Name Remove the directory string from the file name. @@ -741,12 +791,14 @@ def _strip_dir_from_file(file_name, dir_list): """ - return [file_name.replace(_dir + '/', '') for _dir in dir_list - if _dir in file_name][0] + return [ + file_name.replace(_dir + '/', '') + for _dir in dir_list if _dir in file_name + ][0] @classmethod def _get_re(cls, num_scheme): - """ Get Regular Expression + """Get Regular Expression Return the regular expression corresponding to the numbering scheme. @@ -783,9 +835,15 @@ def _get_re(cls, num_scheme): return re_pattern - def _save_num_patterns(self, dir_list, re_pattern, pattern, ext, - output_file): - """ Save Number Patterns + def _save_num_patterns( + self, + dir_list, + re_pattern, + pattern, + ext, + output_file + ): + """Save Number Patterns Save file number patterns to numpy binary, update file patterns and get correct file paths. @@ -814,8 +872,6 @@ def _save_num_patterns(self, dir_list, re_pattern, pattern, ext, file_list = find_files(path, pattern, ext) - print('XXXX', file_list) - if file_list: true_file_list = file_list true_path = path @@ -860,24 +916,28 @@ def _save_num_patterns(self, dir_list, re_pattern, pattern, ext, new_pattern = new_pattern.replace(substring, '') if new_pattern != pattern: - print('Updating pattern from "{}" to "{}".' - ''.format(pattern, new_pattern)) + print( + f'Updating pattern from "{pattern}" to ' + + f'"{new_pattern}".' + ) print() correct_pattern = False if not found_match: - raise RuntimeError('Could not match numbering scheme "{}" to any ' - 'of the input files matching "{}" and "{}" in ' - 'the directories {}.' - ''.format(self._numbering_scheme, pattern, ext, - dir_list)) + raise RuntimeError( + f'Could not match numbering scheme "{self._numbering_scheme}" ' + + f'to any of the input files matching "{pattern}" and ' + + f'"{ext}" in the directories {dir_list}.' + ) if check_duplicate(final_file_list): - raise RuntimeError('Input file list contains at least two elements that match ' - 'file pattern and numbering scheme, leading to identical ' - 'input files. Make sure that the correct input ' - 'directory is used.') + raise RuntimeError( + 'Input file list contains at least two elements that match ' + + 'file pattern and numbering scheme, leading to identical ' + + 'input files. Make sure that the correct input ' + + 'directory is used.' + ) # Save file list np.save(output_file, np.array(final_file_list)) @@ -888,7 +948,7 @@ def _save_num_patterns(self, dir_list, re_pattern, pattern, ext, @staticmethod def _save_match_patterns(output_file, mmap_list): - """ Save Match Patterns + """Save Match Patterns Save matching number patterns to numpy binary. @@ -903,14 +963,19 @@ def _save_match_patterns(output_file, mmap_list): num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list] - np.save(output_file, reduce(partial(np.intersect1d, - assume_unique=True), num_pattern_list)) + np.save( + output_file, + reduce( + partial(np.intersect1d, assume_unique=True), + num_pattern_list + ) + ) del num_pattern_list @staticmethod def _get_file_name(path, pattern, number, ext): - """ Get File Name + """Get File Name Get file name corresponding to the path, file pattern, number pattern and file extension. @@ -933,11 +998,11 @@ def _get_file_name(path, pattern, number, ext): """ - return '{}/{}{}{}'.format(path, pattern, number, ext) + return f'{path}/{pattern}{number}{ext}' @staticmethod def _remove_mmaps(mmap_list): - """ Remove Memory Maps + """Remove Memory Maps Remove memory map files in input list. @@ -954,9 +1019,15 @@ def _remove_mmaps(mmap_list): for mmap in set(mmap_list): os.remove(mmap) - def _format_process_list(self, patterns, memory_map, re_pattern, - num_scheme, run_method): - """ Format Process List + def _format_process_list( + self, + patterns, + memory_map, + re_pattern, + num_scheme, + run_method, + ): + """Format Process List Format the list of files to be processed. @@ -992,24 +1063,32 @@ def _format_process_list(self, patterns, memory_map, re_pattern, for number in number_list: if not re.search(re_pattern, number): - raise ValueError('The string "{}" does not match the ' - 'numbering scheme "{}"' - ''.format(number, num_scheme)) + raise ValueError( + f'The string "{number}" does not match the ' + + f'numbering scheme "{num_scheme}".' + ) if run_method == 'serial': process_items = [] else: process_items = [number] - process_items.extend([self._get_file_name(path, fp, number, ext) - for path, fp, ext in - zip(path_list, pattern_list, ext_list)]) + process_items.extend([ + self._get_file_name(path, fp, number, ext) + for path, fp, ext in zip(path_list, pattern_list, ext_list) + ]) process_list.append(process_items) return process_list - def _save_process_list(self, dir_list, pattern_list, ext_list, num_scheme, - run_method): - """ Save Process List + def _save_process_list( + self, + dir_list, + pattern_list, + ext_list, + num_scheme, + run_method, + ): + """Save Process List Save list of processes to a numpy binary. @@ -1026,22 +1105,36 @@ def _save_process_list(self, dir_list, pattern_list, ext_list, num_scheme, """ - np_mmap_list = [self.format(self._tmp_dir, - 'nums_{}_{}.npy'.format(pattern, ext)) - for pattern, ext in zip(pattern_list, ext_list)] + np_mmap_list = [ + self.format(self._tmp_dir, f'nums_{pattern}_{ext}.npy') + for pattern, ext in zip(pattern_list, ext_list) + ] match_mmap = self.format(self._tmp_dir, 'matching_num_patterns.npy') self.process_mmap = self.format(self._tmp_dir, 'process_list.npy') re_pattern = self._get_re(num_scheme) - temp = [self._save_num_patterns(dir_list, re_pattern, pattern, ext, - np_mmap) for pattern, ext, np_mmap in - zip(pattern_list, ext_list, np_mmap_list)] + temp = [ + self._save_num_patterns( + dir_list, + re_pattern, + pattern, + ext, + np_mmap + ) + for pattern, ext, np_mmap in + zip(pattern_list, ext_list, np_mmap_list) + ] self._save_match_patterns(match_mmap, np_mmap_list) - process_list = self._format_process_list(temp, match_mmap, re_pattern, - num_scheme, run_method) + process_list = self._format_process_list( + temp, + match_mmap, + re_pattern, + num_scheme, + run_method, + ) np.save(self.process_mmap, np.array(process_list)) del process_list @@ -1053,7 +1146,7 @@ def _save_process_list(self, dir_list, pattern_list, ext_list, num_scheme, self._remove_mmaps(np_mmap_list + [match_mmap]) def remove_process_mmap(self): - """ Remove Process MMAP + """Remove Process MMAP Remove process list memory map. @@ -1061,8 +1154,8 @@ def remove_process_mmap(self): self._remove_mmaps([self.process_mmap]) - def _get_module_input_files(self, module): - """ Get Module Input Files + def _get_module_input_files(self, module, run_name): + """Get Module Input Files Retrieve the module input files names from the input directory. @@ -1073,17 +1166,22 @@ def _get_module_input_files(self, module): """ - dir_list = self._module_dict[module]['input_dir'] - pattern_list = self._module_dict[module]['file_pattern'] - ext_list = self._module_dict[module]['file_ext'] - num_scheme = self._module_dict[module]['numbering_scheme'] - run_method = self._module_dict[module]['run_method'] + dir_list = self._module_dict[module][run_name]['input_dir'] + pattern_list = self._module_dict[module][run_name]['file_pattern'] + ext_list = self._module_dict[module][run_name]['file_ext'] + num_scheme = self._module_dict[module][run_name]['numbering_scheme'] + run_method = self._module_dict[module][run_name]['run_method'] - self._save_process_list(dir_list, pattern_list, ext_list, num_scheme, - run_method) + self._save_process_list( + dir_list, + pattern_list, + ext_list, + num_scheme, + run_method, + ) def set_up_module(self, module): - """ Set Up Module + """Set Up Module Set up module parameters for file handler. @@ -1097,20 +1195,22 @@ def set_up_module(self, module): if module in self._module_dict.keys(): self._module_dict[module]['run_count'] += 1 call_num = self._module_dict[module]['run_count'] - self._module_dict[module]['run_name'] = f'{module}/run_{call_num}' + run_name = f'{module}/run_{call_num}' else: self._module_dict[module] = {} self._module_dict[module]['run_count'] = 1 - call_num = None - self._module_dict[module]['run_name'] = module + run_name = module - self._set_module_properties(module) - self._create_module_run_dirs(module, call_num) - self._set_module_input_dir(module) - self._get_module_input_files(module) + self._module_dict[module]['latest'] = run_name + self._module_dict[module][run_name] = {} + + self._set_module_properties(module, run_name) + self._create_module_run_dirs(module, run_name) + self._set_module_input_dir(module, run_name) + self._get_module_input_files(module, run_name) def get_worker_log_name(self, module, file_number_string): - """ Get Worker Log Name + """Get Worker Log Name This method generates a worker log name. @@ -1128,5 +1228,7 @@ def get_worker_log_name(self, module, file_number_string): """ - return '{}/process{}'.format(self._module_dict[module]['log_dir'], - file_number_string) + run_name = self._module_dict[module]['latest'] + log_dir = self._module_dict[module][run_name]['log_dir'] + + return f'{log_dir}/process{file_number_string}' From b4473856dcbe3b667e98a31ddd6e6ef4f934f702 Mon Sep 17 00:00:00 2001 From: Samuel Farrens Date: Wed, 14 Jul 2021 16:08:52 +0200 Subject: [PATCH 4/9] working implementation --- example/config.ini | 6 +- shapepipe/pipeline/file_handler.py | 102 +++++++++++++++++++---------- shapepipe/pipeline/job_handler.py | 17 +++-- shapepipe/pipeline/run_log.py | 26 ++++---- shapepipe_run.py | 20 +++--- 5 files changed, 109 insertions(+), 62 deletions(-) diff --git a/example/config.ini b/example/config.ini index 6d1838a40..fab6ee3ac 100644 --- a/example/config.ini +++ b/example/config.ini @@ -12,7 +12,7 @@ ## ShapePipe execution options [EXECUTION] # MODULE (required) must be a valid module runner name (or a comma separated list of names) -MODULE = python_example_runner, serial_example_runner, execute_example_runner +MODULE = python_example_runner, serial_example_runner, execute_example_runner, python_example_runner, execute_example_runner # MODE (optional) options are smp or mpi, default is smp ; MODE = mpi @@ -67,3 +67,7 @@ FILE_PATTERN = pyex_output, head_output FILE_EXT = cat, txt MESSAGE = The new obtained value is: + +[EXECUTE_EXAMPLE_RUNNER/RUN_2] + +INPUT_MODULE = python_example_runner/run_2 diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index 8c5384063..ef37da171 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -356,6 +356,30 @@ def flatten_list(input_list): return [item for sublist in input_list for item in sublist] + @staticmethod + def _get_module_run_name(dir): + """Get Module Run Name + + Retrieve module run name, module name and search string from input + string. + + Parameters + ---------- + dir : str + Input directory string + + Returns + ------- + tuple + Module run name, module name, search string + + """ + + string, module_run = dir.split(':') + module = module_run.split('/')[0] + + return module_run.lower(), module.lower(), string + def _check_input_dir_list(self, dir_list): """Check Input Directory List @@ -373,7 +397,6 @@ def _check_input_dir_list(self, dir_list): For invalid input directory value """ - input_dir = [] for dir in dir_list: @@ -382,29 +405,28 @@ def _check_input_dir_list(self, dir_list): input_dir.append(dir) elif 'last' in dir.lower(): - module = dir.lower().split(':')[1] + module_run, module, _ = self._get_module_run_name(dir) last_module = self._run_log.get_last(module) input_dir.append( - self.format(self.format(last_module, module), 'output') + self.format(self.format(last_module, module_run), 'output') ) elif 'all' in dir.lower(): - module = dir.lower().split(':')[1] + module_run, module, _ = self._get_module_run_name(dir) all_runs = self._run_log.get_all(module) input_dir.extend([ self.format( - self.format(run.split(' ')[0], module), + self.format(run.split(' ')[0], module_run), 'output' ) for run in all_runs ]) elif ':' in dir.lower(): - string, module = dir.split(':') - module = module.lower() + module_run, _, string = self._get_module_run_name(dir) input_dir.append( self.format( - self.format(self._run_log.get_run(string), module), + self.format(self._run_log.get_run(string), module_run), 'output' ) ) @@ -465,10 +487,29 @@ def _copy_config_to_log(self): f'{self._log_dir}/{config_file_name}', ) + def get_module_current_run(self, module): + """Get Module Current Run + + Get the run current run count for the module. + + Parameters + ---------- + module : str + Module name + + Returns + ------- + str + Module run count + + """ + + return str(self._module_dict[module]['run_count']) + def get_module_config_sec(self, module): - """Set Module Configuration Section + """Get Module Configuration Section - Set the name of section name in the configuration file for the module. + Get the name of section name in the configuration file for the module. Parameters ---------- @@ -482,18 +523,17 @@ def get_module_config_sec(self, module): """ - # return self._module_dict[module]['run_name'].upper() return self._module_dict[module]['latest'].upper() - def get_add_module_property(self, module, run_name, property): + def get_add_module_property(self, run_name, property): """Get Additional Module Properties Get a list of additional module property values. Parameters ---------- - module : str - Module name + run_name : str + Module run name property : str Property name @@ -575,13 +615,9 @@ def _set_module_property(self, module, run_name, property, get_type): # Look for additional module properties for list objects if ( isinstance(prop_val, list) - and self.get_add_module_property(module, run_name, property) + and self.get_add_module_property(run_name, property) ): - prop_val += self.get_add_module_property( - module, - run_name, - property, - ) + prop_val += self.get_add_module_property(run_name, property) self._module_dict[module][run_name][property] = prop_val @@ -643,11 +679,6 @@ def _create_module_run_dirs(self, module, run_name): """ - # if not isinstance(call_num, type(None)): - # run_name = f'{module}_run_{call_num}' - # else: - # run_name = module - self._module_dict[module][run_name]['run_dir'] = ( self.format(self._run_dir, run_name) ) @@ -703,24 +734,23 @@ def _set_module_input_dir(self, module, run_name): else: - input_dir = [ - self._module_dict[input_mod][run_name]['output_dir'] - for input_mod in + input_dir = [] + for in_mod_run in ( self._module_dict[module][run_name]['input_module'] - if input_mod in self._module_dict - ] + ): + input_mod = in_mod_run.split('/')[0] + if input_mod in self._module_dict: + input_dir.append( + self._module_dict[input_mod][in_mod_run]['output_dir'] + ) if self._config.has_option(run_name.upper(), 'INPUT_DIR'): input_dir = self._check_input_dir_list( self._config.getlist(run_name.upper(), 'INPUT_DIR') ) - if self.get_add_module_property(module, run_name, 'input_dir'): - input_dir += self.get_add_module_property( - module, - run_name, - 'input_dir', - ) + if self.get_add_module_property(run_name, 'input_dir'): + input_dir += self.get_add_module_property(run_name, 'input_dir') if not input_dir: raise RuntimeError( diff --git a/shapepipe/pipeline/job_handler.py b/shapepipe/pipeline/job_handler.py index 276d92b0e..f4e0260d4 100644 --- a/shapepipe/pipeline/job_handler.py +++ b/shapepipe/pipeline/job_handler.py @@ -324,17 +324,21 @@ def _log_job_parameters(self): """ + module_run_num = self.filehd.get_module_current_run(self._module) + text = 'Starting job handler with:' - module_info = ' - Module: {}'.format(self._module) - cpu_info = ' - Number of available CPUs: {}'.format(cpu_count()) - proc_info = ' - Total number of processes: {}'.format(self._n_procs) - job_type = ' - Job Type: {}'.format(self.job_type) - batch_info = ' - Batch size: {}'.format(self.batch_size) - time_info = ' - Timeout Limit: {}s'.format(self.timeout) + module_info = f' - Module: {self._module}' + module_run = f' -- Run: {module_run_num}' + cpu_info = f' - Number of available CPUs: {cpu_count()}' + proc_info = f' - Total number of processes: {self._n_procs}' + job_type = f' - Job Type: {self.job_type}' + batch_info = f' - Batch size: {self.batch_size}' + time_info = f' - Timeout Limit: {self.timeout}s' if self._verbose: print('Starting job handler with:') print(module_info) + print(module_run) print(cpu_info) print(proc_info) print(job_type) @@ -345,6 +349,7 @@ def _log_job_parameters(self): # Log process properties self.log.info(text) self.log.info(module_info) + self.log.info(module_run) self.log.info(cpu_info) self.log.info(proc_info) self.log.info(job_type) diff --git a/shapepipe/pipeline/run_log.py b/shapepipe/pipeline/run_log.py index 8c3814b0d..9de27a371 100644 --- a/shapepipe/pipeline/run_log.py +++ b/shapepipe/pipeline/run_log.py @@ -43,8 +43,7 @@ def _write(self): """ with open(self.run_log_file, 'a') as run_log: - run_log.write('{} {}\n'.format(self.current_run, - self._module_list)) + run_log.write(f'{self.current_run} {self._module_list}\n') def _get_list(self): """ Get List @@ -75,11 +74,14 @@ def get_all(self, module): """ - all_runs = [run for run in self._runs if module in - run.split()[1].split(',')] + all_runs = [ + run for run in self._runs + if module in run.split()[1].split(',') + ] if len(all_runs) == 0: - raise RuntimeError('No previous run of module \'{}\' ' - 'found'.format(module)) + raise RuntimeError( + f'No previous run of module \'{module}\' found' + ) all_runs = all_runs[::-1] @@ -134,12 +136,14 @@ def get_run(self, search_string): runs = [run for run in self._runs if search_string in run] if len(runs) < 1: - raise RuntimeError('No runs found matching search string \'{}\'.' - ''.format(search_string)) + raise RuntimeError( + f'No runs found matching search string \'{search_string}\'.' + ) elif len(runs) > 1: - raise RuntimeError('More than one run found matching search ' - 'string \'{}\'' - ''.format(search_string)) + raise RuntimeError( + 'More than one run found matching search string ' + + f'\'{search_string}\'' + ) return runs[0].split(' ')[0] diff --git a/shapepipe_run.py b/shapepipe_run.py index 55fad582a..287e23278 100755 --- a/shapepipe_run.py +++ b/shapepipe_run.py @@ -142,16 +142,20 @@ def _get_module_depends(self, property): for module in module_runners.keys(): if self.config.has_option(module.upper(), property.upper()): - prop_list += self.config.getlist(module.upper(), - property.upper()) + prop_list += self.config.getlist( + module.upper(), + property.upper(), + ) else: prop_list += getattr(module_runners[module], property) - # if self.filehd.get_add_module_property(module, property): - # prop_list += self.filehd.get_add_module_property( - # module, - # property, - # ) + if self.filehd.get_add_module_property(module, property): + prop_list += self.filehd.get_add_module_property( + module, + property, + ) + + print(prop_list) return prop_list @@ -215,7 +219,7 @@ def _check_module_versions(self): if self.verbose: print(ver_text) - for module in self.modules: + for module in set(self.modules): module_txt = ( f' - {module} {self.filehd.module_runners[module].version}' From dd953112c0347f012d584b7a2803f1a1dd11b83c Mon Sep 17 00:00:00 2001 From: Samuel Farrens Date: Thu, 15 Jul 2021 14:36:44 +0200 Subject: [PATCH 5/9] code cleaned up --- shapepipe/info.py | 30 ++++--- shapepipe/modules/__init__.py | 78 ++++++++-------- shapepipe/modules/module_decorator.py | 30 ++++--- shapepipe/modules/module_runners.py | 15 +++- shapepipe/pipeline/__init__.py | 12 ++- shapepipe/pipeline/args.py | 69 ++++++++------ shapepipe/pipeline/config.py | 46 +++++----- shapepipe/pipeline/dependency_handler.py | 110 +++++++++++------------ shapepipe/pipeline/execute.py | 16 ++-- shapepipe/pipeline/file_handler.py | 81 ++++++----------- shapepipe/pipeline/job_handler.py | 105 ++++++++++------------ shapepipe/pipeline/mpi_run.py | 28 ++++-- shapepipe/pipeline/run_log.py | 17 ++-- shapepipe/pipeline/timeout.py | 8 +- shapepipe/pipeline/worker_handler.py | 64 ++++++------- shapepipe/utilities/canfar.py | 25 +++--- shapepipe/utilities/file_system.py | 18 ++-- shapepipe_run.py | 16 +--- 18 files changed, 381 insertions(+), 387 deletions(-) diff --git a/shapepipe/info.py b/shapepipe/info.py index b4479d555..ab1f1c02e 100644 --- a/shapepipe/info.py +++ b/shapepipe/info.py @@ -17,31 +17,36 @@ # Package Info -version_info = (0, 0, 2) +version_info = (0, 0, 3) __version__ = '.'.join(str(c) for c in version_info) __name__ = 'shapepipe' __author__ = 'Samuel Farrens' __email__ = 'samuel.farrens@cea.fr' -__about__ = ('ShapePipe is a shape measurement pipeline developed with the' - 'CosmoStat lab at CEA Paris-Saclay.') +__about__ = ( + 'ShapePipe is a shape measurement pipeline developed with the' + + 'CosmoStat lab at CEA Paris-Saclay.' +) __setups__ = ['pytest-runner'] -__installs__ = ['joblib>=0.13', - 'modopt>=1.2', - 'numpy>=1.14'] -__tests__ = ['pytest', - 'pytest-cov', - 'pytest-pycodestyle'] +__installs__ = [ + 'joblib>=0.13', + 'modopt>=1.2', + 'numpy>=1.14' +] +__tests__ = [ + 'pytest', + 'pytest-cov', + 'pytest-pycodestyle' +] def shapepipe_logo(colour=False): - """ ShapePipe Logo + """ShapePipe Logo Returns ------- str logo string """ - shape = r''' _______ __ __ _______ _______ _______ _______ ___ _______ _______ | || | | || _ || || || || | | || | @@ -79,14 +84,13 @@ def shapepipe_logo(colour=False): def line(): - """ Line + """Line Returns ------- str a horizontal line """ - line = r''' ------------------------------------------------------------------------------- ''' diff --git a/shapepipe/modules/__init__.py b/shapepipe/modules/__init__.py index 3a2762ddd..f51aa5645 100644 --- a/shapepipe/modules/__init__.py +++ b/shapepipe/modules/__init__.py @@ -8,38 +8,46 @@ """ -__all__ = ['module_decorator', - 'module_runners', - 'mask_package', - 'PSFExInterpolation_package', - 'SETools_package' - ] -__module_list__ = ['execute_example', - 'erase_output_runner', - 'find_exposures_runner', - 'galsim_shapes_runner', - 'galsim_shapes_v2_runner', - 'get_images_runner', - 'get_images_runner2', - 'make_catalog_runner', - 'mask_runner', - 'mask_runner_exp', - 'match_external_runner', - 'merge_sep_cats_runner', - 'merge_headers_runner', - 'merge_star_cat_runner', - 'ngmix_runner', - 'paste_cat_runner', - 'psfex_runner', - 'psfex_interp_runner', - 'python_example', - 'serial_example', - 'setools_runner', - 'sextractor_runner', - 'sextractor_runner_exp', - 'split_exp_runner', - 'spread_model_runner', - 'swarp_runner', - 'vignetmaker_runner', - 'vignetmaker_runner2', - 'uncompress_fits_image_runner'] +__all__ = [ + 'find_exposures_package', + 'mask_package', + 'MCCD_package', + 'module_decorator', + 'module_runners', + 'mask_package', + 'PasteCat_package' + 'PSFExInterpolation_package', + 'python_example_package', + 'SETools_package', + 'vignetmaker_package', +] +__module_list__ = [ + 'execute_example', + 'erase_output_runner', + 'find_exposures_runner', + 'galsim_shapes_runner', + 'galsim_shapes_v2_runner', + 'get_images_runner', + 'get_images_runner2', + 'make_catalog_runner', + 'mask_runner', + 'mask_runner_exp', + 'match_external_runner', + 'merge_sep_cats_runner', + 'merge_headers_runner', + 'merge_star_cat_runner', + 'ngmix_runner', + 'paste_cat_runner', + 'psfex_runner', + 'psfex_interp_runner', + 'python_example', + 'serial_example', + 'setools_runner', + 'sextractor_runner', + 'sextractor_runner_exp', + 'split_exp_runner', + 'spread_model_runner', + 'swarp_runner', + 'vignetmaker_runner', + 'uncompress_fits_image_runner', +] diff --git a/shapepipe/modules/module_decorator.py b/shapepipe/modules/module_decorator.py index d3ecbe432..2b284824b 100644 --- a/shapepipe/modules/module_decorator.py +++ b/shapepipe/modules/module_decorator.py @@ -7,10 +7,17 @@ """ -def module_runner(input_module=None, version='0.0', file_pattern='', - file_ext='', depends=[], executes=[], numbering_scheme=None, - run_method='parallel'): - """ Module Runner Wrapper +def module_runner( + input_module=None, + version='0.0', + file_pattern='', + file_ext='', + depends=[], + executes=[], + numbering_scheme=None, + run_method='parallel', +): + """Module Runner Wrapper This method adds properties to module runners. @@ -32,7 +39,6 @@ def module_runner(input_module=None, version='0.0', file_pattern='', Module numbering scheme, default is None """ - if isinstance(input_module, str): input_module = [input_module] elif not isinstance(input_module, (list, type(None))): @@ -62,17 +68,19 @@ def module_runner(input_module=None, version='0.0', file_pattern='', raise TypeError('Executables must be a string or a list of strings') if not isinstance(numbering_scheme, (str, type(None))): - raise TypeError('Numbering scheme must be a string, found \'{}\'.' - ''.format(numbering_scheme)) + raise TypeError( + f'Numbering scheme must be a string, found \'{numbering_scheme}\'.' + ) if (len(file_ext) == 1) and (len(file_pattern) > 1): file_ext = [file_ext[0] for i in file_pattern] if len(file_ext) != len(file_pattern): - raise ValueError('The number of file_ext values ({}) does not match ' - 'the number of file_pattern values ({}) in the ' - 'module decorator.' - ''.format(len(file_ext), len(file_pattern))) + raise ValueError( + f'The number of file_ext values ({len(file_ext)}) does not match ' + + f'the number of file_pattern values ({len(file_pattern)}) in ' + + 'the module decorator.' + ) def decorator(func): diff --git a/shapepipe/modules/module_runners.py b/shapepipe/modules/module_runners.py index 16d29fc99..d524480f3 100644 --- a/shapepipe/modules/module_runners.py +++ b/shapepipe/modules/module_runners.py @@ -12,7 +12,7 @@ def get_module_runners(modules): - """ Get Module Runners + """Get Module Runners Import the specified module runners. @@ -30,8 +30,15 @@ def get_module_runners(modules): package = 'shapepipe.modules' - module_runners = dict([(module, getattr(import_module('.{}'.format(module), - package=package), module)) - for module in modules]) + module_runners = dict([ + ( + module, + getattr( + import_module('.{}'.format(module), package=package), + module, + ) + ) + for module in modules + ]) return module_runners diff --git a/shapepipe/pipeline/__init__.py b/shapepipe/pipeline/__init__.py index 163ef0756..60d34bc22 100644 --- a/shapepipe/pipeline/__init__.py +++ b/shapepipe/pipeline/__init__.py @@ -8,5 +8,13 @@ """ -__all__ = ['args', 'config', 'execute', 'file_handler', 'job_handler', - 'timeout', 'worker_handler', 'file_io'] +__all__ = [ + 'args', + 'config', + 'execute', + 'file_handler', + 'job_handler', + 'timeout', + 'worker_handler', + 'file_io' +] diff --git a/shapepipe/pipeline/args.py b/shapepipe/pipeline/args.py index 9a9ebaf90..c08dc2190 100644 --- a/shapepipe/pipeline/args.py +++ b/shapepipe/pipeline/args.py @@ -13,20 +13,21 @@ from shapepipe.info import shapepipe_logo, __version__ -class cutomFormatter(ap.ArgumentDefaultsHelpFormatter, - ap.RawDescriptionHelpFormatter): - """ Custom Formatter +class cutomFormatter( + ap.ArgumentDefaultsHelpFormatter, + ap.RawDescriptionHelpFormatter, +): + """Custom Formatter This class combines the argparse ``ArgumentDefaultsHelpFormatter`` and ``RawDescriptionHelpFormatter`` formatters. """ - pass def print_message(message): - """ Print Message + """Print Message This method returns a custom argparse action for printing a message. @@ -38,10 +39,9 @@ def print_message(message): Returns ------- customAction - Custom action class instance + Custom action class object """ - class customAction(ap.Action): def __init__(self, option_strings, version=None, dest=ap.SUPPRESS, @@ -61,7 +61,7 @@ def __call__(self, parser, args, values, option_string=None): def module_str(): - """ Module String + """Module String Format the list of modules as a single string. @@ -71,7 +71,6 @@ def module_str(): Formatted string of module names """ - string = '' for module in __module_list__: @@ -81,7 +80,7 @@ def module_str(): def create_arg_parser(): - """ Create Argument Parser + """Create Argument Parser This method returns an argument parser. @@ -91,27 +90,43 @@ def create_arg_parser(): Argument parser """ - # Create parser - parser = ap.ArgumentParser(add_help=False, description=shapepipe_logo(), - formatter_class=cutomFormatter) + parser = ap.ArgumentParser( + add_help=False, description=shapepipe_logo(), + formatter_class=cutomFormatter, + ) optional = parser.add_argument_group('Optional Arguments') # Add arguments - optional.add_argument('-h', '--help', action='help', - help='show this help message and exit') - - optional.add_argument('-v', '--version', action='version', - version='%(prog)s v{}'.format(__version__)) - - optional.add_argument('-l', '--list_modules', - action=print_message('ShapePipe modules currently ' - 'available:\n' - '{}'.format(module_str())), - help='list modules currently available and exit') - - optional.add_argument('-c', '--config', default='config.ini', - help='configuration file name') + optional.add_argument( + '-h', + '--help', + action='help', + help='show this help message and exit', + ) + + optional.add_argument( + '-v', + '--version', + action='version', + version=f'%(prog)s v{__version__}' + ) + + optional.add_argument( + '-l', + '--list_modules', + action=print_message( + f'ShapePipe modules currently available:\n{module_str()}' + ), + help='list modules currently available and exit', + ) + + optional.add_argument( + '-c', + '--config', + default='config.ini', + help='configuration file name', + ) # Return parser return parser.parse_args() diff --git a/shapepipe/pipeline/config.py b/shapepipe/pipeline/config.py index 7747b3634..ae91347af 100644 --- a/shapepipe/pipeline/config.py +++ b/shapepipe/pipeline/config.py @@ -13,14 +13,14 @@ class CustomParser(ConfigParser): - """ Custom Parser + """Custom Parser This class adds functionality to the ``ConfigParser`` class. """ def getexpanded(self, section, option): - """ Get Expanded + """Get Expanded This method expands enviroment varibles obtaiened using the get method. @@ -37,11 +37,10 @@ def getexpanded(self, section, option): Expanded enviroment variables """ - return self._get(section, os.path.expandvars, option) def getlist(self, section, option, delimiter=','): - """ Get List + """Get List This method retrieves a list of strings separated by a given delimiter. @@ -61,13 +60,14 @@ def getlist(self, section, option, delimiter=','): List of strings """ - - return [opt.strip() for opt in self.getexpanded(section, - option).split(delimiter)] + return [ + opt.strip() + for opt in self.getexpanded(section, option).split(delimiter) + ] class SetUpParser(object): - """ Set Up Parser + """Set Up Parser This class sets up an instance of ``CustomParser`` and checks the pipeline related parameters. @@ -86,7 +86,7 @@ def __init__(self, file_name): @property def file_name(self): - """ File name + """File name This sets the configuration file name. @@ -96,25 +96,22 @@ def file_name(self): For non existent configuration file """ - return self._file_name @file_name.setter def file_name(self, value): if not os.path.exists(value): - raise IOError('Configuration file {} does not exist.'.format( - value)) + raise IOError(f'Configuration file {value} does not exist.') self._file_name = value def _set_defaults(self): - """ Set Defaults + """Set Defaults Set default configuration options. """ - if not self.config.has_option('DEFAULT', 'RUN_NAME'): self.config.set('DEFAULT', 'RUN_NAME', 'shapepipe_run') @@ -125,7 +122,7 @@ def _set_defaults(self): self.config.set('DEFAULT', 'VERBOSE', 'True') def _set_execution_options(self): - """ Set Execution Options + """Set Execution Options This method checks the execution options in the configuration file. @@ -137,7 +134,6 @@ def _set_execution_options(self): For non-existent module runner """ - if not self.config.has_option('EXECUTION', 'MODULE'): raise RuntimeError('No module(s) specified') @@ -145,7 +141,7 @@ def _set_execution_options(self): self.config.set('EXECUTION', 'MODE', 'smp') def _set_file_options(self): - """ Set File Options + """Set File Options This module checks the file options in the configuration file. @@ -161,7 +157,6 @@ def _set_file_options(self): For non-existent output directory """ - if not self.config.has_option('FILE', 'LOG_NAME'): self.config.set('FILE', 'LOG_NAME', 'shapepipe') @@ -175,19 +170,20 @@ def _set_file_options(self): raise RuntimeError('Not output directory specified') elif not os.path.isdir(self.config.getexpanded('FILE', 'OUTPUT_DIR')): - raise OSError('Directory {} not found.'.format( - self.config.getexpanded('FILE', 'OUTPUT_DIR'))) + raise OSError( + f'Directory {self.config.getexpanded("FILE", "OUTPUT_DIR")} ' + + 'not found.' + ) if not self.config.has_option('FILE', 'CORRECT_FILE_PATTERN'): self.config.set('FILE', 'CORRECT_FILE_PATTERN', 'True') def _set_worker_options(self): - """ Set Worker Options + """Set Worker Options This module checks the worker options in the configuration file. """ - if not self.config.has_section('WORKER'): self.config.add_section('WORKER') @@ -195,7 +191,7 @@ def _set_worker_options(self): self.config.set('WORKER', 'PROCESS_PRINT_LIMIT', '200') def get_parser(self): - """ Get Parser + """Get Parser Return a configuration file parser instance. @@ -205,7 +201,6 @@ def get_parser(self): Custom configuration file parser """ - self.config.read(self.file_name) self._set_defaults() self._set_execution_options() @@ -216,7 +211,7 @@ def get_parser(self): def create_config_parser(file_name): - """ Create Configuration Parser + """Create Configuration Parser This method creates a configuration file parser instance. @@ -231,7 +226,6 @@ def create_config_parser(file_name): Custom configuration file parser """ - parser = SetUpParser(file_name).get_parser() parser.file_name = file_name diff --git a/shapepipe/pipeline/dependency_handler.py b/shapepipe/pipeline/dependency_handler.py index df303c848..5f9f0c522 100644 --- a/shapepipe/pipeline/dependency_handler.py +++ b/shapepipe/pipeline/dependency_handler.py @@ -15,7 +15,7 @@ class DependencyHandler(object): - """ Dependency Handler + """Dependency Handler This class manages the required Python packages and system executables required to run the pipeline. @@ -46,9 +46,7 @@ def __init__(self, dependencies=[], executables=[]): @property def depend(self): - """ Input Dependency List - """ - + """Input Dependency List""" return self._depend @depend.setter @@ -64,9 +62,7 @@ def depend(self, value): @property def execute(self): - """ Input Executable List - """ - + """Input Executable List""" return self._execute @execute.setter @@ -82,7 +78,7 @@ def execute(self, value): @staticmethod def _convert_to_float(string): - """ Convert String to Float + """Convert String to Float This method converts numerical strings to floats. @@ -97,7 +93,6 @@ def _convert_to_float(string): Converted value """ - try: val = float(string) except Exception: @@ -107,7 +102,7 @@ def _convert_to_float(string): @staticmethod def _slice_1d(array, indices): - """ Slice 1D + """Slice 1D Slice 1D list by indices. @@ -124,12 +119,11 @@ def _slice_1d(array, indices): Sliced list """ - return [array[index] for index in indices] @classmethod def _slice_2d(cls, array, indices): - """ Slice 2D + """Slice 2D Slice a list of lists by indices. @@ -146,12 +140,11 @@ def _slice_2d(cls, array, indices): Sliced list """ - return [cls._slice_1d(sublist, indices) for sublist in array] @staticmethod def _get_indices(array, value): - """ Get Indices + """Get Indices Get indices of array elements equal to input value. @@ -168,13 +161,14 @@ def _get_indices(array, value): List of indices """ - - return [index for index, element in enumerate(array) if - element == value] + return [ + index for index, element in enumerate(array) + if element == value + ] @classmethod def _slice_col_val(cls, array, col, value): - """ Slice by Column and Value + """Slice by Column and Value Slice a list of lists by elements in a given column equal to a given value. @@ -194,7 +188,6 @@ def _slice_col_val(cls, array, col, value): Slices list """ - return cls._slice_2d(array, cls._get_indices(array[col], value)) @staticmethod @@ -218,32 +211,31 @@ def _check_executable(exe_name): For invalid input type """ - if not isinstance(exe_name, str): - raise TypeError('Executable name must be a string.') def is_exe(fpath): - return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(exe_name) if not fpath: - - res = any([is_exe(os.path.join(path, exe_name)) for path in - os.environ["PATH"].split(os.pathsep)]) + res = any([ + is_exe(os.path.join(path, exe_name)) + for path in os.environ['PATH'].split(os.pathsep) + ]) else: - res = is_exe(exe_name) if not res: - raise IOError('{} does not appear to be a valid executable on ' - 'this system.'.format(exe_name)) + raise IOError( + f'{exe_name} does not appear to be a valid executable on ' + + 'this system.' + ) def _split_string(self, string): - """ Split String + """Split String This method splits the version number from the input module string. @@ -258,15 +250,14 @@ def _split_string(self, string): Array of string components """ - if self._greq in string: - val = re.split('({})'.format(self._greq), string) + val = re.split(f'({self._greq})', string) elif self._equal in string: - val = re.split('({})'.format(self._equal), string) + val = re.split(f'({self._equal})', string) elif self._great in string: - val = re.split('({})'.format(self._great), string) + val = re.split(f'({self._great})', string) elif self._less in string: raise ValueError('"<" not permitted in package version string.') @@ -277,43 +268,45 @@ def _split_string(self, string): return val def _split_strings(self): - """ Split Strings + """Split Strings This method splits the input dependency modules strings. """ - - self._depend_arr = list(map(list, zip(*[self._split_string(string) - for string in self.depend]))) + self._depend_arr = list(map( + list, + zip(*[self._split_string(string) for string in self.depend]) + )) self._dependency_set = set(self._depend_arr[0]) def _unique_dependencies(self): - """ Unique Dependencies + """Unique Dependencies This method creates a unique list of depencies. """ - for package_name in self._dependency_set: subset = self._slice_col_val(self._depend_arr, 0, package_name) if any(self._equal in element for element in subset): - subset = self._slice_col_val(subset, 1, self._equal) if any([ver != '' for ver in subset[2]]): - - subset = (self._slice_col_val(subset, 2, - str(max([self._convert_to_float(ver) - for ver in subset[2]])))) + subset = (self._slice_col_val( + subset, + 2, + str(max( + [self._convert_to_float(ver) for ver in subset[2]] + )) + )) subset = [element[0] for element in self._slice_2d(subset, [0])] self.dependency_list.append(''.join(subset)) def check_dependencies(self): - """ Check Dependencies + """Check Dependencies This method checks that the required dependencies are installed. @@ -323,7 +316,6 @@ def check_dependencies(self): List of depenecies with versions and paths """ - dependency_status_list = [] for dependency in self._dependency_set: @@ -331,8 +323,9 @@ def check_dependencies(self): try: package = importlib.import_module(dependency) except Exception: - raise ImportError('Could not import pipeline dependency ' - '{}'.format(dependency)) + raise ImportError( + f'Could not import pipeline dependency {dependency}' + ) if hasattr(package, '__version__'): version = package.__version__ @@ -346,14 +339,14 @@ def check_dependencies(self): else: path = 'N/A' - dependency_status_list.append(' - {} {} {}'.format( - package.__name__, - version, path)) + dependency_status_list.append( + f' - {package.__name__} {version} {path}' + ) return dependency_status_list def check_executables(self): - """ Check Executables + """Check Executables This method checks that the required executables are installed. @@ -363,19 +356,20 @@ def check_executables(self): List of executables with paths """ - executable_status_list = [] for executable in self.executable_list: self._check_executable(executable) - exe_path, err = (subprocess.Popen('which {0}'.format(executable), - shell=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE).communicate()) + exe_path, err = subprocess.Popen( + f'which {executable}', + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ).communicate() - string = ' - {} {}'.format(executable, - exe_path.rstrip().decode('utf-8')) + string = f' - {executable} {exe_path.rstrip().decode("utf-8")}' executable_status_list.append(string) diff --git a/shapepipe/pipeline/execute.py b/shapepipe/pipeline/execute.py index d5157c139..ed472ecb9 100644 --- a/shapepipe/pipeline/execute.py +++ b/shapepipe/pipeline/execute.py @@ -13,7 +13,7 @@ def execute(command_line): - """ Execute + """Execute This method executes a given command line. @@ -33,7 +33,6 @@ def execute(command_line): For invalid input type """ - if not isinstance(command_line, str): raise TypeError('Command line must be a string.') @@ -64,7 +63,6 @@ def check_executable(exe_name): For non-existent executable """ - if not isinstance(exe_name, str): raise TypeError('Executable name must be a string.') @@ -74,12 +72,16 @@ def is_exe(fpath): fpath, fname = os.path.split(exe_name) if not fpath: - res = any([is_exe(os.path.join(path, exe_name)) for path in - os.environ["PATH"].split(os.pathsep)]) + res = any([ + is_exe(os.path.join(path, exe_name)) + for path in os.environ['PATH'].split(os.pathsep) + ]) else: res = is_exe(exe_name) if not res: - raise OSError('{} does not appear to be a valid executable on this ' - 'system.'.format(exe_name)) + raise OSError( + f'{exe_name} does not appear to be a valid executable on this ' + + 'system.' + ) diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index ef37da171..0b78be80d 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -49,7 +49,6 @@ def find_files(path, pattern='*', ext='*'): For invalid extension format """ - dot = '.' star = '*' @@ -84,8 +83,8 @@ def check_duplicate(input_list): ------- ok : bool True (False) if does (does not) contain at least one duplicate - """ + """ input_set = set() for elem in input_list: @@ -130,7 +129,7 @@ def __init__(self, run_name, modules, config): 'FILE', 'CORRECT_FILE_PATTERN', ) - self._run_log_file = self.format( + self._run_log_file = self.setpath( self._output_dir, config.get('FILE', 'RUN_LOG_NAME'), '.txt', @@ -162,7 +161,6 @@ def run_dir(self): This method defines the run directory. """ - return self._run_dir @run_dir.setter @@ -177,7 +175,6 @@ def _input_dir(self): This method defines the input directories. """ - return self.__input_dir @_input_dir.setter @@ -192,7 +189,6 @@ def _output_dir(self): This method defines the output directory. """ - return self.__output_dir @_output_dir.setter @@ -212,7 +208,6 @@ def read_number_list(file_name): Number list file name """ - with open(file_name) as data_file: number_list = data_file.readlines() @@ -235,7 +230,6 @@ def check_dir(cls, dir_name, check_exists=False): If directory already exists """ - if check_exists and os.path.isdir(dir_name): raise OSError('Directory {dir_name} already exists.') @@ -253,7 +247,6 @@ def check_dirs(cls, dir_list): Directory list """ - return [cls.check_dir(dir) for dir in dir_list] @classmethod @@ -268,13 +261,12 @@ def mkdir(cls, dir_name): Directory name with full path """ - cls.check_dir(dir_name, check_exists=True) mkdir(dir_name) @staticmethod - def format(path, name, ext=''): - """Format Path Name + def setpath(path, name, ext=''): + """Set Path Name This method appends the file/directory name to the input path. @@ -293,7 +285,6 @@ def format(path, name, ext=''): Formated path """ - return f'{path}/{name}{ext}' @staticmethod @@ -313,7 +304,6 @@ def strip_slash(path): Updated path """ - return path.rstrip('/') @classmethod @@ -333,7 +323,6 @@ def strip_slash_list(cls, path_list): Updated paths """ - return [cls.strip_slash(path) for path in path_list] @staticmethod @@ -353,7 +342,6 @@ def flatten_list(input_list): Flattened list """ - return [item for sublist in input_list for item in sublist] @staticmethod @@ -374,7 +362,6 @@ def _get_module_run_name(dir): Module run name, module name, search string """ - string, module_run = dir.split(':') module = module_run.split('/')[0] @@ -408,15 +395,18 @@ def _check_input_dir_list(self, dir_list): module_run, module, _ = self._get_module_run_name(dir) last_module = self._run_log.get_last(module) input_dir.append( - self.format(self.format(last_module, module_run), 'output') + self.setpath( + self.setpath(last_module, module_run), + 'output', + ) ) elif 'all' in dir.lower(): module_run, module, _ = self._get_module_run_name(dir) all_runs = self._run_log.get_all(module) input_dir.extend([ - self.format( - self.format(run.split(' ')[0], module_run), + self.setpath( + self.setpath(run.split(' ')[0], module_run), 'output' ) for run in all_runs @@ -425,9 +415,12 @@ def _check_input_dir_list(self, dir_list): elif ':' in dir.lower(): module_run, _, string = self._get_module_run_name(dir) input_dir.append( - self.format( - self.format(self._run_log.get_run(string), module_run), - 'output' + self.setpath( + self.setpath( + self._run_log.get_run(string), + module_run, + ), + 'output', ) ) @@ -446,7 +439,6 @@ def _get_input_dir(self): This method sets the module input directory """ - self._input_dir = self._check_input_dir_list(self._input_list) def create_global_run_dirs(self): @@ -455,11 +447,10 @@ def create_global_run_dirs(self): This method creates the pipeline output directories for a given run. """ - - self.run_dir = self.format(self._output_dir, self._run_name) - self._log_dir = self.format(self.run_dir, 'logs') - self._tmp_dir = self.format(self.run_dir, 'tmp') - self.log_name = self.format(self._log_dir, self._log_name) + self.run_dir = self.setpath(self._output_dir, self._run_name) + self._log_dir = self.setpath(self.run_dir, 'logs') + self._tmp_dir = self.setpath(self.run_dir, 'tmp') + self.log_name = self.setpath(self._log_dir, self._log_name) self._run_log = RunLog( self._run_log_file, self._module_list, @@ -479,7 +470,6 @@ def _copy_config_to_log(self): Copy configuration file to run log directory. """ - config_file_name = os.path.basename(self._config.file_name) copyfile( @@ -503,7 +493,6 @@ def get_module_current_run(self, module): Module run count """ - return str(self._module_dict[module]['run_count']) def get_module_config_sec(self, module): @@ -522,7 +511,6 @@ def get_module_config_sec(self, module): Configuration file section name """ - return self._module_dict[module]['latest'].upper() def get_add_module_property(self, run_name, property): @@ -632,7 +620,6 @@ def _set_module_properties(self, module, run_name): Module name """ - module_props = { 'numbering_scheme': 'str', 'run_method': 'str', @@ -678,15 +665,17 @@ def _create_module_run_dirs(self, module, run_name): Module name """ - self._module_dict[module][run_name]['run_dir'] = ( - self.format(self._run_dir, run_name) + self.setpath(self._run_dir, run_name) ) self._module_dict[module][run_name]['log_dir'] = ( - self.format(self._module_dict[module][run_name]['run_dir'], 'logs') + self.setpath( + self._module_dict[module][run_name]['run_dir'], + 'logs', + ) ) self._module_dict[module][run_name]['output_dir'] = ( - self.format( + self.setpath( self._module_dict[module][run_name]['run_dir'], 'output', ) @@ -722,7 +711,6 @@ def _set_module_input_dir(self, module, run_name): Module name """ - if ( isinstance( self._module_dict[module][run_name]['input_module'], @@ -784,7 +772,6 @@ def _generate_re_pattern(match_pattern): For invalid input type """ - if not isinstance(match_pattern, str): TypeError('Match pattern must be a string.') @@ -820,7 +807,6 @@ def _strip_dir_from_file(file_name, dir_list): File name """ - return [ file_name.replace(_dir + '/', '') for _dir in dir_list if _dir in file_name @@ -848,7 +834,6 @@ def _get_re(cls, num_scheme): Regular Expression """ - # Raise an error if num_scheme is None, # but not if num_scheme=='' if not num_scheme and num_scheme != '': @@ -892,7 +877,6 @@ def _save_num_patterns( Output file name """ - # Find all files matching the input pattern and extension from the # available input directories and identify the correct path true_file_list = None @@ -990,7 +974,6 @@ def _save_match_patterns(output_file, mmap_list): List of memory maps """ - num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list] np.save( @@ -1080,7 +1063,6 @@ def _format_process_list( List of processes """ - pattern_list, ext_list, path_list = list(zip(*patterns)) if isinstance(self._number_list, type(None)): @@ -1134,13 +1116,12 @@ def _save_process_list( Numbering scheme """ - np_mmap_list = [ - self.format(self._tmp_dir, f'nums_{pattern}_{ext}.npy') + self.setpath(self._tmp_dir, f'nums_{pattern}_{ext}.npy') for pattern, ext in zip(pattern_list, ext_list) ] - match_mmap = self.format(self._tmp_dir, 'matching_num_patterns.npy') - self.process_mmap = self.format(self._tmp_dir, 'process_list.npy') + match_mmap = self.setpath(self._tmp_dir, 'matching_num_patterns.npy') + self.process_mmap = self.setpath(self._tmp_dir, 'process_list.npy') re_pattern = self._get_re(num_scheme) @@ -1181,7 +1162,6 @@ def remove_process_mmap(self): Remove process list memory map. """ - self._remove_mmaps([self.process_mmap]) def _get_module_input_files(self, module, run_name): @@ -1195,7 +1175,6 @@ def _get_module_input_files(self, module, run_name): Module name """ - dir_list = self._module_dict[module][run_name]['input_dir'] pattern_list = self._module_dict[module][run_name]['file_pattern'] ext_list = self._module_dict[module][run_name]['file_ext'] @@ -1221,7 +1200,6 @@ def set_up_module(self, module): Module name """ - if module in self._module_dict.keys(): self._module_dict[module]['run_count'] += 1 call_num = self._module_dict[module]['run_count'] @@ -1257,7 +1235,6 @@ def get_worker_log_name(self, module, file_number_string): Worker log file name """ - run_name = self._module_dict[module]['latest'] log_dir = self._module_dict[module][run_name]['log_dir'] diff --git a/shapepipe/pipeline/job_handler.py b/shapepipe/pipeline/job_handler.py index f4e0260d4..a14221d4a 100644 --- a/shapepipe/pipeline/job_handler.py +++ b/shapepipe/pipeline/job_handler.py @@ -17,7 +17,7 @@ class JobHandler(object): - """ Job Handler + """Job Handler This class handles the submition of jobs to workers distributed among a specified number of CPUs. @@ -74,7 +74,7 @@ def __init__(self, module, filehd, config, log, job_type='parallel', @property def config(self): - """ Config + """Config This method defines the configuation parser instance @@ -84,21 +84,21 @@ def config(self): For incorrect input type """ - return self._config @config.setter def config(self, value): if not isinstance(value, ConfigParser): - raise TypeError('config must be an instane of ' - 'configparser.ConfigParser') + raise TypeError( + 'config must be an instane of configparser.ConfigParser' + ) self._config = value @property def log(self): - """ Log + """Log This method defines the logging instance @@ -108,7 +108,6 @@ def log(self): For incorrect input type """ - return self._log @log.setter @@ -121,7 +120,7 @@ def log(self, value): @property def job_type(self): - """ Job Type + """Job Type This method defines the job type @@ -131,20 +130,19 @@ def job_type(self): For incorrect input type """ - return self._job_type @job_type.setter def job_type(self, value): if value not in ('serial', 'parallel'): - raise TypeError('{} is not a valid job type.'.format(value)) + raise TypeError(f'{value} is not a valid job type.') self._job_type = value @property def parallel_mode(self): - """ Parallel Mode + """Parallel Mode This method defines the mode of parallelisation. @@ -154,20 +152,19 @@ def parallel_mode(self): For incorrect input type """ - return self._parallel_mode @parallel_mode.setter def parallel_mode(self, value): if value not in ('smp', 'mpi'): - raise TypeError('{} is not a valid parallel mode.'.format(value)) + raise TypeError(f'{value} is not a valid parallel mode.') self._parallel_mode = value @property def batch_size(self): - """ Batch Size + """Batch Size This method defines the job batch size. @@ -177,14 +174,15 @@ def batch_size(self): For invalid batch size value """ - return self._batch_size @batch_size.setter def batch_size(self, value): - if (isinstance(value, type(None)) and - self.config.has_option('JOB', 'SMP_BATCH_SIZE')): + if ( + isinstance(value, type(None)) + and self.config.has_option('JOB', 'SMP_BATCH_SIZE') + ): value = self.config.getint('JOB', 'SMP_BATCH_SIZE') elif isinstance(value, type(None)): @@ -200,7 +198,7 @@ def batch_size(self, value): @property def backend(self): - """ Backend + """Backend This method defines the joblib backend. The default is 'loky'. @@ -210,26 +208,27 @@ def backend(self): For invalid backend value """ - return self._backend @backend.setter def backend(self, value): - if (isinstance(value, type(None)) and - self.config.has_option('JOB', 'SMP_BACKEND')): + if ( + isinstance(value, type(None)) + and self.config.has_option('JOB', 'SMP_BACKEND') + ): value = self.config.get('JOB', 'SMP_BACKEND').lower() elif isinstance(value, type(None)): value = 'loky' if value not in ('loky', 'multiprocessing', 'threading'): - raise ValueError('{} is not a valid joblib backend.'.format(value)) + raise ValueError(f'{value} is not a valid joblib backend.') self._backend = value @property def timeout(self): - """ Timeout Limit + """Timeout Limit This method defines the timeout limit for all jobs. @@ -241,14 +240,15 @@ def timeout(self): For invalid timeout limit value """ - return self._timeout @timeout.setter def timeout(self, value): - if (isinstance(value, type(None)) and - self.config.has_option('JOB', 'TIMEOUT')): + if ( + isinstance(value, type(None)) + and self.config.has_option('JOB', 'TIMEOUT') + ): value = self.config.get('JOB', 'TIMEOUT') value = self.hms2sec(value) if ':' in value else int(value) @@ -258,12 +258,11 @@ def timeout(self, value): self._timeout = value def finish_up(self): - """ Finish Up + """Finish Up Finish up JobHandler session. """ - self._check_for_errors() self._check_missed_processes() self.log.info('All processes complete') @@ -278,12 +277,11 @@ def finish_up(self): self.clean_up() def submit_jobs(self): - """ Submit Jobs + """Submit Jobs Submit jobs in serial or parallel. """ - if self.job_type == 'serial': self.submit_serial_job() else: @@ -293,7 +291,7 @@ def submit_jobs(self): @staticmethod def hms2sec(time_str): - """ HMS to Seconds + """HMS to Seconds Convert a string from hours, minutes and seconds to seconds. @@ -312,18 +310,16 @@ def hms2sec(time_str): Time strings should take the form 'HH:MM:SS'. """ - h, m, s = time_str.split(':') return int(h) * 3600 + int(m) * 60 + int(s) def _log_job_parameters(self): - """ Log Job Parameters + """Log Job Parameters This method logs the class instance parameters. """ - module_run_num = self.filehd.get_module_current_run(self._module) text = 'Starting job handler with:' @@ -358,12 +354,11 @@ def _log_job_parameters(self): self.log.info(time_info) def _distribute_smp_jobs(self): - """ Distribute SMP Jobs + """Distribute SMP Jobs This method distributes the jobs to the workers using SMP. """ - result = ( Parallel(n_jobs=self.batch_size, backend=self.backend)( delayed(WorkerHandler(verbose=self._verbose).worker)( @@ -383,12 +378,11 @@ def _distribute_smp_jobs(self): self.worker_dicts = result def submit_serial_job(self): - """ Submit Serial Job + """Submit Serial Job Submit a single serial job with access to all processes. """ - wh = WorkerHandler(verbose=self._verbose) process = self.filehd.process_list @@ -405,69 +399,68 @@ def submit_serial_job(self): self.worker_dicts = [result] def _check_for_errors(self): - """ Check for Errors + """Check for Errors This method checks the worker dictionaries for errors and exceptions. """ - # Check worker dictionaries for errors self._check_exception_status() self._check_stderr_status() def _check_exception_status(self): - """ Check Exception Status + """Check Exception Status This method checks the worker dictionaries for exceptions raised by Python and logs the instances. """ - for worker_dict in self.worker_dicts: if worker_dict['exception']: - self.log.info('ERROR: {} recorded in: {}'.format( - worker_dict['exception'], worker_dict['log'])) + self.log.info( + f'ERROR: {worker_dict["exception"]} recorded ' + + f'in: {worker_dict["log"]}' + ) self.error_count += 1 def _check_stderr_status(self): - """ Check STDERR Status + """Check STDERR Status This method checks the worker dictionaries for errors raised by stderr and logs the instances. """ - for worker_dict in self.worker_dicts: if worker_dict['stderr']: - self.log.info('ERROR: stderr recorded in: {}'.format( - worker_dict['log'])) + self.log.info( + f'ERROR: stderr recorded in: {worker_dict["log"]}' + ) self.error_count += 1 def _check_missed_processes(self): - """ Check Missed Processes + """Check Missed Processes This method checks the file handler for processes that were not submitted. """ - - missed_txt = (' - The following processes were not submitted to ' - 'workers:') + missed_txt = ( + ' - The following processes were not submitted to workers:' + ) if self.filehd.missed: self.log.info(missed_txt) - self.log.info(' - {}'.format(self.filehd.missed)) + self.log.info(f' - {self.filehd.missed}') if self._verbose: print(missed_txt) - print(' - {}'.format(self.filehd.missed)) + print(f' - {self.filehd.missed}') def clean_up(self): - """ Finish + """Finish Finish job handler instance. """ - self.filehd.remove_process_mmap() diff --git a/shapepipe/pipeline/mpi_run.py b/shapepipe/pipeline/mpi_run.py index 739f1c1eb..ec8f322b8 100644 --- a/shapepipe/pipeline/mpi_run.py +++ b/shapepipe/pipeline/mpi_run.py @@ -4,7 +4,7 @@ def split_mpi_jobs(jobs, batch_size): - """ Split MPI Jobs + """Split MPI Jobs Split the number of MPI jobs over the number of processes. @@ -21,18 +21,23 @@ def split_mpi_jobs(jobs, batch_size): Split list of jobs """ - return [jobs[_i::batch_size] for _i in range(batch_size)] -def submit_mpi_jobs(jobs, config, timeout, run_dirs, module_runner, - worker_log, verbose): - """ Submit MPI Jobs +def submit_mpi_jobs( + jobs, + config, + timeout, + run_dirs, + module_runner, + worker_log, + verbose, +): + """Submit MPI Jobs This method distributes the jobs to the workers using MPI. """ - result = [] for process in jobs: @@ -40,7 +45,14 @@ def submit_mpi_jobs(jobs, config, timeout, run_dirs, module_runner, w_log_name = worker_log(module_runner.__name__, process[0]) wh = WorkerHandler(verbose=verbose) - result.append(wh.worker(process[1:], process[0], w_log_name, - run_dirs, config, timeout, module_runner)) + result.append(wh.worker( + process[1:], + process[0], + w_log_name, + run_dirs, + config, + timeout, + module_runner + )) return result diff --git a/shapepipe/pipeline/run_log.py b/shapepipe/pipeline/run_log.py index 9de27a371..2b45414b3 100644 --- a/shapepipe/pipeline/run_log.py +++ b/shapepipe/pipeline/run_log.py @@ -12,7 +12,7 @@ class RunLog(object): - """ Run Log Class + """Run Log Class This class manages the run log for ShapePipe. @@ -36,29 +36,27 @@ def __init__(self, run_log_file, module_list, current_run): self._get_list() def _write(self): - """ Write + """Write Write current run to the run log. """ - with open(self.run_log_file, 'a') as run_log: run_log.write(f'{self.current_run} {self._module_list}\n') def _get_list(self): - """ Get List + """Get List Get a list of all runs in the run log. """ - with open(self.run_log_file, 'r') as run_log: lines = run_log.readlines() self._runs = [line.rstrip() for line in lines] def get_all(self, module): - """ Get All + """Get All Get all previous pipeline runs of a given model. @@ -73,7 +71,6 @@ def get_all(self, module): All run paths for a given module """ - all_runs = [ run for run in self._runs if module in run.split()[1].split(',') @@ -88,7 +85,7 @@ def get_all(self, module): return all_runs def get_last(self, module): - """ Get Last + """Get Last Get the last run of the pipeline for a given module. @@ -103,14 +100,13 @@ def get_last(self, module): The last run for a given module """ - all_runs = self.get_all(module) last_run = all_runs[0] return last_run.split(' ')[0] def get_run(self, search_string): - """ Get Run + """Get Run Get a specific run that matches the input search string. @@ -132,7 +128,6 @@ def get_run(self, search_string): If more than one run is found matches the search string """ - runs = [run for run in self._runs if search_string in run] if len(runs) < 1: diff --git a/shapepipe/pipeline/timeout.py b/shapepipe/pipeline/timeout.py index a1c4230f4..421eed52b 100644 --- a/shapepipe/pipeline/timeout.py +++ b/shapepipe/pipeline/timeout.py @@ -13,7 +13,7 @@ def with_timeout(timeout, log_file): - """ Timeout Limit Decorator + """Timeout Limit Decorator This method provides a timeout decorator for a given input function. @@ -30,10 +30,10 @@ def with_timeout(timeout, log_file): For process exceeding timeout limit """ - def handler(signum, frame): - raise TimeoutError('The process time exceeded {}s in ' - '{}'.format(timeout, log_file)) + raise TimeoutError( + f'The process time exceeded {timeout}s in {log_file}' + ) def decorator(decorated): diff --git a/shapepipe/pipeline/worker_handler.py b/shapepipe/pipeline/worker_handler.py index bdd80742f..852f33ba7 100644 --- a/shapepipe/pipeline/worker_handler.py +++ b/shapepipe/pipeline/worker_handler.py @@ -17,7 +17,7 @@ class WorkerHandler(object): - """ Worker Handler + """Worker Handler This class defines the worker to process a given job. @@ -41,7 +41,7 @@ def worker( timeout, module_runner ): - """ Worker + """Worker This method defines a worker. @@ -68,7 +68,6 @@ def worker( Worker dictionary """ - self._w_log_name = w_log_name self._run_dirs = run_dirs self._config = config @@ -84,7 +83,7 @@ def worker( @staticmethod def _set_job_name(num): - """ Set Job Name + """Set Job Name This method creates a job name for a given process number. @@ -99,8 +98,7 @@ def _set_job_name(num): Job name """ - - return 'process{}'.format(num) + return f'process{num}' def _prepare_worker(self, process, job_name, timeout, module): """ Prepare Worker @@ -119,7 +117,6 @@ def _prepare_worker(self, process, job_name, timeout, module): Module runner name """ - self.worker_dict['pid'] = getpid() self.worker_dict['threads'] = active_count() self.worker_dict['node'] = platform.node() @@ -140,45 +137,45 @@ def _create_worker_log(self): worker parameters. """ - process_size = len(str(self.worker_dict['process'])) if self._verbose: - print(' - {} PID: {} '.format( - self.worker_dict['job_name'], - self.worker_dict['pid']), end='') + job_name = self.worker_dict['job_name'] + pid = self.worker_dict['pid'] + + print(f' - {job_name} PID: {pid} ', end='') - if (process_size < - self._config.getint('WORKER', 'PROCESS_PRINT_LIMIT')): - print('processing {} {}'.format( - self.worker_dict['file_number_string'], - self.worker_dict['process'])) + if ( + process_size + < self._config.getint('WORKER', 'PROCESS_PRINT_LIMIT') + ): + print( + f'processing {self.worker_dict["file_number_string"]} ' + + f'{self.worker_dict["process"]}' + ) else: print() self.w_log = set_up_log(self._w_log_name, verbose=False) self.worker_dict['log'] = self.w_log.name self.w_log.info('Worker process running with:') - self.w_log.info(' - Job Name: {}'.format( - self.worker_dict['job_name'])) - self.w_log.info(' - PID: {}'.format(self.worker_dict['pid'])) - self.w_log.info(' - Threads: {}'.format(self.worker_dict['threads'])) - self.w_log.info(' - Node: {}'.format(self.worker_dict['node'])) - self.w_log.info(' - System: {}'.format(self.worker_dict['system'])) - self.w_log.info(' - Machine: {}'.format(self.worker_dict['machine'])) - self.w_log.info(' - Timeout Limit: {}'.format( - self.worker_dict['timeout'])) - self.w_log.info(' - Process: {}'.format(self.worker_dict['process'])) + self.w_log.info(f' - Job Name: {self.worker_dict["job_name"]}') + self.w_log.info(f' - PID: {self.worker_dict["pid"]}') + self.w_log.info(f' - Threads: {self.worker_dict["threads"]}') + self.w_log.info(f' - Node: {self.worker_dict["node"]}') + self.w_log.info(f' - System: {self.worker_dict["system"]}') + self.w_log.info(f' - Machine: {self.worker_dict["machine"]}') + self.w_log.info(f' - Timeout Limit: {self.worker_dict["timeout"]}') + self.w_log.info(f' - Process: {self.worker_dict["process"]}') def _run_worker(self): - """ Run Worker + """Run Worker This method runs the worker with a given timeout limit and catches the corresponding errors. """ - try: with_timeout(self.worker_dict['timeout'], self.w_log.name)( self._worker_execution @@ -189,17 +186,16 @@ def _run_worker(self): self.worker_dict['exception'] = type(err).__name__ def _worker_execution(self): - """ Worker Execution + """Worker Execution This method executes a worker job and logs the results. """ - self._run_module() self._log_stdout() def _run_module(self): - """ Run Module + """Run Module This method runs a module script. @@ -209,9 +205,8 @@ def _run_module(self): For non-existent module runner """ - self.w_log.info( - f" - Running module: {self.worker_dict['module']}" + f' - Running module: {self.worker_dict["module"]}' ) file_number_string = self.worker_dict['file_number_string'] @@ -227,12 +222,11 @@ def _run_module(self): ) def _log_stdout(self): - """ Log STDOUT + """Log STDOUT This method logs the stdout and stderr output of the job. """ - self.w_log.info( f'Process produced the following output: {self._stdout}' ) diff --git a/shapepipe/utilities/canfar.py b/shapepipe/utilities/canfar.py index a5fca1849..6e6c7f7b4 100644 --- a/shapepipe/utilities/canfar.py +++ b/shapepipe/utilities/canfar.py @@ -20,13 +20,12 @@ class vosError(Exception): - """ VOS Error + """VOS Error - Generic error that is raised by the vosHandler. + Generic error that is raised by the vosHandler. - """ - - pass + """ + pass class vosHandler: @@ -54,10 +53,11 @@ def _check_vos_install(): Check if VOS is correctly installed. """ - if import_fail: - raise ImportError('vos package not found, re-install ShapePipe ' - 'with \'./install_shapepipe --vos\'') + raise ImportError( + 'vos package not found, re-install ShapePipe ' + + 'with \'./install_shapepipe --vos\'' + ) @property def command(self): @@ -66,7 +66,6 @@ def command(self): This method sets the VOS command property. """ - return self._command @command.setter @@ -84,13 +83,11 @@ def __call__(self, *args, **kwargs): This method allows class instances to be called as functions. """ - try: self._command() - except: - raise vosError('Error in VOs command: {}' - ''.format(self._command.__name__)) + except Exception: + raise vosError(f'Error in VOs command: {self._command.__name__}') def download(source, target, verbose=False): @@ -109,8 +106,8 @@ def download(source, target, verbose=False): ------- status : bool status, True/False or success/failure - """ + """ if not os.path.exists(target): sys.argv = ['vcp', source, target] if verbose: diff --git a/shapepipe/utilities/file_system.py b/shapepipe/utilities/file_system.py index d2b802c09..9ea8f452c 100644 --- a/shapepipe/utilities/file_system.py +++ b/shapepipe/utilities/file_system.py @@ -12,13 +12,12 @@ class FileSystemError(Exception): - """ File System Error + """File System Error - Generic error that is raised by the file system. + Generic error that is raised by the file system. - """ - - pass + """ + pass def check_dir(dir_name): @@ -32,12 +31,11 @@ def check_dir(dir_name): Directory name """ - return os.path.isdir(dir_name) def mkdir(dir_name, check_created=True, exist_ok=True): - """ Make Directory + """Make Directory This method creates a directory in the specified path. @@ -60,9 +58,9 @@ def mkdir(dir_name, check_created=True, exist_ok=True): If directory not properly created. """ - os.makedirs(dir_name, exist_ok=exist_ok) if check_created and not check_dir(dir_name): - raise FileSystemError('Directory \"{}\" not found after mkdir command.' - ''.format(dir_name)) + raise FileSystemError( + f'Directory \"{dir_name}\" not found after mkdir command.' + ) diff --git a/shapepipe_run.py b/shapepipe_run.py index 287e23278..9920a861d 100755 --- a/shapepipe_run.py +++ b/shapepipe_run.py @@ -46,7 +46,6 @@ def set_up(self): Set up ShapePipe properties. """ - self._args = create_arg_parser() self.config = create_config_parser(self._args.config) self._set_run_name() @@ -63,7 +62,6 @@ def _set_run_name(self): Set the name of the current pipeline run. """ - self._run_name = self.config.get('DEFAULT', 'RUN_NAME') if self.config.getboolean('DEFAULT', 'RUN_DATETIME'): @@ -75,7 +73,6 @@ def _create_pipeline_log(self): Create a general logging instance for the pipeline run. """ - self.log = set_up_log(self.filehd.log_name, verbose=False) start_text = f'Starting ShapePipe Run: {self._run_name}' @@ -100,7 +97,6 @@ def close_pipeline_log(self): if error occurs during pipeline run """ - if self.error_count == 1: plur = ' was' else: @@ -134,7 +130,6 @@ def _get_module_depends(self, property): List of python dependencies, list of system executables """ - prop_list = [] module_runners = self.filehd.module_runners @@ -155,8 +150,6 @@ def _get_module_depends(self, property): property, ) - print(prop_list) - return prop_list def _check_dependencies(self): @@ -165,7 +158,6 @@ def _check_dependencies(self): Check that all pipeline dependencies have been installed. """ - module_dep = self._get_module_depends('depends') + __installs__ module_exe = self._get_module_depends('executes') @@ -212,7 +204,6 @@ def _check_module_versions(self): Check versions of the modules. """ - ver_text = 'Checking Module Versions:' self.log.info(ver_text) @@ -239,7 +230,6 @@ def _get_module_run_methods(self): Create a dictionary of modules with corresponding run methods. """ - self.run_method = {} for module in self.modules: @@ -254,7 +244,6 @@ def _prep_run(self): Run the pipeline. """ - # Make output directories for the pipeline run self.filehd.create_global_run_dirs() @@ -276,7 +265,6 @@ def record_mode(self): Log mode in which ShapePipe is running. """ - mode_text = f'Running ShapePipe using {self.mode}' self.log.info(mode_text) @@ -295,8 +283,8 @@ def run_smp(pipe): ---------- pipe : ShapePipe ShapePipe instance - """ + """ # Loop through modules to be run for module in pipe.modules: @@ -334,8 +322,8 @@ def run_mpi(pipe, comm): ShapePipe instance comm : MPI.COMM_WORLD MPI common world instance - """ + """ # Assign master node master = comm.rank == 0 From 2846c1b58c30f13f34b2254affe7877dd78ef641 Mon Sep 17 00:00:00 2001 From: Samuel Farrens Date: Thu, 15 Jul 2021 15:07:24 +0200 Subject: [PATCH 6/9] additional clean up --- README.rst | 128 +++++++++++++------- shapepipe/info.py | 2 +- shapepipe/modules/execute_example_runner.py | 2 +- shapepipe/modules/python_example_runner.py | 2 +- 4 files changed, 89 insertions(+), 45 deletions(-) diff --git a/README.rst b/README.rst index b9332a7ba..a785ce184 100644 --- a/README.rst +++ b/README.rst @@ -9,9 +9,9 @@ ShapePipe .. |python38| image:: https://img.shields.io/badge/python-3.8-green.svg :target: https://www.python.org/ -:Version: 0.0.3 +:Version: 0.0.4 -:Date: 19/05/2020 +:Date: 15/07/2021 ShapePipe is a galaxy shape measurement pipeline developed within the CosmoStat lab at CEA Paris-Saclay. @@ -161,14 +161,14 @@ The pipeline can be run with SMP as follows: .. code-block:: bash - $ ./shapepipe_run + $ shapepipe_run A list of command line arguments can be displayed using the ``--help`` option: .. code-block:: bash - $ ./shapepipe_run --help + $ shapepipe_run --help Running the Pipeline with MPI ----------------------------- @@ -177,7 +177,7 @@ The pipeline can be run with MPI as follows: .. code-block:: bash - $ mpiexec -n ./shapepipe_run + $ mpiexec -n shapepipe_run where ```` is the number of cores to allocate to the run. @@ -213,13 +213,18 @@ The configuration parameters for the pipeline are: ``shapepipe_runs``. 3. ``INPUT_DIR`` : (``str`` or ``list``) A valid directory containing input files for the first module or a comma separated list of directories. This - parameter also recognizes the following special strings: + parameter also recognises the following special strings: a. ``last:MODULE`` : This will point to the output directory of the last run of the specified module. - b. ``PATTERN:MODULE`` : This will point to the output directory of a + b. ``all:MODULE`` : This will point to all the output directories in which + the specified module was run. + c. ``PATTERN:MODULE`` : This will point to the output directory of a specified module from a run matching the specified pattern. + In all cases the module name can be succeded by the run number (*e.g.* + ``MODULE/run_2``) + 4. ``OUTPUT_DIR`` : (``str``) A valid directory to write the pipeline output files. 5. ``FILE_PATTERN`` : (``str`` or ``list``) A list of string patterns to @@ -256,6 +261,17 @@ additional values to list properties as follows: [MODULE_NAME] ADD_PARAMETER = PARAMETER VALUE +If a given module is run more than once, run specific parameter values can be +specified as follows: + +.. code-block:: bash + + [MODULE_NAME/RUN_X] + PARAMETER = PARAMETER VALUE + +Where ``X`` is an integer greater than ``1``. + + Development =========== @@ -271,7 +287,7 @@ with the ``module_runner`` wrapper that outputs the module ``stdout`` and .. code-block:: python - @module_runner() + @module_runner(version=0.1) def example_module(*args) # DO SOMETHING @@ -282,7 +298,9 @@ The module runner decorator takes the following keyword arguments: 1. ``input_module`` : (``str`` or ``list``) The name of a preceding module(s) whose output provide(s) the input to this module. Default value is ``None``. -2. ``version`` : (``str``) The module version. Default value is ``'0.0'``. +2. ``version`` : (``str``) The module version. Default value is ``'0.0'``. The + module version should always be explicitly declared and be greater than the + default value. 3. ``file_pattern`` : (``str`` or ``list``) The input file pattern(s) to look for. Default value is ``''``. 4. ``file_ext`` : (``str`` or ``list``) The input file extensions(s) to look @@ -297,7 +315,8 @@ The module runner decorator takes the following keyword arguments: The arguments passed to the module runner are the following: 1. ``input_file_list`` : The list of input files. -2. ``output_dir`` : The directory for the module output files. +2. ``run_dirs`` : The dictionary containing module run paths (*e.g.* the output + path for a given module run is ``run_dirs['output']``). 3. ``file_number_string`` : The number pattern corresponding to the current process. 4. ``config`` : The config parser instance, which provides access to the @@ -308,7 +327,9 @@ The arguments passed to the module runner are the following: parameter_value = config.get('MODULE_NAME', 'PARAMETER') -5. ``w_log`` : The worker log instance, which can be used to record additional +5 ``module_config_sec`` : A string specifying the configuration file section + to be read. +6. ``w_log`` : The worker log instance, which can be used to record additional messages in the module output logs using the following structure: .. code-block:: python @@ -322,11 +343,10 @@ The following example module runners are provided in ``shapepipe.modules``. **Python Example** -In this example a Python script using a ``Dummy()`` class is implemented. This -module does not read inputs from any preceding module, but looks for files +In this example a Python script using a ``PythonExample()`` class is implemented. +This module does not read inputs from any preceding module, but looks for files in the ``INPUT_DIR`` that match the file patterns ``'numbers'`` and -``'letters'`` with file extension ``'.txt'``. This module depends on -``numpy``. +``'letters'`` with file extension ``'.txt'``. This module depends on ``numpy``. As this module does not implement any system executable, it is not necessary to return a ``stderr``. Instead any output content that should be @@ -335,20 +355,31 @@ return ``None, None``. .. code-block:: python - @module_runner(version='1.0', file_pattern=['numbers', 'letters'], - file_ext='.txt', depends='numpy') - def python_example(input_file_list, output_dir, file_number_string, - config, w_log): - - output_file_name = ('{}/pyex_output{}.cat'.format(output_dir, - file_number_string)) - message = config.get('PYTHON_EXAMPLE', 'MESSAGE') - - inst = Dummy() - inst.read_files(*input_file_list) - inst.write_file(output_file_name, message) - - return inst.content, None + @module_runner( + version='1.1', + file_pattern=['numbers', 'letters'], + file_ext='.txt', + depends='numpy', + ) + def python_example_runner( + input_file_list, + run_dirs, + file_number_string, + config, + module_config_sec, + w_log, + ): + + output_file_name = ( + f'{run_dirs["output"]}/pyex_output{file_number_string}.cat' + ) + message = config.get(module_config_sec, 'MESSAGE') + + inst = pe.PythonExample(0) + inst.read_files(*input_file_list) + inst.write_file(output_file_name, message) + + return inst.content, None **Executable Example** @@ -358,17 +389,30 @@ the file pattern ``'process'`` with file extension ``'.cat'``. .. code-block:: python - @module_runner(input_module='python_example', version='1.0', - file_pattern='pyex_output', file_ext='.cat', executes='head') - def execute_example(input_file_list, output_dir, file_number_string, *args): - - command_line = 'head {}'.format(input_file_list[0]) - output_file_name = '{}/head_output{}.txt'.format(output_dir, - file_number_string) + @module_runner( + input_module='python_example', + version='1.0', + file_pattern='pyex_output', + file_ext='.cat', + executes='head', + ) + def execute_example_runner( + input_file_list, + run_dirs, + file_number_string, + config, + module_config_sec, + w_log, + ): + + command_line = f'head {input_file_list[0]}' + output_file_name = ( + f'{run_dirs["output"]}/head_output{file_number_string}.txt' + ) + + stdout, stderr = execute(command_line) + + text_file = open(output_file_name, 'w') + text_file.write(stdout) - stdout, stderr = execute(command_line) - - text_file = open(output_file_name, 'w') - text_file.write(stdout) - - return stdout, stderr + return stdout, stderr diff --git a/shapepipe/info.py b/shapepipe/info.py index ab1f1c02e..2203dd16a 100644 --- a/shapepipe/info.py +++ b/shapepipe/info.py @@ -17,7 +17,7 @@ # Package Info -version_info = (0, 0, 3) +version_info = (0, 0, 4) __version__ = '.'.join(str(c) for c in version_info) __name__ = 'shapepipe' __author__ = 'Samuel Farrens' diff --git a/shapepipe/modules/execute_example_runner.py b/shapepipe/modules/execute_example_runner.py index ad7ae580a..42e3cb7b6 100644 --- a/shapepipe/modules/execute_example_runner.py +++ b/shapepipe/modules/execute_example_runner.py @@ -31,7 +31,7 @@ def execute_example_runner( command_line = f'head {input_file_list[0]}' output_file_name = ( - f"{run_dirs['output']}/head_output{file_number_string}.txt" + f'{run_dirs["output"]}/head_output{file_number_string}.txt' ) stdout, stderr = execute(command_line) diff --git a/shapepipe/modules/python_example_runner.py b/shapepipe/modules/python_example_runner.py index 0571d9e43..fc1995277 100644 --- a/shapepipe/modules/python_example_runner.py +++ b/shapepipe/modules/python_example_runner.py @@ -43,7 +43,7 @@ def python_example_runner( ): output_file_name = ( - f"{run_dirs['output']}/pyex_output{file_number_string}.cat" + f'{run_dirs["output"]}/pyex_output{file_number_string}.cat' ) message = config.get(module_config_sec, 'MESSAGE') From b906a4a8a73f1fd224f488e113106d45a396715c Mon Sep 17 00:00:00 2001 From: Samuel Farrens Date: Fri, 23 Jul 2021 11:33:17 +0200 Subject: [PATCH 7/9] update multiple runs of a module to produce independent run directories --- example/config.ini | 12 +++++------- shapepipe/pipeline/file_handler.py | 27 +++++++++++++++++++++------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/example/config.ini b/example/config.ini index fab6ee3ac..49541f944 100644 --- a/example/config.ini +++ b/example/config.ini @@ -54,20 +54,18 @@ TIMEOUT = 00:01:35 ; PROCESS_PRINT_LIMIT = 100 ## Module options -[PYTHON_EXAMPLE_RUNNER] +[PYTHON_EXAMPLE_RUNNER_RUN_1] MESSAGE = The obtained value is: [SERIAL_EXAMPLE_RUNNER] ADD_INPUT_DIR = ./example/data/numbers, ./example/data/letters -[PYTHON_EXAMPLE_RUNNER/RUN_2] - -INPUT_MODULE = python_example_runner, execute_example_runner +[PYTHON_EXAMPLE_RUNNER_RUN_2] +INPUT_MODULE = python_example_runner_run_1, execute_example_runner FILE_PATTERN = pyex_output, head_output FILE_EXT = cat, txt MESSAGE = The new obtained value is: -[EXECUTE_EXAMPLE_RUNNER/RUN_2] - -INPUT_MODULE = python_example_runner/run_2 +[EXECUTE_EXAMPLE_RUNNER_RUN_2] +INPUT_MODULE = python_example_runner_run_2 diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index 0b78be80d..a5fea95ac 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -722,14 +722,23 @@ def _set_module_input_dir(self, module, run_name): else: + print(self._module_dict) + input_dir = [] - for in_mod_run in ( + for input_module in ( self._module_dict[module][run_name]['input_module'] ): - input_mod = in_mod_run.split('/')[0] - if input_mod in self._module_dict: + run_split = '_run_' + if run_split in input_module: + in_mod_run = input_module + input_module = input_module.split('_run_')[0] + else: + in_mod_run = self._module_dict[input_module]['latest'] + if input_module in self._module_dict: input_dir.append( - self._module_dict[input_mod][in_mod_run]['output_dir'] + self._module_dict[input_module][in_mod_run][ + 'output_dir' + ] ) if self._config.has_option(run_name.upper(), 'INPUT_DIR'): @@ -1200,13 +1209,19 @@ def set_up_module(self, module): Module name """ + + multi_call = self._module_list.count(module) > 1 + if module in self._module_dict.keys(): self._module_dict[module]['run_count'] += 1 - call_num = self._module_dict[module]['run_count'] - run_name = f'{module}/run_{call_num}' else: self._module_dict[module] = {} self._module_dict[module]['run_count'] = 1 + + if multi_call: + call_num = self._module_dict[module]['run_count'] + run_name = f'{module}_run_{call_num}' + else: run_name = module self._module_dict[module]['latest'] = run_name From 9188e34efe5911655848516f1c8b67e3b1f7f6c8 Mon Sep 17 00:00:00 2001 From: Samuel Farrens Date: Fri, 23 Jul 2021 12:10:17 +0200 Subject: [PATCH 8/9] updated handling of previous runs --- README.rst | 4 +-- shapepipe/pipeline/file_handler.py | 8 ++---- shapepipe/pipeline/run_log.py | 3 ++ shapepipe/pipeline/shared.py | 46 ++++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 7 deletions(-) create mode 100644 shapepipe/pipeline/shared.py diff --git a/README.rst b/README.rst index a785ce184..af23732ea 100644 --- a/README.rst +++ b/README.rst @@ -223,7 +223,7 @@ The configuration parameters for the pipeline are: specified module from a run matching the specified pattern. In all cases the module name can be succeded by the run number (*e.g.* - ``MODULE/run_2``) + ``MODULE_run_2``) 4. ``OUTPUT_DIR`` : (``str``) A valid directory to write the pipeline output files. @@ -266,7 +266,7 @@ specified as follows: .. code-block:: bash - [MODULE_NAME/RUN_X] + [MODULE_NAME_RUN_X] PARAMETER = PARAMETER VALUE Where ``X`` is an integer greater than ``1``. diff --git a/shapepipe/pipeline/file_handler.py b/shapepipe/pipeline/file_handler.py index a5fea95ac..186baea1f 100644 --- a/shapepipe/pipeline/file_handler.py +++ b/shapepipe/pipeline/file_handler.py @@ -15,6 +15,7 @@ from glob import glob from functools import reduce, partial from shapepipe.pipeline.run_log import RunLog +from shapepipe.pipeline.shared import split_module_run from shapepipe.modules.module_runners import get_module_runners from shapepipe.utilities.file_system import mkdir @@ -728,11 +729,8 @@ def _set_module_input_dir(self, module, run_name): for input_module in ( self._module_dict[module][run_name]['input_module'] ): - run_split = '_run_' - if run_split in input_module: - in_mod_run = input_module - input_module = input_module.split('_run_')[0] - else: + input_module, in_mod_run = split_module_run(input_module) + if in_mod_run == input_module: in_mod_run = self._module_dict[input_module]['latest'] if input_module in self._module_dict: input_dir.append( diff --git a/shapepipe/pipeline/run_log.py b/shapepipe/pipeline/run_log.py index 2b45414b3..82694ae72 100644 --- a/shapepipe/pipeline/run_log.py +++ b/shapepipe/pipeline/run_log.py @@ -9,6 +9,7 @@ """ import numpy as np +from shapepipe.pipeline.shared import split_module_run class RunLog(object): @@ -71,6 +72,8 @@ def get_all(self, module): All run paths for a given module """ + module, _ = split_module_run(module) + all_runs = [ run for run in self._runs if module in run.split()[1].split(',') diff --git a/shapepipe/pipeline/shared.py b/shapepipe/pipeline/shared.py new file mode 100644 index 000000000..6275a1ab9 --- /dev/null +++ b/shapepipe/pipeline/shared.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- + +"""SHARED + +The module defines functions that can be shared between pipeline modules. + +:Author: Samuel Farrens + +""" + + +def split_module_run(module_str): + """Split Module Run + + Extract module name and run from input string. + + Parameters + ---------- + module_str : str + Module name or run string + + Returns + ------- + tuple + Module name and module run string + + Raises + ------ + TypeError + If input is not a string + + """ + if not isinstance(module_str, str): + raise TypeError( + f'Input module_str must be a string not {type(module_str)}.' + ) + + run_split = '_run_' + module_run = module_str + + if run_split in module_str: + module_name = module_str.split(run_split)[0] + else: + module_name = module_str + + return module_name, module_run From 889e0ef068c476c6c6e72043359ec425244b5194 Mon Sep 17 00:00:00 2001 From: Samuel Farrens Date: Thu, 29 Jul 2021 17:13:29 +0200 Subject: [PATCH 9/9] fixed typo in readme --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index af23732ea..c73845e6a 100644 --- a/README.rst +++ b/README.rst @@ -269,7 +269,7 @@ specified as follows: [MODULE_NAME_RUN_X] PARAMETER = PARAMETER VALUE -Where ``X`` is an integer greater than ``1``. +Where ``X`` is an integer greater than or equal to ``1``. Development