From 63c60d0811356e9b6c620d801959da241cbeb145 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:53 -0800 Subject: [PATCH 01/22] Annotate `keys` & `new` --- distributed/scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bbd8f01aeda..07185ea2041 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5925,8 +5925,9 @@ def transitions(self, recommendations: dict): reach a steady state """ parent: SchedulerState = cast(SchedulerState, self) - keys = set() + keys: set = set() recommendations = recommendations.copy() + new: dict while recommendations: key, finish = recommendations.popitem() keys.add(key) From a9648a0f7c0b49e2c3fcc9eef4cf6c18ee0615ef Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:54 -0800 Subject: [PATCH 02/22] Combine `recommendations` annotation with others --- distributed/scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 07185ea2041..4bd4ce164ce 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5832,6 +5832,7 @@ def transition(self, key, finish, *args, **kwargs): """ parent: SchedulerState = cast(SchedulerState, self) ts: TaskState + recommendations: dict worker_msgs: dict client_msgs: dict try: @@ -5847,7 +5848,7 @@ def transition(self, key, finish, *args, **kwargs): dependents = set(ts._dependents) dependencies = set(ts._dependencies) - recommendations: dict = {} + recommendations = {} worker_msgs = {} client_msgs = {} if (start, finish) in self._transitions: From ead596fec8a581bcd79324a7de2de525e49b94b1 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:55 -0800 Subject: [PATCH 03/22] Annotate `dependents` & `dependencies` --- distributed/scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4bd4ce164ce..f2cf10e1a02 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5835,6 +5835,8 @@ def transition(self, key, finish, *args, **kwargs): recommendations: dict worker_msgs: dict client_msgs: dict + dependents: set + dependencies: set try: try: ts = parent._tasks[key] From 6e5bcab7dc3c683f3227c8576c707036139fc4d8 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:55 -0800 Subject: [PATCH 04/22] Annotate `start` & `finish` --- distributed/scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f2cf10e1a02..0c33936bb85 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5814,7 +5814,7 @@ async def register_worker_plugin(self, comm, plugin, name=None): # State Transitions # ##################### - def transition(self, key, finish, *args, **kwargs): + def transition(self, key, finish: str, *args, **kwargs): """Transition a key from its current state to the finish state Examples @@ -5832,6 +5832,7 @@ def transition(self, key, finish, *args, **kwargs): """ parent: SchedulerState = cast(SchedulerState, self) ts: TaskState + start: str recommendations: dict worker_msgs: dict client_msgs: dict From 23b34baeb7a07b6401f0cdf1d3fb4d8c26e29024 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:56 -0800 Subject: [PATCH 05/22] Create empty `dict` for `recommendations` once Instead of letting Cython generate empty `dict`s for each of these cases, just create an empty `dict` once and assign it to `recommendations`. That way we can just `return` it simply and avoid the C boilerplate that would otherwise be needed. --- distributed/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 0c33936bb85..9d744c35d61 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5839,19 +5839,19 @@ def transition(self, key, finish: str, *args, **kwargs): dependents: set dependencies: set try: + recommendations = {} try: ts = parent._tasks[key] except KeyError: - return {} + return recommendations start = ts._state if start == finish: - return {} + return recommendations if self.plugins: dependents = set(ts._dependents) dependencies = set(ts._dependencies) - recommendations = {} worker_msgs = {} client_msgs = {} if (start, finish) in self._transitions: From 20e0dfb576ae2c03498581cd17835114eef2b0ea Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:56 -0800 Subject: [PATCH 06/22] Use `.get(...)` to retrieve `TaskState` --- distributed/scheduler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9d744c35d61..19665c21aac 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5840,9 +5840,8 @@ def transition(self, key, finish: str, *args, **kwargs): dependencies: set try: recommendations = {} - try: - ts = parent._tasks[key] - except KeyError: + ts = parent._tasks.get(key) + if ts is None: return recommendations start = ts._state if start == finish: From dbeb0cd901360ab280adefbedebdd2f8b3d851a3 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:56 -0800 Subject: [PATCH 07/22] Assign `start, finish` to a variable --- distributed/scheduler.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 19665c21aac..0fa310e18cf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5833,6 +5833,7 @@ def transition(self, key, finish: str, *args, **kwargs): parent: SchedulerState = cast(SchedulerState, self) ts: TaskState start: str + start_finish: tuple recommendations: dict worker_msgs: dict client_msgs: dict @@ -5851,12 +5852,13 @@ def transition(self, key, finish: str, *args, **kwargs): dependents = set(ts._dependents) dependencies = set(ts._dependencies) + start_finish = (start, finish) worker_msgs = {} client_msgs = {} - if (start, finish) in self._transitions: - func = self._transitions[start, finish] + if start_finish in self._transitions: + func = self._transitions[start_finish] recommendations, worker_msgs, client_msgs = func(key, *args, **kwargs) - elif "released" not in (start, finish): + elif "released" not in start_finish: func = self._transitions["released", finish] assert not args and not kwargs a = self.transition(key, "released") @@ -5868,9 +5870,7 @@ def transition(self, key, finish: str, *args, **kwargs): recommendations = a start = "released" else: - raise RuntimeError( - "Impossible transition from %r to %r" % (start, finish) - ) + raise RuntimeError("Impossible transition from %r to %r" % start_finish) for worker, msg in worker_msgs.items(): self.worker_send(worker, msg) From 9c8b8207da0b0f92ab17ad454e17fdcf335ca66e Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:57 -0800 Subject: [PATCH 08/22] Just use `.get(...)` to retrieve transition func Avoids checking for the presence of the key and then retrieving the function corresponding the key by simply trying to get the function in the first place or `None` if it is absent. As it is pretty quick to check if something is `None` both in Python and Cython, this should speed up the check and function retrieval time. --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 0fa310e18cf..4ac9d6283e3 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5855,8 +5855,8 @@ def transition(self, key, finish: str, *args, **kwargs): start_finish = (start, finish) worker_msgs = {} client_msgs = {} - if start_finish in self._transitions: - func = self._transitions[start_finish] + func = self._transitions.get(start_finish) + if func is not None: recommendations, worker_msgs, client_msgs = func(key, *args, **kwargs) elif "released" not in start_finish: func = self._transitions["released", finish] From 2907628fd2c5cf9200b9269357eb01378e6c1c64 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:57 -0800 Subject: [PATCH 09/22] Annotate `a` & `b` --- distributed/scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4ac9d6283e3..a8ac18d04bc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5861,9 +5861,10 @@ def transition(self, key, finish: str, *args, **kwargs): elif "released" not in start_finish: func = self._transitions["released", finish] assert not args and not kwargs - a = self.transition(key, "released") + a: dict = self.transition(key, "released") if key in a: func = self._transitions["released", a[key]] + b: dict b, worker_msgs, client_msgs = func(key) a = a.copy() a.update(b) From 5d7bd6e3651e26d8e9ed17fe6fc9a1d085b82e39 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:58 -0800 Subject: [PATCH 10/22] Use `.get(...)` to get `key` from `a` Avoids looking up `key` twice. Once to see if it is there and a second time to grab it. This way we just grab the value corresponding to `key` or `None` if it is missing. The following `None` check is quite fast in both Python and Cython. --- distributed/scheduler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a8ac18d04bc..3c5902e313e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5862,8 +5862,9 @@ def transition(self, key, finish: str, *args, **kwargs): func = self._transitions["released", finish] assert not args and not kwargs a: dict = self.transition(key, "released") - if key in a: - func = self._transitions["released", a[key]] + v = a.get(key) + if v is not None: + func = self._transitions["released", v] b: dict b, worker_msgs, client_msgs = func(key) a = a.copy() From b26e527da0eb421aed89f74a9b7bbaf3e23572df Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:58 -0800 Subject: [PATCH 11/22] Just `update` `recommendations` with `a` & `b` --- distributed/scheduler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3c5902e313e..8c996b9f4aa 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5867,9 +5867,8 @@ def transition(self, key, finish: str, *args, **kwargs): func = self._transitions["released", v] b: dict b, worker_msgs, client_msgs = func(key) - a = a.copy() - a.update(b) - recommendations = a + recommendations.update(a) + recommendations.update(b) start = "released" else: raise RuntimeError("Impossible transition from %r to %r" % start_finish) From b8e09fe6b76411c8b1cc0e514ac4340a7cd24be5 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:59 -0800 Subject: [PATCH 12/22] Drop unneeded `KeyError` handling Neither of these statements should raise a `KeyError`. So just drop this `try...except...`. --- distributed/scheduler.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8c996b9f4aa..56c15f718ff 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5892,11 +5892,8 @@ def transition(self, key, finish: str, *args, **kwargs): if self.plugins: # Temporarily put back forgotten key for plugin to retrieve it if ts._state == "forgotten": - try: - ts._dependents = dependents - ts._dependencies = dependencies - except KeyError: - pass + ts._dependents = dependents + ts._dependencies = dependencies parent._tasks[ts._key] = ts for plugin in list(self.plugins): try: From 43a09bf91b65a07f9520b56bf3db5af6dfed7b20 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:52:59 -0800 Subject: [PATCH 13/22] Annotate `finish2` --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 56c15f718ff..bbb0aacbda0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5834,6 +5834,7 @@ def transition(self, key, finish: str, *args, **kwargs): ts: TaskState start: str start_finish: tuple + finish2: str recommendations: dict worker_msgs: dict client_msgs: dict From f47b9b6f772d11b80d46a8fd9a42f1e9caf95973 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:53:00 -0800 Subject: [PATCH 14/22] Replace generator with simple `for`-loop This avoids building a `list`, which makes it easier for Cython to optimize. --- distributed/scheduler.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bbb0aacbda0..6a17e8b624f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5907,7 +5907,12 @@ def transition(self, key, finish: str, *args, **kwargs): tg: TaskGroup = ts._group if ts._state == "forgotten" and tg._name in parent._task_groups: # Remove TaskGroup if all tasks are in the forgotten state - if not any([tg._states.get(s) for s in ALL_TASK_STATES]): + all_forgotten: bint = True + for s in ALL_TASK_STATES: + if tg._states.get(s): + all_forgotten = False + break + if all_forgotten: ts._prefix._groups.remove(tg) del parent._task_groups[tg._name] From a8282ddc8c02614990cc6fa7a92b3d48eebb1164 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:53:00 -0800 Subject: [PATCH 15/22] Bind `tuple` results to typed variable This should simplify the C code generated by Cython to unpack the `tuple` as it no longer needs to check if it is a `list` or some other sequence that needs to be unpacked and can simply use the `tuple` unpacking logic. --- distributed/scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6a17e8b624f..3a16ab4a3f9 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5858,7 +5858,8 @@ def transition(self, key, finish: str, *args, **kwargs): client_msgs = {} func = self._transitions.get(start_finish) if func is not None: - recommendations, worker_msgs, client_msgs = func(key, *args, **kwargs) + t: tuple = func(key, *args, **kwargs) + recommendations, worker_msgs, client_msgs = t elif "released" not in start_finish: func = self._transitions["released", finish] assert not args and not kwargs @@ -5867,7 +5868,8 @@ def transition(self, key, finish: str, *args, **kwargs): if v is not None: func = self._transitions["released", v] b: dict - b, worker_msgs, client_msgs = func(key) + t: tuple = func(key) + b, worker_msgs, client_msgs = t recommendations.update(a) recommendations.update(b) start = "released" From 3d7ad9c378f4ac6d928b075c1457f8853eea5e8a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:53:01 -0800 Subject: [PATCH 16/22] Collect `list` of messages for clients and workers --- distributed/scheduler.py | 41 ++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3a16ab4a3f9..db2fa02bd89 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1961,7 +1961,7 @@ def transition_waiting_processing(self, key): # logger.debug("Send job to worker: %s, %s", worker, key) - worker_msgs[worker] = _task_to_msg(self, ts) + worker_msgs[worker] = [_task_to_msg(self, ts)] return {}, worker_msgs, client_msgs except Exception as e: @@ -2168,11 +2168,13 @@ def transition_memory_released(self, key, safe: bint = False): ws._has_what.remove(ts) ws._nbytes -= ts.get_nbytes() ts._group._nbytes_in_memory -= ts.get_nbytes() - worker_msgs[ws._address] = { - "op": "delete-data", - "keys": [key], - "report": False, - } + worker_msgs[ws._address] = [ + { + "op": "delete-data", + "keys": [key], + "report": False, + } + ] ts._who_has.clear() @@ -2181,7 +2183,7 @@ def transition_memory_released(self, key, safe: bint = False): report_msg = {"op": "lost-data", "key": key} cs: ClientState for cs in ts._who_wants: - client_msgs[cs._client_key] = report_msg + client_msgs[cs._client_key] = [report_msg] if not ts._run_spec: # pure data recommendations[key] = "forgotten" @@ -2234,7 +2236,7 @@ def transition_released_erred(self, key): } cs: ClientState for cs in ts._who_wants: - client_msgs[cs._client_key] = report_msg + client_msgs[cs._client_key] = [report_msg] ts.state = "erred" @@ -2276,7 +2278,7 @@ def transition_erred_released(self, key): report_msg = {"op": "task-retried", "key": key} cs: ClientState for cs in ts._who_wants: - client_msgs[cs._client_key] = report_msg + client_msgs[cs._client_key] = [report_msg] ts.state = "released" @@ -2343,7 +2345,7 @@ def transition_processing_released(self, key): w: str = _remove_from_processing(self, ts) if w: - worker_msgs[w] = {"op": "release-task", "key": key} + worker_msgs[w] = [{"op": "release-task", "key": key}] ts.state = "released" @@ -2432,7 +2434,7 @@ def transition_processing_erred( } cs: ClientState for cs in ts._who_wants: - client_msgs[cs._client_key] = report_msg + client_msgs[cs._client_key] = [report_msg] cs = self._clients["fire-and-forget"] if ts in cs._wants_what: @@ -5838,6 +5840,7 @@ def transition(self, key, finish: str, *args, **kwargs): recommendations: dict worker_msgs: dict client_msgs: dict + msgs: list dependents: set dependencies: set try: @@ -5876,10 +5879,12 @@ def transition(self, key, finish: str, *args, **kwargs): else: raise RuntimeError("Impossible transition from %r to %r" % start_finish) - for worker, msg in worker_msgs.items(): - self.worker_send(worker, msg) - for client, msg in client_msgs.items(): - self.client_send(client, msg) + for worker, msgs in worker_msgs.items(): + for msg in msgs: + self.worker_send(worker, msg) + for client, msgs in client_msgs.items(): + for msg in msgs: + self.client_send(client, msg) finish2 = ts._state self.transition_log.append((key, start, finish2, recommendations, time())) @@ -6523,7 +6528,7 @@ def _add_to_memory( report_msg["type"] = type for cs in ts._who_wants: - client_msgs[cs._client_key] = report_msg + client_msgs[cs._client_key] = [report_msg] ts.state = "memory" ts._type = typename @@ -6577,7 +6582,7 @@ def _propagate_forgotten( ws._nbytes -= ts.get_nbytes() w: str = ws._address if w in state._workers_dv: # in case worker has died - worker_msgs[w] = {"op": "delete-data", "keys": [key], "report": False} + worker_msgs[w] = [{"op": "delete-data", "keys": [key], "report": False}] ts._who_has.clear() @@ -6684,7 +6689,7 @@ def _task_to_client_msgs(state: SchedulerState, ts: TaskState) -> dict: client_msgs: dict = {} for k in client_keys: - client_msgs[k] = report_msg + client_msgs[k] = [report_msg] return client_msgs From 6b62ef17e7a0d09f1be5351902c495b262bf06c2 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:53:01 -0800 Subject: [PATCH 17/22] Extend `BatchedSend`'s `send` to take many msgs --- distributed/batched.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index 313aab67b56..89e99719e9c 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -127,7 +127,7 @@ def _background_send(self): self.stopped.set() self.abort() - def send(self, msg): + def send(self, *msgs): """Schedule a message for sending to the other side This completes quickly and synchronously @@ -135,8 +135,8 @@ def send(self, msg): if self.comm is not None and self.comm.closed(): raise CommClosedError - self.message_count += 1 - self.buffer.append(msg) + self.message_count += len(msgs) + self.buffer.extend(msgs) # Avoid spurious wakeups if possible if self.next_deadline is None: self.waker.set() From 6f46e7293d4709cb56ea61451cf184f9a847a169 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:53:02 -0800 Subject: [PATCH 18/22] Add `send_all` method and use in `transition` This allows us to batch all worker and client sends into a single function. --- distributed/scheduler.py | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index db2fa02bd89..2d6409af240 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4708,6 +4708,31 @@ def client_send(self, client, msg): if self.status == Status.running: logger.critical("Tried writing to closed comm: %s", msg) + def send_all(self, client_msgs: dict, worker_msgs: dict): + """Send messages to client and workers""" + stream_comms: dict = self.stream_comms + client_comms: dict = self.client_comms + msgs: list + + for worker, msgs in worker_msgs.items(): + try: + w = stream_comms[worker] + for msg in msgs: + w.send(msg) + except (CommClosedError, AttributeError): + self.loop.add_callback(self.remove_worker, address=worker) + + for client, msgs in client_msgs.items(): + c = client_comms.get(client) + if c is None: + continue + for msg in msgs: + try: + c.send(msg) + except CommClosedError: + if self.status == Status.running: + logger.critical("Tried writing to closed comm: %s", msg) + ############################ # Less common interactions # ############################ @@ -5879,12 +5904,7 @@ def transition(self, key, finish: str, *args, **kwargs): else: raise RuntimeError("Impossible transition from %r to %r" % start_finish) - for worker, msgs in worker_msgs.items(): - for msg in msgs: - self.worker_send(worker, msg) - for client, msgs in client_msgs.items(): - for msg in msgs: - self.client_send(client, msg) + self.send_all(client_msgs, worker_msgs) finish2 = ts._state self.transition_log.append((key, start, finish2, recommendations, time())) From e2c849fff5e06fec75f460743a1cdc2c674a5924 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:53:03 -0800 Subject: [PATCH 19/22] Deliver all messages to batched send --- distributed/scheduler.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2d6409af240..a4f2c654ada 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4717,8 +4717,7 @@ def send_all(self, client_msgs: dict, worker_msgs: dict): for worker, msgs in worker_msgs.items(): try: w = stream_comms[worker] - for msg in msgs: - w.send(msg) + w.send(*msgs) except (CommClosedError, AttributeError): self.loop.add_callback(self.remove_worker, address=worker) @@ -4726,12 +4725,11 @@ def send_all(self, client_msgs: dict, worker_msgs: dict): c = client_comms.get(client) if c is None: continue - for msg in msgs: - try: - c.send(msg) - except CommClosedError: - if self.status == Status.running: - logger.critical("Tried writing to closed comm: %s", msg) + try: + c.send(*msgs) + except CommClosedError: + if self.status == Status.running: + logger.critical("Tried writing to closed comm: %s", msgs) ############################ # Less common interactions # From 8a39818cbb96d42ed4689cb955e82f0924fc9c9b Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:53:03 -0800 Subject: [PATCH 20/22] Refactor out private `_transition` function --- distributed/scheduler.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a4f2c654ada..2f63c02c9f8 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5839,12 +5839,12 @@ async def register_worker_plugin(self, comm, plugin, name=None): # State Transitions # ##################### - def transition(self, key, finish: str, *args, **kwargs): + def _transition(self, key, finish: str, *args, **kwargs): """Transition a key from its current state to the finish state Examples -------- - >>> self.transition('x', 'waiting') + >>> self._transition('x', 'waiting') {'x': 'processing'} Returns @@ -5889,7 +5889,7 @@ def transition(self, key, finish: str, *args, **kwargs): elif "released" not in start_finish: func = self._transitions["released", finish] assert not args and not kwargs - a: dict = self.transition(key, "released") + a: dict = self._transition(key, "released") v = a.get(key) if v is not None: func = self._transitions["released", v] @@ -5950,6 +5950,24 @@ def transition(self, key, finish: str, *args, **kwargs): pdb.set_trace() raise + def transition(self, key, finish: str, *args, **kwargs): + """Transition a key from its current state to the finish state + + Examples + -------- + >>> self.transition('x', 'waiting') + {'x': 'processing'} + + Returns + ------- + Dictionary of recommendations for future transitions + + See Also + -------- + Scheduler.transitions: transitive version of this function + """ + return self._transition(key, finish, *args, **kwargs) + def transitions(self, recommendations: dict): """Process transitions until none are left @@ -5963,7 +5981,7 @@ def transitions(self, recommendations: dict): while recommendations: key, finish = recommendations.popitem() keys.add(key) - new = self.transition(key, finish) + new = self._transition(key, finish) recommendations.update(new) if parent._validate: From 6da1d4797fdb6ad9b449b693e4118345c1103ee1 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:53:04 -0800 Subject: [PATCH 21/22] Send all messages after processing all transitions --- distributed/scheduler.py | 101 +++++++++++++++++++++++++++++++-------- 1 file changed, 82 insertions(+), 19 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2f63c02c9f8..cfa598b5ab3 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5864,46 +5864,79 @@ def _transition(self, key, finish: str, *args, **kwargs): worker_msgs: dict client_msgs: dict msgs: list + new_msgs: list dependents: set dependencies: set try: recommendations = {} + worker_msgs = {} + client_msgs = {} + ts = parent._tasks.get(key) if ts is None: - return recommendations + return recommendations, worker_msgs, client_msgs start = ts._state if start == finish: - return recommendations + return recommendations, worker_msgs, client_msgs if self.plugins: dependents = set(ts._dependents) dependencies = set(ts._dependencies) start_finish = (start, finish) - worker_msgs = {} - client_msgs = {} func = self._transitions.get(start_finish) if func is not None: - t: tuple = func(key, *args, **kwargs) - recommendations, worker_msgs, client_msgs = t + a: tuple = func(key, *args, **kwargs) + recommendations, worker_msgs, client_msgs = a elif "released" not in start_finish: func = self._transitions["released", finish] assert not args and not kwargs - a: dict = self._transition(key, "released") - v = a.get(key) + a_recs: dict + a_wmsgs: dict + a_cmsgs: dict + a: tuple = self._transition(key, "released") + a_recs, a_wmsgs, a_cmsgs = a + v = a_recs.get(key) if v is not None: func = self._transitions["released", v] - b: dict - t: tuple = func(key) - b, worker_msgs, client_msgs = t - recommendations.update(a) - recommendations.update(b) + b_recs: dict + b_wmsgs: dict + b_cmsgs: dict + b: tuple = func(key) + b_recs, b_wmsgs, b_cmsgs = b + + recommendations.update(a_recs) + for w, new_msgs in a_wmsgs.items(): + msgs = worker_msgs.get(w) + if msgs is not None: + msgs.extend(new_msgs) + else: + worker_msgs[w] = new_msgs + for c, new_msgs in a_cmsgs.items(): + msgs = client_msgs.get(c) + if msgs is not None: + msgs.extend(new_msgs) + else: + client_msgs[c] = new_msgs + + recommendations.update(b_recs) + for w, new_msgs in b_wmsgs.items(): + msgs = worker_msgs.get(w) + if msgs is not None: + msgs.extend(new_msgs) + else: + worker_msgs[w] = new_msgs + for c, new_msgs in b_cmsgs.items(): + msgs = client_msgs.get(c) + if msgs is not None: + msgs.extend(new_msgs) + else: + client_msgs[c] = new_msgs + start = "released" else: raise RuntimeError("Impossible transition from %r to %r" % start_finish) - self.send_all(client_msgs, worker_msgs) - finish2 = ts._state self.transition_log.append((key, start, finish2, recommendations, time())) if parent._validate: @@ -5941,7 +5974,7 @@ def _transition(self, key, finish: str, *args, **kwargs): ts._prefix._groups.remove(tg) del parent._task_groups[tg._name] - return recommendations + return recommendations, worker_msgs, client_msgs except Exception as e: logger.exception("Error transitioning %r from %r to %r", key, start, finish) if LOG_PDB: @@ -5966,7 +5999,13 @@ def transition(self, key, finish: str, *args, **kwargs): -------- Scheduler.transitions: transitive version of this function """ - return self._transition(key, finish, *args, **kwargs) + recommendations: dict + worker_msgs: dict + client_msgs: dict + a: tuple = self._transition(key, finish, *args, **kwargs) + recommendations, worker_msgs, client_msgs = a + self.send_all(client_msgs, worker_msgs) + return recommendations def transitions(self, recommendations: dict): """Process transitions until none are left @@ -5977,12 +6016,36 @@ def transitions(self, recommendations: dict): parent: SchedulerState = cast(SchedulerState, self) keys: set = set() recommendations = recommendations.copy() - new: dict + worker_msgs: dict = {} + client_msgs: dict = {} + msgs: list + new_msgs: list + new: tuple + new_recs: dict + new_wmsgs: dict + new_cmsgs: dict while recommendations: key, finish = recommendations.popitem() keys.add(key) + new = self._transition(key, finish) - recommendations.update(new) + new_recs, new_wmsgs, new_cmsgs = new + + recommendations.update(new_recs) + for w, new_msgs in new_wmsgs.items(): + msgs = worker_msgs.get(w) + if msgs is not None: + msgs.extend(new_msgs) + else: + worker_msgs[w] = new_msgs + for c, new_msgs in new_cmsgs.items(): + msgs = client_msgs.get(c) + if msgs is not None: + msgs.extend(new_msgs) + else: + client_msgs[c] = new_msgs + + self.send_all(client_msgs, worker_msgs) if parent._validate: for key in keys: From cdcfcf285ee0249c77017156008faae881d3b63a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 28 Jan 2021 16:53:05 -0800 Subject: [PATCH 22/22] `declare` `ALL_TASK_STATES` a `set` --- distributed/scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cfa598b5ab3..c9d17a7cbaa 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -182,7 +182,10 @@ def nogil(func): EventExtension, ] -ALL_TASK_STATES = {"released", "waiting", "no-worker", "processing", "erred", "memory"} +ALL_TASK_STATES = declare( + set, {"released", "waiting", "no-worker", "processing", "erred", "memory"} +) +globals()["ALL_TASK_STATES"] = ALL_TASK_STATES @final