From b06bf322d8d714598f63cd6f0cd9d76491b5e7ba Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 11 May 2022 15:41:52 +0100 Subject: [PATCH 1/4] pyupgrade_mypy clean scheduler.py --- .pre-commit-config.yaml | 4 +- distributed/client.py | 2 +- distributed/comm/registry.py | 2 +- distributed/comm/ucx.py | 2 +- distributed/dashboard/components/scheduler.py | 4 +- distributed/http/utils.py | 2 +- distributed/scheduler.py | 42 +++++++++---------- distributed/shuffle/shuffle_extension.py | 2 +- distributed/spill.py | 2 +- distributed/versions.py | 6 +-- distributed/worker.py | 18 ++++---- docs/source/conf.py | 2 +- setup.cfg | 4 ++ 13 files changed, 45 insertions(+), 47 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a797b730ff1..37d465c9227 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,9 +10,7 @@ repos: - id: isort language_version: python3 - repo: https://github.com/asottile/pyupgrade - # Do not upgrade: there's a bug in Cython that causes sum(... for ...) to fail; - # it needs sum([... for ...]) - rev: v2.13.0 + rev: v2.32.0 hooks: - id: pyupgrade args: diff --git a/distributed/client.py b/distributed/client.py index 240695cc73c..24c4b271d12 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -52,7 +52,7 @@ from tornado.ioloop import PeriodicCallback from distributed import cluster_dump, preloading -from distributed import versions as version_module # type: ignore +from distributed import versions as version_module from distributed.batched import BatchedSend from distributed.cfexecutor import ClientExecutor from distributed.core import ( diff --git a/distributed/comm/registry.py b/distributed/comm/registry.py index 47ba730a7d9..bcdc7aa3614 100644 --- a/distributed/comm/registry.py +++ b/distributed/comm/registry.py @@ -15,7 +15,7 @@ def __call__(self, **kwargs: str) -> Iterable[importlib.metadata.EntryPoint]: if sys.version_info >= (3, 10): # py3.10 importlib.metadata type annotations are not in mypy yet # https://github.com/python/typeshed/pull/7331 - _entry_points: _EntryPoints = importlib.metadata.entry_points # type: ignore[assignment] + _entry_points: _EntryPoints = importlib.metadata.entry_points else: def _entry_points( diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 343dfab6140..739cfe39e74 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -40,7 +40,7 @@ except ImportError: pass else: - ucp = None # type: ignore + ucp = None device_array = None pre_existing_cuda_context = False diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index c4b0452a33d..54597ac2d1c 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -2794,7 +2794,7 @@ def _get_timeseries(self, restrict_to_existing=False): back = None # Remove any periods of zero compute at the front or back of the timeseries if len(self.plugin.compute): - agg = sum([np.array(v[front:]) for v in self.plugin.compute.values()]) + agg = sum(np.array(v[front:]) for v in self.plugin.compute.values()) front2 = len(agg) - len(np.trim_zeros(agg, trim="f")) front += front2 back = len(np.trim_zeros(agg, trim="b")) - len(agg) or None @@ -3192,7 +3192,7 @@ def update(self): "names": ["Scheduler", "Workers"], "values": [ s._tick_interval_observed, - sum([w.metrics["event_loop_interval"] for w in s.workers.values()]) + sum(w.metrics["event_loop_interval"] for w in s.workers.values()) / (len(s.workers) or 1), ], } diff --git a/distributed/http/utils.py b/distributed/http/utils.py index e52b3c1ed7d..7a28cd91402 100644 --- a/distributed/http/utils.py +++ b/distributed/http/utils.py @@ -38,7 +38,7 @@ def get_handlers(server, modules: list[str], prefix="/"): _routes = [] for module_name in modules: module = importlib.import_module(module_name) - _routes.extend(module.routes) # type: ignore + _routes.extend(module.routes) routes = [] diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e1f1420c6f9..2ab7501446b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -743,14 +743,14 @@ def __repr__(self) -> str: @property def nbytes_total(self) -> int: - return sum([tg.nbytes_total for tg in self.groups]) + return sum(tg.nbytes_total for tg in self.groups) def __len__(self) -> int: return sum(map(len, self.groups)) @property def duration(self) -> float: - return sum([tg.duration for tg in self.groups]) + return sum(tg.duration for tg in self.groups) @property def types(self) -> set[str]: @@ -1470,13 +1470,13 @@ def _transition(self, key, finish: str, stimulus_id: str, *args, **kwargs): recommendations.update(a_recs) for c, new_msgs in a_cmsgs.items(): - msgs = client_msgs.get(c) # type: ignore + msgs = client_msgs.get(c) if msgs is not None: msgs.extend(new_msgs) else: client_msgs[c] = new_msgs for w, new_msgs in a_wmsgs.items(): - msgs = worker_msgs.get(w) # type: ignore + msgs = worker_msgs.get(w) if msgs is not None: msgs.extend(new_msgs) else: @@ -1484,13 +1484,13 @@ def _transition(self, key, finish: str, stimulus_id: str, *args, **kwargs): recommendations.update(b_recs) for c, new_msgs in b_cmsgs.items(): - msgs = client_msgs.get(c) # type: ignore + msgs = client_msgs.get(c) if msgs is not None: msgs.extend(new_msgs) else: client_msgs[c] = new_msgs for w, new_msgs in b_wmsgs.items(): - msgs = worker_msgs.get(w) # type: ignore + msgs = worker_msgs.get(w) if msgs is not None: msgs.extend(new_msgs) else: @@ -1985,7 +1985,7 @@ def transition_processing_memory( assert not ts.exception_blame assert ts.state == "processing" - ws = self.workers.get(worker) # type: ignore + ws = self.workers.get(worker) if ws is None: recommendations[key] = "released" return recommendations, client_msgs, worker_msgs @@ -2312,7 +2312,7 @@ def transition_processing_erred( traceback=None, exception_text: str = None, traceback_text: str = None, - worker: str = None, # type: ignore + worker: str = None, **kwargs, ): ws: WorkerState @@ -3434,7 +3434,7 @@ def heartbeat_worker( ) -> dict[str, Any]: address = self.coerce_address(address, resolve_address) address = normalize_address(address) - ws: WorkerState = self.workers.get(address) # type: ignore + ws = self.workers.get(address) if ws is None: return {"status": "missing"} @@ -4755,7 +4755,7 @@ def handle_long_running(self, key=None, worker=None, compute_duration=None): def handle_worker_status_change( self, status: str, worker: str, stimulus_id: str ) -> None: - ws: WorkerState = self.workers.get(worker) # type: ignore + ws = self.workers.get(worker) if not ws: return prev_status = ws.status @@ -5267,9 +5267,9 @@ async def gather_on_worker( ) return set(who_has) - ws: WorkerState = self.workers.get(worker_address) # type: ignore + ws = self.workers.get(worker_address) - if ws is None: + if not ws: logger.warning(f"Worker {worker_address} lost during replication") return set(who_has) elif result["status"] == "OK": @@ -5321,8 +5321,8 @@ async def delete_worker_data( ) return - ws: WorkerState = self.workers.get(worker_address) # type: ignore - if ws is None: + ws = self.workers.get(worker_address) + if not ws: return for key in keys: @@ -5894,9 +5894,9 @@ def workers_to_close( groups = groupby(key, self.workers.values()) limit_bytes = { - k: sum([ws.memory_limit for ws in v]) for k, v in groups.items() + k: sum(ws.memory_limit for ws in v) for k, v in groups.items() } - group_bytes = {k: sum([ws.nbytes for ws in v]) for k, v in groups.items()} + group_bytes = {k: sum(ws.nbytes for ws in v) for k, v in groups.items()} limit = sum(limit_bytes.values()) total = sum(group_bytes.values()) @@ -6848,8 +6848,8 @@ def profile_to_figure(state): tasks_timings=tasks_timings, address=self.address, nworkers=len(self.workers), - threads=sum([ws.nthreads for ws in self.workers.values()]), - memory=format_bytes(sum([ws.memory_limit for ws in self.workers.values()])), + threads=sum(ws.nthreads for ws in self.workers.values()), + memory=format_bytes(sum(ws.memory_limit for ws in self.workers.values())), code=code, dask_version=dask.__version__, distributed_version=distributed.__version__, @@ -7083,8 +7083,8 @@ def adaptive_target(self, target_duration=None): cpu = max(1, cpu) # add more workers if more than 60% of memory is used - limit = sum([ws.memory_limit for ws in self.workers.values()]) - used = sum([ws.nbytes for ws in self.workers.values()]) + limit = sum(ws.memory_limit for ws in self.workers.values()) + used = sum(ws.nbytes for ws in self.workers.values()) memory = 0 if used > 0.6 * limit and limit > 0: memory = 2 * len(self.workers) @@ -7496,7 +7496,7 @@ def validate_task_state(ts: TaskState) -> None: if ts.actor: if ts.state == "memory": - assert sum([ts in ws.actors for ws in ts.who_has]) == 1 + assert sum(ts in ws.actors for ws in ts.who_has) == 1 if ts.state == "processing": assert ts.processing_on assert ts in ts.processing_on.actors diff --git a/distributed/shuffle/shuffle_extension.py b/distributed/shuffle/shuffle_extension.py index d2cc4dadf05..dbf69460019 100644 --- a/distributed/shuffle/shuffle_extension.py +++ b/distributed/shuffle/shuffle_extension.py @@ -197,7 +197,7 @@ def get_output_partition(self, i: int) -> pd.DataFrame: self.output_partitions_left > 0 ), f"No outputs remaining, but requested output partition {i} on {self.worker.address}." - sync(self.worker.loop, self.multi_file.flush) # type: ignore + sync(self.worker.loop, self.multi_file.flush) try: df = self.multi_file.read(i) with self.time("cpu"): diff --git a/distributed/spill.py b/distributed/spill.py index 86a43af5e00..1ba929f5207 100644 --- a/distributed/spill.py +++ b/distributed/spill.py @@ -31,7 +31,7 @@ class SpilledSize(NamedTuple): def __add__(self, other: SpilledSize) -> SpilledSize: # type: ignore return SpilledSize(self.memory + other.memory, self.disk + other.disk) - def __sub__(self, other: SpilledSize) -> SpilledSize: # type: ignore + def __sub__(self, other: SpilledSize) -> SpilledSize: return SpilledSize(self.memory - other.memory, self.disk - other.disk) diff --git a/distributed/versions.py b/distributed/versions.py index 8cd09b6a1e0..6247133cc10 100644 --- a/distributed/versions.py +++ b/distributed/versions.py @@ -73,11 +73,11 @@ def version_of_package(pkg: ModuleType) -> str | None: from contextlib import suppress with suppress(AttributeError): - return pkg.__version__ # type: ignore + return pkg.__version__ with suppress(AttributeError): - return str(pkg.version) # type: ignore + return str(pkg.version) with suppress(AttributeError): - return ".".join(map(str, pkg.version_info)) # type: ignore + return ".".join(map(str, pkg.version_info)) return None diff --git a/distributed/worker.py b/distributed/worker.py index 601b053cadb..596db0e85e1 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3154,7 +3154,7 @@ async def gather_dep( total_nbytes : int Total number of bytes for all the dependencies in to_gather combined """ - if self.status not in WORKER_ANY_RUNNING: # type: ignore + if self.status not in WORKER_ANY_RUNNING: return recommendations: Recs = {} @@ -3383,7 +3383,7 @@ def handle_worker_status_change(self, status: str, stimulus_id: str) -> None: if ( new_status == Status.closing_gracefully - and self._status not in WORKER_ANY_RUNNING # type: ignore + and self._status not in WORKER_ANY_RUNNING ): logger.error( "Invalid Worker.status transition: %s -> %s", self._status, new_status @@ -3819,7 +3819,7 @@ def _(self, ev: AlreadyCancelledEvent) -> RecsInstrs: """Task is already cancelled by the time execute() runs""" # key *must* be still in tasks. Releasing it directly is forbidden # without going through cancelled - ts = self.tasks.get(ev.key) # type: ignore + ts = self.tasks.get(ev.key) assert ts, self.story(ev.key) ts.done = True return {ts: "released"}, [] @@ -3829,7 +3829,7 @@ def _(self, ev: ExecuteSuccessEvent) -> RecsInstrs: """Task completed successfully""" # key *must* be still in tasks. Releasing it directly is forbidden # without going through cancelled - ts = self.tasks.get(ev.key) # type: ignore + ts = self.tasks.get(ev.key) assert ts, self.story(ev.key) ts.done = True @@ -3843,7 +3843,7 @@ def _(self, ev: ExecuteFailureEvent) -> RecsInstrs: """Task execution failed""" # key *must* be still in tasks. Releasing it directly is forbidden # without going through cancelled - ts = self.tasks.get(ev.key) # type: ignore + ts = self.tasks.get(ev.key) assert ts, self.story(ev.key) ts.done = True @@ -3867,7 +3867,7 @@ def _(self, ev: RescheduleEvent) -> RecsInstrs: """Task raised Reschedule exception while it was running""" # key *must* be still in tasks. Releasing it directly is forbidden # without going through cancelled - ts = self.tasks.get(ev.key) # type: ignore + ts = self.tasks.get(ev.key) assert ts, self.story(ev.key) return {ts: "rescheduled"}, [] @@ -4342,11 +4342,7 @@ def get_worker() -> Worker: return thread_state.execution_state["worker"] except AttributeError: try: - return first( - w - for w in Worker._instances - if w.status in WORKER_ANY_RUNNING # type: ignore - ) + return first(w for w in Worker._instances if w.status in WORKER_ANY_RUNNING) except StopIteration: raise ValueError("No workers found") diff --git a/docs/source/conf.py b/docs/source/conf.py index 5ceb04863c1..3d315a7a9ea 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -426,7 +426,7 @@ def copy_legacy_redirects(app, docname): f.write(page) -from docutils.parsers.rst import directives # type: ignore +from docutils.parsers.rst import directives # -- Configuration to keep autosummary in sync with autoclass::members ---------------------------------------------- # Fixes issues/3693 diff --git a/setup.cfg b/setup.cfg index 70fc09aa4da..82ec3f58af9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -98,3 +98,7 @@ timeout = 300 # Silence errors about Python 3.9-style delayed type annotations on Python 3.8 python_version = 3.9 ignore_missing_imports = true +# Can't set this permanently to True without breaking use of mypy on Windows +# See https://github.com/python/mypy/issues/12768 +warn_unused_ignores = False +warn_redundant_casts = True From d65460473f076bf0e13faf5e07905e3f20697311 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 11 May 2022 17:54:30 +0100 Subject: [PATCH 2/4] Windows support --- distributed/comm/asyncio_tcp.py | 13 +++++++++---- distributed/compatibility.py | 2 +- distributed/pytest_resourceleaks.py | 6 +++--- distributed/system.py | 17 +++++++++-------- distributed/tests/test_utils_test.py | 10 +++++++++- setup.cfg | 8 +++++--- 6 files changed, 36 insertions(+), 20 deletions(-) diff --git a/distributed/comm/asyncio_tcp.py b/distributed/comm/asyncio_tcp.py index 1dbac5fef6f..1f61b1527fe 100644 --- a/distributed/comm/asyncio_tcp.py +++ b/distributed/comm/asyncio_tcp.py @@ -6,6 +6,7 @@ import os import socket import struct +import sys import weakref from itertools import islice from typing import Any @@ -776,10 +777,14 @@ class _ZeroCopyWriter: # (which would be very large), and set a limit on the number of buffers to # pass to sendmsg. if hasattr(socket.socket, "sendmsg"): - try: - SENDMSG_MAX_COUNT = os.sysconf("SC_IOV_MAX") # type: ignore - except Exception: - SENDMSG_MAX_COUNT = 16 # Should be supported on all systems + # Note: can't use WINDOWS constant as it upsets mypy + if sys.platform == "win32": + SENDMSG_MAX_COUNT = 16 # No os.sysconf available + else: + try: + SENDMSG_MAX_COUNT = os.sysconf("SC_IOV_MAX") + except Exception: + SENDMSG_MAX_COUNT = 16 # Should be supported on all systems else: SENDMSG_MAX_COUNT = 1 # sendmsg not supported, use send instead diff --git a/distributed/compatibility.py b/distributed/compatibility.py index ca49f7b0b27..997236106fd 100644 --- a/distributed/compatibility.py +++ b/distributed/compatibility.py @@ -9,7 +9,7 @@ LINUX = sys.platform == "linux" MACOS = sys.platform == "darwin" -WINDOWS = sys.platform.startswith("win") +WINDOWS = sys.platform == "win32" if sys.version_info >= (3, 9): diff --git a/distributed/pytest_resourceleaks.py b/distributed/pytest_resourceleaks.py index b8db97a0bcb..3cec270a7b5 100644 --- a/distributed/pytest_resourceleaks.py +++ b/distributed/pytest_resourceleaks.py @@ -55,7 +55,6 @@ def test1(): import psutil import pytest -from distributed.compatibility import WINDOWS from distributed.metrics import time @@ -155,10 +154,11 @@ def format(self, before: int, after: int) -> str: class FDChecker(ResourceChecker, name="fds"): def measure(self) -> int: - if WINDOWS: + # Note: can't use WINDOWS constant as it upsets mypy + if sys.platform == "win32": # Don't use num_handles(); you'll get tens of thousands of reported leaks return 0 - return psutil.Process().num_fds() # type: ignore + return psutil.Process().num_fds() def has_leak(self, before: int, after: int) -> bool: return after > before diff --git a/distributed/system.py b/distributed/system.py index 27e896b89a2..8832148cd6b 100644 --- a/distributed/system.py +++ b/distributed/system.py @@ -1,3 +1,4 @@ +import resource import sys import psutil @@ -17,6 +18,7 @@ def memory_limit() -> int: limit = psutil.virtual_memory().total # Check cgroups if available + # Note: can't use LINUX and WINDOWS constants as they upset mypy if sys.platform == "linux": try: with open("/sys/fs/cgroup/memory/memory.limit_in_bytes") as f: @@ -27,14 +29,13 @@ def memory_limit() -> int: pass # Check rlimit if available - try: - import resource - - hard_limit = resource.getrlimit(resource.RLIMIT_RSS)[1] # type: ignore - if hard_limit > 0: - limit = min(limit, hard_limit) - except (ImportError, OSError): - pass + if sys.platform != "win32": + try: + hard_limit = resource.getrlimit(resource.RLIMIT_RSS)[1] + if hard_limit > 0: + limit = min(limit, hard_limit) + except OSError: + pass return limit diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index d28a5fe4025..c99e05532b8 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -3,6 +3,7 @@ import pathlib import signal import socket +import sys import threading from contextlib import contextmanager from time import sleep @@ -563,9 +564,16 @@ async def test_dump_cluster_unresponsive_remote_worker(c, s, a, b, tmpdir): clog_fut.cancel() +# Note: can't use WINDOWS constant as it upsets mypy +if sys.platform == "win32": + TERM_SIGNALS = (signal.SIGTERM, signal.SIGINT) +else: + TERM_SIGNALS = (signal.SIGTERM, signal.SIGHUP, signal.SIGINT) + + def garbage_process(barrier, ignore_sigterm: bool = False, t: float = 3600) -> None: if ignore_sigterm: - for signum in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT): # type: ignore + for signum in TERM_SIGNALS: signal.signal(signum, signal.SIG_IGN) barrier.wait() sleep(t) diff --git a/setup.cfg b/setup.cfg index 82ec3f58af9..d55d03bea5b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -97,8 +97,10 @@ timeout = 300 [mypy] # Silence errors about Python 3.9-style delayed type annotations on Python 3.8 python_version = 3.9 +# See https://github.com/python/mypy/issues/12286 for automatic multi-platform support +platform = linux +# platform = win32 +# platform = darwin ignore_missing_imports = true -# Can't set this permanently to True without breaking use of mypy on Windows -# See https://github.com/python/mypy/issues/12768 -warn_unused_ignores = False +warn_unused_ignores = True warn_redundant_casts = True From bd7d5d48750dea2cc94c3c5450de196146855301 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 13 May 2022 01:03:19 +0100 Subject: [PATCH 3/4] fix regression on windows --- distributed/system.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/system.py b/distributed/system.py index 8832148cd6b..7d403e044e6 100644 --- a/distributed/system.py +++ b/distributed/system.py @@ -1,4 +1,3 @@ -import resource import sys import psutil @@ -31,10 +30,12 @@ def memory_limit() -> int: # Check rlimit if available if sys.platform != "win32": try: + import resource + hard_limit = resource.getrlimit(resource.RLIMIT_RSS)[1] if hard_limit > 0: limit = min(limit, hard_limit) - except OSError: + except (ImportError, OSError): pass return limit From 8404096a789605414730840438da00900dfcc1c8 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 13 May 2022 14:59:44 +0100 Subject: [PATCH 4/4] lint --- distributed/scheduler.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index aebce51fc12..16e927294b5 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1400,7 +1400,9 @@ def new_task( # State Transitions # ##################### - def _transition(self, key, finish: str, stimulus_id: str, *args, **kwargs): + def _transition( + self, key: str, finish: str, stimulus_id: str, *args, **kwargs + ) -> tuple[dict, dict, dict]: """Transition a key from its current state to the finish state Examples @@ -1432,9 +1434,9 @@ def _transition(self, key, finish: str, stimulus_id: str, *args, **kwargs): if self.transition_counter_max: assert self.transition_counter < self.transition_counter_max - recommendations = {} - worker_msgs = {} - client_msgs = {} + recommendations: dict = {} + worker_msgs: dict = {} + client_msgs: dict = {} if self.plugins: dependents = set(ts.dependents) @@ -1444,23 +1446,17 @@ def _transition(self, key, finish: str, stimulus_id: str, *args, **kwargs): if func is not None: recommendations, client_msgs, worker_msgs = func( self, key, stimulus_id, *args, **kwargs - ) # type: ignore + ) elif "released" not in (start, finish): assert not args and not kwargs, (args, kwargs, start, finish) - a_recs: dict - a_cmsgs: dict - a_wmsgs: dict - a: tuple = self._transition(key, "released", stimulus_id) - a_recs, a_cmsgs, a_wmsgs = a + a_recs, a_cmsgs, a_wmsgs = self._transition( + key, "released", stimulus_id + ) v = a_recs.get(key, finish) func = self._TRANSITIONS_TABLE["released", v] - b_recs: dict - b_cmsgs: dict - b_wmsgs: dict - b: tuple = func(self, key, stimulus_id) # type: ignore - b_recs, b_cmsgs, b_wmsgs = b + b_recs, b_cmsgs, b_wmsgs = func(self, key, stimulus_id) recommendations.update(a_recs) for c, new_msgs in a_cmsgs.items():