diff --git a/distributed/comm/inproc.py b/distributed/comm/inproc.py index de262bcae09..1d735597746 100644 --- a/distributed/comm/inproc.py +++ b/distributed/comm/inproc.py @@ -184,7 +184,9 @@ def __init__( # type: ignore[no-untyped-def] self._initialized = True def _get_finalizer(self): - def finalize(write_q=self._write_q, write_loop=self._write_loop, r=repr(self)): + r = repr(self) + + def finalize(write_q=self._write_q, write_loop=self._write_loop, r=r): logger.warning(f"Closing dangling queue in {r}") write_loop.add_callback(write_q.put_nowait, _EOF) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index e9acb7086ee..1ec5eadfb04 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -194,7 +194,9 @@ def _read_extra(self): pass def _get_finalizer(self): - def finalize(stream=self.stream, r=repr(self)): + r = repr(self) + + def finalize(stream=self.stream, r=r): # stream is None if a StreamClosedError is raised during interpreter # shutdown if stream is not None and not stream.closed(): diff --git a/distributed/comm/ws.py b/distributed/comm/ws.py index 48da18862f6..8e4aebff45e 100644 --- a/distributed/comm/ws.py +++ b/distributed/comm/ws.py @@ -205,7 +205,9 @@ def __init__( self._read_extra() def _get_finalizer(self): - def finalize(sock=self.sock, r=repr(self)): + r = repr(self) + + def finalize(sock=self.sock, r=r): if not sock.close_code: logger.info("Closing dangling websocket in %s", r) sock.close()