From 86520e7b4a1f4b1d02cd8e573fb500c8ef176aa6 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Thu, 6 Jun 2019 23:30:49 +0200 Subject: [PATCH 1/9] Prioritize task execution by recipe order --- esmvalcore/_recipe.py | 5 +++++ esmvalcore/_task.py | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 6ab0236f55..c6bdb1fc34 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -1016,6 +1016,7 @@ def initialize_tasks(self): logger.info("Creating tasks from recipe") tasks = set() + priority = 0 for diagnostic_name, diagnostic in self.diagnostics.items(): logger.info("Creating tasks for diagnostic %s", diagnostic_name) @@ -1029,7 +1030,9 @@ def initialize_tasks(self): profiles=self._preprocessors, config_user=self._cfg, task_name=task_name) + task.priority = priority tasks.add(task) + priority += 1 # Create diagnostic tasks for script_name, script_cfg in diagnostic['scripts'].items(): @@ -1040,7 +1043,9 @@ def initialize_tasks(self): output_dir=script_cfg['output_dir'], settings=script_cfg['settings'], name=task_name) + task.priority = priority tasks.add(task) + priority += 1 check.tasks_valid(tasks) diff --git a/esmvalcore/_task.py b/esmvalcore/_task.py index 8ab7cb68d8..926b061111 100644 --- a/esmvalcore/_task.py +++ b/esmvalcore/_task.py @@ -199,6 +199,7 @@ def __init__(self, ancestors=None, name=''): self.output_files = None self.name = name self.activity = None + self.priority = 0 def initialize_provenance(self, recipe_entity): """Initialize task provenance activity.""" @@ -601,7 +602,8 @@ def _run_tasks_sequential(tasks): n_tasks = len(get_flattened_tasks(tasks)) logger.info("Running %s tasks sequentially", n_tasks) - for task in get_independent_tasks(tasks): + tasks = get_independent_tasks(tasks) + for task in sorted(tasks, key=lambda t: t.priority): task.run() @@ -626,7 +628,7 @@ def done(task): while scheduled or running: # Submit new tasks to pool just_scheduled = [] - for task in scheduled: + for task in sorted(scheduled, key=lambda t: t.priority): if not task.ancestors or all(done(t) for t in task.ancestors): result = pool.apply_async(_run_task, [task]) results.append(result) From 0f90584c1f088680a1477ef5d5d56c9ac829a214 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Fri, 28 Jun 2019 11:53:52 +0200 Subject: [PATCH 2/9] Fix task priority and add test --- esmvalcore/_recipe.py | 3 +- esmvalcore/_task.py | 78 +++++++++++++++-------------- esmvalcore/preprocessor/__init__.py | 3 +- tests/integration/test_task.py | 42 ++++++++++++++++ 4 files changed, 85 insertions(+), 41 deletions(-) create mode 100644 tests/integration/test_task.py diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index c6bdb1fc34..70bf003af7 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -1030,7 +1030,8 @@ def initialize_tasks(self): profiles=self._preprocessors, config_user=self._cfg, task_name=task_name) - task.priority = priority + for task0 in task.flatten(): + task0.priority = priority tasks.add(task) priority += 1 diff --git a/esmvalcore/_task.py b/esmvalcore/_task.py index 926b061111..826d2f6e7e 100644 --- a/esmvalcore/_task.py +++ b/esmvalcore/_task.py @@ -10,12 +10,12 @@ import threading import time from copy import deepcopy -from multiprocessing import Pool, cpu_count +from multiprocessing import Pool import psutil import yaml -from ._config import TAGS, replace_tags, DIAGNOSTICS_PATH +from ._config import DIAGNOSTICS_PATH, TAGS, replace_tags from ._provenance import TrackedFile, get_task_provenance logger = logging.getLogger(__name__) @@ -193,9 +193,10 @@ def _ncl_type(value): class BaseTask: """Base class for defining task classes.""" - def __init__(self, ancestors=None, name=''): + def __init__(self, ancestors=None, name='', products=None): """Initialize task.""" self.ancestors = [] if ancestors is None else ancestors + self.products = set() if products is None else set(products) self.output_files = None self.name = name self.activity = None @@ -255,10 +256,9 @@ class DiagnosticTask(BaseTask): def __init__(self, script, settings, output_dir, ancestors=None, name=''): """Create a diagnostic task.""" - super(DiagnosticTask, self).__init__(ancestors=ancestors, name=name) + super().__init__(ancestors=ancestors, name=name) self.script = script self.settings = settings - self.products = set() self.output_dir = output_dir self.cmd = self._initialize_cmd(script) self.log = os.path.join(settings['run_dir'], 'log.txt') @@ -267,8 +267,7 @@ def __init__(self, script, settings, output_dir, ancestors=None, name=''): def _initialize_cmd(self, script): """Create a an executable command from script.""" - diagnostics_root = os.path.join( - DIAGNOSTICS_PATH, 'diag_scripts') + diagnostics_root = os.path.join(DIAGNOSTICS_PATH, 'diag_scripts') script_file = os.path.abspath(os.path.join(diagnostics_root, script)) if not os.path.isfile(script_file): @@ -457,8 +456,8 @@ def _run(self, input_files): env['MPLBACKEND'] = 'Agg' else: # Make diag_scripts path available to diagostics scripts - env['diag_scripts'] = os.path.join( - DIAGNOSTICS_PATH, 'diag_scripts') + env['diag_scripts'] = os.path.join(DIAGNOSTICS_PATH, + 'diag_scripts') cmd = list(self.cmd) settings_file = self.write_settings() @@ -610,46 +609,38 @@ def _run_tasks_sequential(tasks): def _run_tasks_parallel(tasks, max_parallel_tasks=None): """Run tasks in parallel.""" scheduled = get_flattened_tasks(tasks) - running = [] - results = [] + running = {} n_scheduled, n_running = len(scheduled), len(running) n_tasks = n_scheduled - pool = Pool(processes=max_parallel_tasks) - - logger.info("Running %s tasks using at most %s processes", n_tasks, - max_parallel_tasks or cpu_count()) + if max_parallel_tasks is None: + max_parallel_tasks = os.cpu_count() + if max_parallel_tasks > n_tasks: + max_parallel_tasks = n_tasks + logger.info("Running %s tasks using %s processes", n_tasks, + max_parallel_tasks) def done(task): """Assume a task is done if it not scheduled or running.""" return not (task in scheduled or task in running) + pool = Pool(processes=max_parallel_tasks) while scheduled or running: # Submit new tasks to pool - just_scheduled = [] for task in sorted(scheduled, key=lambda t: t.priority): - if not task.ancestors or all(done(t) for t in task.ancestors): - result = pool.apply_async(_run_task, [task]) - results.append(result) - running.append(task) - just_scheduled.append(task) - for task in just_scheduled: - scheduled.remove(task) + if len(running) >= max_parallel_tasks: + break + if all(done(t) for t in task.ancestors): + future = pool.apply_async(_run_task, [task]) + running[task] = future + scheduled.remove(task) # Handle completed tasks - for task, result in zip(running, results): - if result.ready(): - task.output_files, updated_products = result.get() - for updated in updated_products: - for original in task.products: - if original.filename == updated.filename: - updated.copy_provenance(target=original) - break - else: - task.products.add(updated) - running.remove(task) - results.remove(result) + ready = {t for t in running if running[t].ready()} + for task in ready: + _copy_results(task, future) + running.pop(task) # Wait if there are still tasks running if running: @@ -660,14 +651,25 @@ def done(task): n_scheduled, n_running = len(scheduled), len(running) n_done = n_tasks - n_scheduled - n_running logger.info( - "Progress: %s tasks running or queued, %s tasks waiting for " - "ancestors, %s/%s done", n_running, n_scheduled, n_done, - n_tasks) + "Progress: %s tasks running, %s tasks waiting for ancestors, " + "%s/%s done", n_running, n_scheduled, n_done, n_tasks) pool.close() pool.join() +def _copy_results(task, future): + """Update task with the results from the remote process.""" + task.output_files, updated_products = future.get() + for updated in updated_products: + for original in task.products: + if original.filename == updated.filename: + updated.copy_provenance(target=original) + break + else: + task.products.add(updated) + + def _run_task(task): """Run task and return the result.""" output_files = task.run() diff --git a/esmvalcore/preprocessor/__init__.py b/esmvalcore/preprocessor/__init__.py index dabf231b14..6cb3a8dc66 100644 --- a/esmvalcore/preprocessor/__init__.py +++ b/esmvalcore/preprocessor/__init__.py @@ -349,9 +349,8 @@ def __init__( write_ncl_interface=False, ): """Initialize""" - super(PreprocessingTask, self).__init__(ancestors=ancestors, name=name) _check_multi_model_settings(products) - self.products = set(products) + super().__init__(ancestors=ancestors, name=name, products=products) self.order = list(order) self.debug = debug self.write_ncl_interface = write_ncl_interface diff --git a/tests/integration/test_task.py b/tests/integration/test_task.py new file mode 100644 index 0000000000..645860290c --- /dev/null +++ b/tests/integration/test_task.py @@ -0,0 +1,42 @@ +from functools import partial +from multiprocessing.pool import ThreadPool + +import pytest + +import esmvalcore +from esmvalcore._task import (BaseTask, _run_tasks_parallel, + _run_tasks_sequential) + + +@pytest.mark.parametrize('run_tasks', [ + _run_tasks_sequential, + partial(_run_tasks_parallel, max_parallel_tasks=1), +]) +def test_tasks_run_(monkeypatch, run_tasks): + + order = [] + + def _run(self, input_files): + print(f'running task {self.name} with priority {self.priority}') + order.append(self.priority) + return [f'{self.name}_test.nc'] + + monkeypatch.setattr(BaseTask, '_run', _run) + monkeypatch.setattr(esmvalcore._task, 'Pool', ThreadPool) + + tasks = set() + for i in range(3): + task = BaseTask( + name=f'task{i}', + ancestors=[ + BaseTask(name=f'task{i}-ancestor{j}') for j in range(3) + ], + ) + for task0 in task.flatten(): + task0.priority = i + tasks.add(task) + + run_tasks(tasks) + print(order) + assert len(order) == 12 + assert order == sorted(order) From a1c497e101cce8b412fc5dd5282f3ed112884652 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Fri, 2 Aug 2019 15:45:30 +0200 Subject: [PATCH 3/9] Fix typo --- esmvalcore/_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esmvalcore/_task.py b/esmvalcore/_task.py index 826d2f6e7e..19284a73b6 100644 --- a/esmvalcore/_task.py +++ b/esmvalcore/_task.py @@ -639,7 +639,7 @@ def done(task): # Handle completed tasks ready = {t for t in running if running[t].ready()} for task in ready: - _copy_results(task, future) + _copy_results(task, running[task]) running.pop(task) # Wait if there are still tasks running From 8f2c65fd3ea0330c181cf861157165299e8f9172 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Fri, 2 Aug 2019 15:46:15 +0200 Subject: [PATCH 4/9] Add more tests --- tests/integration/test_task.py | 74 ++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 17 deletions(-) diff --git a/tests/integration/test_task.py b/tests/integration/test_task.py index 645860290c..4f0f9e854d 100644 --- a/tests/integration/test_task.py +++ b/tests/integration/test_task.py @@ -1,3 +1,4 @@ +import os from functools import partial from multiprocessing.pool import ThreadPool @@ -5,15 +6,66 @@ import esmvalcore from esmvalcore._task import (BaseTask, _run_tasks_parallel, - _run_tasks_sequential) + _run_tasks_sequential, run_tasks) -@pytest.mark.parametrize('run_tasks', [ +@pytest.fixture +def example_tasks(): + """Example tasks for testing the task runners.""" + tasks = set() + for i in range(3): + task = BaseTask( + name=f'task{i}', + ancestors=[ + BaseTask(name=f'task{i}-ancestor{j}') for j in range(3) + ], + ) + for task0 in task.flatten(): + task0.priority = i + tasks.add(task) + + return tasks + + +@pytest.mark.parametrize('max_parallel_tasks', [1, 2, 3, 4, 16, None]) +def test_run_tasks(monkeypatch, tmp_path, max_parallel_tasks, example_tasks): + """Check that tasks are run correctly.""" + def _run(self, input_files): + output_file = tmp_path / self.name + + msg = ('running {} in thread {}, using input {}, generating {}'.format( + self.name, os.getpid(), input_files, output_file)) + print(msg) + + # Check that the output is created just once + assert not output_file.exists() + output_file.write_text(msg) + output_file = str(output_file) + + # Check that ancestor results are provided correctly + assert len(self.ancestors) == len(input_files) + for ancestor in self.ancestors: + assert len(ancestor.output_files) == 1 + assert ancestor.output_files[0].startswith(output_file) + assert str(tmp_path / ancestor.name) in input_files + + return [output_file] + + monkeypatch.setattr(BaseTask, '_run', _run) + + run_tasks(example_tasks, max_parallel_tasks) + + for task in example_tasks: + print(task.name, task.output_files) + assert task.output_files + + +@pytest.mark.parametrize('runner', [ _run_tasks_sequential, partial(_run_tasks_parallel, max_parallel_tasks=1), ]) -def test_tasks_run_(monkeypatch, run_tasks): - +def test_runner_uses_priority(monkeypatch, runner, example_tasks): + """Check that the runner tries to respect task priority.""" order = [] def _run(self, input_files): @@ -24,19 +76,7 @@ def _run(self, input_files): monkeypatch.setattr(BaseTask, '_run', _run) monkeypatch.setattr(esmvalcore._task, 'Pool', ThreadPool) - tasks = set() - for i in range(3): - task = BaseTask( - name=f'task{i}', - ancestors=[ - BaseTask(name=f'task{i}-ancestor{j}') for j in range(3) - ], - ) - for task0 in task.flatten(): - task0.priority = i - tasks.add(task) - - run_tasks(tasks) + runner(example_tasks) print(order) assert len(order) == 12 assert order == sorted(order) From 8abd7e81fd7d09ba72c3a1bdc0e793bed26b0f42 Mon Sep 17 00:00:00 2001 From: Mattia Righi Date: Wed, 18 Sep 2019 10:36:36 +0200 Subject: [PATCH 5/9] Fix forgotten conflict --- esmvalcore/_recipe.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 18f91b76ee..ee0acc2a0f 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -1183,13 +1183,9 @@ def initialize_tasks(self): script=script_cfg['script'], output_dir=script_cfg['output_dir'], settings=script_cfg['settings'], -<<<<<<< HEAD - name=task_name) - task.priority = priority -======= name=task_name, ) ->>>>>>> development + task.priority = priority tasks.add(task) priority += 1 From 698532d704d8a6f4763b8d76fc6d4bc174b63fc9 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Wed, 18 Sep 2019 15:40:53 +0200 Subject: [PATCH 6/9] Add time information per task --- esmvalcore/_task.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/esmvalcore/_task.py b/esmvalcore/_task.py index 19284a73b6..6b09c7abe2 100644 --- a/esmvalcore/_task.py +++ b/esmvalcore/_task.py @@ -226,8 +226,10 @@ def run(self, input_files=None): input_files.extend(task.run()) logger.info("Starting task %s in process [%s]", self.name, os.getpid()) + start = datetime.datetime.now() self.output_files = self._run(input_files) - logger.info("Successfully completed task %s", self.name) + logger.info("Successfully completed task %s in %s", + self.name, datetime.datetime.now() - start) return self.output_files From 35d7c9309005743851dcd88c2352d93a45774fc5 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Wed, 18 Sep 2019 15:51:38 +0200 Subject: [PATCH 7/9] Also mention priority --- esmvalcore/_task.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/esmvalcore/_task.py b/esmvalcore/_task.py index 6b09c7abe2..1351921783 100644 --- a/esmvalcore/_task.py +++ b/esmvalcore/_task.py @@ -228,8 +228,9 @@ def run(self, input_files=None): os.getpid()) start = datetime.datetime.now() self.output_files = self._run(input_files) - logger.info("Successfully completed task %s in %s", - self.name, datetime.datetime.now() - start) + runtime = datetime.datetime.now() - start + logger.info("Successfully completed task %s (priority %s) in %s", + self.name, self.priority, runtime) return self.output_files From 820fe2448ddf03b4595f7cafb97edf248f7c1b8a Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Fri, 20 Sep 2019 16:59:52 +0200 Subject: [PATCH 8/9] Add documentation introducing the task concept --- doc/esmvalcore/recipe.rst | 144 ++++++++++++++++++++++++++++++++------ 1 file changed, 123 insertions(+), 21 deletions(-) diff --git a/doc/esmvalcore/recipe.rst b/doc/esmvalcore/recipe.rst index c1396a4163..d7abac519c 100644 --- a/doc/esmvalcore/recipe.rst +++ b/doc/esmvalcore/recipe.rst @@ -147,7 +147,7 @@ arguments): operator: mean multi_model_statistics: span: overlap - statistics: [mean ] + statistics: [mean] .. note:: @@ -164,12 +164,127 @@ Recipe section: ``diagnostics`` The diagnostics section includes one or more diagnostics. Each diagnostics will include: -- a list of which variables to load; -- a description of the variables (optional); -- the preprocessor to be applied to each variable; -- the script to be run; +- the variables to preprocess, including the preprocessor to be applied to each variable; +- the diagnostic script(s) to be run; +- a description of the diagnostic and lists of themes and realms that it applies to; - an optional ``additional_datasets`` section. +The diagnostic section defines tasks +------------------------------------ +The diagnostic section(s) define the tasks that will be run when running the recipe. +For each variable a preprocesing task will be defined and for each diagnostic script a +diagnostic task will be defined. If variables need to be derived +from other variables, a preprocessing task for each of the variables +needed to derive that variable will be defined as well. These tasks can be viewed +in the main_log_debug.txt file that is produced every run. Each task has a unique +name that defines the directory where the results of that task are stored. Task +names start with the name of the diagnostic section followed by a ``/`` and then +the name of the variable section for a preprocessing task or the name of the diagnostic +script section for a diagnostic task. + +A (simplified) example diagnostics section could look like + +.. code-block:: yaml + + diagnostics: + diagnostic_name: + description: Air temperature tutorial diagnostic. + themes: + - phys + realms: + - atmos + variables: + variable_name: + short_name: ta + preprocessor: preprocessor_name + mip: Amon + scripts: + script_name: + script: examples/diagnostic.py + + +Note that the example recipe above contains a single diagnostic section +called ``diagnostic_name`` and will result in two tasks: + +- a preprocessing task called ``diagnostic_name/variable_name`` that will preprocess + air temperature data for each dataset in the Datasets_ section of the recipe (not shown). +- a diagnostic task called ``diagnostic_name/script_name`` + +The path to the script provided in the `script` option should be +either the absolute path to the script, or the path relative to the +``esmvaltool/diag_scripts`` directory. + +Ancestor tasks +-------------- +Some tasks require the result of other tasks to be ready before they can start, +e.g. a diagnostic script needs the preprocessed variable data to start. Thus +each tasks has zero or more ancestor tasks. By the default, each diagnostic task +in a diagnostic section has all variable preprocessing tasks in that same section +as ancestors. However, this can be changed using the ``ancestors`` keyword. Note +that wildcard expansion can be used to define ancestors. + +.. code-block:: yaml + + diagnostics: + diagnostic_1: + variables: + airtemp: + short_name: ta + preprocessor: preprocessor_name + mip: Amon + scripts: + script_a: + script: diagnostic_a.py + diagnostic_2: + variables: + precip: + short_name: pr + preprocessor: preprocessor_name + mip: Amon + scripts: + script_b: + script: diagnostic_b.py + ancestors: [diagnostic_1/script_a, precip] + + +The example recipe above will result in four tasks: + +- a preprocessing task called ``diagnostic_1/airtemp`` +- a diagnostic task called ``diagnostic_1/script_a`` +- a preprocessing task called ``diagnostic_2/precip`` +- a diagnostic task called ``diagnostic_2/script_b`` + +the preprocessing tasks do not have any ancestors, while the diagnostic_a.py +script will receive the preprocessed air temperature data +(has ancestor ``diagnostic_1/airtemp``) and the diagnostic_b.py +script will receive the results of diagnostic_a.py and the preprocessed precipitation +data (has ancestors ``diagnostic_1/script_a`` and ``diagnostic_2/precip``). + +Task priority +------------- +Tasks are assigned a priority, with tasks appearing earlier on in the recipe +getting higher priority. The tasks will be executed sequentially or in parellel, +depending on the setting of `max_parallel_tasks` in the :ref:`user configuration file`. +When choosing the next task to execute, the task with the highest priority that +is not waiting for ancestor tasks will be chosen. This feature makes it possible to +reduce the processing time of recipes with many tasks, by placing tasks that +take relatively long at the top of the recipe. Of course this only works when +settings ``max_parallel_tasks`` to a value larger than 1. The current priority +and run time of individual tasks can be seen in the log messages shown when running the tool. + +Variable and dataset definitions +-------------------------------- +To define a variable/dataset combination that corresponds to an actual +variable from a dataset, the keys in each of the diagnostic's variable section +are combined with the keys of each dataset definition. If two versions of the same +key are provided, then the key in the datasets section will take precedence +over the keys in variables section. For many recipes it makes more sense to +define the ``start_year`` and ``end_year`` items in the variable section, +because the diagnostic script assumes that all the data has the same time +range. + +Diagnostic and variable specific datasets +----------------------------------------- The ``additional_datasets`` can add datasets beyond those listed in the the Datasets_ section. This is useful if specific datasets need to be used only by a specific diagnostic. The ``additional_datasets`` can also be used to add @@ -199,22 +314,9 @@ map scipt, ``ocean/diagnostic_maps.py``. Global_Ocean_Surface_regrid_map: script: ocean/diagnostic_maps.py -To define a variable/dataset combination, the keys in the diagnostic section -are combined with the keys from datasets section. If two versions of the same -key are provided, then the key in the datasets section will take precedence -over the keys in variables section. For many recipes it makes more sense to -define the ``start_year`` and ``end_year`` items in the variable section, -because the diagnostic script assumes that all the data has the same time -range. - -Note that the path to the script provided in the `script` option should be -either the absolute path to the script, or the path relative to the -``esmvaltool/diag_scripts`` directory. - - -Passing arguments to a diagnostic ---------------------------------- -The ``diagnostics`` section may include a lot of arguments that can be used by +Passing arguments to a diagnostic script +---------------------------------------- +The diagnostic script section(s) may include custom arguments that can be used by the diagnostic script; these arguments are stored at runtime in a dictionary that is then made available to the diagnostic script via the interface link, independent of the language the diagnostic script is written in. Here is an From 0815a920309df375b95666dfd79d7237bbdd0d62 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Mon, 23 Sep 2019 11:05:34 +0200 Subject: [PATCH 9/9] Address review comments --- doc/esmvalcore/recipe.rst | 45 +++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/doc/esmvalcore/recipe.rst b/doc/esmvalcore/recipe.rst index d7abac519c..6ba46e1848 100644 --- a/doc/esmvalcore/recipe.rst +++ b/doc/esmvalcore/recipe.rst @@ -161,24 +161,24 @@ arguments): Recipe section: ``diagnostics`` =============================== -The diagnostics section includes one or more diagnostics. Each diagnostics will -include: +The diagnostics section includes one or more diagnostics. Each diagnostic +section will include: -- the variables to preprocess, including the preprocessor to be applied to each variable; +- the variable(s) to preprocess, including the preprocessor to be applied to each variable; - the diagnostic script(s) to be run; - a description of the diagnostic and lists of themes and realms that it applies to; - an optional ``additional_datasets`` section. -The diagnostic section defines tasks ------------------------------------- -The diagnostic section(s) define the tasks that will be run when running the recipe. +The diagnostics section defines tasks +------------------------------------- +The diagnostic section(s) define the tasks that will be executed when running the recipe. For each variable a preprocesing task will be defined and for each diagnostic script a diagnostic task will be defined. If variables need to be derived from other variables, a preprocessing task for each of the variables needed to derive that variable will be defined as well. These tasks can be viewed in the main_log_debug.txt file that is produced every run. Each task has a unique -name that defines the directory where the results of that task are stored. Task -names start with the name of the diagnostic section followed by a ``/`` and then +name that defines the subdirectory where the results of that task are stored. Task +names start with the name of the diagnostic section followed by a '/' and then the name of the variable section for a preprocessing task or the name of the diagnostic script section for a diagnostic task. @@ -210,7 +210,7 @@ called ``diagnostic_name`` and will result in two tasks: air temperature data for each dataset in the Datasets_ section of the recipe (not shown). - a diagnostic task called ``diagnostic_name/script_name`` -The path to the script provided in the `script` option should be +The path to the script provided in the ``script`` option should be either the absolute path to the script, or the path relative to the ``esmvaltool/diag_scripts`` directory. @@ -218,7 +218,7 @@ Ancestor tasks -------------- Some tasks require the result of other tasks to be ready before they can start, e.g. a diagnostic script needs the preprocessed variable data to start. Thus -each tasks has zero or more ancestor tasks. By the default, each diagnostic task +each tasks has zero or more ancestor tasks. By default, each diagnostic task in a diagnostic section has all variable preprocessing tasks in that same section as ancestors. However, this can be changed using the ``ancestors`` keyword. Note that wildcard expansion can be used to define ancestors. @@ -264,18 +264,20 @@ Task priority ------------- Tasks are assigned a priority, with tasks appearing earlier on in the recipe getting higher priority. The tasks will be executed sequentially or in parellel, -depending on the setting of `max_parallel_tasks` in the :ref:`user configuration file`. -When choosing the next task to execute, the task with the highest priority that -is not waiting for ancestor tasks will be chosen. This feature makes it possible to +depending on the setting of ``max_parallel_tasks`` in the :ref:`user configuration file`. +When there are fewer than ``max_parallel_tasks`` running, tasks will be started +according to their priority. For obvious reasons, only tasks that are not waiting for +ancestor tasks can be started. This feature makes it possible to reduce the processing time of recipes with many tasks, by placing tasks that -take relatively long at the top of the recipe. Of course this only works when +take relatively long near the top of the recipe. Of course this only works when settings ``max_parallel_tasks`` to a value larger than 1. The current priority -and run time of individual tasks can be seen in the log messages shown when running the tool. +and run time of individual tasks can be seen in the log messages shown when +running the tool (a lower number means higher priority). Variable and dataset definitions -------------------------------- To define a variable/dataset combination that corresponds to an actual -variable from a dataset, the keys in each of the diagnostic's variable section +variable from a dataset, the keys in each variable section are combined with the keys of each dataset definition. If two versions of the same key are provided, then the key in the datasets section will take precedence over the keys in variables section. For many recipes it makes more sense to @@ -285,11 +287,12 @@ range. Diagnostic and variable specific datasets ----------------------------------------- -The ``additional_datasets`` can add datasets beyond those listed in the the -Datasets_ section. This is useful if specific datasets need to be used only by -a specific diagnostic. The ``additional_datasets`` can also be used to add -variable specific datasets. This is also a good way to add observational -datasets, which are usually variable-specific. +The ``additional_datasets`` option can be used to add datasets beyond those +listed in the Datasets_ section. This is useful if specific datasets need to +be used only by a specific diagnostic or variable, i.e. it can be added both +at diagnostic level, where it will apply to all variables in that diagnostic +section or at individual variable level. For example, this can be a good way +to add observational datasets, which are usually variable-specific. Running a simple diagnostic ---------------------------