diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a797b730ff1..37d465c9227 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,9 +10,7 @@ repos: - id: isort language_version: python3 - repo: https://github.com/asottile/pyupgrade - # Do not upgrade: there's a bug in Cython that causes sum(... for ...) to fail; - # it needs sum([... for ...]) - rev: v2.13.0 + rev: v2.32.0 hooks: - id: pyupgrade args: diff --git a/distributed/client.py b/distributed/client.py index 240695cc73c..24c4b271d12 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -52,7 +52,7 @@ from tornado.ioloop import PeriodicCallback from distributed import cluster_dump, preloading -from distributed import versions as version_module # type: ignore +from distributed import versions as version_module from distributed.batched import BatchedSend from distributed.cfexecutor import ClientExecutor from distributed.core import ( diff --git a/distributed/comm/asyncio_tcp.py b/distributed/comm/asyncio_tcp.py index 1dbac5fef6f..1f61b1527fe 100644 --- a/distributed/comm/asyncio_tcp.py +++ b/distributed/comm/asyncio_tcp.py @@ -6,6 +6,7 @@ import os import socket import struct +import sys import weakref from itertools import islice from typing import Any @@ -776,10 +777,14 @@ class _ZeroCopyWriter: # (which would be very large), and set a limit on the number of buffers to # pass to sendmsg. if hasattr(socket.socket, "sendmsg"): - try: - SENDMSG_MAX_COUNT = os.sysconf("SC_IOV_MAX") # type: ignore - except Exception: - SENDMSG_MAX_COUNT = 16 # Should be supported on all systems + # Note: can't use WINDOWS constant as it upsets mypy + if sys.platform == "win32": + SENDMSG_MAX_COUNT = 16 # No os.sysconf available + else: + try: + SENDMSG_MAX_COUNT = os.sysconf("SC_IOV_MAX") + except Exception: + SENDMSG_MAX_COUNT = 16 # Should be supported on all systems else: SENDMSG_MAX_COUNT = 1 # sendmsg not supported, use send instead diff --git a/distributed/comm/registry.py b/distributed/comm/registry.py index 47ba730a7d9..bcdc7aa3614 100644 --- a/distributed/comm/registry.py +++ b/distributed/comm/registry.py @@ -15,7 +15,7 @@ def __call__(self, **kwargs: str) -> Iterable[importlib.metadata.EntryPoint]: if sys.version_info >= (3, 10): # py3.10 importlib.metadata type annotations are not in mypy yet # https://github.com/python/typeshed/pull/7331 - _entry_points: _EntryPoints = importlib.metadata.entry_points # type: ignore[assignment] + _entry_points: _EntryPoints = importlib.metadata.entry_points else: def _entry_points( diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 343dfab6140..739cfe39e74 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -40,7 +40,7 @@ except ImportError: pass else: - ucp = None # type: ignore + ucp = None device_array = None pre_existing_cuda_context = False diff --git a/distributed/compatibility.py b/distributed/compatibility.py index ca49f7b0b27..997236106fd 100644 --- a/distributed/compatibility.py +++ b/distributed/compatibility.py @@ -9,7 +9,7 @@ LINUX = sys.platform == "linux" MACOS = sys.platform == "darwin" -WINDOWS = sys.platform.startswith("win") +WINDOWS = sys.platform == "win32" if sys.version_info >= (3, 9): diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index c4b0452a33d..54597ac2d1c 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -2794,7 +2794,7 @@ def _get_timeseries(self, restrict_to_existing=False): back = None # Remove any periods of zero compute at the front or back of the timeseries if len(self.plugin.compute): - agg = sum([np.array(v[front:]) for v in self.plugin.compute.values()]) + agg = sum(np.array(v[front:]) for v in self.plugin.compute.values()) front2 = len(agg) - len(np.trim_zeros(agg, trim="f")) front += front2 back = len(np.trim_zeros(agg, trim="b")) - len(agg) or None @@ -3192,7 +3192,7 @@ def update(self): "names": ["Scheduler", "Workers"], "values": [ s._tick_interval_observed, - sum([w.metrics["event_loop_interval"] for w in s.workers.values()]) + sum(w.metrics["event_loop_interval"] for w in s.workers.values()) / (len(s.workers) or 1), ], } diff --git a/distributed/http/utils.py b/distributed/http/utils.py index e52b3c1ed7d..7a28cd91402 100644 --- a/distributed/http/utils.py +++ b/distributed/http/utils.py @@ -38,7 +38,7 @@ def get_handlers(server, modules: list[str], prefix="/"): _routes = [] for module_name in modules: module = importlib.import_module(module_name) - _routes.extend(module.routes) # type: ignore + _routes.extend(module.routes) routes = [] diff --git a/distributed/pytest_resourceleaks.py b/distributed/pytest_resourceleaks.py index b8db97a0bcb..3cec270a7b5 100644 --- a/distributed/pytest_resourceleaks.py +++ b/distributed/pytest_resourceleaks.py @@ -55,7 +55,6 @@ def test1(): import psutil import pytest -from distributed.compatibility import WINDOWS from distributed.metrics import time @@ -155,10 +154,11 @@ def format(self, before: int, after: int) -> str: class FDChecker(ResourceChecker, name="fds"): def measure(self) -> int: - if WINDOWS: + # Note: can't use WINDOWS constant as it upsets mypy + if sys.platform == "win32": # Don't use num_handles(); you'll get tens of thousands of reported leaks return 0 - return psutil.Process().num_fds() # type: ignore + return psutil.Process().num_fds() def has_leak(self, before: int, after: int) -> bool: return after > before diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8cf312feb2e..16e927294b5 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -744,14 +744,14 @@ def __repr__(self) -> str: @property def nbytes_total(self) -> int: - return sum([tg.nbytes_total for tg in self.groups]) + return sum(tg.nbytes_total for tg in self.groups) def __len__(self) -> int: return sum(map(len, self.groups)) @property def duration(self) -> float: - return sum([tg.duration for tg in self.groups]) + return sum(tg.duration for tg in self.groups) @property def types(self) -> set[str]: @@ -1400,7 +1400,9 @@ def new_task( # State Transitions # ##################### - def _transition(self, key, finish: str, stimulus_id: str, *args, **kwargs): + def _transition( + self, key: str, finish: str, stimulus_id: str, *args, **kwargs + ) -> tuple[dict, dict, dict]: """Transition a key from its current state to the finish state Examples @@ -1432,9 +1434,9 @@ def _transition(self, key, finish: str, stimulus_id: str, *args, **kwargs): if self.transition_counter_max: assert self.transition_counter < self.transition_counter_max - recommendations = {} # type: ignore - worker_msgs = {} # type: ignore - client_msgs = {} # type: ignore + recommendations: dict = {} + worker_msgs: dict = {} + client_msgs: dict = {} if self.plugins: dependents = set(ts.dependents) @@ -1444,33 +1446,27 @@ def _transition(self, key, finish: str, stimulus_id: str, *args, **kwargs): if func is not None: recommendations, client_msgs, worker_msgs = func( self, key, stimulus_id, *args, **kwargs - ) # type: ignore + ) elif "released" not in (start, finish): assert not args and not kwargs, (args, kwargs, start, finish) - a_recs: dict - a_cmsgs: dict - a_wmsgs: dict - a: tuple = self._transition(key, "released", stimulus_id) - a_recs, a_cmsgs, a_wmsgs = a + a_recs, a_cmsgs, a_wmsgs = self._transition( + key, "released", stimulus_id + ) v = a_recs.get(key, finish) func = self._TRANSITIONS_TABLE["released", v] - b_recs: dict - b_cmsgs: dict - b_wmsgs: dict - b: tuple = func(self, key, stimulus_id) # type: ignore - b_recs, b_cmsgs, b_wmsgs = b + b_recs, b_cmsgs, b_wmsgs = func(self, key, stimulus_id) recommendations.update(a_recs) for c, new_msgs in a_cmsgs.items(): - msgs = client_msgs.get(c) # type: ignore + msgs = client_msgs.get(c) if msgs is not None: msgs.extend(new_msgs) else: client_msgs[c] = new_msgs for w, new_msgs in a_wmsgs.items(): - msgs = worker_msgs.get(w) # type: ignore + msgs = worker_msgs.get(w) if msgs is not None: msgs.extend(new_msgs) else: @@ -1478,13 +1474,13 @@ def _transition(self, key, finish: str, stimulus_id: str, *args, **kwargs): recommendations.update(b_recs) for c, new_msgs in b_cmsgs.items(): - msgs = client_msgs.get(c) # type: ignore + msgs = client_msgs.get(c) if msgs is not None: msgs.extend(new_msgs) else: client_msgs[c] = new_msgs for w, new_msgs in b_wmsgs.items(): - msgs = worker_msgs.get(w) # type: ignore + msgs = worker_msgs.get(w) if msgs is not None: msgs.extend(new_msgs) else: @@ -1953,7 +1949,7 @@ def transition_processing_memory( assert not ts.exception_blame assert ts.state == "processing" - ws = self.workers.get(worker) # type: ignore + ws = self.workers.get(worker) if ws is None: recommendations[key] = "released" return recommendations, client_msgs, worker_msgs @@ -2280,7 +2276,7 @@ def transition_processing_erred( traceback=None, exception_text: str = None, traceback_text: str = None, - worker: str = None, # type: ignore + worker: str = None, **kwargs, ): ws: WorkerState @@ -3455,7 +3451,7 @@ def heartbeat_worker( ) -> dict[str, Any]: address = self.coerce_address(address, resolve_address) address = normalize_address(address) - ws: WorkerState = self.workers.get(address) # type: ignore + ws = self.workers.get(address) if ws is None: return {"status": "missing"} @@ -4778,7 +4774,7 @@ def handle_long_running(self, key=None, worker=None, compute_duration=None): def handle_worker_status_change( self, status: str, worker: str, stimulus_id: str ) -> None: - ws: WorkerState = self.workers.get(worker) # type: ignore + ws = self.workers.get(worker) if not ws: return prev_status = ws.status @@ -5290,9 +5286,9 @@ async def gather_on_worker( ) return set(who_has) - ws: WorkerState = self.workers.get(worker_address) # type: ignore + ws = self.workers.get(worker_address) - if ws is None: + if not ws: logger.warning(f"Worker {worker_address} lost during replication") return set(who_has) elif result["status"] == "OK": @@ -5344,8 +5340,8 @@ async def delete_worker_data( ) return - ws: WorkerState = self.workers.get(worker_address) # type: ignore - if ws is None: + ws = self.workers.get(worker_address) + if not ws: return for key in keys: @@ -5917,9 +5913,9 @@ def workers_to_close( groups = groupby(key, self.workers.values()) limit_bytes = { - k: sum([ws.memory_limit for ws in v]) for k, v in groups.items() + k: sum(ws.memory_limit for ws in v) for k, v in groups.items() } - group_bytes = {k: sum([ws.nbytes for ws in v]) for k, v in groups.items()} + group_bytes = {k: sum(ws.nbytes for ws in v) for k, v in groups.items()} limit = sum(limit_bytes.values()) total = sum(group_bytes.values()) @@ -6871,8 +6867,8 @@ def profile_to_figure(state): tasks_timings=tasks_timings, address=self.address, nworkers=len(self.workers), - threads=sum([ws.nthreads for ws in self.workers.values()]), - memory=format_bytes(sum([ws.memory_limit for ws in self.workers.values()])), + threads=sum(ws.nthreads for ws in self.workers.values()), + memory=format_bytes(sum(ws.memory_limit for ws in self.workers.values())), code=code, dask_version=dask.__version__, distributed_version=distributed.__version__, @@ -7106,8 +7102,8 @@ def adaptive_target(self, target_duration=None): cpu = max(1, cpu) # add more workers if more than 60% of memory is used - limit = sum([ws.memory_limit for ws in self.workers.values()]) - used = sum([ws.nbytes for ws in self.workers.values()]) + limit = sum(ws.memory_limit for ws in self.workers.values()) + used = sum(ws.nbytes for ws in self.workers.values()) memory = 0 if used > 0.6 * limit and limit > 0: memory = 2 * len(self.workers) @@ -7519,7 +7515,7 @@ def validate_task_state(ts: TaskState) -> None: if ts.actor: if ts.state == "memory": - assert sum([ts in ws.actors for ws in ts.who_has]) == 1 + assert sum(ts in ws.actors for ws in ts.who_has) == 1 if ts.state == "processing": assert ts.processing_on assert ts in ts.processing_on.actors diff --git a/distributed/shuffle/shuffle_extension.py b/distributed/shuffle/shuffle_extension.py index d2cc4dadf05..dbf69460019 100644 --- a/distributed/shuffle/shuffle_extension.py +++ b/distributed/shuffle/shuffle_extension.py @@ -197,7 +197,7 @@ def get_output_partition(self, i: int) -> pd.DataFrame: self.output_partitions_left > 0 ), f"No outputs remaining, but requested output partition {i} on {self.worker.address}." - sync(self.worker.loop, self.multi_file.flush) # type: ignore + sync(self.worker.loop, self.multi_file.flush) try: df = self.multi_file.read(i) with self.time("cpu"): diff --git a/distributed/spill.py b/distributed/spill.py index 86a43af5e00..1ba929f5207 100644 --- a/distributed/spill.py +++ b/distributed/spill.py @@ -31,7 +31,7 @@ class SpilledSize(NamedTuple): def __add__(self, other: SpilledSize) -> SpilledSize: # type: ignore return SpilledSize(self.memory + other.memory, self.disk + other.disk) - def __sub__(self, other: SpilledSize) -> SpilledSize: # type: ignore + def __sub__(self, other: SpilledSize) -> SpilledSize: return SpilledSize(self.memory - other.memory, self.disk - other.disk) diff --git a/distributed/system.py b/distributed/system.py index 27e896b89a2..7d403e044e6 100644 --- a/distributed/system.py +++ b/distributed/system.py @@ -17,6 +17,7 @@ def memory_limit() -> int: limit = psutil.virtual_memory().total # Check cgroups if available + # Note: can't use LINUX and WINDOWS constants as they upset mypy if sys.platform == "linux": try: with open("/sys/fs/cgroup/memory/memory.limit_in_bytes") as f: @@ -27,14 +28,15 @@ def memory_limit() -> int: pass # Check rlimit if available - try: - import resource - - hard_limit = resource.getrlimit(resource.RLIMIT_RSS)[1] # type: ignore - if hard_limit > 0: - limit = min(limit, hard_limit) - except (ImportError, OSError): - pass + if sys.platform != "win32": + try: + import resource + + hard_limit = resource.getrlimit(resource.RLIMIT_RSS)[1] + if hard_limit > 0: + limit = min(limit, hard_limit) + except (ImportError, OSError): + pass return limit diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index d28a5fe4025..c99e05532b8 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -3,6 +3,7 @@ import pathlib import signal import socket +import sys import threading from contextlib import contextmanager from time import sleep @@ -563,9 +564,16 @@ async def test_dump_cluster_unresponsive_remote_worker(c, s, a, b, tmpdir): clog_fut.cancel() +# Note: can't use WINDOWS constant as it upsets mypy +if sys.platform == "win32": + TERM_SIGNALS = (signal.SIGTERM, signal.SIGINT) +else: + TERM_SIGNALS = (signal.SIGTERM, signal.SIGHUP, signal.SIGINT) + + def garbage_process(barrier, ignore_sigterm: bool = False, t: float = 3600) -> None: if ignore_sigterm: - for signum in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT): # type: ignore + for signum in TERM_SIGNALS: signal.signal(signum, signal.SIG_IGN) barrier.wait() sleep(t) diff --git a/distributed/versions.py b/distributed/versions.py index 8cd09b6a1e0..6247133cc10 100644 --- a/distributed/versions.py +++ b/distributed/versions.py @@ -73,11 +73,11 @@ def version_of_package(pkg: ModuleType) -> str | None: from contextlib import suppress with suppress(AttributeError): - return pkg.__version__ # type: ignore + return pkg.__version__ with suppress(AttributeError): - return str(pkg.version) # type: ignore + return str(pkg.version) with suppress(AttributeError): - return ".".join(map(str, pkg.version_info)) # type: ignore + return ".".join(map(str, pkg.version_info)) return None diff --git a/distributed/worker.py b/distributed/worker.py index 3c3ec2ae747..ecbfe5c1261 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3270,7 +3270,7 @@ async def gather_dep( total_nbytes : int Total number of bytes for all the dependencies in to_gather combined """ - if self.status not in WORKER_ANY_RUNNING: # type: ignore + if self.status not in WORKER_ANY_RUNNING: return None recommendations: Recs = {} @@ -3506,7 +3506,7 @@ def handle_worker_status_change(self, status: str, stimulus_id: str) -> None: if ( new_status == Status.closing_gracefully - and self._status not in WORKER_ANY_RUNNING # type: ignore + and self._status not in WORKER_ANY_RUNNING ): logger.error( "Invalid Worker.status transition: %s -> %s", self._status, new_status @@ -3949,7 +3949,7 @@ def _(self, ev: AlreadyCancelledEvent) -> RecsInstrs: """Task is already cancelled by the time execute() runs""" # key *must* be still in tasks. Releasing it directly is forbidden # without going through cancelled - ts = self.tasks.get(ev.key) # type: ignore + ts = self.tasks.get(ev.key) assert ts, self.story(ev.key) ts.done = True return {ts: "released"}, [] @@ -3959,7 +3959,7 @@ def _(self, ev: ExecuteSuccessEvent) -> RecsInstrs: """Task completed successfully""" # key *must* be still in tasks. Releasing it directly is forbidden # without going through cancelled - ts = self.tasks.get(ev.key) # type: ignore + ts = self.tasks.get(ev.key) assert ts, self.story(ev.key) ts.done = True @@ -3973,7 +3973,7 @@ def _(self, ev: ExecuteFailureEvent) -> RecsInstrs: """Task execution failed""" # key *must* be still in tasks. Releasing it directly is forbidden # without going through cancelled - ts = self.tasks.get(ev.key) # type: ignore + ts = self.tasks.get(ev.key) assert ts, self.story(ev.key) ts.done = True @@ -3997,7 +3997,7 @@ def _(self, ev: RescheduleEvent) -> RecsInstrs: """Task raised Reschedule exception while it was running""" # key *must* be still in tasks. Releasing it directly is forbidden # without going through cancelled - ts = self.tasks.get(ev.key) # type: ignore + ts = self.tasks.get(ev.key) assert ts, self.story(ev.key) return {ts: "rescheduled"}, [] @@ -4476,11 +4476,7 @@ def get_worker() -> Worker: return thread_state.execution_state["worker"] except AttributeError: try: - return first( - w - for w in Worker._instances - if w.status in WORKER_ANY_RUNNING # type: ignore - ) + return first(w for w in Worker._instances if w.status in WORKER_ANY_RUNNING) except StopIteration: raise ValueError("No workers found") diff --git a/docs/source/conf.py b/docs/source/conf.py index 5ceb04863c1..3d315a7a9ea 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -426,7 +426,7 @@ def copy_legacy_redirects(app, docname): f.write(page) -from docutils.parsers.rst import directives # type: ignore +from docutils.parsers.rst import directives # -- Configuration to keep autosummary in sync with autoclass::members ---------------------------------------------- # Fixes issues/3693 diff --git a/setup.cfg b/setup.cfg index 70fc09aa4da..d55d03bea5b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -97,4 +97,10 @@ timeout = 300 [mypy] # Silence errors about Python 3.9-style delayed type annotations on Python 3.8 python_version = 3.9 +# See https://github.com/python/mypy/issues/12286 for automatic multi-platform support +platform = linux +# platform = win32 +# platform = darwin ignore_missing_imports = true +warn_unused_ignores = True +warn_redundant_casts = True