From 648323504aa13d4903215ac9e5daf604b0561f33 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:06 -0800 Subject: [PATCH 01/31] Use `tsp` variable name for `TaskStreamPlugin`s --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 46a38c1ac8f..7a8b47e6f75 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4165,8 +4165,8 @@ def get_task_stream(self, comm=None, start=None, stop=None, count=None): from distributed.diagnostics.task_stream import TaskStreamPlugin self.add_plugin(TaskStreamPlugin, idempotent=True) - ts = [p for p in self.plugins if isinstance(p, TaskStreamPlugin)][0] - return ts.collect(start=start, stop=stop, count=count) + tsp = [p for p in self.plugins if isinstance(p, TaskStreamPlugin)][0] + return tsp.collect(start=start, stop=stop, count=count) def start_task_metadata(self, comm=None, name=None): plugin = CollectTaskMetaDataPlugin(scheduler=self, name=name) From 68081a7b915f69c13980959ac14c0c22cb3e5ef7 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:07 -0800 Subject: [PATCH 02/31] Use `dts` for iterated `TaskState` variables --- distributed/scheduler.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7a8b47e6f75..c869ac7d738 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2944,9 +2944,9 @@ def send_task_to_worker(self, worker, key): if deps: ws: WorkerState msg["who_has"] = { - dep.key: [ws._address for ws in dep.who_has] for dep in deps + dts.key: [ws._address for ws in dts.who_has] for dts in deps } - msg["nbytes"] = {dep.key: dep.nbytes for dep in deps} + msg["nbytes"] = {dts.key: dts.nbytes for dts in deps} if self.validate and deps: assert all(msg["who_has"].values()) @@ -4711,9 +4711,9 @@ def transition_erred_released(self, key): ts.exception_blame = None ts.traceback = None - for dep in ts.dependents: - if dep.state == "erred": - recommendations[dep.key] = "waiting" + for dts in ts.dependents: + if dts.state == "erred": + recommendations[dts.key] = "waiting" self.report({"op": "task-retried", "key": key}) ts.state = "released" From 64405513588b29b7c17d4bc902426aa8ad3a5288 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:07 -0800 Subject: [PATCH 03/31] Create `list` from generator This ensures Cython still uses `TaskState` to annotate the variable iterated over. Otherwise it constructs a generator with its own scope where this is ignored. --- distributed/scheduler.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c869ac7d738..4ab6b0254d7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2685,7 +2685,7 @@ def validate_released(self, key): assert not ts.waiting_on assert not ts.who_has assert not ts.processing_on - assert not any(ts in dts.waiters for dts in ts.dependencies) + assert not any([ts in dts.waiters for dts in ts.dependencies]) assert ts not in self.unrunnable def validate_waiting(self, key): @@ -4055,7 +4055,7 @@ async def get_call_stack(self, comm=None, keys=None): key = stack.pop() ts = self.tasks[key] if ts.state == "waiting": - stack.extend(dts.key for dts in ts.dependencies) + stack.extend([dts.key for dts in ts.dependencies]) elif ts.state == "processing": processing.add(ts) @@ -4100,7 +4100,7 @@ def get_comm_cost(self, ts, ws: WorkerState): on the given worker. """ return ( - sum(dts.nbytes for dts in ts.dependencies - ws._has_what) / self.bandwidth + sum([dts.nbytes for dts in ts.dependencies - ws._has_what]) / self.bandwidth ) def get_task_duration(self, ts, default=None): @@ -4276,7 +4276,7 @@ def transition_released_waiting(self, key): assert not ts.waiting_on assert not ts.who_has assert not ts.processing_on - assert not any(dts.state == "forgotten" for dts in ts.dependencies) + assert not any([dts.state == "forgotten" for dts in ts.dependencies]) if ts.has_lost_dependencies: return {key: "forgotten"} @@ -4413,7 +4413,7 @@ def transition_waiting_processing(self, key): assert not ts.processing_on assert not ts.has_lost_dependencies assert ts not in self.unrunnable - assert all(dts.who_has for dts in ts.dependencies) + assert all([dts.who_has for dts in ts.dependencies]) ws: WorkerState = self.decide_worker(ts) if ws is None: @@ -4699,7 +4699,7 @@ def transition_erred_released(self, key): if self.validate: with log_errors(pdb=LOG_PDB): - assert all(dts.state != "erred" for dts in ts.dependencies) + assert all([dts.state != "erred" for dts in ts.dependencies]) assert ts.exception_blame assert not ts.who_has assert not ts.waiting_on @@ -5792,7 +5792,7 @@ def decide_worker(ts, all_workers, valid_workers, objective): *objective* function. """ deps = ts.dependencies - assert all(dts.who_has for dts in deps) + assert all([dts.who_has for dts in deps]) if ts.actor: candidates = set(all_workers) else: @@ -5877,7 +5877,7 @@ def validate_task_state(ts): assert bool(ts.who_has) == (ts.state == "memory"), (ts, ts.who_has) if ts.state == "processing": - assert all(dts.who_has for dts in ts.dependencies), ( + assert all([dts.who_has for dts in ts.dependencies]), ( "task processing without all deps", str(ts), str(ts.dependencies), @@ -5893,7 +5893,7 @@ def validate_task_state(ts): if ts.run_spec: # was computed assert ts.type assert isinstance(ts.type, str) - assert not any(ts in dts.waiting_on for dts in ts.dependents) + assert not any([ts in dts.waiting_on for dts in ts.dependents]) for ws in ts.who_has: assert ts in ws._has_what, ( "not in who_has' has_what", From e9e8731c2d8de8b85e4bf399c73d94cd98058ae2 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:08 -0800 Subject: [PATCH 04/31] Use `-1` as `TaskState.nbytes` default Ideally we would want to type `nbytes` as some kind of integer for Cython optimization. However as we assign and work with `nbytes` being `None` in some cases, this is not currently possible since a C integer cannot be `None`. To fix that change the default value of `nbytes` to be `-1`. This way it is still a valid integer and one we would never use for `nbytes` unless it wasn't set. Thus it remains easy to check for and won't overlap. Also it is something a C integer can be assigned. --- distributed/diagnostics/progress.py | 16 ++++++++-------- distributed/scheduler.py | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index eaad51a2747..eeb3c8a2817 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -250,7 +250,7 @@ def __init__(self, scheduler): prefix = ts.prefix.name self.all[prefix].add(key) self.state[ts.state][prefix].add(key) - if ts.nbytes is not None: + if ts.nbytes >= 0: self.nbytes[prefix] += ts.nbytes scheduler.add_plugin(self) @@ -264,11 +264,11 @@ def transition(self, key, start, finish, *args, **kwargs): except KeyError: # TODO: remove me once we have a new or clean state pass - if start == "memory": + if start == "memory" and ts.nbytes >= 0: # XXX why not respect DEFAULT_DATA_SIZE? - self.nbytes[prefix] -= ts.nbytes or 0 - if finish == "memory": - self.nbytes[prefix] += ts.nbytes or 0 + self.nbytes[prefix] -= ts.nbytes + if finish == "memory" and ts.nbytes >= 0: + self.nbytes[prefix] += ts.nbytes if finish != "forgotten": self.state[finish][prefix].add(key) @@ -304,7 +304,7 @@ def __init__(self, scheduler): self.create(key, k) self.keys[k].add(key) self.groups[k][ts.state] += 1 - if ts.state == "memory" and ts.nbytes is not None: + if ts.state == "memory" and ts.nbytes >= 0: self.nbytes[k] += ts.nbytes scheduler.add_plugin(self) @@ -347,9 +347,9 @@ def transition(self, key, start, finish, *args, **kwargs): for dep in self.dependencies.pop(k): self.dependents[key_split_group(dep)].remove(k) - if start == "memory" and ts.nbytes is not None: + if start == "memory" and ts.nbytes >= 0: self.nbytes[k] -= ts.nbytes - if finish == "memory" and ts.nbytes is not None: + if finish == "memory" and ts.nbytes >= 0: self.nbytes[k] += ts.nbytes def restart(self, scheduler): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4ab6b0254d7..8816393ff7f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1007,7 +1007,7 @@ def __init__(self, key, run_spec): self._state = None self.exception = self.traceback = self.exception_blame = None self.suspicious = self.retries = 0 - self.nbytes = None + self.nbytes = -1 self.priority = None self.who_wants = set() self.dependencies = set() @@ -1056,11 +1056,13 @@ def add_dependency(self, other: "TaskState"): def get_nbytes(self) -> int: nbytes = self.nbytes - return nbytes if nbytes is not None else DEFAULT_DATA_SIZE + return nbytes if nbytes >= 0 else DEFAULT_DATA_SIZE def set_nbytes(self, nbytes: int): + diff = nbytes old_nbytes = self.nbytes - diff = nbytes - (old_nbytes or 0) + if old_nbytes >= 0: + diff -= old_nbytes self.group.nbytes_total += diff self.group.nbytes_in_memory += diff ws: WorkerState @@ -4081,9 +4083,7 @@ def get_nbytes(self, comm=None, keys=None, summary=True): result = {k: self.tasks[k].nbytes for k in keys} else: result = { - k: ts.nbytes - for k, ts in self.tasks.items() - if ts.nbytes is not None + k: ts.nbytes for k, ts in self.tasks.items() if ts.nbytes >= 0 } if summary: From ec364155130713080fabbb52d271b41b2748e068 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:09 -0800 Subject: [PATCH 05/31] Assign `TaskState` instances to variables --- distributed/scheduler.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8816393ff7f..180f62e17f3 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2132,8 +2132,10 @@ def update_graph( # Avoid computation that is already finished already_in_memory = set() # tasks that are already done for k, v in dependencies.items(): - if v and k in self.tasks and self.tasks[k].state in ("memory", "erred"): - already_in_memory.add(k) + if v and k in self.tasks: + ts = self.tasks[k] + if ts.state in ("memory", "erred"): + already_in_memory.add(k) if already_in_memory: dependents = dask.core.reverse_dict(dependencies) @@ -2221,13 +2223,15 @@ def update_graph( for a, kv in annotations.items(): for k, v in kv.items(): - self.tasks[k].annotations[a] = v + ts = self.tasks[k] + ts.annotations[a] = v # Add actors if actors is True: actors = list(keys) for actor in actors or []: - self.tasks[actor].actor = True + ts = self.tasks[actor] + ts.actor = True priority = priority or dask.order.order( tasks @@ -2468,9 +2472,8 @@ def stimulus_retry(self, comm=None, keys=None, client=None): while stack: key = stack.pop() seen.add(key) - erred_deps = [ - dts.key for dts in self.tasks[key].dependencies if dts.state == "erred" - ] + ts = self.tasks[key] + erred_deps = [dts.key for dts in ts.dependencies if dts.state == "erred"] if erred_deps: stack.extend(erred_deps) else: From a94964118fd471a45e2565590a83e39ed1ae15c5 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:09 -0800 Subject: [PATCH 06/31] Annotate `TaskState` for Cythonization --- distributed/scheduler.py | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 180f62e17f3..7c249ff0540 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -709,6 +709,7 @@ def __len__(self): return sum(self.states.values()) +@cclass class TaskState: """ A simple object holding information about a task. @@ -950,6 +951,37 @@ class TaskState: Task annotations """ + key: str + _hash: Py_hash_t + prefix: TaskPrefix + run_spec: object + priority: tuple + _state: str + dependencies: set + dependents: set + has_lost_dependencies: bool + waiting_on: set + waiters: set + who_wants: set + who_has: set + processing_on: WorkerState + retries: Py_ssize_t + nbytes: Py_ssize_t + type: str + exception: object + traceback: object + exception_blame: object + suspicious: Py_ssize_t + host_restrictions: set + worker_restrictions: set + resource_restrictions: dict + loose_restrictions: bool + metadata: dict + annotations: dict + actor: bool + group: TaskGroup + group_key: str + __slots__ = ( # === General description === "actor", @@ -1000,7 +1032,7 @@ class TaskState: "annotations", ) - def __init__(self, key, run_spec): + def __init__(self, key: str, run_spec: object): self.key = key self._hash = hash(key) self.run_spec = run_spec From 111f73b1dd6ccf6d6a9bd92d9a037f6d2438f281 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:10 -0800 Subject: [PATCH 07/31] Annotate all `TaskState` variables --- distributed/scheduler.py | 180 +++++++++++++++++++++++++++------------ 1 file changed, 124 insertions(+), 56 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7c249ff0540..40e6c49ec35 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -511,6 +511,7 @@ def clean(self): nanny=self._nanny, extra=self._extra, ) + ts: TaskState ws._processing = {ts.key: cost for ts, cost in self._processing.items()} return ws @@ -690,7 +691,8 @@ def __init__(self, name): self.duration = 0 self.types = set() - def add(self, ts): + def add(self, o): + ts: TaskState = o self.states[ts.state] += 1 ts.group = self @@ -1064,7 +1066,13 @@ def __hash__(self): return self._hash def __eq__(self, other): - return type(self) == type(other) and self.key == other.key + typ_self: type = type(self) + typ_other: type = type(other) + if typ_self == typ_other: + other_ts: TaskState = other + return self.key == other_ts.key + else: + return False @property def state(self) -> str: @@ -1202,6 +1210,7 @@ def _legacy_task_key_set(tasks): """ Transform a set of task states into a set of task keys. """ + ts: TaskState return {ts.key for ts in tasks} @@ -1225,6 +1234,7 @@ def _legacy_task_key_dict(task_dict): """ Transform a dict of {task state: value} into a dict of {task key: value}. """ + ts: TaskState return {ts.key: value for ts, value in task_dict.items()} @@ -2009,7 +2019,7 @@ async def add_worker( if nbytes: for key in nbytes: - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) if ts is not None and ts.state in ("processing", "waiting"): recommendations = self.transition( key, @@ -2162,6 +2172,7 @@ def update_graph( self.client_releases_keys(keys=[k], client=client) # Avoid computation that is already finished + ts: TaskState already_in_memory = set() # tasks that are already done for k, v in dependencies.items(): if v and k in self.tasks: @@ -2169,6 +2180,7 @@ def update_graph( if ts.state in ("memory", "erred"): already_in_memory.add(k) + dts: TaskState if already_in_memory: dependents = dask.core.reverse_dict(dependencies) stack = list(already_in_memory) @@ -2381,7 +2393,7 @@ def update_graph( def new_task(self, key, spec, state): """ Create a new task, and associated states """ - ts = TaskState(key, spec) + ts: TaskState = TaskState(key, spec) ts._state = state prefix_key = key_split(key) try: @@ -2405,7 +2417,7 @@ def stimulus_task_finished(self, key=None, worker=None, **kwargs): """ Mark that a task has finished execution on a particular worker """ logger.debug("Stimulus task finished %s, %s", key, worker) - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) if ts is None: return {} ws: WorkerState = self.workers[worker] @@ -2437,7 +2449,7 @@ def stimulus_task_erred( """ Mark that a task has erred on a particular worker """ logger.debug("Stimulus task erred %s, %s", key, worker) - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) if ts is None: return {} @@ -2468,10 +2480,10 @@ def stimulus_missing_data( with log_errors(): logger.debug("Stimulus missing data %s, %s", key, worker) - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) if ts is None or ts.state == "memory": return {} - cts = self.tasks.get(cause) + cts: TaskState = self.tasks.get(cause) recommendations = {} @@ -2501,6 +2513,8 @@ def stimulus_retry(self, comm=None, keys=None, client=None): stack = list(keys) seen = set() roots = [] + ts: TaskState + dts: TaskState while stack: key = stack.pop() seen.add(key) @@ -2574,6 +2588,7 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True): recommendations = {} + ts: TaskState for ts in list(ws._processing): k = ts.key recommendations[k] = "released" @@ -2646,7 +2661,8 @@ def stimulus_cancel(self, comm, keys=None, client=None, force=False): def cancel_key(self, key, client, retries=5, force=False): """ Cancel a particular key and all dependents """ # TODO: this should be converted to use the transition mechanism - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) + dts: TaskState try: cs: ClientState = self.clients[client] except KeyError: @@ -2671,6 +2687,7 @@ def client_desires_keys(self, keys=None, client=None): if cs is None: # For publish, queues etc. self.clients[client] = cs = ClientState(client) + ts: TaskState for k in keys: ts = self.tasks.get(k) if ts is None: @@ -2686,6 +2703,7 @@ def client_releases_keys(self, keys=None, client=None): """ Remove keys from client desired list """ logger.debug("Client %s releases keys: %s", client, keys) cs: ClientState = self.clients[client] + ts: TaskState tasks2 = set() for key in list(keys): ts = self.tasks.get(key) @@ -2716,7 +2734,8 @@ def client_heartbeat(self, client=None): ################### def validate_released(self, key): - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState assert ts.state == "released" assert not ts.waiters assert not ts.waiting_on @@ -2726,7 +2745,8 @@ def validate_released(self, key): assert ts not in self.unrunnable def validate_waiting(self, key): - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState assert ts.waiting_on assert not ts.who_has assert not ts.processing_on @@ -2737,7 +2757,8 @@ def validate_waiting(self, key): assert ts in dts.waiters # XXX even if dts.who_has? def validate_processing(self, key): - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState assert not ts.waiting_on ws: WorkerState = ts.processing_on assert ws @@ -2748,7 +2769,8 @@ def validate_processing(self, key): assert ts in dts.waiters def validate_memory(self, key): - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState assert ts.who_has assert not ts.processing_on assert not ts.waiting_on @@ -2758,7 +2780,8 @@ def validate_memory(self, key): assert ts not in dts.waiting_on def validate_no_worker(self, key): - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState assert ts in self.unrunnable assert not ts.waiting_on assert ts in self.unrunnable @@ -2768,11 +2791,11 @@ def validate_no_worker(self, key): assert dts.who_has def validate_erred(self, key): - ts = self.tasks[key] + ts: TaskState = self.tasks[key] assert ts.exception_blame assert not ts.who_has - def validate_key(self, key, ts=None): + def validate_key(self, key, ts: TaskState = None): try: if ts is None: ts = self.tasks.get(key) @@ -2811,6 +2834,7 @@ def validate_state(self, allow_overlap=False): assert not ws._occupancy assert ws in self.idle + ts: TaskState for k, ts in self.tasks.items(): assert isinstance(ts, TaskState), (type(ts), ts) assert ts.key == k @@ -2845,7 +2869,7 @@ def validate_state(self, allow_overlap=False): # Manage Messages # ################### - def report(self, msg, ts=None, client=None): + def report(self, msg, ts: TaskState = None, client=None): """ Publish updates to all listening Queues and Comms @@ -2940,6 +2964,7 @@ def remove_client(self, client=None): # XXX is this a legitimate condition? pass else: + ts: TaskState self.client_releases_keys( keys=[ts.key for ts in cs._wants_what], client=cs._client_key ) @@ -2964,7 +2989,8 @@ def remove_client_from_events(): def send_task_to_worker(self, worker, key): """ Send a single computational task to a worker """ try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState msg = { "op": "compute-task", @@ -3018,7 +3044,7 @@ def handle_task_erred(self, key=None, **msg): self.transitions(r) def handle_release_data(self, key=None, worker=None, client=None, **msg): - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) if ts is None: return ws: WorkerState = self.workers[worker] @@ -3031,7 +3057,7 @@ def handle_missing_data(self, key=None, errant_worker=None, **kwargs): logger.debug("handle missing data key=%s worker=%s", key, errant_worker) self.log.append(("missing", key, errant_worker)) - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) if ts is None or not ts.who_has: return if errant_worker in self.workers: @@ -3052,6 +3078,7 @@ def release_worker_data(self, comm=None, keys=None, worker=None): removed_tasks = tasks & ws._has_what ws._has_what -= removed_tasks + ts: TaskState recommendations = {} for ts in removed_tasks: ws._nbytes -= ts.get_nbytes() @@ -3068,7 +3095,7 @@ def handle_long_running(self, key=None, worker=None, compute_duration=None): We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped. """ - ts = self.tasks[key] + ts: TaskState = self.tasks[key] if "stealing" in self.extensions: self.extensions["stealing"].remove_key_from_stealable(ts) @@ -3200,7 +3227,7 @@ async def gather(self, comm=None, keys=None, serializers=None): keys = list(keys) who_has = {} for key in keys: - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) if ts is not None: who_has[key] = [ws._address for ws in ts.who_has] else: @@ -3235,7 +3262,7 @@ async def gather(self, comm=None, keys=None, serializers=None): for key, workers in missing_keys.items(): # Task may already be gone if it was held by a # `missing_worker` - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) logger.exception( "Workers don't have promised key: %s, %s", str(workers), @@ -3269,6 +3296,7 @@ async def restart(self, client=None, timeout=3): logger.info("Send lost future signal to clients") cs: ClientState + ts: TaskState for cs in self.clients.values(): self.client_releases_keys( keys=[ts.key for ts in cs._wants_what], client=cs._client_key @@ -3402,6 +3430,7 @@ async def _delete_worker_data(self, worker_address, keys): ) ws: WorkerState = self.workers[worker_address] + ts: TaskState tasks = {self.tasks[key] for key in keys} ws._has_what -= tasks for ts in tasks: @@ -3420,6 +3449,7 @@ async def rebalance(self, comm=None, keys=None, workers=None): occupied worker until either the sender or the recipient are at the average expected load. """ + ts: TaskState with log_errors(): async with self._lock: if keys: @@ -3570,6 +3600,7 @@ async def replicate( """ ws: WorkerState wws: WorkerState + ts: TaskState assert branching_factor > 0 async with self._lock if lock else empty_context: @@ -3825,6 +3856,7 @@ async def retire_workers( Scheduler.workers_to_close """ ws: WorkerState + ts: TaskState with log_errors(): async with self._lock if lock else empty_context: if names is not None: @@ -3906,7 +3938,7 @@ def add_keys(self, comm=None, worker=None, keys=()): return "not found" ws: WorkerState = self.workers[worker] for key in keys: - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) if ts is not None and ts.state == "memory": if ts not in ws._has_what: ws._nbytes += ts.get_nbytes() @@ -3936,9 +3968,9 @@ def update_data( logger.debug("Update data %s", who_has) for key, workers in who_has.items(): - ts = self.tasks.get(key) + ts: TaskState = self.tasks.get(key) if ts is None: - ts = self.new_task(key, None, "memory") + ts: TaskState = self.new_task(key, None, "memory") ts.state = "memory" if key in nbytes: ts.set_nbytes(nbytes[key]) @@ -3955,7 +3987,7 @@ def update_data( if client: self.client_desires_keys(keys=list(who_has), client=client) - def report_on_key(self, key=None, ts=None, client=None): + def report_on_key(self, key=None, ts: TaskState = None, client=None): assert (key is None) + (ts is None) == 1, (key, ts) if ts is None: try: @@ -3970,7 +4002,7 @@ def report_on_key(self, key=None, ts=None, client=None): elif ts.state == "memory": self.report({"op": "key-in-memory", "key": key}, ts=ts, client=client) elif ts.state == "erred": - failing_ts = ts.exception_blame + failing_ts: TaskState = ts.exception_blame self.report( { "op": "task-erred", @@ -4038,6 +4070,7 @@ def subscribe_worker_status(self, comm=None): def get_processing(self, comm=None, workers=None): ws: WorkerState + ts: TaskState if workers is not None: workers = set(map(self.coerce_address, workers)) return {w: [ts.key for ts in self.workers[w].processing] for w in workers} @@ -4048,6 +4081,7 @@ def get_processing(self, comm=None, workers=None): def get_who_has(self, comm=None, keys=None): ws: WorkerState + ts: TaskState if keys is not None: return { k: [ws._address for ws in self.tasks[k].who_has] @@ -4063,6 +4097,7 @@ def get_who_has(self, comm=None, keys=None): def get_has_what(self, comm=None, workers=None): ws: WorkerState + ts: TaskState if workers is not None: workers = map(self.coerce_address, workers) return { @@ -4085,6 +4120,8 @@ def get_ncores(self, comm=None, workers=None): return {w: ws._nthreads for w, ws in self.workers.items()} async def get_call_stack(self, comm=None, keys=None): + ts: TaskState + dts: TaskState if keys is not None: stack = list(keys) processing = set() @@ -4113,6 +4150,7 @@ async def get_call_stack(self, comm=None, keys=None): return response def get_nbytes(self, comm=None, keys=None, summary=True): + ts: TaskState with log_errors(): if keys is not None: result = {k: self.tasks[k].nbytes for k in keys} @@ -4129,16 +4167,17 @@ def get_nbytes(self, comm=None, keys=None, summary=True): return result - def get_comm_cost(self, ts, ws: WorkerState): + def get_comm_cost(self, ts: TaskState, ws: WorkerState): """ Get the estimated communication cost (in s.) to compute the task on the given worker. """ + dts: TaskState return ( sum([dts.nbytes for dts in ts.dependencies - ws._has_what]) / self.bandwidth ) - def get_task_duration(self, ts, default=None): + def get_task_duration(self, ts: TaskState, default=None): """ Get the estimated computation cost of the given task (not including any communication cost). @@ -4237,7 +4276,7 @@ async def register_worker_plugin(self, comm, plugin, name=None): # State Transitions # ##################### - def _remove_from_processing(self, ts, send_worker_msg=None): + def _remove_from_processing(self, ts: TaskState, send_worker_msg=None): """ Remove *ts* from the set of processing tasks. """ @@ -4258,7 +4297,13 @@ def _remove_from_processing(self, ts, send_worker_msg=None): self.worker_send(w, send_worker_msg) def _add_to_memory( - self, ts, ws: WorkerState, recommendations, type=None, typename=None, **kwargs + self, + ts: TaskState, + ws: WorkerState, + recommendations, + type=None, + typename=None, + **kwargs, ): """ Add *ts* to the set of in-memory tasks. @@ -4273,6 +4318,7 @@ def _add_to_memory( deps = ts.dependents if len(deps) > 1: deps = sorted(deps, key=operator.attrgetter("priority"), reverse=True) + dts: TaskState for dts in deps: s = dts.waiting_on if ts in s: @@ -4304,7 +4350,8 @@ def _add_to_memory( def transition_released_waiting(self, key): try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState if self.validate: assert ts.run_spec @@ -4320,6 +4367,7 @@ def transition_released_waiting(self, key): recommendations = {} + dts: TaskState for dts in ts.dependencies: if dts.exception_blame: ts.exception_blame = dts.exception_blame @@ -4355,7 +4403,8 @@ def transition_released_waiting(self, key): def transition_no_worker_waiting(self, key): try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState if self.validate: assert ts in self.unrunnable @@ -4397,7 +4446,7 @@ def transition_no_worker_waiting(self, key): pdb.set_trace() raise - def decide_worker(self, ts): + def decide_worker(self, ts: TaskState): """ Decide on a worker for task *ts*. Return a WorkerState. """ @@ -4439,7 +4488,8 @@ def decide_worker(self, ts): def transition_waiting_processing(self, key): try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState if self.validate: assert not ts.waiting_on @@ -4486,7 +4536,7 @@ def transition_waiting_processing(self, key): def transition_waiting_memory(self, key, nbytes=None, worker=None, **kwargs): try: ws: WorkerState = self.workers[worker] - ts = self.tasks[key] + ts: TaskState = self.tasks[key] if self.validate: assert not ts.processing_on @@ -4531,7 +4581,7 @@ def transition_processing_memory( ws: WorkerState wws: WorkerState try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] assert worker assert isinstance(worker, str) @@ -4593,6 +4643,7 @@ def transition_processing_memory( ts.prefix.duration_average = avg_duration ts.group.duration += new_duration + tts: TaskState for tts in self.unknown_durations.pop(ts.prefix.name, ()): if tts.processing_on: wws = tts.processing_on @@ -4630,7 +4681,8 @@ def transition_processing_memory( def transition_memory_released(self, key, safe=False): ws: WorkerState try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState if self.validate: assert not ts.waiting_on @@ -4689,7 +4741,9 @@ def transition_memory_released(self, key, safe=False): def transition_released_erred(self, key): try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState + failing_ts: TaskState if self.validate: with log_errors(pdb=LOG_PDB): @@ -4730,7 +4784,8 @@ def transition_released_erred(self, key): def transition_erred_released(self, key): try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState if self.validate: with log_errors(pdb=LOG_PDB): @@ -4764,7 +4819,7 @@ def transition_erred_released(self, key): def transition_waiting_released(self, key): try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] if self.validate: assert not ts.who_has @@ -4772,6 +4827,7 @@ def transition_waiting_released(self, key): recommendations = {} + dts: TaskState for dts in ts.dependencies: s = dts.waiters if ts in s: @@ -4800,7 +4856,8 @@ def transition_waiting_released(self, key): def transition_processing_released(self, key): try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState if self.validate: assert ts.processing_on @@ -4847,7 +4904,9 @@ def transition_processing_erred( ): ws: WorkerState try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState + failing_ts: TaskState if self.validate: assert cause or ts.exception_blame @@ -4914,7 +4973,8 @@ def transition_processing_erred( def transition_no_worker_released(self, key): try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] + dts: TaskState if self.validate: assert self.tasks[key].state == "no-worker" @@ -4939,7 +4999,7 @@ def transition_no_worker_released(self, key): raise def remove_key(self, key): - ts = self.tasks.pop(key) + ts: TaskState = self.tasks.pop(key) assert ts.state == "forgotten" self.unrunnable.discard(ts) cs: ClientState @@ -4952,9 +5012,10 @@ def remove_key(self, key): if key in self.task_metadata: del self.task_metadata[key] - def _propagate_forgotten(self, ts, recommendations): + def _propagate_forgotten(self, ts: TaskState, recommendations): ts.state = "forgotten" key = ts.key + dts: TaskState for dts in ts.dependents: dts.has_lost_dependencies = True dts.dependencies.remove(ts) @@ -4993,7 +5054,7 @@ def _propagate_forgotten(self, ts, recommendations): def transition_memory_forgotten(self, key): ws: WorkerState try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] if self.validate: assert ts.state == "memory" @@ -5033,7 +5094,7 @@ def transition_memory_forgotten(self, key): def transition_released_forgotten(self, key): try: - ts = self.tasks[key] + ts: TaskState = self.tasks[key] if self.validate: assert ts.state in ("released", "erred") @@ -5083,6 +5144,7 @@ def transition(self, key, finish, *args, **kwargs): -------- Scheduler.transitions: transitive version of this function """ + ts: TaskState try: try: ts = self.tasks[key] @@ -5192,6 +5254,7 @@ def reschedule(self, key=None, worker=None): Things may have shifted and this task may now be better suited to run elsewhere """ + ts: TaskState try: ts = self.tasks[key] except KeyError: @@ -5249,7 +5312,7 @@ def check_idle_saturated(self, ws: WorkerState, occ: double = -1.0): else: saturated.discard(ws) - def valid_workers(self, ts): + def valid_workers(self, ts: TaskState): """Return set of currently valid workers for key If all workers are valid then this returns ``True``. @@ -5298,12 +5361,12 @@ def valid_workers(self, ts): else: return {self.workers[w] for w in s} - def consume_resources(self, ts, ws: WorkerState): + def consume_resources(self, ts: TaskState, ws: WorkerState): if ts.resource_restrictions: for r, required in ts.resource_restrictions.items(): ws._used_resources[r] += required - def release_resources(self, ts, ws: WorkerState): + def release_resources(self, ts: TaskState, ws: WorkerState): if ts.resource_restrictions: for r, required in ts.resource_restrictions.items(): ws._used_resources[r] -= required @@ -5389,12 +5452,13 @@ def start_ipython(self, comm=None): ) return self._ipython_kernel.get_connection_info() - def worker_objective(self, ts, ws: WorkerState): + def worker_objective(self, ts: TaskState, ws: WorkerState): """ Objective function to determine which worker should get the task Minimize expected start time. If a tie then break with data storage. """ + dts: TaskState comm_bytes = sum( [dts.get_nbytes() for dts in ts.dependencies if ws not in dts.who_has] ) @@ -5810,7 +5874,7 @@ def adaptive_target(self, comm=None, target_duration=None): return len(self.workers) - len(to_close) -def decide_worker(ts, all_workers, valid_workers, objective): +def decide_worker(ts: TaskState, all_workers, valid_workers, objective): """ Decide which worker should take task *ts*. @@ -5826,6 +5890,7 @@ def decide_worker(ts, all_workers, valid_workers, objective): of bytes sent between workers. This is determined by calling the *objective* function. """ + dts: TaskState deps = ts.dependencies assert all([dts.who_has for dts in deps]) if ts.actor: @@ -5854,11 +5919,12 @@ def decide_worker(ts, all_workers, valid_workers, objective): return min(candidates, key=objective) -def validate_task_state(ts): +def validate_task_state(ts: TaskState): """ Validate the given TaskState. """ ws: WorkerState + dts: TaskState assert ts.state in ALL_TASK_STATES or ts.state == "forgotten", ts @@ -5955,6 +6021,7 @@ def validate_task_state(ts): def validate_worker_state(ws: WorkerState): + ts: TaskState for ts in ws._has_what: assert ws in ts.who_has, ( "not in has_what' who_has", @@ -5974,6 +6041,7 @@ def validate_state(tasks, workers, clients): This performs a sequence of checks on the entire graph, running in about linear time. This raises assert errors if anything doesn't check out. """ + ts: TaskState for ts in tasks.values(): validate_task_state(ts) @@ -6064,7 +6132,7 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, **kwar def transition(self, key, start, finish, *args, **kwargs): if finish == "memory" or finish == "erred": - ts = self.scheduler.tasks.get(key) + ts: TaskState = self.scheduler.tasks.get(key) if ts is not None and ts.key in self.keys: self.metadata[key] = ts.metadata self.state[key] = finish From a85b85ec6f75cf58a50ada43e2f7cf49d96ef2cd Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:10 -0800 Subject: [PATCH 08/31] Use closure to access `TaskState.priority` This way we are able to inform Cython of the type and it is able to do the work necessary to retrieve it. We will wind up reverting this later. --- distributed/scheduler.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 40e6c49ec35..3d4d9fe08e4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2351,7 +2351,10 @@ def update_graph( # Compute recommendations recommendations = {} - for ts in sorted(runnables, key=operator.attrgetter("priority"), reverse=True): + def _priority(ets: TaskState): + return ets.priority + + for ts in sorted(runnables, key=_priority, reverse=True): if ts.state == "released" and ts.run_spec: recommendations[ts.key] = "waiting" @@ -4315,9 +4318,12 @@ def _add_to_memory( ws._has_what.add(ts) ws._nbytes += ts.get_nbytes() + def _priority(ets: TaskState): + return ets.priority + deps = ts.dependents if len(deps) > 1: - deps = sorted(deps, key=operator.attrgetter("priority"), reverse=True) + deps = sorted(deps, key=_priority, reverse=True) dts: TaskState for dts in deps: s = dts.waiting_on From 3c25b2403e0464b460f67db79e204ec5f6e5a0bf Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:11 -0800 Subject: [PATCH 09/31] Add `_` before all `TaskState` attributes --- distributed/scheduler.py | 204 +++++++++++++++++++-------------------- 1 file changed, 102 insertions(+), 102 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3d4d9fe08e4..23b3ae68003 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -953,114 +953,114 @@ class TaskState: Task annotations """ - key: str + _key: str _hash: Py_hash_t - prefix: TaskPrefix - run_spec: object - priority: tuple + _prefix: TaskPrefix + _run_spec: object + _priority: tuple _state: str - dependencies: set - dependents: set - has_lost_dependencies: bool - waiting_on: set - waiters: set - who_wants: set - who_has: set - processing_on: WorkerState - retries: Py_ssize_t - nbytes: Py_ssize_t - type: str - exception: object - traceback: object - exception_blame: object - suspicious: Py_ssize_t - host_restrictions: set - worker_restrictions: set - resource_restrictions: dict - loose_restrictions: bool - metadata: dict - annotations: dict - actor: bool - group: TaskGroup - group_key: str + _dependencies: set + _dependents: set + _has_lost_dependencies: bool + _waiting_on: set + _waiters: set + _who_wants: set + _who_has: set + _processing_on: WorkerState + _retries: Py_ssize_t + _nbytes: Py_ssize_t + _type: str + _exception: object + _traceback: object + _exception_blame: object + _suspicious: Py_ssize_t + _host_restrictions: set + _worker_restrictions: set + _resource_restrictions: dict + _loose_restrictions: bool + _metadata: dict + _annotations: dict + _actor: bool + _group: TaskGroup + _group_key: str __slots__ = ( # === General description === - "actor", + "_actor", # Key name - "key", + "_key", # Hash of the key name "_hash", # Key prefix (see key_split()) - "prefix", + "_prefix", # How to run the task (None if pure data) - "run_spec", + "_run_spec", # Alive dependents and dependencies - "dependencies", - "dependents", + "_dependencies", + "_dependents", # Compute priority - "priority", + "_priority", # Restrictions - "host_restrictions", - "worker_restrictions", # not WorkerStates but addresses - "resource_restrictions", - "loose_restrictions", + "_host_restrictions", + "_worker_restrictions", # not WorkerStates but addresses + "_resource_restrictions", + "_loose_restrictions", # === Task state === "_state", # Whether some dependencies were forgotten - "has_lost_dependencies", + "_has_lost_dependencies", # If in 'waiting' state, which tasks need to complete # before we can run - "waiting_on", + "_waiting_on", # If in 'waiting' or 'processing' state, which tasks needs us # to complete before they can run - "waiters", + "_waiters", # In in 'processing' state, which worker we are processing on - "processing_on", + "_processing_on", # If in 'memory' state, Which workers have us - "who_has", + "_who_has", # Which clients want us - "who_wants", - "exception", - "traceback", - "exception_blame", - "suspicious", - "retries", - "nbytes", - "type", - "group_key", - "group", - "metadata", - "annotations", + "_who_wants", + "_exception", + "_traceback", + "_exception_blame", + "_suspicious", + "_retries", + "_nbytes", + "_type", + "_group_key", + "_group", + "_metadata", + "_annotations", ) def __init__(self, key: str, run_spec: object): - self.key = key + self._key = key self._hash = hash(key) - self.run_spec = run_spec + self._run_spec = run_spec self._state = None - self.exception = self.traceback = self.exception_blame = None - self.suspicious = self.retries = 0 - self.nbytes = -1 - self.priority = None - self.who_wants = set() - self.dependencies = set() - self.dependents = set() - self.waiting_on = set() - self.waiters = set() - self.who_has = set() - self.processing_on = None - self.has_lost_dependencies = False - self.host_restrictions = None - self.worker_restrictions = None - self.resource_restrictions = None - self.loose_restrictions = False - self.actor = None - self.type = None - self.group_key = key_split_group(key) - self.group = None - self.metadata = {} - self.annotations = {} + self._exception = self._traceback = self._exception_blame = None + self._suspicious = self._retries = 0 + self._nbytes = -1 + self._priority = None + self._who_wants = set() + self._dependencies = set() + self._dependents = set() + self._waiting_on = set() + self._waiters = set() + self._who_has = set() + self._processing_on = None + self._has_lost_dependencies = False + self._host_restrictions = None + self._worker_restrictions = None + self._resource_restrictions = None + self._loose_restrictions = False + self._actor = None + self._type = None + self._group_key = key_split_group(key) + self._group = None + self._metadata = {} + self._annotations = {} def __hash__(self): return self._hash @@ -1070,7 +1070,7 @@ def __eq__(self, other): typ_other: type = type(other) if typ_self == typ_other: other_ts: TaskState = other - return self.key == other_ts.key + return self._key == other_ts._key else: return False @@ -1080,49 +1080,49 @@ def state(self) -> str: @property def prefix_key(self): - return self.prefix.name + return self._prefix.name @state.setter def state(self, value: str): - self.group.states[self._state] -= 1 - self.group.states[value] += 1 + self._group.states[self._state] -= 1 + self._group.states[value] += 1 self._state = value def add_dependency(self, other: "TaskState"): """ Add another task as a dependency of this task """ - self.dependencies.add(other) - self.group.dependencies.add(other.group) - other.dependents.add(self) + self._dependencies.add(other) + self._group.dependencies.add(other._group) + other._dependents.add(self) def get_nbytes(self) -> int: - nbytes = self.nbytes + nbytes = self._nbytes return nbytes if nbytes >= 0 else DEFAULT_DATA_SIZE def set_nbytes(self, nbytes: int): diff = nbytes - old_nbytes = self.nbytes + old_nbytes = self._nbytes if old_nbytes >= 0: diff -= old_nbytes - self.group.nbytes_total += diff - self.group.nbytes_in_memory += diff + self._group.nbytes_total += diff + self._group.nbytes_in_memory += diff ws: WorkerState - for ws in self.who_has: + for ws in self._who_has: ws._nbytes += diff - self.nbytes = nbytes + self._nbytes = nbytes def __repr__(self): - return "" % (self.key, self.state) + return "" % (self._key, self._state) def validate(self): try: - for cs in self.who_wants: - assert isinstance(cs, ClientState), (repr(cs), self.who_wants) - for ws in self.who_has: - assert isinstance(ws, WorkerState), (repr(ws), self.who_has) - for ts in self.dependencies: - assert isinstance(ts, TaskState), (repr(ts), self.dependencies) - for ts in self.dependents: - assert isinstance(ts, TaskState), (repr(ts), self.dependents) + for cs in self._who_wants: + assert isinstance(cs, ClientState), (repr(cs), self._who_wants) + for ws in self._who_has: + assert isinstance(ws, WorkerState), (repr(ws), self._who_has) + for ts in self._dependencies: + assert isinstance(ts, TaskState), (repr(ts), self._dependencies) + for ts in self._dependents: + assert isinstance(ts, TaskState), (repr(ts), self._dependents) validate_task_state(self) except Exception as e: logger.exception(e) From 78eb88ee96dec3f36f57ba84683feb335e627b72 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:12 -0800 Subject: [PATCH 10/31] Use `_` prefixed `TaskState` attributes throughout --- distributed/scheduler.py | 847 ++++++++++++++++++++------------------- 1 file changed, 427 insertions(+), 420 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 23b3ae68003..5e678ab8ee1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -512,7 +512,7 @@ def clean(self): extra=self._extra, ) ts: TaskState - ws._processing = {ts.key: cost for ts, cost in self._processing.items()} + ws._processing = {ts._key: cost for ts, cost in self._processing.items()} return ws def __repr__(self): @@ -694,7 +694,7 @@ def __init__(self, name): def add(self, o): ts: TaskState = o self.states[ts.state] += 1 - ts.group = self + ts._group = self def __repr__(self): return ( @@ -1211,7 +1211,7 @@ def _legacy_task_key_set(tasks): Transform a set of task states into a set of task keys. """ ts: TaskState - return {ts.key for ts in tasks} + return {ts._key for ts in tasks} def _legacy_client_key_set(clients): @@ -1235,7 +1235,7 @@ def _legacy_task_key_dict(task_dict): Transform a dict of {task state: value} into a dict of {task key: value}. """ ts: TaskState - return {ts.key: value for ts, value in task_dict.items()} + return {ts._key: value for ts, value in task_dict.items()} def _task_key_or_none(task): @@ -2034,7 +2034,7 @@ async def add_worker( for ts in list(self.unrunnable): valid = self.valid_workers(ts) if valid is True or ws in valid: - recommendations[ts.key] = "waiting" + recommendations[ts._key] = "waiting" if recommendations: self.transitions(recommendations) @@ -2218,8 +2218,8 @@ def update_graph( ts = self.tasks.get(k) if ts is None: ts = self.new_task(k, tasks.get(k), "released") - elif not ts.run_spec: - ts.run_spec = tasks.get(k) + elif not ts._run_spec: + ts._run_spec = tasks.get(k) touched_keys.add(k) touched_tasks.append(ts) @@ -2230,7 +2230,7 @@ def update_graph( # Add dependencies for key, deps in dependencies.items(): ts = self.tasks.get(key) - if ts is None or ts.dependencies: + if ts is None or ts._dependencies: continue for dep in deps: dts = self.tasks[dep] @@ -2268,14 +2268,14 @@ def update_graph( for a, kv in annotations.items(): for k, v in kv.items(): ts = self.tasks[k] - ts.annotations[a] = v + ts._annotations[a] = v # Add actors if actors is True: actors = list(keys) for actor in actors or []: ts = self.tasks[actor] - ts.actor = True + ts._actor = True priority = priority or dask.order.order( tasks @@ -2284,7 +2284,7 @@ def update_graph( if submitting_task: # sub-tasks get better priority than parent tasks ts = self.tasks.get(submitting_task) if ts is not None: - generation = ts.priority[0] - 0.01 + generation = ts._priority[0] - 0.01 else: # super-task already cleaned up generation = self.generation elif self._last_time + fifo_timeout < start: @@ -2296,14 +2296,14 @@ def update_graph( for key in set(priority) & touched_keys: ts = self.tasks[key] - if ts.priority is None: - ts.priority = (-(user_priority.get(key, 0)), generation, priority[key]) + if ts._priority is None: + ts._priority = (-(user_priority.get(key, 0)), generation, priority[key]) # Ensure all runnables have a priority - runnables = [ts for ts in touched_tasks if ts.run_spec] + runnables = [ts for ts in touched_tasks if ts._run_spec] for ts in runnables: - if ts.priority is None and ts.run_spec: - ts.priority = (self.generation, 0) + if ts._priority is None and ts._run_spec: + ts._priority = (self.generation, 0) if restrictions: # *restrictions* is a dict keying task ids to lists of @@ -2314,21 +2314,21 @@ def update_graph( ts = self.tasks.get(k) if ts is None: continue - ts.host_restrictions = set() - ts.worker_restrictions = set() + ts._host_restrictions = set() + ts._worker_restrictions = set() for w in v: try: w = self.coerce_address(w) except ValueError: # Not a valid address, but perhaps it's a hostname - ts.host_restrictions.add(w) + ts._host_restrictions.add(w) else: - ts.worker_restrictions.add(w) + ts._worker_restrictions.add(w) if loose_restrictions: for k in loose_restrictions: ts = self.tasks[k] - ts.loose_restrictions = True + ts._loose_restrictions = True if resources: for k, v in resources.items(): @@ -2338,7 +2338,7 @@ def update_graph( ts = self.tasks.get(k) if ts is None: continue - ts.resource_restrictions = v + ts._resource_restrictions = v if retries: for k, v in retries.items(): @@ -2346,23 +2346,23 @@ def update_graph( ts = self.tasks.get(k) if ts is None: continue - ts.retries = v + ts._retries = v # Compute recommendations recommendations = {} def _priority(ets: TaskState): - return ets.priority + return ets._priority for ts in sorted(runnables, key=_priority, reverse=True): - if ts.state == "released" and ts.run_spec: - recommendations[ts.key] = "waiting" + if ts.state == "released" and ts._run_spec: + recommendations[ts._key] = "waiting" for ts in touched_tasks: - for dts in ts.dependencies: - if dts.exception_blame: - ts.exception_blame = dts.exception_blame - recommendations[ts.key] = "erred" + for dts in ts._dependencies: + if dts._exception_blame: + ts._exception_blame = dts._exception_blame + recommendations[ts._key] = "erred" break for plugin in self.plugins[:]: @@ -2386,7 +2386,7 @@ def _priority(ets: TaskState): for ts in touched_tasks: if ts.state in ("memory", "erred"): - self.report_on_key(ts.key, client=client) + self.report_on_key(ts._key, client=client) end = time() if self.digests is not None: @@ -2403,9 +2403,9 @@ def new_task(self, key, spec, state): tp = self.task_prefixes[prefix_key] except KeyError: tp = self.task_prefixes[prefix_key] = TaskPrefix(prefix_key) - ts.prefix = tp + ts._prefix = tp - group_key = ts.group_key + group_key = ts._group_key try: tg = self.task_groups[group_key] except KeyError: @@ -2424,13 +2424,13 @@ def stimulus_task_finished(self, key=None, worker=None, **kwargs): if ts is None: return {} ws: WorkerState = self.workers[worker] - ts.metadata.update(kwargs["metadata"]) + ts._metadata.update(kwargs["metadata"]) if ts.state == "processing": recommendations = self.transition(key, "memory", worker=worker, **kwargs) if ts.state == "memory": - assert ws in ts.who_has + assert ws in ts._who_has else: logger.debug( "Received already computed task, worker: %s, state: %s" @@ -2438,9 +2438,9 @@ def stimulus_task_finished(self, key=None, worker=None, **kwargs): worker, ts.state, key, - ts.who_has, + ts._who_has, ) - if ws not in ts.who_has: + if ws not in ts._who_has: self.worker_send(worker, {"op": "release-task", "key": key}) recommendations = {} @@ -2457,9 +2457,9 @@ def stimulus_task_erred( return {} if ts.state == "processing": - retries = ts.retries + retries = ts._retries if retries > 0: - ts.retries = retries - 1 + ts._retries = retries - 1 recommendations = self.transition(key, "waiting") else: recommendations = self.transition( @@ -2492,10 +2492,10 @@ def stimulus_missing_data( if cts is not None and cts.state == "memory": # couldn't find this ws: WorkerState - for ws in cts.who_has: # TODO: this behavior is extreme + for ws in cts._who_has: # TODO: this behavior is extreme ws._has_what.remove(cts) ws._nbytes -= cts.get_nbytes() - cts.who_has.clear() + cts._who_has.clear() recommendations[cause] = "released" if key: @@ -2522,7 +2522,7 @@ def stimulus_retry(self, comm=None, keys=None, client=None): key = stack.pop() seen.add(key) ts = self.tasks[key] - erred_deps = [dts.key for dts in ts.dependencies if dts.state == "erred"] + erred_deps = [dts._key for dts in ts._dependencies if dts.state == "erred"] if erred_deps: stack.extend(erred_deps) else: @@ -2593,12 +2593,12 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True): ts: TaskState for ts in list(ws._processing): - k = ts.key + k = ts._key recommendations[k] = "released" if not safe: - ts.suspicious += 1 - ts.prefix.suspicious += 1 - if ts.suspicious > self.allowed_failures: + ts._suspicious += 1 + ts._prefix.suspicious += 1 + if ts._suspicious > self.allowed_failures: del recommendations[k] e = pickle.dumps( KilledWorker(task=k, last_worker=ws.clean()), protocol=4 @@ -2608,17 +2608,17 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True): logger.info( "Task %s marked as failed because %d workers died" " while trying to run it", - ts.key, + ts._key, self.allowed_failures, ) for ts in ws._has_what: - ts.who_has.remove(ws) - if not ts.who_has: - if ts.run_spec: - recommendations[ts.key] = "released" + ts._who_has.remove(ws) + if not ts._who_has: + if ts._run_spec: + recommendations[ts._key] = "released" else: # pure data - recommendations[ts.key] = "forgotten" + recommendations[ts._key] = "forgotten" ws._has_what.clear() self.transitions(recommendations) @@ -2670,18 +2670,18 @@ def cancel_key(self, key, client, retries=5, force=False): cs: ClientState = self.clients[client] except KeyError: return - if ts is None or not ts.who_wants: # no key yet, lets try again in a moment + if ts is None or not ts._who_wants: # no key yet, lets try again in a moment if retries: self.loop.call_later( 0.2, lambda: self.cancel_key(key, client, retries - 1) ) return - if force or ts.who_wants == {cs}: # no one else wants this key - for dts in list(ts.dependents): - self.cancel_key(dts.key, client, force=force) + if force or ts._who_wants == {cs}: # no one else wants this key + for dts in list(ts._dependents): + self.cancel_key(dts._key, client, force=force) logger.info("Scheduler cancels key %s. Force=%s", key, force) self.report({"op": "cancelled-key", "key": key}) - clients = list(ts.who_wants) if force else [cs] + clients = list(ts._who_wants) if force else [cs] for cs in clients: self.client_releases_keys(keys=[key], client=cs._client_key) @@ -2696,7 +2696,7 @@ def client_desires_keys(self, keys=None, client=None): if ts is None: # For publish, queues etc. ts = self.new_task(k, None, "released") - ts.who_wants.add(cs) + ts._who_wants.add(cs) cs._wants_what.add(ts) if ts.state in ("memory", "erred"): @@ -2712,18 +2712,18 @@ def client_releases_keys(self, keys=None, client=None): ts = self.tasks.get(key) if ts is not None and ts in cs._wants_what: cs._wants_what.remove(ts) - s = ts.who_wants + s = ts._who_wants s.remove(cs) if not s: tasks2.add(ts) recommendations = {} for ts in tasks2: - if not ts.dependents: + if not ts._dependents: # No live dependents, can forget - recommendations[ts.key] = "forgotten" - elif ts.state != "erred" and not ts.waiters: - recommendations[ts.key] = "released" + recommendations[ts._key] = "forgotten" + elif ts.state != "erred" and not ts._waiters: + recommendations[ts._key] = "released" self.transitions(recommendations) @@ -2740,63 +2740,63 @@ def validate_released(self, key): ts: TaskState = self.tasks[key] dts: TaskState assert ts.state == "released" - assert not ts.waiters - assert not ts.waiting_on - assert not ts.who_has - assert not ts.processing_on - assert not any([ts in dts.waiters for dts in ts.dependencies]) + assert not ts._waiters + assert not ts._waiting_on + assert not ts._who_has + assert not ts._processing_on + assert not any([ts in dts._waiters for dts in ts._dependencies]) assert ts not in self.unrunnable def validate_waiting(self, key): ts: TaskState = self.tasks[key] dts: TaskState - assert ts.waiting_on - assert not ts.who_has - assert not ts.processing_on + assert ts._waiting_on + assert not ts._who_has + assert not ts._processing_on assert ts not in self.unrunnable - for dts in ts.dependencies: + for dts in ts._dependencies: # We are waiting on a dependency iff it's not stored - assert bool(dts.who_has) + (dts in ts.waiting_on) == 1 - assert ts in dts.waiters # XXX even if dts.who_has? + assert bool(dts._who_has) + (dts in ts._waiting_on) == 1 + assert ts in dts._waiters # XXX even if dts._who_has? def validate_processing(self, key): ts: TaskState = self.tasks[key] dts: TaskState - assert not ts.waiting_on - ws: WorkerState = ts.processing_on + assert not ts._waiting_on + ws: WorkerState = ts._processing_on assert ws assert ts in ws._processing - assert not ts.who_has - for dts in ts.dependencies: - assert dts.who_has - assert ts in dts.waiters + assert not ts._who_has + for dts in ts._dependencies: + assert dts._who_has + assert ts in dts._waiters def validate_memory(self, key): ts: TaskState = self.tasks[key] dts: TaskState - assert ts.who_has - assert not ts.processing_on - assert not ts.waiting_on + assert ts._who_has + assert not ts._processing_on + assert not ts._waiting_on assert ts not in self.unrunnable - for dts in ts.dependents: - assert (dts in ts.waiters) == (dts.state in ("waiting", "processing")) - assert ts not in dts.waiting_on + for dts in ts._dependents: + assert (dts in ts._waiters) == (dts.state in ("waiting", "processing")) + assert ts not in dts._waiting_on def validate_no_worker(self, key): ts: TaskState = self.tasks[key] dts: TaskState assert ts in self.unrunnable - assert not ts.waiting_on + assert not ts._waiting_on assert ts in self.unrunnable - assert not ts.processing_on - assert not ts.who_has - for dts in ts.dependencies: - assert dts.who_has + assert not ts._processing_on + assert not ts._who_has + for dts in ts._dependencies: + assert dts._who_has def validate_erred(self, key): ts: TaskState = self.tasks[key] - assert ts.exception_blame - assert not ts.who_has + assert ts._exception_blame + assert not ts._who_has def validate_key(self, key, ts: TaskState = None): try: @@ -2840,7 +2840,7 @@ def validate_state(self, allow_overlap=False): ts: TaskState for k, ts in self.tasks.items(): assert isinstance(ts, TaskState), (type(ts), ts) - assert ts.key == k + assert ts._key == k self.validate_key(k, ts) c: str @@ -2888,11 +2888,11 @@ def report(self, msg, ts: TaskState = None, client=None): client_keys = list(self.client_comms) elif client is None: # Notify clients interested in key - client_keys = [cs._client_key for cs in ts.who_wants] + client_keys = [cs._client_key for cs in ts._who_wants] else: # Notify clients interested in key (including `client`) client_keys = [ - cs._client_key for cs in ts.who_wants if cs._client_key != client + cs._client_key for cs in ts._who_wants if cs._client_key != client ] client_keys.append(client) @@ -2969,7 +2969,7 @@ def remove_client(self, client=None): else: ts: TaskState self.client_releases_keys( - keys=[ts.key for ts in cs._wants_what], client=cs._client_key + keys=[ts._key for ts in cs._wants_what], client=cs._client_key ) del self.clients[client] @@ -2998,26 +2998,26 @@ def send_task_to_worker(self, worker, key): msg = { "op": "compute-task", "key": key, - "priority": ts.priority, + "priority": ts._priority, "duration": self.get_task_duration(ts), } - if ts.resource_restrictions: - msg["resource_restrictions"] = ts.resource_restrictions - if ts.actor: + if ts._resource_restrictions: + msg["resource_restrictions"] = ts._resource_restrictions + if ts._actor: msg["actor"] = True - deps = ts.dependencies + deps = ts._dependencies if deps: ws: WorkerState msg["who_has"] = { - dts.key: [ws._address for ws in dts.who_has] for dts in deps + dts._key: [ws._address for ws in dts._who_has] for dts in deps } - msg["nbytes"] = {dts.key: dts.nbytes for dts in deps} + msg["nbytes"] = {dts._key: dts._nbytes for dts in deps} if self.validate and deps: assert all(msg["who_has"].values()) - task = ts.run_spec + task = ts._run_spec if type(task) is dict: msg.update(task) else: @@ -3051,7 +3051,7 @@ def handle_release_data(self, key=None, worker=None, client=None, **msg): if ts is None: return ws: WorkerState = self.workers[worker] - if ts.processing_on != ws: + if ts._processing_on != ws: return r = self.stimulus_missing_data(key=key, ensure=False, **msg) self.transitions(r) @@ -3061,16 +3061,16 @@ def handle_missing_data(self, key=None, errant_worker=None, **kwargs): self.log.append(("missing", key, errant_worker)) ts: TaskState = self.tasks.get(key) - if ts is None or not ts.who_has: + if ts is None or not ts._who_has: return if errant_worker in self.workers: ws: WorkerState = self.workers[errant_worker] - if ws in ts.who_has: - ts.who_has.remove(ws) + if ws in ts._who_has: + ts._who_has.remove(ws) ws._has_what.remove(ts) ws._nbytes -= ts.get_nbytes() - if not ts.who_has: - if ts.run_spec: + if not ts._who_has: + if ts._run_spec: self.transitions({key: "released"}) else: self.transitions({key: "forgotten"}) @@ -3085,10 +3085,10 @@ def release_worker_data(self, comm=None, keys=None, worker=None): recommendations = {} for ts in removed_tasks: ws._nbytes -= ts.get_nbytes() - wh = ts.who_has + wh = ts._who_has wh.remove(ws) if not wh: - recommendations[ts.key] = "released" + recommendations[ts._key] = "released" if recommendations: self.transitions(recommendations) @@ -3102,20 +3102,20 @@ def handle_long_running(self, key=None, worker=None, compute_duration=None): if "stealing" in self.extensions: self.extensions["stealing"].remove_key_from_stealable(ts) - ws: WorkerState = ts.processing_on + ws: WorkerState = ts._processing_on if ws is None: logger.debug("Received long-running signal from duplicate task. Ignoring.") return if compute_duration: - old_duration = ts.prefix.duration_average or 0 + old_duration = ts._prefix.duration_average or 0 new_duration = compute_duration if not old_duration: avg_duration = new_duration else: avg_duration = 0.5 * old_duration + 0.5 * new_duration - ts.prefix.duration_average = avg_duration + ts._prefix.duration_average = avg_duration ws._occupancy -= ws._processing[ts] self.total_occupancy -= ws._processing[ts] @@ -3232,7 +3232,7 @@ async def gather(self, comm=None, keys=None, serializers=None): for key in keys: ts: TaskState = self.tasks.get(key) if ts is not None: - who_has[key] = [ws._address for ws in ts.who_has] + who_has[key] = [ws._address for ws in ts._who_has] else: who_has[key] = [] @@ -3277,7 +3277,7 @@ async def gather(self, comm=None, keys=None, serializers=None): ws = self.workers.get(worker) if ws is not None and ts in ws._has_what: ws._has_what.remove(ts) - ts.who_has.remove(ws) + ts._who_has.remove(ws) ws._nbytes -= ts.get_nbytes() self.transitions({key: "released"}) @@ -3302,7 +3302,7 @@ async def restart(self, client=None, timeout=3): ts: TaskState for cs in self.clients.values(): self.client_releases_keys( - keys=[ts.key for ts in cs._wants_what], client=cs._client_key + keys=[ts._key for ts in cs._wants_what], client=cs._client_key ) ws: WorkerState @@ -3437,7 +3437,7 @@ async def _delete_worker_data(self, worker_address, keys): tasks = {self.tasks[key] for key in keys} ws._has_what -= tasks for ts in tasks: - ts.who_has.remove(ws) + ts._who_has.remove(ws) ws._nbytes -= ts.get_nbytes() self.log_event(ws._address, {"action": "remove-worker-data", "keys": keys}) @@ -3457,7 +3457,7 @@ async def rebalance(self, comm=None, keys=None, workers=None): async with self._lock: if keys: tasks = {self.tasks[k] for k in keys} - missing_data = [ts.key for ts in tasks if not ts.who_has] + missing_data = [ts._key for ts in tasks if not ts._who_has] if missing_data: return {"status": "missing-data", "keys": missing_data} else: @@ -3465,10 +3465,10 @@ async def rebalance(self, comm=None, keys=None, workers=None): if workers: workers = {self.workers[w] for w in workers} - workers_by_task = {ts: ts.who_has & workers for ts in tasks} + workers_by_task = {ts: ts._who_has & workers for ts in tasks} else: workers = set(self.workers.values()) - workers_by_task = {ts: ts.who_has for ts in tasks} + workers_by_task = {ts: ts._who_has for ts in tasks} ws: WorkerState tasks_by_worker = {ws: set() for ws in workers} @@ -3520,8 +3520,8 @@ async def rebalance(self, comm=None, keys=None, workers=None): to_recipients = defaultdict(lambda: defaultdict(list)) to_senders = defaultdict(list) for sender, recipient, ts in msgs: - to_recipients[recipient.address][ts.key].append(sender.address) - to_senders[sender.address].append(ts.key) + to_recipients[recipient.address][ts._key].append(sender.address) + to_senders[sender.address].append(ts._key) result = await asyncio.gather( *( @@ -3557,11 +3557,17 @@ async def rebalance(self, comm=None, keys=None, workers=None): for sender, recipient, ts in msgs: assert ts.state == "memory" - ts.who_has.add(recipient) + ts._who_has.add(recipient) recipient.has_what.add(ts) recipient.nbytes += ts.get_nbytes() self.log.append( - ("rebalance", ts.key, time(), sender.address, recipient.address) + ( + "rebalance", + ts._key, + time(), + sender.address, + recipient.address, + ) ) await asyncio.gather( @@ -3616,7 +3622,7 @@ async def replicate( raise ValueError("Can not use replicate to delete data") tasks = {self.tasks[k] for k in keys} - missing_data = [ts.key for ts in tasks if not ts.who_has] + missing_data = [ts._key for ts in tasks if not ts._who_has] if missing_data: return {"status": "missing-data", "keys": missing_data} @@ -3624,7 +3630,7 @@ async def replicate( if delete: del_worker_tasks = defaultdict(set) for ts in tasks: - del_candidates = ts.who_has & workers + del_candidates = ts._who_has & workers if len(del_candidates) > n: for ws in random.sample( del_candidates, len(del_candidates) - n @@ -3646,18 +3652,18 @@ async def replicate( # task is no longer needed by any client or dependant task tasks.remove(ts) continue - n_missing = n - len(ts.who_has & workers) + n_missing = n - len(ts._who_has & workers) if n_missing <= 0: # Already replicated enough tasks.remove(ts) continue - count = min(n_missing, branching_factor * len(ts.who_has)) + count = min(n_missing, branching_factor * len(ts._who_has)) assert count > 0 - for ws in random.sample(workers - ts.who_has, count): - gathers[ws._address][ts.key] = [ - wws._address for wws in ts.who_has + for ws in random.sample(workers - ts._who_has, count): + gathers[ws._address][ts._key] = [ + wws._address for wws in ts._who_has ] results = await asyncio.gather( @@ -3892,7 +3898,7 @@ async def retire_workers( # Keys orphaned by retiring those workers keys = set.union(*[w.has_what for w in workers]) - keys = {ts.key for ts in keys if ts.who_has.issubset(workers)} + keys = {ts._key for ts in keys if ts._who_has.issubset(workers)} other_workers = set(self.workers.values()) - workers if keys: @@ -3946,7 +3952,7 @@ def add_keys(self, comm=None, worker=None, keys=()): if ts not in ws._has_what: ws._nbytes += ts.get_nbytes() ws._has_what.add(ts) - ts.who_has.add(ws) + ts._who_has.add(ws) else: self.worker_send( worker, {"op": "delete-data", "keys": [key], "report": False} @@ -3982,7 +3988,7 @@ def update_data( if ts not in ws._has_what: ws._nbytes += ts.get_nbytes() ws._has_what.add(ts) - ts.who_has.add(ws) + ts._who_has.add(ws) self.report( {"op": "key-in-memory", "key": key, "workers": list(workers)} ) @@ -3999,19 +4005,19 @@ def report_on_key(self, key=None, ts: TaskState = None, client=None): self.report({"op": "cancelled-key", "key": key}, client=client) return else: - key = ts.key + key = ts._key if ts.state == "forgotten": self.report({"op": "cancelled-key", "key": key}, ts=ts, client=client) elif ts.state == "memory": self.report({"op": "key-in-memory", "key": key}, ts=ts, client=client) elif ts.state == "erred": - failing_ts: TaskState = ts.exception_blame + failing_ts: TaskState = ts._exception_blame self.report( { "op": "task-erred", "key": key, - "exception": failing_ts.exception, - "traceback": failing_ts.traceback, + "exception": failing_ts._exception, + "traceback": failing_ts._traceback, }, ts=ts, client=client, @@ -4076,10 +4082,10 @@ def get_processing(self, comm=None, workers=None): ts: TaskState if workers is not None: workers = set(map(self.coerce_address, workers)) - return {w: [ts.key for ts in self.workers[w].processing] for w in workers} + return {w: [ts._key for ts in self.workers[w].processing] for w in workers} else: return { - w: [ts.key for ts in ws._processing] for w, ws in self.workers.items() + w: [ts._key for ts in ws._processing] for w, ws in self.workers.items() } def get_who_has(self, comm=None, keys=None): @@ -4094,7 +4100,7 @@ def get_who_has(self, comm=None, keys=None): } else: return { - key: [ws._address for ws in ts.who_has] + key: [ws._address for ws in ts._who_has] for key, ts in self.tasks.items() } @@ -4104,14 +4110,14 @@ def get_has_what(self, comm=None, workers=None): if workers is not None: workers = map(self.coerce_address, workers) return { - w: [ts.key for ts in self.workers[w].has_what] + w: [ts._key for ts in self.workers[w].has_what] if w in self.workers else [] for w in workers } else: return { - w: [ts.key for ts in ws._has_what] for w, ws in self.workers.items() + w: [ts._key for ts in ws._has_what] for w, ws in self.workers.items() } def get_ncores(self, comm=None, workers=None): @@ -4132,14 +4138,14 @@ async def get_call_stack(self, comm=None, keys=None): key = stack.pop() ts = self.tasks[key] if ts.state == "waiting": - stack.extend([dts.key for dts in ts.dependencies]) + stack.extend([dts._key for dts in ts._dependencies]) elif ts.state == "processing": processing.add(ts) workers = defaultdict(list) for ts in processing: - if ts.processing_on: - workers[ts.processing_on.address].append(ts.key) + if ts._processing_on: + workers[ts._processing_on.address].append(ts._key) else: workers = {w: None for w in self.workers} @@ -4159,7 +4165,7 @@ def get_nbytes(self, comm=None, keys=None, summary=True): result = {k: self.tasks[k].nbytes for k in keys} else: result = { - k: ts.nbytes for k, ts in self.tasks.items() if ts.nbytes >= 0 + k: ts._nbytes for k, ts in self.tasks.items() if ts._nbytes >= 0 } if summary: @@ -4177,7 +4183,8 @@ def get_comm_cost(self, ts: TaskState, ws: WorkerState): """ dts: TaskState return ( - sum([dts.nbytes for dts in ts.dependencies - ws._has_what]) / self.bandwidth + sum([dts._nbytes for dts in ts._dependencies - ws._has_what]) + / self.bandwidth ) def get_task_duration(self, ts: TaskState, default=None): @@ -4185,9 +4192,9 @@ def get_task_duration(self, ts: TaskState, default=None): Get the estimated computation cost of the given task (not including any communication cost). """ - duration = ts.prefix.duration_average + duration = ts._prefix.duration_average if duration is None: - self.unknown_durations[ts.prefix.name].add(ts) + self.unknown_durations[ts._prefix.name].add(ts) if default is None: default = parse_timedelta( dask.config.get("distributed.scheduler.unknown-task-duration") @@ -4283,8 +4290,8 @@ def _remove_from_processing(self, ts: TaskState, send_worker_msg=None): """ Remove *ts* from the set of processing tasks. """ - ws: WorkerState = ts.processing_on - ts.processing_on = None + ws: WorkerState = ts._processing_on + ts._processing_on = None w = ws._address if w in self.workers: # may have been removed duration = ws._processing.pop(ts) @@ -4314,45 +4321,45 @@ def _add_to_memory( if self.validate: assert ts not in ws._has_what - ts.who_has.add(ws) + ts._who_has.add(ws) ws._has_what.add(ts) ws._nbytes += ts.get_nbytes() def _priority(ets: TaskState): - return ets.priority + return ets._priority - deps = ts.dependents + deps = ts._dependents if len(deps) > 1: deps = sorted(deps, key=_priority, reverse=True) dts: TaskState for dts in deps: - s = dts.waiting_on + s = dts._waiting_on if ts in s: s.discard(ts) if not s: # new task ready to run - recommendations[dts.key] = "processing" + recommendations[dts._key] = "processing" - for dts in ts.dependencies: - s = dts.waiters + for dts in ts._dependencies: + s = dts._waiters s.discard(ts) - if not s and not dts.who_wants: - recommendations[dts.key] = "released" + if not s and not dts._who_wants: + recommendations[dts._key] = "released" - if not ts.waiters and not ts.who_wants: - recommendations[ts.key] = "released" + if not ts._waiters and not ts._who_wants: + recommendations[ts._key] = "released" else: - msg = {"op": "key-in-memory", "key": ts.key} + msg = {"op": "key-in-memory", "key": ts._key} if type is not None: msg["type"] = type self.report(msg) ts.state = "memory" - ts.type = typename - ts.group.types.add(typename) + ts._type = typename + ts._group.types.add(typename) cs: ClientState = self.clients["fire-and-forget"] if ts in cs._wants_what: - self.client_releases_keys(client="fire-and-forget", keys=[ts.key]) + self.client_releases_keys(client="fire-and-forget", keys=[ts._key]) def transition_released_waiting(self, key): try: @@ -4360,13 +4367,13 @@ def transition_released_waiting(self, key): dts: TaskState if self.validate: - assert ts.run_spec - assert not ts.waiting_on - assert not ts.who_has - assert not ts.processing_on - assert not any([dts.state == "forgotten" for dts in ts.dependencies]) + assert ts._run_spec + assert not ts._waiting_on + assert not ts._who_has + assert not ts._processing_on + assert not any([dts.state == "forgotten" for dts in ts._dependencies]) - if ts.has_lost_dependencies: + if ts._has_lost_dependencies: return {key: "forgotten"} ts.state = "waiting" @@ -4374,24 +4381,24 @@ def transition_released_waiting(self, key): recommendations = {} dts: TaskState - for dts in ts.dependencies: - if dts.exception_blame: - ts.exception_blame = dts.exception_blame + for dts in ts._dependencies: + if dts._exception_blame: + ts._exception_blame = dts._exception_blame recommendations[key] = "erred" return recommendations - for dts in ts.dependencies: - dep = dts.key - if not dts.who_has: - ts.waiting_on.add(dts) + for dts in ts._dependencies: + dep = dts._key + if not dts._who_has: + ts._waiting_on.add(dts) if dts.state == "released": recommendations[dep] = "waiting" else: - dts.waiters.add(ts) + dts._waiters.add(ts) - ts.waiters = {dts for dts in ts.dependents if dts.state == "waiting"} + ts._waiters = {dts for dts in ts._dependents if dts.state == "waiting"} - if not ts.waiting_on: + if not ts._waiting_on: if self.workers: recommendations[key] = "processing" else: @@ -4414,29 +4421,29 @@ def transition_no_worker_waiting(self, key): if self.validate: assert ts in self.unrunnable - assert not ts.waiting_on - assert not ts.who_has - assert not ts.processing_on + assert not ts._waiting_on + assert not ts._who_has + assert not ts._processing_on self.unrunnable.remove(ts) - if ts.has_lost_dependencies: + if ts._has_lost_dependencies: return {key: "forgotten"} recommendations = {} - for dts in ts.dependencies: - dep = dts.key - if not dts.who_has: - ts.waiting_on.add(dts) + for dts in ts._dependencies: + dep = dts._key + if not dts._who_has: + ts._waiting_on.add(dts) if dts.state == "released": recommendations[dep] = "waiting" else: - dts.waiters.add(ts) + dts._waiters.add(ts) ts.state = "waiting" - if not ts.waiting_on: + if not ts._waiting_on: if self.workers: recommendations[key] = "processing" else: @@ -4458,12 +4465,12 @@ def decide_worker(self, ts: TaskState): """ valid_workers = self.valid_workers(ts) - if not valid_workers and not ts.loose_restrictions and self.workers: + if not valid_workers and not ts._loose_restrictions and self.workers: self.unrunnable.add(ts) ts.state = "no-worker" return None - if ts.dependencies or valid_workers is not True: + if ts._dependencies or valid_workers is not True: worker = decide_worker( ts, self.workers.values(), @@ -4498,13 +4505,13 @@ def transition_waiting_processing(self, key): dts: TaskState if self.validate: - assert not ts.waiting_on - assert not ts.who_has - assert not ts.exception_blame - assert not ts.processing_on - assert not ts.has_lost_dependencies + assert not ts._waiting_on + assert not ts._who_has + assert not ts._exception_blame + assert not ts._processing_on + assert not ts._has_lost_dependencies assert ts not in self.unrunnable - assert all([dts.who_has for dts in ts.dependencies]) + assert all([dts._who_has for dts in ts._dependencies]) ws: WorkerState = self.decide_worker(ts) if ws is None: @@ -4515,7 +4522,7 @@ def transition_waiting_processing(self, key): comm = self.get_comm_cost(ts, ws) ws._processing[ts] = duration + comm - ts.processing_on = ws + ts._processing_on = ws ws._occupancy += duration + comm self.total_occupancy += duration + comm ts.state = "processing" @@ -4523,7 +4530,7 @@ def transition_waiting_processing(self, key): self.check_idle_saturated(ws) self.n_tasks += 1 - if ts.actor: + if ts._actor: ws._actors.add(ts) # logger.debug("Send job to worker: %s, %s", worker, key) @@ -4545,11 +4552,11 @@ def transition_waiting_memory(self, key, nbytes=None, worker=None, **kwargs): ts: TaskState = self.tasks[key] if self.validate: - assert not ts.processing_on - assert ts.waiting_on + assert not ts._processing_on + assert ts._waiting_on assert ts.state == "waiting" - ts.waiting_on.clear() + ts._waiting_on.clear() if nbytes is not None: ts.set_nbytes(nbytes) @@ -4561,9 +4568,9 @@ def transition_waiting_memory(self, key, nbytes=None, worker=None, **kwargs): self._add_to_memory(ts, ws, recommendations, **kwargs) if self.validate: - assert not ts.processing_on - assert not ts.waiting_on - assert ts.who_has + assert not ts._processing_on + assert not ts._waiting_on + assert ts._who_has return recommendations except Exception as e: @@ -4592,23 +4599,23 @@ def transition_processing_memory( assert isinstance(worker, str) if self.validate: - assert ts.processing_on - ws = ts.processing_on + assert ts._processing_on + ws = ts._processing_on assert ts in ws._processing - assert not ts.waiting_on - assert not ts.who_has, (ts, ts.who_has) - assert not ts.exception_blame + assert not ts._waiting_on + assert not ts._who_has, (ts, ts._who_has) + assert not ts._exception_blame assert ts.state == "processing" ws = self.workers.get(worker) if ws is None: return {key: "released"} - if ws != ts.processing_on: # someone else has this task + if ws != ts._processing_on: # someone else has this task logger.info( "Unexpected worker completed task, likely due to" " work stealing. Expected: %s, Got: %s, Key: %s", - ts.processing_on, + ts._processing_on, ws, key, ) @@ -4625,7 +4632,7 @@ def transition_processing_memory( # record timings of all actions -- a cheaper way of # getting timing info compared with get_task_stream() - ts.prefix.all_durations[action] += stop - start + ts._prefix.all_durations[action] += stop - start if len(L) > 0: compute_start, compute_stop = L[0] @@ -4639,20 +4646,20 @@ def transition_processing_memory( ############################# if compute_start and ws._processing.get(ts, True): # Update average task duration for worker - old_duration = ts.prefix.duration_average or 0 + old_duration = ts._prefix.duration_average or 0 new_duration = compute_stop - compute_start if not old_duration: avg_duration = new_duration else: avg_duration = 0.5 * old_duration + 0.5 * new_duration - ts.prefix.duration_average = avg_duration - ts.group.duration += new_duration + ts._prefix.duration_average = avg_duration + ts._group.duration += new_duration tts: TaskState - for tts in self.unknown_durations.pop(ts.prefix.name, ()): - if tts.processing_on: - wws = tts.processing_on + for tts in self.unknown_durations.pop(ts._prefix.name, ()): + if tts._processing_on: + wws = tts._processing_on old = wws._processing[tts] comm = self.get_comm_cost(tts, wws) wws._processing[tts] = avg_duration + comm @@ -4672,8 +4679,8 @@ def transition_processing_memory( self._add_to_memory(ts, ws, recommendations, type=type, typename=typename) if self.validate: - assert not ts.processing_on - assert not ts.waiting_on + assert not ts._processing_on + assert not ts._waiting_on return recommendations except Exception as e: @@ -4691,50 +4698,50 @@ def transition_memory_released(self, key, safe=False): dts: TaskState if self.validate: - assert not ts.waiting_on - assert not ts.processing_on + assert not ts._waiting_on + assert not ts._processing_on if safe: - assert not ts.waiters + assert not ts._waiters - if ts.actor: - for ws in ts.who_has: + if ts._actor: + for ws in ts._who_has: ws._actors.discard(ts) - if ts.who_wants: - ts.exception_blame = ts - ts.exception = "Worker holding Actor was lost" - return {ts.key: "erred"} # don't try to recreate + if ts._who_wants: + ts._exception_blame = ts + ts._exception = "Worker holding Actor was lost" + return {ts._key: "erred"} # don't try to recreate recommendations = {} - for dts in ts.waiters: + for dts in ts._waiters: if dts.state in ("no-worker", "processing"): - recommendations[dts.key] = "waiting" + recommendations[dts._key] = "waiting" elif dts.state == "waiting": - dts.waiting_on.add(ts) + dts._waiting_on.add(ts) # XXX factor this out? - for ws in ts.who_has: + for ws in ts._who_has: ws._has_what.remove(ts) ws._nbytes -= ts.get_nbytes() - ts.group.nbytes_in_memory -= ts.get_nbytes() + ts._group.nbytes_in_memory -= ts.get_nbytes() self.worker_send( ws._address, {"op": "delete-data", "keys": [key], "report": False} ) - ts.who_has.clear() + ts._who_has.clear() ts.state = "released" self.report({"op": "lost-data", "key": key}) - if not ts.run_spec: # pure data + if not ts._run_spec: # pure data recommendations[key] = "forgotten" - elif ts.has_lost_dependencies: + elif ts._has_lost_dependencies: recommendations[key] = "forgotten" - elif ts.who_wants or ts.waiters: + elif ts._who_wants or ts._waiters: recommendations[key] = "waiting" if self.validate: - assert not ts.waiting_on + assert not ts._waiting_on return recommendations except Exception as e: @@ -4753,26 +4760,26 @@ def transition_released_erred(self, key): if self.validate: with log_errors(pdb=LOG_PDB): - assert ts.exception_blame - assert not ts.who_has - assert not ts.waiting_on - assert not ts.waiters + assert ts._exception_blame + assert not ts._who_has + assert not ts._waiting_on + assert not ts._waiters recommendations = {} - failing_ts = ts.exception_blame + failing_ts = ts._exception_blame - for dts in ts.dependents: - dts.exception_blame = failing_ts - if not dts.who_has: - recommendations[dts.key] = "erred" + for dts in ts._dependents: + dts._exception_blame = failing_ts + if not dts._who_has: + recommendations[dts._key] = "erred" self.report( { "op": "task-erred", "key": key, - "exception": failing_ts.exception, - "traceback": failing_ts.traceback, + "exception": failing_ts._exception, + "traceback": failing_ts._traceback, } ) @@ -4795,21 +4802,21 @@ def transition_erred_released(self, key): if self.validate: with log_errors(pdb=LOG_PDB): - assert all([dts.state != "erred" for dts in ts.dependencies]) - assert ts.exception_blame - assert not ts.who_has - assert not ts.waiting_on - assert not ts.waiters + assert all([dts.state != "erred" for dts in ts._dependencies]) + assert ts._exception_blame + assert not ts._who_has + assert not ts._waiting_on + assert not ts._waiters recommendations = {} - ts.exception = None - ts.exception_blame = None - ts.traceback = None + ts._exception = None + ts._exception_blame = None + ts._traceback = None - for dts in ts.dependents: + for dts in ts._dependents: if dts.state == "erred": - recommendations[dts.key] = "waiting" + recommendations[dts._key] = "waiting" self.report({"op": "task-retried", "key": key}) ts.state = "released" @@ -4828,28 +4835,28 @@ def transition_waiting_released(self, key): ts: TaskState = self.tasks[key] if self.validate: - assert not ts.who_has - assert not ts.processing_on + assert not ts._who_has + assert not ts._processing_on recommendations = {} dts: TaskState - for dts in ts.dependencies: - s = dts.waiters + for dts in ts._dependencies: + s = dts._waiters if ts in s: s.discard(ts) - if not s and not dts.who_wants: - recommendations[dts.key] = "released" - ts.waiting_on.clear() + if not s and not dts._who_wants: + recommendations[dts._key] = "released" + ts._waiting_on.clear() ts.state = "released" - if ts.has_lost_dependencies: + if ts._has_lost_dependencies: recommendations[key] = "forgotten" - elif not ts.exception_blame and (ts.who_wants or ts.waiters): + elif not ts._exception_blame and (ts._who_wants or ts._waiters): recommendations[key] = "waiting" else: - ts.waiters.clear() + ts._waiters.clear() return recommendations except Exception as e: @@ -4866,9 +4873,9 @@ def transition_processing_released(self, key): dts: TaskState if self.validate: - assert ts.processing_on - assert not ts.who_has - assert not ts.waiting_on + assert ts._processing_on + assert not ts._who_has + assert not ts._waiting_on assert self.tasks[key].state == "processing" self._remove_from_processing( @@ -4879,22 +4886,22 @@ def transition_processing_released(self, key): recommendations = {} - if ts.has_lost_dependencies: + if ts._has_lost_dependencies: recommendations[key] = "forgotten" - elif ts.waiters or ts.who_wants: + elif ts._waiters or ts._who_wants: recommendations[key] = "waiting" if recommendations.get(key) != "waiting": - for dts in ts.dependencies: + for dts in ts._dependencies: if dts.state != "released": - s = dts.waiters + s = dts._waiters s.discard(ts) - if not s and not dts.who_wants: - recommendations[dts.key] = "released" - ts.waiters.clear() + if not s and not dts._who_wants: + recommendations[dts._key] = "released" + ts._waiters.clear() if self.validate: - assert not ts.processing_on + assert not ts._processing_on return recommendations except Exception as e: @@ -4915,40 +4922,40 @@ def transition_processing_erred( failing_ts: TaskState if self.validate: - assert cause or ts.exception_blame - assert ts.processing_on - assert not ts.who_has - assert not ts.waiting_on + assert cause or ts._exception_blame + assert ts._processing_on + assert not ts._who_has + assert not ts._waiting_on - if ts.actor: - ws = ts.processing_on + if ts._actor: + ws = ts._processing_on ws._actors.remove(ts) self._remove_from_processing(ts) if exception is not None: - ts.exception = exception + ts._exception = exception if traceback is not None: - ts.traceback = traceback + ts._traceback = traceback if cause is not None: failing_ts = self.tasks[cause] - ts.exception_blame = failing_ts + ts._exception_blame = failing_ts else: - failing_ts = ts.exception_blame + failing_ts = ts._exception_blame recommendations = {} - for dts in ts.dependents: - dts.exception_blame = failing_ts - recommendations[dts.key] = "erred" + for dts in ts._dependents: + dts._exception_blame = failing_ts + recommendations[dts._key] = "erred" - for dts in ts.dependencies: - s = dts.waiters + for dts in ts._dependencies: + s = dts._waiters s.discard(ts) - if not s and not dts.who_wants: - recommendations[dts.key] = "released" + if not s and not dts._who_wants: + recommendations[dts._key] = "released" - ts.waiters.clear() # do anything with this? + ts._waiters.clear() # do anything with this? ts.state = "erred" @@ -4956,8 +4963,8 @@ def transition_processing_erred( { "op": "task-erred", "key": key, - "exception": failing_ts.exception, - "traceback": failing_ts.traceback, + "exception": failing_ts._exception, + "traceback": failing_ts._traceback, } ) @@ -4966,7 +4973,7 @@ def transition_processing_erred( self.client_releases_keys(client="fire-and-forget", keys=[key]) if self.validate: - assert not ts.processing_on + assert not ts._processing_on return recommendations except Exception as e: @@ -4984,16 +4991,16 @@ def transition_no_worker_released(self, key): if self.validate: assert self.tasks[key].state == "no-worker" - assert not ts.who_has - assert not ts.waiting_on + assert not ts._who_has + assert not ts._waiting_on self.unrunnable.remove(ts) ts.state = "released" - for dts in ts.dependencies: - dts.waiters.discard(ts) + for dts in ts._dependencies: + dts._waiters.discard(ts) - ts.waiters.clear() + ts._waiters.clear() return {} except Exception as e: @@ -5009,45 +5016,45 @@ def remove_key(self, key): assert ts.state == "forgotten" self.unrunnable.discard(ts) cs: ClientState - for cs in ts.who_wants: + for cs in ts._who_wants: cs._wants_what.remove(ts) - ts.who_wants.clear() - ts.processing_on = None - ts.exception_blame = ts.exception = ts.traceback = None + ts._who_wants.clear() + ts._processing_on = None + ts._exception_blame = ts._exception = ts._traceback = None if key in self.task_metadata: del self.task_metadata[key] def _propagate_forgotten(self, ts: TaskState, recommendations): ts.state = "forgotten" - key = ts.key + key = ts._key dts: TaskState - for dts in ts.dependents: - dts.has_lost_dependencies = True - dts.dependencies.remove(ts) - dts.waiting_on.discard(ts) + for dts in ts._dependents: + dts._has_lost_dependencies = True + dts._dependencies.remove(ts) + dts._waiting_on.discard(ts) if dts.state not in ("memory", "erred"): # Cannot compute task anymore - recommendations[dts.key] = "forgotten" - ts.dependents.clear() - ts.waiters.clear() + recommendations[dts._key] = "forgotten" + ts._dependents.clear() + ts._waiters.clear() - for dts in ts.dependencies: - dts.dependents.remove(ts) - s = dts.waiters + for dts in ts._dependencies: + dts._dependents.remove(ts) + s = dts._waiters s.discard(ts) - if not dts.dependents and not dts.who_wants: + if not dts._dependents and not dts._who_wants: # Task not needed anymore assert dts is not ts - recommendations[dts.key] = "forgotten" - ts.dependencies.clear() - ts.waiting_on.clear() + recommendations[dts._key] = "forgotten" + ts._dependencies.clear() + ts._waiting_on.clear() - if ts.who_has: - ts.group.nbytes_in_memory -= ts.get_nbytes() + if ts._who_has: + ts._group.nbytes_in_memory -= ts.get_nbytes() ws: WorkerState - for ws in ts.who_has: + for ws in ts._who_has: ws._has_what.remove(ts) ws._nbytes -= ts.get_nbytes() w = ws._address @@ -5055,7 +5062,7 @@ def _propagate_forgotten(self, ts: TaskState, recommendations): self.worker_send( w, {"op": "delete-data", "keys": [key], "report": False} ) - ts.who_has.clear() + ts._who_has.clear() def transition_memory_forgotten(self, key): ws: WorkerState @@ -5064,15 +5071,15 @@ def transition_memory_forgotten(self, key): if self.validate: assert ts.state == "memory" - assert not ts.processing_on - assert not ts.waiting_on - if not ts.run_spec: + assert not ts._processing_on + assert not ts._waiting_on + if not ts._run_spec: # It's ok to forget a pure data task pass - elif ts.has_lost_dependencies: + elif ts._has_lost_dependencies: # It's ok to forget a task with forgotten dependencies pass - elif not ts.who_wants and not ts.waiters and not ts.dependents: + elif not ts._who_wants and not ts._waiters and not ts._dependents: # It's ok to forget a task that nobody needs pass else: @@ -5080,8 +5087,8 @@ def transition_memory_forgotten(self, key): recommendations = {} - if ts.actor: - for ws in ts.who_has: + if ts._actor: + for ws in ts._who_has: ws._actors.discard(ts) self._propagate_forgotten(ts, recommendations) @@ -5104,16 +5111,16 @@ def transition_released_forgotten(self, key): if self.validate: assert ts.state in ("released", "erred") - assert not ts.who_has - assert not ts.processing_on - assert not ts.waiting_on, (ts, ts.waiting_on) - if not ts.run_spec: + assert not ts._who_has + assert not ts._processing_on + assert not ts._waiting_on, (ts, ts._waiting_on) + if not ts._run_spec: # It's ok to forget a pure data task pass - elif ts.has_lost_dependencies: + elif ts._has_lost_dependencies: # It's ok to forget a task with forgotten dependencies pass - elif not ts.who_wants and not ts.waiters and not ts.dependents: + elif not ts._who_wants and not ts._waiters and not ts._dependents: # It's ok to forget a task that nobody needs pass else: @@ -5161,8 +5168,8 @@ def transition(self, key, finish, *args, **kwargs): return {} if self.plugins: - dependents = set(ts.dependents) - dependencies = set(ts.dependencies) + dependents = set(ts._dependents) + dependencies = set(ts._dependencies) if (start, finish) in self._transitions: func = self._transitions[start, finish] @@ -5198,24 +5205,24 @@ def transition(self, key, finish, *args, **kwargs): # Temporarily put back forgotten key for plugin to retrieve it if ts.state == "forgotten": try: - ts.dependents = dependents - ts.dependencies = dependencies + ts._dependents = dependents + ts._dependencies = dependencies except KeyError: pass - self.tasks[ts.key] = ts + self.tasks[ts._key] = ts for plugin in list(self.plugins): try: plugin.transition(key, start, finish2, *args, **kwargs) except Exception: logger.info("Plugin failed with exception", exc_info=True) if ts.state == "forgotten": - del self.tasks[ts.key] + del self.tasks[ts._key] - if ts.state == "forgotten" and ts.group.name in self.task_groups: + if ts.state == "forgotten" and ts._group.name in self.task_groups: # Remove TaskGroup if all tasks are in the forgotten state - tg = ts.group + tg = ts._group if not any(tg.states.get(s) for s in ALL_TASK_STATES): - ts.prefix.groups.remove(tg) + ts._prefix.groups.remove(tg) del self.task_groups[tg.name] return recommendations @@ -5271,7 +5278,7 @@ def reschedule(self, key=None, worker=None): return if ts.state != "processing": return - if worker and ts.processing_on.address != worker: + if worker and ts._processing_on.address != worker: return self.transitions({key: "released"}) @@ -5330,13 +5337,13 @@ def valid_workers(self, ts: TaskState): """ s = True - if ts.worker_restrictions: - s = {w for w in ts.worker_restrictions if w in self.workers} + if ts._worker_restrictions: + s = {w for w in ts._worker_restrictions if w in self.workers} - if ts.host_restrictions: + if ts._host_restrictions: # Resolve the alias here rather than early, for the worker # may not be connected when host_restrictions is populated - hr = [self.coerce_hostname(h) for h in ts.host_restrictions] + hr = [self.coerce_hostname(h) for h in ts._host_restrictions] # XXX need HostState? ss = [self.host_info[h]["addresses"] for h in hr if h in self.host_info] ss = set.union(*ss) if ss else set() @@ -5345,14 +5352,14 @@ def valid_workers(self, ts: TaskState): else: s |= ss - if ts.resource_restrictions: + if ts._resource_restrictions: w = { resource: { w for w, supplied in self.resources[resource].items() if supplied >= required } - for resource, required in ts.resource_restrictions.items() + for resource, required in ts._resource_restrictions.items() } ww = set.intersection(*w.values()) @@ -5368,13 +5375,13 @@ def valid_workers(self, ts: TaskState): return {self.workers[w] for w in s} def consume_resources(self, ts: TaskState, ws: WorkerState): - if ts.resource_restrictions: - for r, required in ts.resource_restrictions.items(): + if ts._resource_restrictions: + for r, required in ts._resource_restrictions.items(): ws._used_resources[r] += required def release_resources(self, ts: TaskState, ws: WorkerState): - if ts.resource_restrictions: - for r, required in ts.resource_restrictions.items(): + if ts._resource_restrictions: + for r, required in ts._resource_restrictions.items(): ws._used_resources[r] -= required ##################### @@ -5466,12 +5473,12 @@ def worker_objective(self, ts: TaskState, ws: WorkerState): """ dts: TaskState comm_bytes = sum( - [dts.get_nbytes() for dts in ts.dependencies if ws not in dts.who_has] + [dts.get_nbytes() for dts in ts._dependencies if ws not in dts._who_has] ) stack_time = ws._occupancy / ws._nthreads start_time = comm_bytes / self.bandwidth + stack_time - if ts.actor: + if ts._actor: return (len(ws._actors), start_time, ws._nbytes) else: return (start_time, ws._nbytes) @@ -5897,13 +5904,13 @@ def decide_worker(ts: TaskState, all_workers, valid_workers, objective): *objective* function. """ dts: TaskState - deps = ts.dependencies - assert all([dts.who_has for dts in deps]) - if ts.actor: + deps = ts._dependencies + assert all([dts._who_has for dts in deps]) + if ts._actor: candidates = set(all_workers) else: ws: WorkerState - candidates = {ws for dts in deps for ws in dts.who_has} + candidates = {ws for dts in deps for ws in dts._who_has} if valid_workers is True: if not candidates: candidates = set(all_workers) @@ -5912,7 +5919,7 @@ def decide_worker(ts: TaskState, all_workers, valid_workers, objective): if not candidates: candidates = valid_workers if not candidates: - if ts.loose_restrictions: + if ts._loose_restrictions: return decide_worker(ts, all_workers, True, objective) else: return None @@ -5934,74 +5941,74 @@ def validate_task_state(ts: TaskState): assert ts.state in ALL_TASK_STATES or ts.state == "forgotten", ts - if ts.waiting_on: - assert ts.waiting_on.issubset(ts.dependencies), ( + if ts._waiting_on: + assert ts._waiting_on.issubset(ts._dependencies), ( "waiting not subset of dependencies", - str(ts.waiting_on), - str(ts.dependencies), + str(ts._waiting_on), + str(ts._dependencies), ) - if ts.waiters: - assert ts.waiters.issubset(ts.dependents), ( + if ts._waiters: + assert ts._waiters.issubset(ts._dependents), ( "waiters not subset of dependents", - str(ts.waiters), - str(ts.dependents), + str(ts._waiters), + str(ts._dependents), ) - for dts in ts.waiting_on: - assert not dts.who_has, ("waiting on in-memory dep", str(ts), str(dts)) + for dts in ts._waiting_on: + assert not dts._who_has, ("waiting on in-memory dep", str(ts), str(dts)) assert dts.state != "released", ("waiting on released dep", str(ts), str(dts)) - for dts in ts.dependencies: - assert ts in dts.dependents, ( + for dts in ts._dependencies: + assert ts in dts._dependents, ( "not in dependency's dependents", str(ts), str(dts), - str(dts.dependents), + str(dts._dependents), ) if ts.state in ("waiting", "processing"): - assert dts in ts.waiting_on or dts.who_has, ( + assert dts in ts._waiting_on or dts._who_has, ( "dep missing", str(ts), str(dts), ) assert dts.state != "forgotten" - for dts in ts.waiters: + for dts in ts._waiters: assert dts.state in ("waiting", "processing"), ( "waiter not in play", str(ts), str(dts), ) - for dts in ts.dependents: - assert ts in dts.dependencies, ( + for dts in ts._dependents: + assert ts in dts._dependencies, ( "not in dependent's dependencies", str(ts), str(dts), - str(dts.dependencies), + str(dts._dependencies), ) assert dts.state != "forgotten" - assert (ts.processing_on is not None) == (ts.state == "processing") - assert bool(ts.who_has) == (ts.state == "memory"), (ts, ts.who_has) + assert (ts._processing_on is not None) == (ts.state == "processing") + assert bool(ts._who_has) == (ts.state == "memory"), (ts, ts._who_has) if ts.state == "processing": - assert all([dts.who_has for dts in ts.dependencies]), ( + assert all([dts._who_has for dts in ts._dependencies]), ( "task processing without all deps", str(ts), - str(ts.dependencies), + str(ts._dependencies), ) - assert not ts.waiting_on + assert not ts._waiting_on - if ts.who_has: - assert ts.waiters or ts.who_wants, ( + if ts._who_has: + assert ts._waiters or ts._who_wants, ( "unneeded task in memory", str(ts), - str(ts.who_has), + str(ts._who_has), ) - if ts.run_spec: # was computed - assert ts.type - assert isinstance(ts.type, str) - assert not any([ts in dts.waiting_on for dts in ts.dependents]) - for ws in ts.who_has: + if ts._run_spec: # was computed + assert ts._type + assert isinstance(ts._type, str) + assert not any([ts in dts._waiting_on for dts in ts._dependents]) + for ws in ts._who_has: assert ts in ws._has_what, ( "not in who_has' has_what", str(ts), @@ -6009,9 +6016,9 @@ def validate_task_state(ts: TaskState): str(ws._has_what), ) - if ts.who_wants: + if ts._who_wants: cs: ClientState - for cs in ts.who_wants: + for cs in ts._who_wants: assert ts in cs._wants_what, ( "not in who_wants' wants_what", str(ts), @@ -6019,21 +6026,21 @@ def validate_task_state(ts: TaskState): str(cs._wants_what), ) - if ts.actor: + if ts._actor: if ts.state == "memory": - assert sum([ts in ws._actors for ws in ts.who_has]) == 1 + assert sum([ts in ws._actors for ws in ts._who_has]) == 1 if ts.state == "processing": - assert ts in ts.processing_on.actors + assert ts in ts._processing_on.actors def validate_worker_state(ws: WorkerState): ts: TaskState for ts in ws._has_what: - assert ws in ts.who_has, ( + assert ws in ts._who_has, ( "not in has_what' who_has", str(ws), str(ts), - str(ts.who_has), + str(ts._who_has), ) for ts in ws._actors: @@ -6058,11 +6065,11 @@ def validate_state(tasks, workers, clients): cs: ClientState for cs in clients.values(): for ts in cs._wants_what: - assert cs in ts.who_wants, ( + assert cs in ts._who_wants, ( "not in wants_what' who_wants", str(cs), str(ts), - str(ts.who_wants), + str(ts._who_wants), ) @@ -6139,7 +6146,7 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, **kwar def transition(self, key, start, finish, *args, **kwargs): if finish == "memory" or finish == "erred": ts: TaskState = self.scheduler.tasks.get(key) - if ts is not None and ts.key in self.keys: - self.metadata[key] = ts.metadata + if ts is not None and ts._key in self.keys: + self.metadata[key] = ts._metadata self.state[key] = finish self.keys.discard(key) From 7f38dd6ed6e551cbe6e4dddd06c7c88c4b019fdf Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:12 -0800 Subject: [PATCH 11/31] Add Python-level `property`s for attributes --- distributed/scheduler.py | 120 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 116 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5e678ab8ee1..5cebd8eed68 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1075,12 +1075,24 @@ def __eq__(self, other): return False @property - def state(self) -> str: - return self._state + def key(self): + return self._key @property - def prefix_key(self): - return self._prefix.name + def prefix(self): + return self._prefix + + @property + def run_spec(self): + return self._run_spec + + @property + def priority(self): + return self._priority + + @property + def state(self) -> str: + return self._state @state.setter def state(self, value: str): @@ -1088,6 +1100,106 @@ def state(self, value: str): self._group.states[value] += 1 self._state = value + @property + def dependencies(self): + return self._dependencies + + @property + def dependents(self): + return self._dependents + + @property + def has_lost_dependencies(self): + return self._has_lost_dependencies + + @property + def waiting_on(self): + return self._waiting_on + + @property + def waiters(self): + return self._waiters + + @property + def who_wants(self): + return self._who_wants + + @property + def who_has(self): + return self._who_has + + @property + def processing_on(self): + return self._processing_on + + @property + def retries(self): + return self._retries + + @property + def nbytes(self): + return self._nbytes + + @property + def type(self): + return self._type + + @property + def exception(self): + return self._exception + + @property + def traceback(self): + return self._traceback + + @property + def exception_blame(self): + return self._exception_blame + + @property + def suspicious(self): + return self._suspicious + + @property + def host_restrictions(self): + return self._host_restrictions + + @property + def worker_restrictions(self): + return self._worker_restrictions + + @property + def resource_restrictions(self): + return self._resource_restrictions + + @property + def loose_restrictions(self): + return self._loose_restrictions + + @property + def metadata(self): + return self._metadata + + @property + def annotations(self): + return self._annotations + + @property + def actor(self): + return self._actor + + @property + def group(self): + return self._group + + @property + def group_key(self): + return self._group_key + + @property + def prefix_key(self): + return self._prefix.name + def add_dependency(self, other: "TaskState"): """ Add another task as a dependency of this task """ self._dependencies.add(other) From 0c35af0069af15a3e96c607485f5e60692ff25b7 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:13 -0800 Subject: [PATCH 12/31] Add some `property.setter`s Includes `setter`s for `processing_on` and `nbytes`. --- distributed/scheduler.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5cebd8eed68..8c674a0c589 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1132,6 +1132,10 @@ def who_has(self): def processing_on(self): return self._processing_on + @processing_on.setter + def processing_on(self, v: WorkerState): + self._processing_on = v + @property def retries(self): return self._retries @@ -1140,6 +1144,10 @@ def retries(self): def nbytes(self): return self._nbytes + @nbytes.setter + def nbytes(self, v: Py_ssize_t): + self._nbytes = v + @property def type(self): return self._type From af9461f846c14a443b9d1a400fbbaab782db253a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:13 -0800 Subject: [PATCH 13/31] Drop recently added `TaskState.priority` closures Now that we have properties accessible from `TaskState`, drop the closures we added previously to access the typed values internally. This should be equivalently performant and cut out a little bit of boilerplate. --- distributed/scheduler.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8c674a0c589..f52ec4dcd39 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2471,10 +2471,7 @@ def update_graph( # Compute recommendations recommendations = {} - def _priority(ets: TaskState): - return ets._priority - - for ts in sorted(runnables, key=_priority, reverse=True): + for ts in sorted(runnables, key=operator.attrgetter("priority"), reverse=True): if ts.state == "released" and ts._run_spec: recommendations[ts._key] = "waiting" @@ -4445,12 +4442,9 @@ def _add_to_memory( ws._has_what.add(ts) ws._nbytes += ts.get_nbytes() - def _priority(ets: TaskState): - return ets._priority - deps = ts._dependents if len(deps) > 1: - deps = sorted(deps, key=_priority, reverse=True) + deps = sorted(deps, key=operator.attrgetter("priority"), reverse=True) dts: TaskState for dts in deps: s = dts._waiting_on From e834d88286a41802f772a78fe6b9f8b5c3b566ea Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:14 -0800 Subject: [PATCH 14/31] Swap assignment order with `TaskPrefix` Make sure to assign to the `TaskPrefix` variable, `tp`, first before assigning to the `dict`. This should avoid the admittedly likely low overhead of looking up the result in the dictionary when we already have the value available. --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f52ec4dcd39..aaee675ddbc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2519,7 +2519,7 @@ def new_task(self, key, spec, state): try: tp = self.task_prefixes[prefix_key] except KeyError: - tp = self.task_prefixes[prefix_key] = TaskPrefix(prefix_key) + self.task_prefixes[prefix_key] = tp = TaskPrefix(prefix_key) ts._prefix = tp group_key = ts._group_key From be1957cde24a6dd74470de9458157442aa16682c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:15 -0800 Subject: [PATCH 15/31] Assign `dask.config` query to a variable Saves us need to fetch this twice. Also makes the code a bit more readable. Finally may allow Cython optimizations on the variable later. --- distributed/scheduler.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index aaee675ddbc..29f43e7cdaf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -583,12 +583,9 @@ def __init__(self, name): # store timings for each prefix-action self.all_durations = defaultdict(float) - if self.name in dask.config.get("distributed.scheduler.default-task-durations"): - self.duration_average = parse_timedelta( - dask.config.get("distributed.scheduler.default-task-durations")[ - self.name - ] - ) + task_durations = dask.config.get("distributed.scheduler.default-task-durations") + if self.name in task_durations: + self.duration_average = parse_timedelta(task_durations[self.name]) else: self.duration_average = None self.suspicious = 0 From f5fdf67c78384737343df8857d7ff8409342cb4a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:15 -0800 Subject: [PATCH 16/31] Set `TaskPrefix.duration_average` to `-1` Instead of using `None` for `TaskPrefix.duration_average`, set it `-1`. This works better when typing `TaskPrefix.duration_average` as it can always be floating point. This also works logically with this value as it can't actually be negative unless it wasn't defined. Rework the logic around this variable to ensure it is positive semi-definite. --- distributed/scheduler.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 29f43e7cdaf..c848c792aa3 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -587,7 +587,7 @@ def __init__(self, name): if self.name in task_durations: self.duration_average = parse_timedelta(task_durations[self.name]) else: - self.duration_average = None + self.duration_average = -1 self.suspicious = 0 @property @@ -3222,9 +3222,9 @@ def handle_long_running(self, key=None, worker=None, compute_duration=None): return if compute_duration: - old_duration = ts._prefix.duration_average or 0 + old_duration = ts._prefix.duration_average new_duration = compute_duration - if not old_duration: + if old_duration < 0: avg_duration = new_duration else: avg_duration = 0.5 * old_duration + 0.5 * new_duration @@ -4307,7 +4307,7 @@ def get_task_duration(self, ts: TaskState, default=None): (not including any communication cost). """ duration = ts._prefix.duration_average - if duration is None: + if duration < 0: self.unknown_durations[ts._prefix.name].add(ts) if default is None: default = parse_timedelta( @@ -4757,9 +4757,9 @@ def transition_processing_memory( ############################# if compute_start and ws._processing.get(ts, True): # Update average task duration for worker - old_duration = ts._prefix.duration_average or 0 + old_duration = ts._prefix.duration_average new_duration = compute_stop - compute_start - if not old_duration: + if old_duration < 0: avg_duration = new_duration else: avg_duration = 0.5 * old_duration + 0.5 * new_duration From 3b4b76edbb72bc4150c1a9cc7c104bfe3304cc02 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:16 -0800 Subject: [PATCH 17/31] Annotate `TaskPrefix` for Cythonization --- distributed/scheduler.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c848c792aa3..cff2b6ea5ce 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -546,6 +546,7 @@ def ncores(self): return self._nthreads +@cclass class TaskPrefix: """Collection tracking all tasks within a group @@ -576,6 +577,12 @@ class TaskPrefix: TaskGroup """ + name: str + all_durations: object + duration_average: double + suspicious: Py_ssize_t + groups: list + def __init__(self, name): self.name = name self.groups = [] From cabe00d59edef995d528505bb7d2dd1b589d6b3c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:16 -0800 Subject: [PATCH 18/31] Annotate `TaskPrefix` constructor as well --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cff2b6ea5ce..48b03591749 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -583,7 +583,7 @@ class TaskPrefix: suspicious: Py_ssize_t groups: list - def __init__(self, name): + def __init__(self, name: str): self.name = name self.groups = [] From 47d77b15515be7dfe3b420edcc810e17fa64f769 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:17 -0800 Subject: [PATCH 19/31] Annotate all `TaskPrefix` variables --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 48b03591749..750c10eda38 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2518,6 +2518,7 @@ def update_graph( def new_task(self, key, spec, state): """ Create a new task, and associated states """ ts: TaskState = TaskState(key, spec) + tp: TaskPrefix ts._state = state prefix_key = key_split(key) try: From 9ff0dbbcf32973f59a73a54423d02df17a9a20de Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:18 -0800 Subject: [PATCH 20/31] Add `_` before all `TaskPrefix` attributes --- distributed/scheduler.py | 64 ++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 750c10eda38..ca2a2224795 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -577,35 +577,35 @@ class TaskPrefix: TaskGroup """ - name: str - all_durations: object - duration_average: double - suspicious: Py_ssize_t - groups: list + _name: str + _all_durations: object + _duration_average: double + _suspicious: Py_ssize_t + _groups: list def __init__(self, name: str): - self.name = name - self.groups = [] + self._name = name + self._groups = [] # store timings for each prefix-action - self.all_durations = defaultdict(float) + self._all_durations = defaultdict(float) task_durations = dask.config.get("distributed.scheduler.default-task-durations") - if self.name in task_durations: - self.duration_average = parse_timedelta(task_durations[self.name]) + if self._name in task_durations: + self._duration_average = parse_timedelta(task_durations[self._name]) else: - self.duration_average = -1 - self.suspicious = 0 + self._duration_average = -1 + self._suspicious = 0 @property def states(self): - return merge_with(sum, [g.states for g in self.groups]) + return merge_with(sum, [g.states for g in self._groups]) @property def active(self): return [ g - for g in self.groups + for g in self._groups if any(v != 0 for k, v in g.states.items() if k != "forgotten") ] @@ -616,7 +616,7 @@ def active_states(self): def __repr__(self): return ( "<" - + self.name + + self._name + ": " + ", ".join( "%s: %d" % (k, v) for (k, v) in sorted(self.states.items()) if v @@ -626,22 +626,22 @@ def __repr__(self): @property def nbytes_in_memory(self): - return sum(tg.nbytes_in_memory for tg in self.groups) + return sum(tg.nbytes_in_memory for tg in self._groups) @property def nbytes_total(self): - return sum(tg.nbytes_total for tg in self.groups) + return sum(tg.nbytes_total for tg in self._groups) def __len__(self): - return sum(map(len, self.groups)) + return sum(map(len, self._groups)) @property def duration(self): - return sum(tg.duration for tg in self.groups) + return sum(tg.duration for tg in self._groups) @property def types(self): - return set().union(*[tg.types for tg in self.groups]) + return set().union(*[tg.types for tg in self._groups]) class TaskGroup: @@ -1210,7 +1210,7 @@ def group_key(self): @property def prefix_key(self): - return self._prefix.name + return self._prefix._name def add_dependency(self, other: "TaskState"): """ Add another task as a dependency of this task """ @@ -2533,7 +2533,7 @@ def new_task(self, key, spec, state): except KeyError: tg = self.task_groups[group_key] = TaskGroup(group_key) tg.prefix = tp - tp.groups.append(tg) + tp._groups.append(tg) tg.add(ts) self.tasks[key] = ts return ts @@ -2719,7 +2719,7 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True): recommendations[k] = "released" if not safe: ts._suspicious += 1 - ts._prefix.suspicious += 1 + ts._prefix._suspicious += 1 if ts._suspicious > self.allowed_failures: del recommendations[k] e = pickle.dumps( @@ -3230,14 +3230,14 @@ def handle_long_running(self, key=None, worker=None, compute_duration=None): return if compute_duration: - old_duration = ts._prefix.duration_average + old_duration = ts._prefix._duration_average new_duration = compute_duration if old_duration < 0: avg_duration = new_duration else: avg_duration = 0.5 * old_duration + 0.5 * new_duration - ts._prefix.duration_average = avg_duration + ts._prefix._duration_average = avg_duration ws._occupancy -= ws._processing[ts] self.total_occupancy -= ws._processing[ts] @@ -4314,9 +4314,9 @@ def get_task_duration(self, ts: TaskState, default=None): Get the estimated computation cost of the given task (not including any communication cost). """ - duration = ts._prefix.duration_average + duration = ts._prefix._duration_average if duration < 0: - self.unknown_durations[ts._prefix.name].add(ts) + self.unknown_durations[ts._prefix._name].add(ts) if default is None: default = parse_timedelta( dask.config.get("distributed.scheduler.unknown-task-duration") @@ -4751,7 +4751,7 @@ def transition_processing_memory( # record timings of all actions -- a cheaper way of # getting timing info compared with get_task_stream() - ts._prefix.all_durations[action] += stop - start + ts._prefix._all_durations[action] += stop - start if len(L) > 0: compute_start, compute_stop = L[0] @@ -4765,18 +4765,18 @@ def transition_processing_memory( ############################# if compute_start and ws._processing.get(ts, True): # Update average task duration for worker - old_duration = ts._prefix.duration_average + old_duration = ts._prefix._duration_average new_duration = compute_stop - compute_start if old_duration < 0: avg_duration = new_duration else: avg_duration = 0.5 * old_duration + 0.5 * new_duration - ts._prefix.duration_average = avg_duration + ts._prefix._duration_average = avg_duration ts._group.duration += new_duration tts: TaskState - for tts in self.unknown_durations.pop(ts._prefix.name, ()): + for tts in self.unknown_durations.pop(ts._prefix._name, ()): if tts._processing_on: wws = tts._processing_on old = wws._processing[tts] @@ -5341,7 +5341,7 @@ def transition(self, key, finish, *args, **kwargs): # Remove TaskGroup if all tasks are in the forgotten state tg = ts._group if not any(tg.states.get(s) for s in ALL_TASK_STATES): - ts._prefix.groups.remove(tg) + ts._prefix._groups.remove(tg) del self.task_groups[tg.name] return recommendations From 9593fb006380cedb74a0084c4b42143b8dbda5c5 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:18 -0800 Subject: [PATCH 21/31] Add Python-level `property`s for attributes --- distributed/scheduler.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ca2a2224795..dcb4d4f7513 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -597,6 +597,26 @@ def __init__(self, name: str): self._duration_average = -1 self._suspicious = 0 + @property + def name(self): + return self._name + + @property + def all_durations(self): + return self._all_durations + + @property + def duration_average(self): + return self._duration_average + + @property + def suspicious(self): + return self._suspicious + + @property + def groups(self): + return self._groups + @property def states(self): return merge_with(sum, [g.states for g in self._groups]) From 5a76c8fd3a6b5f8e4fd6968c6fe87065be8ebdc7 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:19 -0800 Subject: [PATCH 22/31] Use `Py_ssize_t` in `set_nbytes` This allows Cython to perform C-level optimizations on these variables and usages thereof. --- distributed/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index dcb4d4f7513..99f9e34571d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1242,9 +1242,9 @@ def get_nbytes(self) -> int: nbytes = self._nbytes return nbytes if nbytes >= 0 else DEFAULT_DATA_SIZE - def set_nbytes(self, nbytes: int): - diff = nbytes - old_nbytes = self._nbytes + def set_nbytes(self, nbytes: Py_ssize_t): + diff: Py_ssize_t = nbytes + old_nbytes: Py_ssize_t = self._nbytes if old_nbytes >= 0: diff -= old_nbytes self._group.nbytes_total += diff From 5af80ca08e6e66221feab377a1969e6a1f4a9c5b Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:19 -0800 Subject: [PATCH 23/31] Use `tg` for `TaskGroup` variables --- distributed/scheduler.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 99f9e34571d..beda2f2aef7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -619,19 +619,19 @@ def groups(self): @property def states(self): - return merge_with(sum, [g.states for g in self._groups]) + return merge_with(sum, [tg.states for tg in self._groups]) @property def active(self): return [ - g - for g in self._groups - if any(v != 0 for k, v in g.states.items() if k != "forgotten") + tg + for tg in self._groups + if any(v != 0 for k, v in tg.states.items() if k != "forgotten") ] @property def active_states(self): - return merge_with(sum, [g.states for g in self.active]) + return merge_with(sum, [tg.states for tg in self.active]) def __repr__(self): return ( From 9802c863c3049bbcf00bb128039b12b825ea0dbf Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:20 -0800 Subject: [PATCH 24/31] Swap assignment order with `TaskGroup` Make sure to assign to the `TaskGroup` variable, `tg`, first before assigning to the `dict`. This should avoid the admittedly likely low overhead of looking up the result in the dictionary when we already have the value available. --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index beda2f2aef7..daa9dd5f070 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2551,7 +2551,7 @@ def new_task(self, key, spec, state): try: tg = self.task_groups[group_key] except KeyError: - tg = self.task_groups[group_key] = TaskGroup(group_key) + self.task_groups[group_key] = tg = TaskGroup(group_key) tg.prefix = tp tp._groups.append(tg) tg.add(ts) From 53fd21e10010cd8f2f6c22611e935920c78e72f9 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:21 -0800 Subject: [PATCH 25/31] Create `list` from generator This ensures Cython still uses `TaskGroup` to annotate the variable iterated over. Otherwise it constructs a generator with its own scope where this is ignored. --- distributed/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index daa9dd5f070..f26f8253e59 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -646,18 +646,18 @@ def __repr__(self): @property def nbytes_in_memory(self): - return sum(tg.nbytes_in_memory for tg in self._groups) + return sum([tg.nbytes_in_memory for tg in self._groups]) @property def nbytes_total(self): - return sum(tg.nbytes_total for tg in self._groups) + return sum([tg.nbytes_total for tg in self._groups]) def __len__(self): return sum(map(len, self._groups)) @property def duration(self): - return sum(tg.duration for tg in self._groups) + return sum([tg.duration for tg in self._groups]) @property def types(self): From 1414ff5030d6505614d349215ec80fa5b193b4af Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:21 -0800 Subject: [PATCH 26/31] Annotate `TaskGroup` for Cythonization --- distributed/scheduler.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f26f8253e59..a972cac039b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -664,6 +664,7 @@ def types(self): return set().union(*[tg.types for tg in self._groups]) +@cclass class TaskGroup: """Collection tracking all tasks within a group @@ -705,6 +706,15 @@ class TaskGroup: TaskPrefix """ + name: str + prefix: TaskPrefix + states: dict + dependencies: set + nbytes_total: Py_ssize_t + nbytes_in_memory: Py_ssize_t + duration: double + types: set + def __init__(self, name): self.name = name self.states = {state: 0 for state in ALL_TASK_STATES} From 9e2aa4fa8590502387ea1a05c6d868d6d5de2f8a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:22 -0800 Subject: [PATCH 27/31] Annotate `TaskGroup` constructor as well --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a972cac039b..d0f3aaf8026 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -715,7 +715,7 @@ class TaskGroup: duration: double types: set - def __init__(self, name): + def __init__(self, name: str): self.name = name self.states = {state: 0 for state in ALL_TASK_STATES} self.states["forgotten"] = 0 From cc0dbb17b1ca42a89b8eb3d1ac662a0ae196cd5d Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:22 -0800 Subject: [PATCH 28/31] Set `TaskGroup.prefix` to `None` initially --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d0f3aaf8026..bf966f9b453 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -717,6 +717,7 @@ class TaskGroup: def __init__(self, name: str): self.name = name + self.prefix = None self.states = {state: 0 for state in ALL_TASK_STATES} self.states["forgotten"] = 0 self.dependencies = set() From 1f50eb2608de02f96e5d1cf0c6ecd2db12c2dc65 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:23 -0800 Subject: [PATCH 29/31] Annotate all `TaskGroup` variables --- distributed/scheduler.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bf966f9b453..87fff37afda 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -619,10 +619,12 @@ def groups(self): @property def states(self): + tg: TaskGroup return merge_with(sum, [tg.states for tg in self._groups]) @property def active(self): + tg: TaskGroup return [ tg for tg in self._groups @@ -646,10 +648,12 @@ def __repr__(self): @property def nbytes_in_memory(self): + tg: TaskGroup return sum([tg.nbytes_in_memory for tg in self._groups]) @property def nbytes_total(self): + tg: TaskGroup return sum([tg.nbytes_total for tg in self._groups]) def __len__(self): @@ -657,10 +661,12 @@ def __len__(self): @property def duration(self): + tg: TaskGroup return sum([tg.duration for tg in self._groups]) @property def types(self): + tg: TaskGroup return set().union(*[tg.types for tg in self._groups]) @@ -2550,6 +2556,7 @@ def new_task(self, key, spec, state): """ Create a new task, and associated states """ ts: TaskState = TaskState(key, spec) tp: TaskPrefix + tg: TaskGroup ts._state = state prefix_key = key_split(key) try: @@ -5370,7 +5377,7 @@ def transition(self, key, finish, *args, **kwargs): if ts.state == "forgotten" and ts._group.name in self.task_groups: # Remove TaskGroup if all tasks are in the forgotten state - tg = ts._group + tg: TaskGroup = ts._group if not any(tg.states.get(s) for s in ALL_TASK_STATES): ts._prefix._groups.remove(tg) del self.task_groups[tg.name] From 9336f2a52a3b51f0bc89df216c47b2181c1c7316 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:24 -0800 Subject: [PATCH 30/31] Add `_` before all `TaskGroup` attributes --- distributed/scheduler.py | 82 ++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 87fff37afda..7ebeae808bb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -620,7 +620,7 @@ def groups(self): @property def states(self): tg: TaskGroup - return merge_with(sum, [tg.states for tg in self._groups]) + return merge_with(sum, [tg._states for tg in self._groups]) @property def active(self): @@ -628,12 +628,12 @@ def active(self): return [ tg for tg in self._groups - if any(v != 0 for k, v in tg.states.items() if k != "forgotten") + if any(v != 0 for k, v in tg._states.items() if k != "forgotten") ] @property def active_states(self): - return merge_with(sum, [tg.states for tg in self.active]) + return merge_with(sum, [tg._states for tg in self.active]) def __repr__(self): return ( @@ -649,12 +649,12 @@ def __repr__(self): @property def nbytes_in_memory(self): tg: TaskGroup - return sum([tg.nbytes_in_memory for tg in self._groups]) + return sum([tg._nbytes_in_memory for tg in self._groups]) @property def nbytes_total(self): tg: TaskGroup - return sum([tg.nbytes_total for tg in self._groups]) + return sum([tg._nbytes_total for tg in self._groups]) def __len__(self): return sum(map(len, self._groups)) @@ -662,12 +662,12 @@ def __len__(self): @property def duration(self): tg: TaskGroup - return sum([tg.duration for tg in self._groups]) + return sum([tg._duration for tg in self._groups]) @property def types(self): tg: TaskGroup - return set().union(*[tg.types for tg in self._groups]) + return set().union(*[tg._types for tg in self._groups]) @cclass @@ -712,44 +712,44 @@ class TaskGroup: TaskPrefix """ - name: str - prefix: TaskPrefix - states: dict - dependencies: set - nbytes_total: Py_ssize_t - nbytes_in_memory: Py_ssize_t - duration: double - types: set + _name: str + _prefix: TaskPrefix + _states: dict + _dependencies: set + _nbytes_total: Py_ssize_t + _nbytes_in_memory: Py_ssize_t + _duration: double + _types: set def __init__(self, name: str): - self.name = name - self.prefix = None - self.states = {state: 0 for state in ALL_TASK_STATES} - self.states["forgotten"] = 0 - self.dependencies = set() - self.nbytes_total = 0 - self.nbytes_in_memory = 0 - self.duration = 0 - self.types = set() + self._name = name + self._prefix = None + self._states = {state: 0 for state in ALL_TASK_STATES} + self._states["forgotten"] = 0 + self._dependencies = set() + self._nbytes_total = 0 + self._nbytes_in_memory = 0 + self._duration = 0 + self._types = set() def add(self, o): ts: TaskState = o - self.states[ts.state] += 1 + self._states[ts.state] += 1 ts._group = self def __repr__(self): return ( "<" - + (self.name or "no-group") + + (self._name or "no-group") + ": " + ", ".join( - "%s: %d" % (k, v) for (k, v) in sorted(self.states.items()) if v + "%s: %d" % (k, v) for (k, v) in sorted(self._states.items()) if v ) + ">" ) def __len__(self): - return sum(self.states.values()) + return sum(self._states.values()) @cclass @@ -1137,8 +1137,8 @@ def state(self) -> str: @state.setter def state(self, value: str): - self._group.states[self._state] -= 1 - self._group.states[value] += 1 + self._group._states[self._state] -= 1 + self._group._states[value] += 1 self._state = value @property @@ -1252,7 +1252,7 @@ def prefix_key(self): def add_dependency(self, other: "TaskState"): """ Add another task as a dependency of this task """ self._dependencies.add(other) - self._group.dependencies.add(other._group) + self._group._dependencies.add(other._group) other._dependents.add(self) def get_nbytes(self) -> int: @@ -1264,8 +1264,8 @@ def set_nbytes(self, nbytes: Py_ssize_t): old_nbytes: Py_ssize_t = self._nbytes if old_nbytes >= 0: diff -= old_nbytes - self._group.nbytes_total += diff - self._group.nbytes_in_memory += diff + self._group._nbytes_total += diff + self._group._nbytes_in_memory += diff ws: WorkerState for ws in self._who_has: ws._nbytes += diff @@ -2570,7 +2570,7 @@ def new_task(self, key, spec, state): tg = self.task_groups[group_key] except KeyError: self.task_groups[group_key] = tg = TaskGroup(group_key) - tg.prefix = tp + tg._prefix = tp tp._groups.append(tg) tg.add(ts) self.tasks[key] = ts @@ -4512,7 +4512,7 @@ def _add_to_memory( ts.state = "memory" ts._type = typename - ts._group.types.add(typename) + ts._group._types.add(typename) cs: ClientState = self.clients["fire-and-forget"] if ts in cs._wants_what: @@ -4811,7 +4811,7 @@ def transition_processing_memory( avg_duration = 0.5 * old_duration + 0.5 * new_duration ts._prefix._duration_average = avg_duration - ts._group.duration += new_duration + ts._group._duration += new_duration tts: TaskState for tts in self.unknown_durations.pop(ts._prefix._name, ()): @@ -4880,7 +4880,7 @@ def transition_memory_released(self, key, safe=False): for ws in ts._who_has: ws._has_what.remove(ts) ws._nbytes -= ts.get_nbytes() - ts._group.nbytes_in_memory -= ts.get_nbytes() + ts._group._nbytes_in_memory -= ts.get_nbytes() self.worker_send( ws._address, {"op": "delete-data", "keys": [key], "report": False} ) @@ -5208,7 +5208,7 @@ def _propagate_forgotten(self, ts: TaskState, recommendations): ts._waiting_on.clear() if ts._who_has: - ts._group.nbytes_in_memory -= ts.get_nbytes() + ts._group._nbytes_in_memory -= ts.get_nbytes() ws: WorkerState for ws in ts._who_has: @@ -5375,12 +5375,12 @@ def transition(self, key, finish, *args, **kwargs): if ts.state == "forgotten": del self.tasks[ts._key] - if ts.state == "forgotten" and ts._group.name in self.task_groups: + if ts.state == "forgotten" and ts._group._name in self.task_groups: # Remove TaskGroup if all tasks are in the forgotten state tg: TaskGroup = ts._group - if not any(tg.states.get(s) for s in ALL_TASK_STATES): + if not any(tg._states.get(s) for s in ALL_TASK_STATES): ts._prefix._groups.remove(tg) - del self.task_groups[tg.name] + del self.task_groups[tg._name] return recommendations except Exception as e: From 4aa4cbec494c9e84f2ed9d53ff5a84e5fd8ae525 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 7 Dec 2020 09:33:25 -0800 Subject: [PATCH 31/31] Add Python-level `property`s for attributes --- distributed/scheduler.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7ebeae808bb..ef7adb2b1ed 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -732,6 +732,38 @@ def __init__(self, name: str): self._duration = 0 self._types = set() + @property + def name(self): + return self._name + + @property + def prefix(self): + return self._prefix + + @property + def states(self): + return self._states + + @property + def dependencies(self): + return self._dependencies + + @property + def nbytes_total(self): + return self._nbytes_total + + @property + def nbytes_in_memory(self): + return self._nbytes_in_memory + + @property + def duration(self): + return self._duration + + @property + def types(self): + return self._types + def add(self, o): ts: TaskState = o self._states[ts.state] += 1