From ab69a62b71a84b4e3dad36c4684d1d62c6e76c42 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 22 Jan 2021 17:46:13 -0800 Subject: [PATCH 1/3] Refactor `task_groups` & `task_prefixes` Moves `task_groups` and `task_prefixes` to `SchedulerState` where they are type annotated. Then uses them through `parent` within `Scheduler`. Allows Cython to recognize these are Python `dict`s and optimize calls and operations on them. --- distributed/scheduler.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5a60eac5f92..03c8307b413 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1585,6 +1585,8 @@ class SchedulerState: _resources: object _saturated: set _tasks: dict + _task_groups: dict + _task_prefixes: dict _task_metadata: dict _total_nthreads: Py_ssize_t _total_occupancy: double @@ -1635,6 +1637,8 @@ def __init__( self._tasks = tasks else: self._tasks = dict() + self._task_groups = dict() + self._task_prefixes = dict() self._task_metadata = dict() self._total_nthreads = 0 self._total_occupancy = 0 @@ -1691,6 +1695,14 @@ def saturated(self): def tasks(self): return self._tasks + @property + def task_groups(self): + return self._task_groups + + @property + def task_prefixes(self): + return self._task_prefixes + @property def task_metadata(self): return self._task_metadata @@ -1738,6 +1750,8 @@ def __pdict__(self): "unknown_durations": self._unknown_durations, "validate": self._validate, "tasks": self._tasks, + "task_groups": self._task_groups, + "task_prefixes": self._task_prefixes, "total_nthreads": self._total_nthreads, "total_occupancy": self._total_occupancy, "extensions": self._extensions, @@ -2926,8 +2940,6 @@ def __init__( # Task state tasks = dict() - self.task_groups = dict() - self.task_prefixes = dict() for old_attr, new_attr, wrap in [ ("priority", "priority", None), ("dependencies", "dependencies", _legacy_task_key_set), @@ -3920,16 +3932,16 @@ def new_task(self, key, spec, state): ts._state = state prefix_key = key_split(key) try: - tp = self.task_prefixes[prefix_key] + tp = parent._task_prefixes[prefix_key] except KeyError: - self.task_prefixes[prefix_key] = tp = TaskPrefix(prefix_key) + parent._task_prefixes[prefix_key] = tp = TaskPrefix(prefix_key) ts._prefix = tp group_key = ts._group_key try: - tg = self.task_groups[group_key] + tg = parent._task_groups[group_key] except KeyError: - self.task_groups[group_key] = tg = TaskGroup(group_key) + parent._task_groups[group_key] = tg = TaskGroup(group_key) tg._prefix = tp tp._groups.append(tg) tg.add(ts) @@ -5891,12 +5903,12 @@ def transition(self, key, finish, *args, **kwargs): if ts._state == "forgotten": del parent._tasks[ts._key] - if ts._state == "forgotten" and ts._group._name in self.task_groups: + if ts._state == "forgotten" and ts._group._name in parent._task_groups: # Remove TaskGroup if all tasks are in the forgotten state tg: TaskGroup = ts._group if not any([tg._states.get(s) for s in ALL_TASK_STATES]): ts._prefix._groups.remove(tg) - del self.task_groups[tg._name] + del parent._task_groups[tg._name] return recommendations except Exception as e: From 227e1579c4d048408d9a8c81e66d34bbc7c79624 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 22 Jan 2021 17:49:39 -0800 Subject: [PATCH 2/3] Use `.get(...)` instead of `try...except...` Instead of using `try...except...` to catch and handle `KeyError`s, just use `.get(...), which gets the key needed or returns `None`. This has less overhead. Also the following `None` check is a quick pointer comparison. Otherwise the code is unchanged. --- distributed/scheduler.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 03c8307b413..a1727a83940 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3931,16 +3931,14 @@ def new_task(self, key, spec, state): tg: TaskGroup ts._state = state prefix_key = key_split(key) - try: - tp = parent._task_prefixes[prefix_key] - except KeyError: + tp = parent._task_prefixes.get(prefix_key) + if tp is None: parent._task_prefixes[prefix_key] = tp = TaskPrefix(prefix_key) ts._prefix = tp group_key = ts._group_key - try: - tg = parent._task_groups[group_key] - except KeyError: + tg = parent._task_groups.get(group_key) + if tg is None: parent._task_groups[group_key] = tg = TaskGroup(group_key) tg._prefix = tp tp._groups.append(tg) From d5c4b1b4e9c3499bf48bb66cb7866ccfb9d5189a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 22 Jan 2021 17:55:42 -0800 Subject: [PATCH 3/3] Assign `TaskGroup` before `if` Since it is used in the check as well, go ahead and assign it beforehand for simplicity. --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a1727a83940..f2126a80e41 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5901,9 +5901,9 @@ def transition(self, key, finish, *args, **kwargs): if ts._state == "forgotten": del parent._tasks[ts._key] - if ts._state == "forgotten" and ts._group._name in parent._task_groups: + tg: TaskGroup = ts._group + if ts._state == "forgotten" and tg._name in parent._task_groups: # Remove TaskGroup if all tasks are in the forgotten state - tg: TaskGroup = ts._group if not any([tg._states.get(s) for s in ALL_TASK_STATES]): ts._prefix._groups.remove(tg) del parent._task_groups[tg._name]