diff --git a/doc/esmvalcore/recipe.rst b/doc/esmvalcore/recipe.rst index c1396a4163..6ba46e1848 100644 --- a/doc/esmvalcore/recipe.rst +++ b/doc/esmvalcore/recipe.rst @@ -147,7 +147,7 @@ arguments): operator: mean multi_model_statistics: span: overlap - statistics: [mean ] + statistics: [mean] .. note:: @@ -161,20 +161,138 @@ arguments): Recipe section: ``diagnostics`` =============================== -The diagnostics section includes one or more diagnostics. Each diagnostics will -include: +The diagnostics section includes one or more diagnostics. Each diagnostic +section will include: -- a list of which variables to load; -- a description of the variables (optional); -- the preprocessor to be applied to each variable; -- the script to be run; +- the variable(s) to preprocess, including the preprocessor to be applied to each variable; +- the diagnostic script(s) to be run; +- a description of the diagnostic and lists of themes and realms that it applies to; - an optional ``additional_datasets`` section. -The ``additional_datasets`` can add datasets beyond those listed in the the -Datasets_ section. This is useful if specific datasets need to be used only by -a specific diagnostic. The ``additional_datasets`` can also be used to add -variable specific datasets. This is also a good way to add observational -datasets, which are usually variable-specific. +The diagnostics section defines tasks +------------------------------------- +The diagnostic section(s) define the tasks that will be executed when running the recipe. +For each variable a preprocesing task will be defined and for each diagnostic script a +diagnostic task will be defined. If variables need to be derived +from other variables, a preprocessing task for each of the variables +needed to derive that variable will be defined as well. These tasks can be viewed +in the main_log_debug.txt file that is produced every run. Each task has a unique +name that defines the subdirectory where the results of that task are stored. Task +names start with the name of the diagnostic section followed by a '/' and then +the name of the variable section for a preprocessing task or the name of the diagnostic +script section for a diagnostic task. + +A (simplified) example diagnostics section could look like + +.. code-block:: yaml + + diagnostics: + diagnostic_name: + description: Air temperature tutorial diagnostic. + themes: + - phys + realms: + - atmos + variables: + variable_name: + short_name: ta + preprocessor: preprocessor_name + mip: Amon + scripts: + script_name: + script: examples/diagnostic.py + + +Note that the example recipe above contains a single diagnostic section +called ``diagnostic_name`` and will result in two tasks: + +- a preprocessing task called ``diagnostic_name/variable_name`` that will preprocess + air temperature data for each dataset in the Datasets_ section of the recipe (not shown). +- a diagnostic task called ``diagnostic_name/script_name`` + +The path to the script provided in the ``script`` option should be +either the absolute path to the script, or the path relative to the +``esmvaltool/diag_scripts`` directory. + +Ancestor tasks +-------------- +Some tasks require the result of other tasks to be ready before they can start, +e.g. a diagnostic script needs the preprocessed variable data to start. Thus +each tasks has zero or more ancestor tasks. By default, each diagnostic task +in a diagnostic section has all variable preprocessing tasks in that same section +as ancestors. However, this can be changed using the ``ancestors`` keyword. Note +that wildcard expansion can be used to define ancestors. + +.. code-block:: yaml + + diagnostics: + diagnostic_1: + variables: + airtemp: + short_name: ta + preprocessor: preprocessor_name + mip: Amon + scripts: + script_a: + script: diagnostic_a.py + diagnostic_2: + variables: + precip: + short_name: pr + preprocessor: preprocessor_name + mip: Amon + scripts: + script_b: + script: diagnostic_b.py + ancestors: [diagnostic_1/script_a, precip] + + +The example recipe above will result in four tasks: + +- a preprocessing task called ``diagnostic_1/airtemp`` +- a diagnostic task called ``diagnostic_1/script_a`` +- a preprocessing task called ``diagnostic_2/precip`` +- a diagnostic task called ``diagnostic_2/script_b`` + +the preprocessing tasks do not have any ancestors, while the diagnostic_a.py +script will receive the preprocessed air temperature data +(has ancestor ``diagnostic_1/airtemp``) and the diagnostic_b.py +script will receive the results of diagnostic_a.py and the preprocessed precipitation +data (has ancestors ``diagnostic_1/script_a`` and ``diagnostic_2/precip``). + +Task priority +------------- +Tasks are assigned a priority, with tasks appearing earlier on in the recipe +getting higher priority. The tasks will be executed sequentially or in parellel, +depending on the setting of ``max_parallel_tasks`` in the :ref:`user configuration file`. +When there are fewer than ``max_parallel_tasks`` running, tasks will be started +according to their priority. For obvious reasons, only tasks that are not waiting for +ancestor tasks can be started. This feature makes it possible to +reduce the processing time of recipes with many tasks, by placing tasks that +take relatively long near the top of the recipe. Of course this only works when +settings ``max_parallel_tasks`` to a value larger than 1. The current priority +and run time of individual tasks can be seen in the log messages shown when +running the tool (a lower number means higher priority). + +Variable and dataset definitions +-------------------------------- +To define a variable/dataset combination that corresponds to an actual +variable from a dataset, the keys in each variable section +are combined with the keys of each dataset definition. If two versions of the same +key are provided, then the key in the datasets section will take precedence +over the keys in variables section. For many recipes it makes more sense to +define the ``start_year`` and ``end_year`` items in the variable section, +because the diagnostic script assumes that all the data has the same time +range. + +Diagnostic and variable specific datasets +----------------------------------------- +The ``additional_datasets`` option can be used to add datasets beyond those +listed in the Datasets_ section. This is useful if specific datasets need to +be used only by a specific diagnostic or variable, i.e. it can be added both +at diagnostic level, where it will apply to all variables in that diagnostic +section or at individual variable level. For example, this can be a good way +to add observational datasets, which are usually variable-specific. Running a simple diagnostic --------------------------- @@ -199,22 +317,9 @@ map scipt, ``ocean/diagnostic_maps.py``. Global_Ocean_Surface_regrid_map: script: ocean/diagnostic_maps.py -To define a variable/dataset combination, the keys in the diagnostic section -are combined with the keys from datasets section. If two versions of the same -key are provided, then the key in the datasets section will take precedence -over the keys in variables section. For many recipes it makes more sense to -define the ``start_year`` and ``end_year`` items in the variable section, -because the diagnostic script assumes that all the data has the same time -range. - -Note that the path to the script provided in the `script` option should be -either the absolute path to the script, or the path relative to the -``esmvaltool/diag_scripts`` directory. - - -Passing arguments to a diagnostic ---------------------------------- -The ``diagnostics`` section may include a lot of arguments that can be used by +Passing arguments to a diagnostic script +---------------------------------------- +The diagnostic script section(s) may include custom arguments that can be used by the diagnostic script; these arguments are stored at runtime in a dictionary that is then made available to the diagnostic script via the interface link, independent of the language the diagnostic script is written in. Here is an diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 684dfa07b5..ee0acc2a0f 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -1155,6 +1155,7 @@ def initialize_tasks(self): logger.info("Creating tasks from recipe") tasks = set() + priority = 0 for diagnostic_name, diagnostic in self.diagnostics.items(): logger.info("Creating tasks for diagnostic %s", diagnostic_name) @@ -1169,7 +1170,10 @@ def initialize_tasks(self): config_user=self._cfg, task_name=task_name, ) + for task0 in task.flatten(): + task0.priority = priority tasks.add(task) + priority += 1 # Create diagnostic tasks for script_name, script_cfg in diagnostic['scripts'].items(): @@ -1181,7 +1185,9 @@ def initialize_tasks(self): settings=script_cfg['settings'], name=task_name, ) + task.priority = priority tasks.add(task) + priority += 1 check.tasks_valid(tasks) diff --git a/esmvalcore/_task.py b/esmvalcore/_task.py index 8ab7cb68d8..1351921783 100644 --- a/esmvalcore/_task.py +++ b/esmvalcore/_task.py @@ -10,12 +10,12 @@ import threading import time from copy import deepcopy -from multiprocessing import Pool, cpu_count +from multiprocessing import Pool import psutil import yaml -from ._config import TAGS, replace_tags, DIAGNOSTICS_PATH +from ._config import DIAGNOSTICS_PATH, TAGS, replace_tags from ._provenance import TrackedFile, get_task_provenance logger = logging.getLogger(__name__) @@ -193,12 +193,14 @@ def _ncl_type(value): class BaseTask: """Base class for defining task classes.""" - def __init__(self, ancestors=None, name=''): + def __init__(self, ancestors=None, name='', products=None): """Initialize task.""" self.ancestors = [] if ancestors is None else ancestors + self.products = set() if products is None else set(products) self.output_files = None self.name = name self.activity = None + self.priority = 0 def initialize_provenance(self, recipe_entity): """Initialize task provenance activity.""" @@ -224,8 +226,11 @@ def run(self, input_files=None): input_files.extend(task.run()) logger.info("Starting task %s in process [%s]", self.name, os.getpid()) + start = datetime.datetime.now() self.output_files = self._run(input_files) - logger.info("Successfully completed task %s", self.name) + runtime = datetime.datetime.now() - start + logger.info("Successfully completed task %s (priority %s) in %s", + self.name, self.priority, runtime) return self.output_files @@ -254,10 +259,9 @@ class DiagnosticTask(BaseTask): def __init__(self, script, settings, output_dir, ancestors=None, name=''): """Create a diagnostic task.""" - super(DiagnosticTask, self).__init__(ancestors=ancestors, name=name) + super().__init__(ancestors=ancestors, name=name) self.script = script self.settings = settings - self.products = set() self.output_dir = output_dir self.cmd = self._initialize_cmd(script) self.log = os.path.join(settings['run_dir'], 'log.txt') @@ -266,8 +270,7 @@ def __init__(self, script, settings, output_dir, ancestors=None, name=''): def _initialize_cmd(self, script): """Create a an executable command from script.""" - diagnostics_root = os.path.join( - DIAGNOSTICS_PATH, 'diag_scripts') + diagnostics_root = os.path.join(DIAGNOSTICS_PATH, 'diag_scripts') script_file = os.path.abspath(os.path.join(diagnostics_root, script)) if not os.path.isfile(script_file): @@ -456,8 +459,8 @@ def _run(self, input_files): env['MPLBACKEND'] = 'Agg' else: # Make diag_scripts path available to diagostics scripts - env['diag_scripts'] = os.path.join( - DIAGNOSTICS_PATH, 'diag_scripts') + env['diag_scripts'] = os.path.join(DIAGNOSTICS_PATH, + 'diag_scripts') cmd = list(self.cmd) settings_file = self.write_settings() @@ -601,53 +604,46 @@ def _run_tasks_sequential(tasks): n_tasks = len(get_flattened_tasks(tasks)) logger.info("Running %s tasks sequentially", n_tasks) - for task in get_independent_tasks(tasks): + tasks = get_independent_tasks(tasks) + for task in sorted(tasks, key=lambda t: t.priority): task.run() def _run_tasks_parallel(tasks, max_parallel_tasks=None): """Run tasks in parallel.""" scheduled = get_flattened_tasks(tasks) - running = [] - results = [] + running = {} n_scheduled, n_running = len(scheduled), len(running) n_tasks = n_scheduled - pool = Pool(processes=max_parallel_tasks) - - logger.info("Running %s tasks using at most %s processes", n_tasks, - max_parallel_tasks or cpu_count()) + if max_parallel_tasks is None: + max_parallel_tasks = os.cpu_count() + if max_parallel_tasks > n_tasks: + max_parallel_tasks = n_tasks + logger.info("Running %s tasks using %s processes", n_tasks, + max_parallel_tasks) def done(task): """Assume a task is done if it not scheduled or running.""" return not (task in scheduled or task in running) + pool = Pool(processes=max_parallel_tasks) while scheduled or running: # Submit new tasks to pool - just_scheduled = [] - for task in scheduled: - if not task.ancestors or all(done(t) for t in task.ancestors): - result = pool.apply_async(_run_task, [task]) - results.append(result) - running.append(task) - just_scheduled.append(task) - for task in just_scheduled: - scheduled.remove(task) + for task in sorted(scheduled, key=lambda t: t.priority): + if len(running) >= max_parallel_tasks: + break + if all(done(t) for t in task.ancestors): + future = pool.apply_async(_run_task, [task]) + running[task] = future + scheduled.remove(task) # Handle completed tasks - for task, result in zip(running, results): - if result.ready(): - task.output_files, updated_products = result.get() - for updated in updated_products: - for original in task.products: - if original.filename == updated.filename: - updated.copy_provenance(target=original) - break - else: - task.products.add(updated) - running.remove(task) - results.remove(result) + ready = {t for t in running if running[t].ready()} + for task in ready: + _copy_results(task, running[task]) + running.pop(task) # Wait if there are still tasks running if running: @@ -658,14 +654,25 @@ def done(task): n_scheduled, n_running = len(scheduled), len(running) n_done = n_tasks - n_scheduled - n_running logger.info( - "Progress: %s tasks running or queued, %s tasks waiting for " - "ancestors, %s/%s done", n_running, n_scheduled, n_done, - n_tasks) + "Progress: %s tasks running, %s tasks waiting for ancestors, " + "%s/%s done", n_running, n_scheduled, n_done, n_tasks) pool.close() pool.join() +def _copy_results(task, future): + """Update task with the results from the remote process.""" + task.output_files, updated_products = future.get() + for updated in updated_products: + for original in task.products: + if original.filename == updated.filename: + updated.copy_provenance(target=original) + break + else: + task.products.add(updated) + + def _run_task(task): """Run task and return the result.""" output_files = task.run() diff --git a/esmvalcore/preprocessor/__init__.py b/esmvalcore/preprocessor/__init__.py index eac02111a8..6793d2c3e8 100644 --- a/esmvalcore/preprocessor/__init__.py +++ b/esmvalcore/preprocessor/__init__.py @@ -360,9 +360,8 @@ def __init__( write_ncl_interface=False, ): """Initialize""" - super(PreprocessingTask, self).__init__(ancestors=ancestors, name=name) _check_multi_model_settings(products) - self.products = set(products) + super().__init__(ancestors=ancestors, name=name, products=products) self.order = list(order) self.debug = debug self.write_ncl_interface = write_ncl_interface diff --git a/tests/integration/test_task.py b/tests/integration/test_task.py new file mode 100644 index 0000000000..4f0f9e854d --- /dev/null +++ b/tests/integration/test_task.py @@ -0,0 +1,82 @@ +import os +from functools import partial +from multiprocessing.pool import ThreadPool + +import pytest + +import esmvalcore +from esmvalcore._task import (BaseTask, _run_tasks_parallel, + _run_tasks_sequential, run_tasks) + + +@pytest.fixture +def example_tasks(): + """Example tasks for testing the task runners.""" + tasks = set() + for i in range(3): + task = BaseTask( + name=f'task{i}', + ancestors=[ + BaseTask(name=f'task{i}-ancestor{j}') for j in range(3) + ], + ) + for task0 in task.flatten(): + task0.priority = i + tasks.add(task) + + return tasks + + +@pytest.mark.parametrize('max_parallel_tasks', [1, 2, 3, 4, 16, None]) +def test_run_tasks(monkeypatch, tmp_path, max_parallel_tasks, example_tasks): + """Check that tasks are run correctly.""" + def _run(self, input_files): + output_file = tmp_path / self.name + + msg = ('running {} in thread {}, using input {}, generating {}'.format( + self.name, os.getpid(), input_files, output_file)) + print(msg) + + # Check that the output is created just once + assert not output_file.exists() + output_file.write_text(msg) + output_file = str(output_file) + + # Check that ancestor results are provided correctly + assert len(self.ancestors) == len(input_files) + for ancestor in self.ancestors: + assert len(ancestor.output_files) == 1 + assert ancestor.output_files[0].startswith(output_file) + assert str(tmp_path / ancestor.name) in input_files + + return [output_file] + + monkeypatch.setattr(BaseTask, '_run', _run) + + run_tasks(example_tasks, max_parallel_tasks) + + for task in example_tasks: + print(task.name, task.output_files) + assert task.output_files + + +@pytest.mark.parametrize('runner', [ + _run_tasks_sequential, + partial(_run_tasks_parallel, max_parallel_tasks=1), +]) +def test_runner_uses_priority(monkeypatch, runner, example_tasks): + """Check that the runner tries to respect task priority.""" + order = [] + + def _run(self, input_files): + print(f'running task {self.name} with priority {self.priority}') + order.append(self.priority) + return [f'{self.name}_test.nc'] + + monkeypatch.setattr(BaseTask, '_run', _run) + monkeypatch.setattr(esmvalcore._task, 'Pool', ThreadPool) + + runner(example_tasks) + print(order) + assert len(order) == 12 + assert order == sorted(order)