From 6ba1ba9cb972303394497620bc899d86b10be14b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=2E=20K=C3=A4rkk=C3=A4inen?= Date: Wed, 11 Mar 2020 11:34:07 +0200 Subject: [PATCH 01/19] UNIX server sockets. --- trio/__init__.py | 2 + trio/_highlevel_open_unix_listeners.py | 232 +++++++++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 trio/_highlevel_open_unix_listeners.py diff --git a/trio/__init__.py b/trio/__init__.py index d517bc7f18..663475c0b6 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -60,6 +60,8 @@ from ._highlevel_open_unix_stream import open_unix_socket +from ._highlevel_open_unix_listeners import open_unix_listeners, serve_unix + from ._highlevel_ssl_helpers import ( open_ssl_over_tcp_stream, open_ssl_over_tcp_listeners, serve_ssl_over_tcp ) diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py new file mode 100644 index 0000000000..51954b33cd --- /dev/null +++ b/trio/_highlevel_open_unix_listeners.py @@ -0,0 +1,232 @@ +import os +import secrets +import socket +import stat +import time +from contextlib import contextmanager +from math import inf + +import trio + +__all__ = ["open_unix_listeners", "serve_unix"] + + +# Default backlog size: +# +# Having the backlog too low can cause practical problems (a perfectly healthy +# service that starts failing to accept connections if they arrive in a +# burst). +# +# Having it too high doesn't really cause any problems. Like any buffer, you +# want backlog queue to be zero usually, and it won't save you if you're +# getting connection attempts faster than you can call accept() on an ongoing +# basis. But unlike other buffers, this one doesn't really provide any +# backpressure. If a connection gets stuck waiting in the backlog queue, then +# from the peer's point of view the connection succeeded but then their +# send/recv will stall until we get to it, possibly for a long time. OTOH if +# there isn't room in the backlog queue... then their connect stalls, possibly +# for a long time, which is pretty much the same thing. +# +# A large backlog can also use a bit more kernel memory, but this seems fairly +# negligible these days. +# +# So this suggests we should make the backlog as large as possible. This also +# matches what Golang does. However, they do it in a weird way, where they +# have a bunch of code to sniff out the configured upper limit for backlog on +# different operating systems. But on every system, passing in a too-large +# backlog just causes it to be silently truncated to the configured maximum, +# so this is unnecessary -- we can just pass in "infinity" and get the maximum +# that way. (Verified on Windows, Linux, macOS using +# notes-to-self/measure-listen-backlog.py) +def _compute_backlog(backlog): + if backlog is None: + backlog = inf + # Many systems (Linux, BSDs, ...) store the backlog in a uint16 and are + # missing overflow protection, so we apply our own overflow protection. + # https://github.com/golang/go/issues/5030 + return min(backlog, 0xffff) + + +class UnixSocketListener(trio.SocketListener): + @staticmethod + def _inode(filename): + """Return a (dev, inode) tuple uniquely identifying a file.""" + s = os.stat(filename) + return s.st_dev, s.st_ino + + @staticmethod + @contextmanager + def _lock(socketname): + """Protect socketname access by a lock file.""" + name = f"{socketname}.lock" + start_monotonic = time.monotonic() + # Atomically acquire the lock file + while True: + try: + os.close(os.open(name, os.O_CREAT | os.O_EXCL)) + break # Lock file created! + except FileExistsError: + pass + # Retry but avoid busy polling + time.sleep(0.01) + try: + ctime = os.stat(name).st_ctime + if ctime and abs(time.time() - ctime) > 2.0: + raise FileExistsError(f"Stale lock file {name}") + except FileNotFoundError: + pass + if time.monotonic() - start_monotonic > 1.0: + raise FileExistsError(f"Timeout acquiring {name}") + try: + yield + finally: + os.unlink(name) + + def __init__(self, sock, path, inode): + """Private contructor. Use UnixSocketListener.create instead.""" + self.path, self.inode = path, inode + super().__init__(trio.socket.from_stdlib_socket(sock)) + + @staticmethod + def _create(path, mode, backlog): + # Sanitise and pre-verify socket path + path = os.path.abspath(path) + folder = os.path.dirname(path) + if not os.path.isdir(folder): + raise FileNotFoundError(f"Socket folder does not exist: {folder}") + if os.path.exists(path) and not stat.S_ISSOCK(os.stat(path).st_mode): + raise FileExistsError(f"Existing file is not a socket: {path}") + # Create new socket with a random temporary name + tmp_path = f"{path}.{secrets.token_urlsafe()}" + sock = socket.socket(socket.AF_UNIX) + try: + # Critical section begins (filename races) + sock.bind(tmp_path) + try: + inode = UnixSocketListener._inode(tmp_path) + os.chmod(tmp_path, mode) + # Start listening before rename to avoid connection failures + sock.listen(backlog) + # Replace the requested name (atomic overwrite if necessary) + with UnixSocketListener._lock(path): + os.rename(tmp_path, path) + return UnixSocketListener(sock, path, inode) + except: # noqa: E722 + try: + os.unlink(tmp_path) + finally: + raise + except: # noqa: E722 + try: + sock.close() + finally: + raise + + @staticmethod + async def create(path, *, mode=0o666, backlog=None): + backlog = _compute_backlog(backlog) + return await trio.to_thread.run_sync( + UnixSocketListener._create, path, mode, backlog or 0xFFFF + ) + + def _close(self): + try: + # Verify that the socket hasn't been replaced by another instance + # before unlinking. Needs locking to prevent another instance of + # this program replacing it between stat and unlink. + with UnixSocketListener._lock(self.path): + if self.inode == UnixSocketListener._inode(self.path): + os.unlink(self.path) + except OSError: + pass + + async def aclose(self): + """Close the socket and remove the socket file.""" + with trio.fail_after(10) as cleanup: + cleanup.shield = True + await super().aclose() + await trio.to_thread.run_sync(self._close) + + +async def open_unix_listeners(path, *, mode=0o666, backlog=None): + """Create :class:`SocketListener` objects to listen for UNIX connections. + + Args: + + path (str): Filename of UNIX socket to create and listen on. + + Absolute or relative paths may be used. + + The socket is initially created with a random token appended to its + name, and then moved over the requested name while protected by a + separate lock file. The additional names use suffixes on the + requested name. + + mode (int): The socket file permissions. + + UNIX permissions are usually specified in octal numbers. + + The default mode 0o666 gives user, group and other read and write + permissions, allowing connections from anyone on the system. + + backlog (int or None): The listen backlog to use. If you leave this as + ``None`` then Trio will pick a good default. (Currently: whatever + your system has configured as the maximum backlog.) + + Returns: + list of :class:`SocketListener` + + """ + return [await UnixSocketListener.create(path, mode=mode, backlog=backlog)] + + +async def serve_unix( + handler, + path, + *, + backlog=None, + handler_nursery=None, + task_status=trio.TASK_STATUS_IGNORED +): + """Listen for incoming UNIX connections, and for each one start a task + running ``handler(stream)``. + + This is a thin convenience wrapper around :func:`open_unix_listeners` and + :func:`serve_listeners` – see them for full details. + + .. warning:: + + If ``handler`` raises an exception, then this function doesn't do + anything special to catch it – so by default the exception will + propagate out and crash your server. If you don't want this, then catch + exceptions inside your ``handler``, or use a ``handler_nursery`` object + that responds to exceptions in some other way. + + When used with ``nursery.start`` you get back the newly opened listeners. + + Args: + handler: The handler to start for each incoming connection. Passed to + :func:`serve_listeners`. + + path: The socket file name. + Passed to :func:`open_unix_listeners`. + + backlog: The listen backlog, or None to have a good default picked. + Passed to :func:`open_tcp_listeners`. + + handler_nursery: The nursery to start handlers in, or None to use an + internal nursery. Passed to :func:`serve_listeners`. + + task_status: This function can be used with ``nursery.start``. + + Returns: + This function only returns when cancelled. + + """ + listeners = await trio.open_unix_listeners(path, backlog=backlog) + await trio.serve_listeners( + handler, + listeners, + handler_nursery=handler_nursery, + task_status=task_status + ) From e79b402610a652b2d2f0be634a465f94aa003684 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=2E=20K=C3=A4rkk=C3=A4inen?= Date: Fri, 27 Mar 2020 15:21:53 +0200 Subject: [PATCH 02/19] Fail if socket target is a symlink. More robust handling of FileNotFoundError. --- trio/_highlevel_open_unix_listeners.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index 51954b33cd..3c0bb55850 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -94,8 +94,11 @@ def _create(path, mode, backlog): folder = os.path.dirname(path) if not os.path.isdir(folder): raise FileNotFoundError(f"Socket folder does not exist: {folder}") - if os.path.exists(path) and not stat.S_ISSOCK(os.stat(path).st_mode): - raise FileExistsError(f"Existing file is not a socket: {path}") + try: + if not stat.S_ISSOCK(os.stat(path, follow_symlinks=False).st_mode): + raise FileExistsError(f"Existing file is not a socket: {path}") + except FileNotFoundError: + pass # Create new socket with a random temporary name tmp_path = f"{path}.{secrets.token_urlsafe()}" sock = socket.socket(socket.AF_UNIX) From 3aa67affe9756ae56058163cb686a627507be229 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Tue, 1 Aug 2023 23:29:57 -0500 Subject: [PATCH 03/19] First round of updating to the future --- docs/source/reference-core.rst | 3 - docs/source/reference-lowlevel.rst | 2 + trio/__init__.py | 3 +- trio/_highlevel_open_unix_listeners.py | 219 ++++++++++++++++--------- trio/lowlevel.py | 1 + 5 files changed, 147 insertions(+), 81 deletions(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 980a3106e5..30994e4386 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -922,9 +922,6 @@ The nursery API See :meth:`~Nursery.start`. -.. autoclass:: TaskStatus - :members: - .. _task-local-storage: Task-local storage diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index bacebff5ad..faf07268cb 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -534,6 +534,8 @@ Task API putting a task to sleep and then waking it up again. (See :func:`wait_task_rescheduled` for details.) +.. autoclass:: TaskStatus + :members: .. _guest-mode: diff --git a/trio/__init__.py b/trio/__init__.py index 27e80f5705..49363f4ada 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -35,7 +35,6 @@ EndOfChannel as EndOfChannel, Nursery as Nursery, RunFinishedError as RunFinishedError, - TaskStatus as TaskStatus, TrioInternalError as TrioInternalError, WouldBlock as WouldBlock, current_effective_deadline as current_effective_deadline, @@ -63,9 +62,9 @@ serve_tcp as serve_tcp, ) from ._highlevel_open_tcp_stream import open_tcp_stream as open_tcp_stream +from ._highlevel_open_unix_listeners import open_unix_listeners, serve_unix from ._highlevel_open_unix_stream import open_unix_socket as open_unix_socket from ._highlevel_serve_listeners import serve_listeners as serve_listeners -from ._highlevel_open_unix_listeners import open_unix_listeners, serve_unix from ._highlevel_socket import ( SocketListener as SocketListener, SocketStream as SocketStream, diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index 3c0bb55850..df9efc1298 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -1,14 +1,30 @@ +from __future__ import annotations + import os import secrets -import socket import stat import time +from collections.abc import Awaitable, Callable, Generator from contextlib import contextmanager -from math import inf +from typing import TYPE_CHECKING import trio +import trio.socket as tsocket + +if TYPE_CHECKING: + from typing_extensions import Self + + from trio.lowlevel import TaskStatus + + from ._socket import _SocketType as SocketType -__all__ = ["open_unix_listeners", "serve_unix"] + +try: + from trio.socket import AF_UNIX + + HAS_UNIX = True +except ImportError: + HAS_UNIX = False # Default backlog size: @@ -24,7 +40,7 @@ # backpressure. If a connection gets stuck waiting in the backlog queue, then # from the peer's point of view the connection succeeded but then their # send/recv will stall until we get to it, possibly for a long time. OTOH if -# there isn't room in the backlog queue... then their connect stalls, possibly +# there isn't room in the backlog queue, then their connect stalls, possibly # for a long time, which is pretty much the same thing. # # A large backlog can also use a bit more kernel memory, but this seems fairly @@ -38,82 +54,111 @@ # so this is unnecessary -- we can just pass in "infinity" and get the maximum # that way. (Verified on Windows, Linux, macOS using # notes-to-self/measure-listen-backlog.py) -def _compute_backlog(backlog): - if backlog is None: - backlog = inf +def _compute_backlog(backlog: int | None) -> int: # Many systems (Linux, BSDs, ...) store the backlog in a uint16 and are # missing overflow protection, so we apply our own overflow protection. # https://github.com/golang/go/issues/5030 - return min(backlog, 0xffff) - - -class UnixSocketListener(trio.SocketListener): - @staticmethod - def _inode(filename): - """Return a (dev, inode) tuple uniquely identifying a file.""" - s = os.stat(filename) - return s.st_dev, s.st_ino - - @staticmethod - @contextmanager - def _lock(socketname): - """Protect socketname access by a lock file.""" + if backlog is None: + return 0xFFFF + return min(backlog, 0xFFFF) + + +def _inode( + filename: int | str | bytes | os.PathLike[str] | os.PathLike[bytes], +) -> tuple[int, int]: + """Return a (dev, inode) tuple uniquely identifying a file.""" + stat = os.stat(filename) + return stat.st_dev, stat.st_ino + + +@contextmanager +def _lock( + socketname: str | bytes | os.PathLike[str] | os.PathLike[bytes], +) -> Generator[None, None, None]: + """Protect socketname access by a lock file.""" + name: str | bytes + if isinstance(socketname, bytes): + name = socketname + b".lock" + else: name = f"{socketname}.lock" - start_monotonic = time.monotonic() - # Atomically acquire the lock file - while True: - try: - os.close(os.open(name, os.O_CREAT | os.O_EXCL)) - break # Lock file created! - except FileExistsError: - pass - # Retry but avoid busy polling - time.sleep(0.01) - try: - ctime = os.stat(name).st_ctime - if ctime and abs(time.time() - ctime) > 2.0: - raise FileExistsError(f"Stale lock file {name}") - except FileNotFoundError: - pass - if time.monotonic() - start_monotonic > 1.0: - raise FileExistsError(f"Timeout acquiring {name}") + start_monotonic = time.monotonic() + # Atomically acquire the lock file + while True: try: - yield - finally: - os.unlink(name) + os.close(os.open(name, os.O_CREAT | os.O_EXCL)) + break # Lock file created! + except FileExistsError: + pass + # Retry but avoid busy polling + time.sleep(0.01) + try: + ctime = os.stat(name).st_ctime + if ctime and abs(time.time() - ctime) > 2.0: + raise FileExistsError(f"Stale lock file {name!r}") + except FileNotFoundError: + pass + if time.monotonic() - start_monotonic > 1.0: + raise FileExistsError(f"Timeout acquiring {name!r}") + try: + yield + finally: + os.unlink(name) - def __init__(self, sock, path, inode): + +class UnixSocketListener(trio.SocketListener): + __slots__ = () + + def __init__( + self, + socket: SocketType, + path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + inode: tuple[int, int], + ) -> None: """Private contructor. Use UnixSocketListener.create instead.""" - self.path, self.inode = path, inode - super().__init__(trio.socket.from_stdlib_socket(sock)) + if not HAS_UNIX: + raise RuntimeError("Unix sockets are not supported on this platform") + if not isinstance(socket, tsocket.SocketType): + raise TypeError("SocketStream requires a Trio socket object") + if socket.type != tsocket.SOCK_STREAM: + raise ValueError("SocketStream requires a SOCK_STREAM socket") - @staticmethod - def _create(path, mode, backlog): + self.path, self.inode = path, inode + super().__init__(socket) + + @classmethod + async def _create( + cls, + path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + mode: int, + backlog: int, + ) -> Self: # Sanitise and pre-verify socket path path = os.path.abspath(path) folder = os.path.dirname(path) if not os.path.isdir(folder): - raise FileNotFoundError(f"Socket folder does not exist: {folder}") + raise FileNotFoundError(f"Socket folder does not exist: {folder!r}") try: if not stat.S_ISSOCK(os.stat(path, follow_symlinks=False).st_mode): - raise FileExistsError(f"Existing file is not a socket: {path}") + raise FileExistsError(f"Existing file is not a socket: {path!r}") except FileNotFoundError: pass # Create new socket with a random temporary name + # typecheck: str-bytes-safe error: If x = b'abc' then f"{x}" or "{}".format(x) produces "b'abc'", not "abc". If this is desired behavior, use f"{x!r}" or "{!r}".format(x). Otherwise, decode the bytes tmp_path = f"{path}.{secrets.token_urlsafe()}" - sock = socket.socket(socket.AF_UNIX) + # typecheck: ^^^^^^^^^^^^^ + sock = tsocket.socket(AF_UNIX, tsocket.SOCK_STREAM) try: # Critical section begins (filename races) - sock.bind(tmp_path) + await sock.bind(tmp_path) try: - inode = UnixSocketListener._inode(tmp_path) + inode = _inode(tmp_path) os.chmod(tmp_path, mode) # Start listening before rename to avoid connection failures sock.listen(backlog) # Replace the requested name (atomic overwrite if necessary) - with UnixSocketListener._lock(path): + with _lock(path): os.rename(tmp_path, path) - return UnixSocketListener(sock, path, inode) + return cls(sock, path, inode) except: # noqa: E722 try: os.unlink(tmp_path) @@ -125,25 +170,30 @@ def _create(path, mode, backlog): finally: raise - @staticmethod - async def create(path, *, mode=0o666, backlog=None): + @classmethod + async def create( + cls, + path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + *, + mode: int = 0o666, + backlog: int | None = None, + ) -> Self: backlog = _compute_backlog(backlog) - return await trio.to_thread.run_sync( - UnixSocketListener._create, path, mode, backlog or 0xFFFF - ) + # typecheck: arg-type error: Argument 1 to "to_thread_run_sync" has incompatible type "Callable[[str | bytes | PathLike[str] | PathLike[bytes], int, int], Coroutine[Any, Any, Self]]"; expected "Callable[..., Self]" + return await cls._create(path, mode, backlog or 0xFFFF) - def _close(self): + def _close(self) -> None: try: # Verify that the socket hasn't been replaced by another instance # before unlinking. Needs locking to prevent another instance of # this program replacing it between stat and unlink. - with UnixSocketListener._lock(self.path): - if self.inode == UnixSocketListener._inode(self.path): + with _lock(self.path): + if self.inode == _inode(self.path): os.unlink(self.path) except OSError: pass - async def aclose(self): + async def aclose(self) -> None: """Close the socket and remove the socket file.""" with trio.fail_after(10) as cleanup: cleanup.shield = True @@ -151,8 +201,13 @@ async def aclose(self): await trio.to_thread.run_sync(self._close) -async def open_unix_listeners(path, *, mode=0o666, backlog=None): - """Create :class:`SocketListener` objects to listen for UNIX connections. +async def open_unix_listeners( + path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + *, + mode: int = 0o666, + backlog: int | None = None, +) -> list[UnixSocketListener]: + """Create :class:`UnixSocketListener` objects to listen for UNIX connections. Args: @@ -177,20 +232,28 @@ async def open_unix_listeners(path, *, mode=0o666, backlog=None): your system has configured as the maximum backlog.) Returns: - list of :class:`SocketListener` + list of :class:`UnixSocketListener` + + Raises: + RuntimeError: If AF_UNIX sockets are not supported. """ + if not HAS_UNIX: + raise RuntimeError("Unix sockets are not supported on this platform") + return [await UnixSocketListener.create(path, mode=mode, backlog=backlog)] +# typecheck: no-untyped-def error: Function is missing a return type annotation +# typecheck: no-untyped-def error: Function is missing a type annotation for one or more arguments async def serve_unix( - handler, + handler: Callable[[trio.SocketStream], Awaitable[object]], path, *, - backlog=None, - handler_nursery=None, - task_status=trio.TASK_STATUS_IGNORED -): + backlog: int | None = None, + handler_nursery: trio.Nursery | None = None, + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, # type: ignore[has-type] # Cannot determine type of "TASK_STATUS_IGNORED" +) -> None: """Listen for incoming UNIX connections, and for each one start a task running ``handler(stream)``. @@ -225,11 +288,15 @@ async def serve_unix( Returns: This function only returns when cancelled. + Raises: + RuntimeError: If AF_UNIX sockets are not supported. + """ - listeners = await trio.open_unix_listeners(path, backlog=backlog) + if not HAS_UNIX: + raise RuntimeError("Unix sockets are not supported on this platform") + + listeners = await open_unix_listeners(path, backlog=backlog) + # typecheck: no-untyped-call error: Call to untyped function "serve_listeners" in typed context await trio.serve_listeners( - handler, - listeners, - handler_nursery=handler_nursery, - task_status=task_status + handler, listeners, handler_nursery=handler_nursery, task_status=task_status ) diff --git a/trio/lowlevel.py b/trio/lowlevel.py index 54f4ef3141..0bbb0c25b3 100644 --- a/trio/lowlevel.py +++ b/trio/lowlevel.py @@ -15,6 +15,7 @@ RaiseCancelT as RaiseCancelT, RunVar as RunVar, Task as Task, + TaskStatus as TaskStatus, TrioToken as TrioToken, UnboundedQueue as UnboundedQueue, add_instrument as add_instrument, From a1a0a62457a292e319dbf6ac9e880763e625fec0 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Tue, 1 Aug 2023 23:38:12 -0500 Subject: [PATCH 04/19] Import `SocketListener` directly --- trio/_highlevel_open_unix_listeners.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index df9efc1298..bfc4c3258f 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -10,6 +10,7 @@ import trio import trio.socket as tsocket +from trio import SocketListener if TYPE_CHECKING: from typing_extensions import Self @@ -105,7 +106,7 @@ def _lock( os.unlink(name) -class UnixSocketListener(trio.SocketListener): +class UnixSocketListener(SocketListener): __slots__ = () def __init__( From 8b22b288636b589e6badc60743682c65e1926b18 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Wed, 2 Aug 2023 19:07:05 -0500 Subject: [PATCH 05/19] Change SocketListener import --- trio/_highlevel_open_unix_listeners.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index bfc4c3258f..c6d04b0511 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -10,7 +10,8 @@ import trio import trio.socket as tsocket -from trio import SocketListener + +from ._highlevel_socket import SocketListener if TYPE_CHECKING: from typing_extensions import Self From eb73e272c63bf3fab9bc4bcbd6d9202ccadfb6bd Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Sun, 27 Aug 2023 11:55:51 -0500 Subject: [PATCH 06/19] Fix `TaskStatus` and other type fixes --- trio/__init__.py | 1 + trio/_highlevel_open_unix_listeners.py | 22 ++++++++-------------- trio/lowlevel.py | 1 - 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/trio/__init__.py b/trio/__init__.py index 49363f4ada..fe199e58f5 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -35,6 +35,7 @@ EndOfChannel as EndOfChannel, Nursery as Nursery, RunFinishedError as RunFinishedError, + TaskStatus as TaskStatus, TrioInternalError as TrioInternalError, WouldBlock as WouldBlock, current_effective_deadline as current_effective_deadline, diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index c6d04b0511..5c1c599a2b 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -16,8 +16,6 @@ if TYPE_CHECKING: from typing_extensions import Self - from trio.lowlevel import TaskStatus - from ._socket import _SocketType as SocketType @@ -116,7 +114,7 @@ def __init__( path: str | bytes | os.PathLike[str] | os.PathLike[bytes], inode: tuple[int, int], ) -> None: - """Private contructor. Use UnixSocketListener.create instead.""" + """Private constructor. Use UnixSocketListener.create instead.""" if not HAS_UNIX: raise RuntimeError("Unix sockets are not supported on this platform") if not isinstance(socket, tsocket.SocketType): @@ -130,11 +128,13 @@ def __init__( @classmethod async def _create( cls, - path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + path: str | bytes, mode: int, backlog: int, ) -> Self: # Sanitise and pre-verify socket path + if not isinstance(path, str): + path = path.decode("utf-8") path = os.path.abspath(path) folder = os.path.dirname(path) if not os.path.isdir(folder): @@ -145,9 +145,7 @@ async def _create( except FileNotFoundError: pass # Create new socket with a random temporary name - # typecheck: str-bytes-safe error: If x = b'abc' then f"{x}" or "{}".format(x) produces "b'abc'", not "abc". If this is desired behavior, use f"{x!r}" or "{!r}".format(x). Otherwise, decode the bytes tmp_path = f"{path}.{secrets.token_urlsafe()}" - # typecheck: ^^^^^^^^^^^^^ sock = tsocket.socket(AF_UNIX, tsocket.SOCK_STREAM) try: # Critical section begins (filename races) @@ -175,13 +173,12 @@ async def _create( @classmethod async def create( cls, - path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + path: str | bytes, *, mode: int = 0o666, backlog: int | None = None, ) -> Self: backlog = _compute_backlog(backlog) - # typecheck: arg-type error: Argument 1 to "to_thread_run_sync" has incompatible type "Callable[[str | bytes | PathLike[str] | PathLike[bytes], int, int], Coroutine[Any, Any, Self]]"; expected "Callable[..., Self]" return await cls._create(path, mode, backlog or 0xFFFF) def _close(self) -> None: @@ -204,7 +201,7 @@ async def aclose(self) -> None: async def open_unix_listeners( - path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + path: str | bytes, *, mode: int = 0o666, backlog: int | None = None, @@ -246,15 +243,13 @@ async def open_unix_listeners( return [await UnixSocketListener.create(path, mode=mode, backlog=backlog)] -# typecheck: no-untyped-def error: Function is missing a return type annotation -# typecheck: no-untyped-def error: Function is missing a type annotation for one or more arguments async def serve_unix( handler: Callable[[trio.SocketStream], Awaitable[object]], - path, + path: str | bytes, *, backlog: int | None = None, handler_nursery: trio.Nursery | None = None, - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, # type: ignore[has-type] # Cannot determine type of "TASK_STATUS_IGNORED" + task_status: trio.TaskStatus[list[UnixSocketListener]] = trio.TASK_STATUS_IGNORED, ) -> None: """Listen for incoming UNIX connections, and for each one start a task running ``handler(stream)``. @@ -298,7 +293,6 @@ async def serve_unix( raise RuntimeError("Unix sockets are not supported on this platform") listeners = await open_unix_listeners(path, backlog=backlog) - # typecheck: no-untyped-call error: Call to untyped function "serve_listeners" in typed context await trio.serve_listeners( handler, listeners, handler_nursery=handler_nursery, task_status=task_status ) diff --git a/trio/lowlevel.py b/trio/lowlevel.py index fc55f9eabe..25e64975e2 100644 --- a/trio/lowlevel.py +++ b/trio/lowlevel.py @@ -16,7 +16,6 @@ RunStatistics as RunStatistics, RunVar as RunVar, Task as Task, - TaskStatus as TaskStatus, TrioToken as TrioToken, UnboundedQueue as UnboundedQueue, UnboundedQueueStatistics as UnboundedQueueStatistics, From 34f067bf99bd490d16c388f5e3551901cdb6e9b3 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Sun, 27 Aug 2023 11:58:14 -0500 Subject: [PATCH 07/19] Revert docs changes --- docs/source/reference-core.rst | 2 ++ docs/source/reference-lowlevel.rst | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index aaecaa0244..316a9c2ac3 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -927,6 +927,8 @@ The nursery API See :meth:`Nursery.start`. +.. autoclass:: TaskStatus(Protocol[StatusT]) + :members: .. _task-local-storage: diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 13eabcf3e2..712a36ad04 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -538,7 +538,6 @@ Task API putting a task to sleep and then waking it up again. (See :func:`wait_task_rescheduled` for details.) - .. _guest-mode: Using "guest mode" to run Trio on top of other event loops From ca5b8cf863c7c5474a915da3880503d190f1fe06 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Sun, 3 Sep 2023 15:42:23 -0500 Subject: [PATCH 08/19] Steal code from `SocketListener` because it's `Final` --- trio/_highlevel_open_unix_listeners.py | 82 ++++++++++++++++++++++---- 1 file changed, 72 insertions(+), 10 deletions(-) diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index 5c1c599a2b..be30fc514a 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -11,7 +11,13 @@ import trio import trio.socket as tsocket -from ._highlevel_socket import SocketListener +from ._highlevel_socket import ( + SocketStream, + _closed_stream_errnos, + _ignorable_accept_errnos, +) +from ._util import Final +from .abc import Listener if TYPE_CHECKING: from typing_extensions import Self @@ -105,25 +111,52 @@ def _lock( os.unlink(name) -class UnixSocketListener(SocketListener): - __slots__ = () +class UnixSocketListener(Listener[SocketStream], metaclass=Final): + """A :class:`~trio.abc.Listener` that uses a listening socket to accept + incoming connections as :class:`SocketStream` objects. + + Args: + socket: The Trio socket object to wrap. Must have type ``SOCK_STREAM``, + and be listening. + path: The path the unix socket is bound too. + inode: (dev, inode) tuple uniquely identifying the socket file. + + Note that the :class:`UnixSocketListener` "takes ownership" of the given + socket; closing the :class:`UnixSocketListener` will also close the socket. + + .. attribute:: socket + + The Trio socket object that this stream wraps. + + """ + + __slots__ = ("socket", "path", "inode") def __init__( self, socket: SocketType, - path: str | bytes | os.PathLike[str] | os.PathLike[bytes], + path: str, inode: tuple[int, int], ) -> None: """Private constructor. Use UnixSocketListener.create instead.""" if not HAS_UNIX: raise RuntimeError("Unix sockets are not supported on this platform") if not isinstance(socket, tsocket.SocketType): - raise TypeError("SocketStream requires a Trio socket object") + raise TypeError("UnixSocketListener requires a Trio socket object") if socket.type != tsocket.SOCK_STREAM: - raise ValueError("SocketStream requires a SOCK_STREAM socket") + raise ValueError("UnixSocketListener requires a SOCK_STREAM socket") + try: + listening = socket.getsockopt(tsocket.SOL_SOCKET, tsocket.SO_ACCEPTCONN) + except OSError: + # SO_ACCEPTCONN fails on macOS; we just have to trust the user. + pass + else: + if not listening: + raise ValueError("SocketListener requires a listening socket") - self.path, self.inode = path, inode - super().__init__(socket) + self.socket = socket + self.path = path + self.inode = inode @classmethod async def _create( @@ -192,11 +225,40 @@ def _close(self) -> None: except OSError: pass + async def accept(self) -> SocketStream: + """Accept an incoming connection. + + Returns: + :class:`SocketStream` + + Raises: + OSError: if the underlying call to ``accept`` raises an unexpected + error. + ClosedResourceError: if you already closed the socket. + + This method handles routine errors like ``ECONNABORTED``, but passes + other errors on to its caller. In particular, it does *not* make any + special effort to handle resource exhaustion errors like ``EMFILE``, + ``ENFILE``, ``ENOBUFS``, ``ENOMEM``. + + """ + while True: + try: + sock, _ = await self.socket.accept() + except OSError as exc: + if exc.errno in _closed_stream_errnos: + raise trio.ClosedResourceError + if exc.errno not in _ignorable_accept_errnos: + raise + else: + return SocketStream(sock) + async def aclose(self) -> None: - """Close the socket and remove the socket file.""" + """Close this listener and its underlying socket.""" with trio.fail_after(10) as cleanup: cleanup.shield = True - await super().aclose() + self.socket.close() + await trio.lowlevel.checkpoint() await trio.to_thread.run_sync(self._close) From 56fda2df93b8034e6ac6c79550d88a3302439fff Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Sun, 3 Sep 2023 16:13:45 -0500 Subject: [PATCH 09/19] Try stealing test data from open tcp listeners and highlevel sockets --- .../test_highlevel_open_unix_listeners.py | 453 ++++++++++++++++++ 1 file changed, 453 insertions(+) create mode 100644 trio/_tests/test_highlevel_open_unix_listeners.py diff --git a/trio/_tests/test_highlevel_open_unix_listeners.py b/trio/_tests/test_highlevel_open_unix_listeners.py new file mode 100644 index 0000000000..b37c31d6a6 --- /dev/null +++ b/trio/_tests/test_highlevel_open_unix_listeners.py @@ -0,0 +1,453 @@ +import errno +import socket as stdlib_socket +import sys +from math import inf + +import attr +import pytest + +import trio +from trio import UnixSocketListener, open_tcp_stream, open_unix_listeners, serve_unix +from trio.testing import open_stream_to_socket_listener + +from .. import _core, socket as tsocket +from .._core._tests.tutil import binds_ipv6 +from .._highlevel_socket import * +from ..testing import assert_checkpoints + +if sys.version_info < (3, 11): + from exceptiongroup import BaseExceptionGroup + + +async def test_UnixSocketListener() -> None: + # Not a Trio socket + with stdlib_socket.socket() as s: + s.bind(("127.0.0.1", 0)) + s.listen(10) + with pytest.raises(TypeError): + UnixSocketListener(s) + + # Not a SOCK_STREAM + with tsocket.socket(type=tsocket.SOCK_DGRAM) as s: + await s.bind(("127.0.0.1", 0)) + with pytest.raises(ValueError) as excinfo: + UnixSocketListener(s) + excinfo.match(r".*SOCK_STREAM") + + # Didn't call .listen() + # macOS has no way to check for this, so skip testing it there. + if sys.platform != "darwin": + with tsocket.socket() as s: + await s.bind(("127.0.0.1", 0)) + with pytest.raises(ValueError) as excinfo: + UnixSocketListener(s) + excinfo.match(r".*listen") + + listen_sock = tsocket.socket() + await listen_sock.bind(("127.0.0.1", 0)) + listen_sock.listen(10) + listener = UnixSocketListener(listen_sock) + + assert listener.socket is listen_sock + + client_sock = tsocket.socket() + await client_sock.connect(listen_sock.getsockname()) + with assert_checkpoints(): + server_stream = await listener.accept() + assert isinstance(server_stream, SocketStream) + assert server_stream.socket.getsockname() == listen_sock.getsockname() + assert server_stream.socket.getpeername() == client_sock.getsockname() + + with assert_checkpoints(): + await listener.aclose() + + with assert_checkpoints(): + await listener.aclose() + + with assert_checkpoints(): + with pytest.raises(_core.ClosedResourceError): + await listener.accept() + + client_sock.close() + await server_stream.aclose() + + +async def test_UnixSocketListener_socket_closed_underfoot() -> None: + listen_sock = tsocket.socket() + await listen_sock.bind(("127.0.0.1", 0)) + listen_sock.listen(10) + listener = UnixSocketListener(listen_sock) + + # Close the socket, not the listener + listen_sock.close() + + # UnixSocketListener gives correct error + with assert_checkpoints(): + with pytest.raises(_core.ClosedResourceError): + await listener.accept() + + +async def test_UnixSocketListener_accept_errors() -> None: + class FakeSocket(tsocket.SocketType): + def __init__(self, events) -> None: + self._events = iter(events) + + type = tsocket.SOCK_STREAM + + # Fool the check for SO_ACCEPTCONN in UnixSocketListener.__init__ + def getsockopt(self, level: object, opt: object) -> bool: + return True + + def setsockopt(self, level: object, opt: object, value: object) -> None: + pass + + async def accept(self): + await _core.checkpoint() + event = next(self._events) + if isinstance(event, BaseException): + raise event + else: + return event, None + + fake_server_sock = FakeSocket([]) + + fake_listen_sock = FakeSocket( + [ + OSError(errno.ECONNABORTED, "Connection aborted"), + OSError(errno.EPERM, "Permission denied"), + OSError(errno.EPROTO, "Bad protocol"), + fake_server_sock, + OSError(errno.EMFILE, "Out of file descriptors"), + OSError(errno.EFAULT, "attempt to write to read-only memory"), + OSError(errno.ENOBUFS, "out of buffers"), + fake_server_sock, + ] + ) + + l = UnixSocketListener(fake_listen_sock) + + with assert_checkpoints(): + s = await l.accept() + assert s.socket is fake_server_sock + + for code in [errno.EMFILE, errno.EFAULT, errno.ENOBUFS]: + with assert_checkpoints(): + with pytest.raises(OSError) as excinfo: + await l.accept() + assert excinfo.value.errno == code + + with assert_checkpoints(): + s = await l.accept() + assert s.socket is fake_server_sock + + +async def test_socket_stream_works_when_peer_has_already_closed() -> None: + sock_a, sock_b = tsocket.socketpair() + with sock_a, sock_b: + await sock_b.send(b"x") + sock_b.close() + stream = SocketStream(sock_a) + assert await stream.receive_some(1) == b"x" + assert await stream.receive_some(1) == b"" + + +async def test_open_unix_listeners_basic() -> None: + listeners = await open_unix_listeners(0) + assert isinstance(listeners, list) + for obj in listeners: + assert isinstance(obj, UnixSocketListener) + # Binds to wildcard address by default + assert obj.socket.family in [tsocket.AF_INET, tsocket.AF_INET6] + assert obj.socket.getsockname()[0] in ["0.0.0.0", "::"] + + listener = listeners[0] + # Make sure the backlog is at least 2 + c1 = await open_stream_to_socket_listener(listener) + c2 = await open_stream_to_socket_listener(listener) + + s1 = await listener.accept() + s2 = await listener.accept() + + # Note that we don't know which client stream is connected to which server + # stream + await s1.send_all(b"x") + await s2.send_all(b"x") + assert await c1.receive_some(1) == b"x" + assert await c2.receive_some(1) == b"x" + + for resource in [c1, c2, s1, s2] + listeners: + await resource.aclose() + + +async def test_open_unix_listeners_specific_port_specific_host() -> None: + # Pick a port + sock = tsocket.socket() + await sock.bind(("127.0.0.1", 0)) + host, port = sock.getsockname() + sock.close() + + (listener,) = await open_unix_listeners(port, host=host) + async with listener: + assert listener.socket.getsockname() == (host, port) + + +@binds_ipv6 +async def test_open_unix_listeners_ipv6_v6only() -> None: + # Check IPV6_V6ONLY is working properly + (ipv6_listener,) = await open_unix_listeners(0, host="::1") + async with ipv6_listener: + _, port, *_ = ipv6_listener.socket.getsockname() + + with pytest.raises(OSError): + await open_tcp_stream("127.0.0.1", port) + + +async def test_open_unix_listeners_rebind() -> None: + (l1,) = await open_unix_listeners(0, host="127.0.0.1") + sockaddr1 = l1.socket.getsockname() + + # Plain old rebinding while it's still there should fail, even if we have + # SO_REUSEADDR set + with stdlib_socket.socket() as probe: + probe.setsockopt(stdlib_socket.SOL_SOCKET, stdlib_socket.SO_REUSEADDR, 1) + with pytest.raises(OSError): + probe.bind(sockaddr1) + + # Now use the first listener to set up some connections in various states, + # and make sure that they don't create any obstacle to rebinding a second + # listener after the first one is closed. + c_established = await open_stream_to_socket_listener(l1) + s_established = await l1.accept() + + c_time_wait = await open_stream_to_socket_listener(l1) + s_time_wait = await l1.accept() + # Server-initiated close leaves socket in TIME_WAIT + await s_time_wait.aclose() + + await l1.aclose() + (l2,) = await open_unix_listeners(sockaddr1[1], host="127.0.0.1") + sockaddr2 = l2.socket.getsockname() + + assert sockaddr1 == sockaddr2 + assert s_established.socket.getsockname() == sockaddr2 + assert c_time_wait.socket.getpeername() == sockaddr2 + + for resource in [ + l1, + l2, + c_established, + s_established, + c_time_wait, + s_time_wait, + ]: + await resource.aclose() + + +class FakeOSError(OSError): + __slots__ = () + + +@attr.s +class FakeSocket(tsocket.SocketType): + family = attr.ib() + type = attr.ib() + proto = attr.ib() + + closed = attr.ib(default=False) + poison_listen = attr.ib(default=False) + backlog = attr.ib(default=None) + + def getsockopt(self, level: int, option: int) -> bool: + if (level, option) == (tsocket.SOL_SOCKET, tsocket.SO_ACCEPTCONN): + return True + assert False # pragma: no cover + + def setsockopt(self, level: object, option: object, value: object) -> None: + pass + + async def bind(self, sockaddr: object) -> None: + pass + + def listen(self, backlog: int) -> None: + assert self.backlog is None + assert backlog is not None + self.backlog = backlog + if self.poison_listen: + raise FakeOSError("whoops") + + def close(self) -> None: + self.closed = True + + +@attr.s +class FakeSocketFactory: + poison_after: int = attr.ib() + sockets: list[FakeSocket] = attr.ib(factory=list) + raise_on_family: dict[int, int] = attr.ib(factory=dict) # family => errno + + def socket(self, family: int, type: int, proto: int) -> FakeSocket: + if family in self.raise_on_family: + raise OSError(self.raise_on_family[family], "nope") + sock = FakeSocket(family, type, proto) + self.poison_after -= 1 + if self.poison_after == 0: + sock.poison_listen = True + self.sockets.append(sock) + return sock + + +@attr.s +class FakeHostnameResolver: + family_addr_pairs = attr.ib() + + async def getaddrinfo( + self, + host: str | bytes, + port: int, + family: int, + type: int, + proto: int, + flags: int, + ) -> list[tuple[int, int, int, str, tuple[str | bytes, int]]]: + return [ + (family, tsocket.SOCK_STREAM, 0, "", (addr, port)) + for family, addr in self.family_addr_pairs + ] + + +async def test_open_unix_listeners_multiple_host_cleanup_on_error() -> None: + # If we were trying to bind to multiple hosts and one of them failed, they + # call get cleaned up before returning + fsf = FakeSocketFactory(3) + tsocket.set_custom_socket_factory(fsf) + tsocket.set_custom_hostname_resolver( + FakeHostnameResolver( + [ + (tsocket.AF_INET, "1.1.1.1"), + (tsocket.AF_INET, "2.2.2.2"), + (tsocket.AF_INET, "3.3.3.3"), + ] + ) + ) + + with pytest.raises(FakeOSError): + await open_unix_listeners(80, host="example.org") + + assert len(fsf.sockets) == 3 + for sock in fsf.sockets: + assert sock.closed + + +async def test_open_unix_listeners_port_checking() -> None: + for host in ["127.0.0.1", None]: + with pytest.raises(TypeError): + await open_unix_listeners(None, host=host) + with pytest.raises(TypeError): + await open_unix_listeners(b"80", host=host) + with pytest.raises(TypeError): + await open_unix_listeners("http", host=host) + + +async def test_serve_tcp() -> None: + async def handler(stream) -> None: + await stream.send_all(b"x") + + async with trio.open_nursery() as nursery: + listeners = await nursery.start(serve_unix, handler, 0) + stream = await open_stream_to_socket_listener(listeners[0]) + async with stream: + await stream.receive_some(1) == b"x" + nursery.cancel_scope.cancel() + + +@pytest.mark.parametrize( + "try_families", + [{tsocket.AF_INET}, {tsocket.AF_INET6}, {tsocket.AF_INET, tsocket.AF_INET6}], +) +@pytest.mark.parametrize( + "fail_families", + [{tsocket.AF_INET}, {tsocket.AF_INET6}, {tsocket.AF_INET, tsocket.AF_INET6}], +) +async def test_open_unix_listeners_some_address_families_unavailable( + try_families, fail_families +): + fsf = FakeSocketFactory( + 10, raise_on_family={family: errno.EAFNOSUPPORT for family in fail_families} + ) + tsocket.set_custom_socket_factory(fsf) + tsocket.set_custom_hostname_resolver( + FakeHostnameResolver([(family, "foo") for family in try_families]) + ) + + should_succeed = try_families - fail_families + + if not should_succeed: + with pytest.raises(OSError) as exc_info: + await open_unix_listeners(80, host="example.org") + + assert "This system doesn't support" in str(exc_info.value) + if isinstance(exc_info.value.__cause__, BaseExceptionGroup): + for subexc in exc_info.value.__cause__.exceptions: + assert "nope" in str(subexc) + else: + assert isinstance(exc_info.value.__cause__, OSError) + assert "nope" in str(exc_info.value.__cause__) + else: + listeners = await open_unix_listeners(80) + for listener in listeners: + should_succeed.remove(listener.socket.family) + assert not should_succeed + + +async def test_open_unix_listeners_socket_fails_not_afnosupport() -> None: + fsf = FakeSocketFactory( + 10, + raise_on_family={ + tsocket.AF_INET: errno.EAFNOSUPPORT, + tsocket.AF_INET6: errno.EINVAL, + }, + ) + tsocket.set_custom_socket_factory(fsf) + tsocket.set_custom_hostname_resolver( + FakeHostnameResolver([(tsocket.AF_INET, "foo"), (tsocket.AF_INET6, "bar")]) + ) + + with pytest.raises(OSError) as exc_info: + await open_unix_listeners(80, host="example.org") + assert exc_info.value.errno == errno.EINVAL + assert exc_info.value.__cause__ is None + assert "nope" in str(exc_info.value) + + +# We used to have an elaborate test that opened a real TCP listening socket +# and then tried to measure its backlog by making connections to it. And most +# of the time, it worked. But no matter what we tried, it was always fragile, +# because it had to do things like use timeouts to guess when the listening +# queue was full, sometimes the CI hosts go into SYN-cookie mode (where there +# effectively is no backlog), sometimes the host might not be enough resources +# to give us the full requested backlog... it was a mess. So now we just check +# that the backlog argument is passed through correctly. +async def test_open_unix_listeners_backlog() -> None: + fsf = FakeSocketFactory(99) + tsocket.set_custom_socket_factory(fsf) + for given, expected in [ + (None, 0xFFFF), + (inf, 0xFFFF), + (99999999, 0xFFFF), + (10, 10), + (1, 1), + ]: + listeners = await open_unix_listeners(0, backlog=given) + assert listeners + for listener in listeners: + assert listener.socket.backlog == expected + + +async def test_open_unix_listeners_backlog_float_error() -> None: + fsf = FakeSocketFactory(99) + tsocket.set_custom_socket_factory(fsf) + for should_fail in (0.0, 2.18, 3.14, 9.75): + with pytest.raises( + ValueError, match=f"Only accepts infinity, not {should_fail!r}" + ): + await open_unix_listeners(0, backlog=should_fail) From 852e82b415ce1548c0679a628ed98cc3bd1def42 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Tue, 17 Oct 2023 21:17:31 -0500 Subject: [PATCH 10/19] Revert "Try stealing test data from open tcp listeners and highlevel sockets" This reverts commit 56fda2df93b8034e6ac6c79550d88a3302439fff. --- .../test_highlevel_open_unix_listeners.py | 453 ------------------ 1 file changed, 453 deletions(-) delete mode 100644 trio/_tests/test_highlevel_open_unix_listeners.py diff --git a/trio/_tests/test_highlevel_open_unix_listeners.py b/trio/_tests/test_highlevel_open_unix_listeners.py deleted file mode 100644 index b37c31d6a6..0000000000 --- a/trio/_tests/test_highlevel_open_unix_listeners.py +++ /dev/null @@ -1,453 +0,0 @@ -import errno -import socket as stdlib_socket -import sys -from math import inf - -import attr -import pytest - -import trio -from trio import UnixSocketListener, open_tcp_stream, open_unix_listeners, serve_unix -from trio.testing import open_stream_to_socket_listener - -from .. import _core, socket as tsocket -from .._core._tests.tutil import binds_ipv6 -from .._highlevel_socket import * -from ..testing import assert_checkpoints - -if sys.version_info < (3, 11): - from exceptiongroup import BaseExceptionGroup - - -async def test_UnixSocketListener() -> None: - # Not a Trio socket - with stdlib_socket.socket() as s: - s.bind(("127.0.0.1", 0)) - s.listen(10) - with pytest.raises(TypeError): - UnixSocketListener(s) - - # Not a SOCK_STREAM - with tsocket.socket(type=tsocket.SOCK_DGRAM) as s: - await s.bind(("127.0.0.1", 0)) - with pytest.raises(ValueError) as excinfo: - UnixSocketListener(s) - excinfo.match(r".*SOCK_STREAM") - - # Didn't call .listen() - # macOS has no way to check for this, so skip testing it there. - if sys.platform != "darwin": - with tsocket.socket() as s: - await s.bind(("127.0.0.1", 0)) - with pytest.raises(ValueError) as excinfo: - UnixSocketListener(s) - excinfo.match(r".*listen") - - listen_sock = tsocket.socket() - await listen_sock.bind(("127.0.0.1", 0)) - listen_sock.listen(10) - listener = UnixSocketListener(listen_sock) - - assert listener.socket is listen_sock - - client_sock = tsocket.socket() - await client_sock.connect(listen_sock.getsockname()) - with assert_checkpoints(): - server_stream = await listener.accept() - assert isinstance(server_stream, SocketStream) - assert server_stream.socket.getsockname() == listen_sock.getsockname() - assert server_stream.socket.getpeername() == client_sock.getsockname() - - with assert_checkpoints(): - await listener.aclose() - - with assert_checkpoints(): - await listener.aclose() - - with assert_checkpoints(): - with pytest.raises(_core.ClosedResourceError): - await listener.accept() - - client_sock.close() - await server_stream.aclose() - - -async def test_UnixSocketListener_socket_closed_underfoot() -> None: - listen_sock = tsocket.socket() - await listen_sock.bind(("127.0.0.1", 0)) - listen_sock.listen(10) - listener = UnixSocketListener(listen_sock) - - # Close the socket, not the listener - listen_sock.close() - - # UnixSocketListener gives correct error - with assert_checkpoints(): - with pytest.raises(_core.ClosedResourceError): - await listener.accept() - - -async def test_UnixSocketListener_accept_errors() -> None: - class FakeSocket(tsocket.SocketType): - def __init__(self, events) -> None: - self._events = iter(events) - - type = tsocket.SOCK_STREAM - - # Fool the check for SO_ACCEPTCONN in UnixSocketListener.__init__ - def getsockopt(self, level: object, opt: object) -> bool: - return True - - def setsockopt(self, level: object, opt: object, value: object) -> None: - pass - - async def accept(self): - await _core.checkpoint() - event = next(self._events) - if isinstance(event, BaseException): - raise event - else: - return event, None - - fake_server_sock = FakeSocket([]) - - fake_listen_sock = FakeSocket( - [ - OSError(errno.ECONNABORTED, "Connection aborted"), - OSError(errno.EPERM, "Permission denied"), - OSError(errno.EPROTO, "Bad protocol"), - fake_server_sock, - OSError(errno.EMFILE, "Out of file descriptors"), - OSError(errno.EFAULT, "attempt to write to read-only memory"), - OSError(errno.ENOBUFS, "out of buffers"), - fake_server_sock, - ] - ) - - l = UnixSocketListener(fake_listen_sock) - - with assert_checkpoints(): - s = await l.accept() - assert s.socket is fake_server_sock - - for code in [errno.EMFILE, errno.EFAULT, errno.ENOBUFS]: - with assert_checkpoints(): - with pytest.raises(OSError) as excinfo: - await l.accept() - assert excinfo.value.errno == code - - with assert_checkpoints(): - s = await l.accept() - assert s.socket is fake_server_sock - - -async def test_socket_stream_works_when_peer_has_already_closed() -> None: - sock_a, sock_b = tsocket.socketpair() - with sock_a, sock_b: - await sock_b.send(b"x") - sock_b.close() - stream = SocketStream(sock_a) - assert await stream.receive_some(1) == b"x" - assert await stream.receive_some(1) == b"" - - -async def test_open_unix_listeners_basic() -> None: - listeners = await open_unix_listeners(0) - assert isinstance(listeners, list) - for obj in listeners: - assert isinstance(obj, UnixSocketListener) - # Binds to wildcard address by default - assert obj.socket.family in [tsocket.AF_INET, tsocket.AF_INET6] - assert obj.socket.getsockname()[0] in ["0.0.0.0", "::"] - - listener = listeners[0] - # Make sure the backlog is at least 2 - c1 = await open_stream_to_socket_listener(listener) - c2 = await open_stream_to_socket_listener(listener) - - s1 = await listener.accept() - s2 = await listener.accept() - - # Note that we don't know which client stream is connected to which server - # stream - await s1.send_all(b"x") - await s2.send_all(b"x") - assert await c1.receive_some(1) == b"x" - assert await c2.receive_some(1) == b"x" - - for resource in [c1, c2, s1, s2] + listeners: - await resource.aclose() - - -async def test_open_unix_listeners_specific_port_specific_host() -> None: - # Pick a port - sock = tsocket.socket() - await sock.bind(("127.0.0.1", 0)) - host, port = sock.getsockname() - sock.close() - - (listener,) = await open_unix_listeners(port, host=host) - async with listener: - assert listener.socket.getsockname() == (host, port) - - -@binds_ipv6 -async def test_open_unix_listeners_ipv6_v6only() -> None: - # Check IPV6_V6ONLY is working properly - (ipv6_listener,) = await open_unix_listeners(0, host="::1") - async with ipv6_listener: - _, port, *_ = ipv6_listener.socket.getsockname() - - with pytest.raises(OSError): - await open_tcp_stream("127.0.0.1", port) - - -async def test_open_unix_listeners_rebind() -> None: - (l1,) = await open_unix_listeners(0, host="127.0.0.1") - sockaddr1 = l1.socket.getsockname() - - # Plain old rebinding while it's still there should fail, even if we have - # SO_REUSEADDR set - with stdlib_socket.socket() as probe: - probe.setsockopt(stdlib_socket.SOL_SOCKET, stdlib_socket.SO_REUSEADDR, 1) - with pytest.raises(OSError): - probe.bind(sockaddr1) - - # Now use the first listener to set up some connections in various states, - # and make sure that they don't create any obstacle to rebinding a second - # listener after the first one is closed. - c_established = await open_stream_to_socket_listener(l1) - s_established = await l1.accept() - - c_time_wait = await open_stream_to_socket_listener(l1) - s_time_wait = await l1.accept() - # Server-initiated close leaves socket in TIME_WAIT - await s_time_wait.aclose() - - await l1.aclose() - (l2,) = await open_unix_listeners(sockaddr1[1], host="127.0.0.1") - sockaddr2 = l2.socket.getsockname() - - assert sockaddr1 == sockaddr2 - assert s_established.socket.getsockname() == sockaddr2 - assert c_time_wait.socket.getpeername() == sockaddr2 - - for resource in [ - l1, - l2, - c_established, - s_established, - c_time_wait, - s_time_wait, - ]: - await resource.aclose() - - -class FakeOSError(OSError): - __slots__ = () - - -@attr.s -class FakeSocket(tsocket.SocketType): - family = attr.ib() - type = attr.ib() - proto = attr.ib() - - closed = attr.ib(default=False) - poison_listen = attr.ib(default=False) - backlog = attr.ib(default=None) - - def getsockopt(self, level: int, option: int) -> bool: - if (level, option) == (tsocket.SOL_SOCKET, tsocket.SO_ACCEPTCONN): - return True - assert False # pragma: no cover - - def setsockopt(self, level: object, option: object, value: object) -> None: - pass - - async def bind(self, sockaddr: object) -> None: - pass - - def listen(self, backlog: int) -> None: - assert self.backlog is None - assert backlog is not None - self.backlog = backlog - if self.poison_listen: - raise FakeOSError("whoops") - - def close(self) -> None: - self.closed = True - - -@attr.s -class FakeSocketFactory: - poison_after: int = attr.ib() - sockets: list[FakeSocket] = attr.ib(factory=list) - raise_on_family: dict[int, int] = attr.ib(factory=dict) # family => errno - - def socket(self, family: int, type: int, proto: int) -> FakeSocket: - if family in self.raise_on_family: - raise OSError(self.raise_on_family[family], "nope") - sock = FakeSocket(family, type, proto) - self.poison_after -= 1 - if self.poison_after == 0: - sock.poison_listen = True - self.sockets.append(sock) - return sock - - -@attr.s -class FakeHostnameResolver: - family_addr_pairs = attr.ib() - - async def getaddrinfo( - self, - host: str | bytes, - port: int, - family: int, - type: int, - proto: int, - flags: int, - ) -> list[tuple[int, int, int, str, tuple[str | bytes, int]]]: - return [ - (family, tsocket.SOCK_STREAM, 0, "", (addr, port)) - for family, addr in self.family_addr_pairs - ] - - -async def test_open_unix_listeners_multiple_host_cleanup_on_error() -> None: - # If we were trying to bind to multiple hosts and one of them failed, they - # call get cleaned up before returning - fsf = FakeSocketFactory(3) - tsocket.set_custom_socket_factory(fsf) - tsocket.set_custom_hostname_resolver( - FakeHostnameResolver( - [ - (tsocket.AF_INET, "1.1.1.1"), - (tsocket.AF_INET, "2.2.2.2"), - (tsocket.AF_INET, "3.3.3.3"), - ] - ) - ) - - with pytest.raises(FakeOSError): - await open_unix_listeners(80, host="example.org") - - assert len(fsf.sockets) == 3 - for sock in fsf.sockets: - assert sock.closed - - -async def test_open_unix_listeners_port_checking() -> None: - for host in ["127.0.0.1", None]: - with pytest.raises(TypeError): - await open_unix_listeners(None, host=host) - with pytest.raises(TypeError): - await open_unix_listeners(b"80", host=host) - with pytest.raises(TypeError): - await open_unix_listeners("http", host=host) - - -async def test_serve_tcp() -> None: - async def handler(stream) -> None: - await stream.send_all(b"x") - - async with trio.open_nursery() as nursery: - listeners = await nursery.start(serve_unix, handler, 0) - stream = await open_stream_to_socket_listener(listeners[0]) - async with stream: - await stream.receive_some(1) == b"x" - nursery.cancel_scope.cancel() - - -@pytest.mark.parametrize( - "try_families", - [{tsocket.AF_INET}, {tsocket.AF_INET6}, {tsocket.AF_INET, tsocket.AF_INET6}], -) -@pytest.mark.parametrize( - "fail_families", - [{tsocket.AF_INET}, {tsocket.AF_INET6}, {tsocket.AF_INET, tsocket.AF_INET6}], -) -async def test_open_unix_listeners_some_address_families_unavailable( - try_families, fail_families -): - fsf = FakeSocketFactory( - 10, raise_on_family={family: errno.EAFNOSUPPORT for family in fail_families} - ) - tsocket.set_custom_socket_factory(fsf) - tsocket.set_custom_hostname_resolver( - FakeHostnameResolver([(family, "foo") for family in try_families]) - ) - - should_succeed = try_families - fail_families - - if not should_succeed: - with pytest.raises(OSError) as exc_info: - await open_unix_listeners(80, host="example.org") - - assert "This system doesn't support" in str(exc_info.value) - if isinstance(exc_info.value.__cause__, BaseExceptionGroup): - for subexc in exc_info.value.__cause__.exceptions: - assert "nope" in str(subexc) - else: - assert isinstance(exc_info.value.__cause__, OSError) - assert "nope" in str(exc_info.value.__cause__) - else: - listeners = await open_unix_listeners(80) - for listener in listeners: - should_succeed.remove(listener.socket.family) - assert not should_succeed - - -async def test_open_unix_listeners_socket_fails_not_afnosupport() -> None: - fsf = FakeSocketFactory( - 10, - raise_on_family={ - tsocket.AF_INET: errno.EAFNOSUPPORT, - tsocket.AF_INET6: errno.EINVAL, - }, - ) - tsocket.set_custom_socket_factory(fsf) - tsocket.set_custom_hostname_resolver( - FakeHostnameResolver([(tsocket.AF_INET, "foo"), (tsocket.AF_INET6, "bar")]) - ) - - with pytest.raises(OSError) as exc_info: - await open_unix_listeners(80, host="example.org") - assert exc_info.value.errno == errno.EINVAL - assert exc_info.value.__cause__ is None - assert "nope" in str(exc_info.value) - - -# We used to have an elaborate test that opened a real TCP listening socket -# and then tried to measure its backlog by making connections to it. And most -# of the time, it worked. But no matter what we tried, it was always fragile, -# because it had to do things like use timeouts to guess when the listening -# queue was full, sometimes the CI hosts go into SYN-cookie mode (where there -# effectively is no backlog), sometimes the host might not be enough resources -# to give us the full requested backlog... it was a mess. So now we just check -# that the backlog argument is passed through correctly. -async def test_open_unix_listeners_backlog() -> None: - fsf = FakeSocketFactory(99) - tsocket.set_custom_socket_factory(fsf) - for given, expected in [ - (None, 0xFFFF), - (inf, 0xFFFF), - (99999999, 0xFFFF), - (10, 10), - (1, 1), - ]: - listeners = await open_unix_listeners(0, backlog=given) - assert listeners - for listener in listeners: - assert listener.socket.backlog == expected - - -async def test_open_unix_listeners_backlog_float_error() -> None: - fsf = FakeSocketFactory(99) - tsocket.set_custom_socket_factory(fsf) - for should_fail in (0.0, 2.18, 3.14, 9.75): - with pytest.raises( - ValueError, match=f"Only accepts infinity, not {should_fail!r}" - ): - await open_unix_listeners(0, backlog=should_fail) From dac34f7570f8530c9e44ef60a87ec850d90c873a Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Tue, 17 Oct 2023 21:40:22 -0500 Subject: [PATCH 11/19] WIP tests for open_unix_listeners --- .../test_highlevel_open_unix_listeners.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 trio/_tests/test_highlevel_open_unix_listeners.py diff --git a/trio/_tests/test_highlevel_open_unix_listeners.py b/trio/_tests/test_highlevel_open_unix_listeners.py new file mode 100644 index 0000000000..9cd43e30bc --- /dev/null +++ b/trio/_tests/test_highlevel_open_unix_listeners.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import sys +from typing import TYPE_CHECKING + +import trio.socket as tsocket +from trio import SocketListener +from trio._highlevel_open_unix_listeners import ( + open_unix_listeners, +) +from trio.testing import open_stream_to_socket_listener + +assert ( # Skip type checking when on Windows + sys.platform != "win32" or not TYPE_CHECKING +) + + +async def test_open_unix_listeners_basic() -> None: + listeners = await open_unix_listeners(0) + assert isinstance(listeners, list) + for obj in listeners: + assert isinstance(obj, SocketListener) + # Binds to wildcard address by default + assert obj.socket.family in [tsocket.AF_INET, tsocket.AF_INET6] + assert obj.socket.getsockname()[0] in ["0.0.0.0", "::"] + + listener = listeners[0] + # Make sure the backlog is at least 2 + c1 = await open_stream_to_socket_listener(listener) + c2 = await open_stream_to_socket_listener(listener) + + s1 = await listener.accept() + s2 = await listener.accept() + + # Note that we don't know which client stream is connected to which server + # stream + await s1.send_all(b"x") + await s2.send_all(b"x") + assert await c1.receive_some(1) == b"x" + assert await c2.receive_some(1) == b"x" + + for resource in [c1, c2, s1, s2, *listeners]: + await resource.aclose() From 9784dd1030c027ff9ea3fabadb996ffebb2a5bb4 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Sun, 5 Nov 2023 19:43:16 -0600 Subject: [PATCH 12/19] Fix typing and ruff issue --- trio/_highlevel_open_unix_listeners.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index be30fc514a..de3cf909a9 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -6,7 +6,7 @@ import time from collections.abc import Awaitable, Callable, Generator from contextlib import contextmanager -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, NoReturn import trio import trio.socket as tsocket @@ -247,7 +247,7 @@ async def accept(self) -> SocketStream: sock, _ = await self.socket.accept() except OSError as exc: if exc.errno in _closed_stream_errnos: - raise trio.ClosedResourceError + raise trio.ClosedResourceError from None if exc.errno not in _ignorable_accept_errnos: raise else: @@ -312,7 +312,7 @@ async def serve_unix( backlog: int | None = None, handler_nursery: trio.Nursery | None = None, task_status: trio.TaskStatus[list[UnixSocketListener]] = trio.TASK_STATUS_IGNORED, -) -> None: +) -> NoReturn: """Listen for incoming UNIX connections, and for each one start a task running ``handler(stream)``. From 99f17c5303cb07d22377b8c87c4e904bc6330302 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Sun, 5 Nov 2023 19:47:10 -0600 Subject: [PATCH 13/19] `Final Metaclass` -> `@final` --- trio/_highlevel_open_unix_listeners.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index de3cf909a9..18d03c3d57 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -16,7 +16,7 @@ _closed_stream_errnos, _ignorable_accept_errnos, ) -from ._util import Final +from ._util import final from .abc import Listener if TYPE_CHECKING: @@ -111,7 +111,8 @@ def _lock( os.unlink(name) -class UnixSocketListener(Listener[SocketStream], metaclass=Final): +@final +class UnixSocketListener(Listener[SocketStream]): """A :class:`~trio.abc.Listener` that uses a listening socket to accept incoming connections as :class:`SocketStream` objects. From 7cec7cb2c541f8192c5093da1294d0ece89509b8 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Sun, 5 Nov 2023 19:51:06 -0600 Subject: [PATCH 14/19] Publicly expose imported functions --- trio/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/trio/__init__.py b/trio/__init__.py index 72a0004a4e..fa13fced86 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -63,7 +63,10 @@ serve_tcp as serve_tcp, ) from ._highlevel_open_tcp_stream import open_tcp_stream as open_tcp_stream -from ._highlevel_open_unix_listeners import open_unix_listeners, serve_unix +from ._highlevel_open_unix_listeners import ( + open_unix_listeners as open_unix_listeners, + serve_unix as serve_unix, +) from ._highlevel_open_unix_stream import open_unix_socket as open_unix_socket from ._highlevel_serve_listeners import serve_listeners as serve_listeners from ._highlevel_socket import ( From 0e950d1123ab4c026c992f602020f158d5d6178b Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Sun, 5 Nov 2023 20:10:05 -0600 Subject: [PATCH 15/19] Documentation and expose `UnixSocketListener` for typing purposes --- trio/__init__.py | 1 + trio/_highlevel_open_unix_listeners.py | 21 ++++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/trio/__init__.py b/trio/__init__.py index fa13fced86..a4dff645b9 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -72,6 +72,7 @@ from ._highlevel_socket import ( SocketListener as SocketListener, SocketStream as SocketStream, + UnixSocketListener as UnixSocketListener, ) from ._highlevel_ssl_helpers import ( open_ssl_over_tcp_listeners as open_ssl_over_tcp_listeners, diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index 18d03c3d57..6649baa9b6 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -212,8 +212,27 @@ async def create( mode: int = 0o666, backlog: int | None = None, ) -> Self: + """Create new unix socket listener. + + Args: + path (str, bytes): Path to folder where new socket file will be created. + + mode (int): Unix octal file mode of new socket file. + + backlog (int or None): The listen backlog to use. If you leave this as + ``None`` then Trio will pick a good default. (Currently: whatever + your system has configured as the maximum backlog.) + + Returns: + :class:`UnixSocketListener` + + Raises: + FileNotFoundError: if socket path doesn't exist + FileExistsError: if existing file is not a socket + + """ backlog = _compute_backlog(backlog) - return await cls._create(path, mode, backlog or 0xFFFF) + return await cls._create(path=path, mode=mode, backlog=backlog or 0xFFFF) def _close(self) -> None: try: From dc493ffddb490f8d429415b99bae19c2040e1c5a Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Sun, 5 Nov 2023 20:26:16 -0600 Subject: [PATCH 16/19] WIP add tests --- trio/_highlevel_open_unix_listeners.py | 6 +++--- trio/_tests/test_highlevel_open_unix_listeners.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/trio/_highlevel_open_unix_listeners.py b/trio/_highlevel_open_unix_listeners.py index 6649baa9b6..f6c44710be 100644 --- a/trio/_highlevel_open_unix_listeners.py +++ b/trio/_highlevel_open_unix_listeners.py @@ -153,7 +153,7 @@ def __init__( pass else: if not listening: - raise ValueError("SocketListener requires a listening socket") + raise ValueError("UnixSocketListener requires a listening socket") self.socket = socket self.path = path @@ -193,12 +193,12 @@ async def _create( with _lock(path): os.rename(tmp_path, path) return cls(sock, path, inode) - except: # noqa: E722 + except BaseException: try: os.unlink(tmp_path) finally: raise - except: # noqa: E722 + except BaseException: try: sock.close() finally: diff --git a/trio/_tests/test_highlevel_open_unix_listeners.py b/trio/_tests/test_highlevel_open_unix_listeners.py index 9cd43e30bc..912e88fae0 100644 --- a/trio/_tests/test_highlevel_open_unix_listeners.py +++ b/trio/_tests/test_highlevel_open_unix_listeners.py @@ -16,7 +16,8 @@ async def test_open_unix_listeners_basic() -> None: - listeners = await open_unix_listeners(0) + # Since we are on unix, we can use fun things like /tmp + listeners = await open_unix_listeners("/tmp/test_socket.sock", backlog=0) assert isinstance(listeners, list) for obj in listeners: assert isinstance(obj, SocketListener) From 74ca3db59979864758b1ae637e043204c960e4a7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 Jan 2025 23:08:24 +0000 Subject: [PATCH 17/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/trio/_highlevel_open_unix_listeners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trio/_highlevel_open_unix_listeners.py b/src/trio/_highlevel_open_unix_listeners.py index 1854476df9..78d5e67eb3 100644 --- a/src/trio/_highlevel_open_unix_listeners.py +++ b/src/trio/_highlevel_open_unix_listeners.py @@ -132,7 +132,7 @@ class UnixSocketListener(Listener[SocketStream]): """ - __slots__ = ("socket", "path", "inode") + __slots__ = ("inode", "path", "socket") def __init__( self, From 564467f20dda794ba201663cf11d33a1f43f8ac3 Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Mon, 20 Jan 2025 13:21:20 -0600 Subject: [PATCH 18/19] Fix test --- src/trio/__init__.py | 2 +- .../test_highlevel_open_unix_listeners.py | 56 +++++++++++++++---- 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/src/trio/__init__.py b/src/trio/__init__.py index 7f70566825..7944c26ba9 100644 --- a/src/trio/__init__.py +++ b/src/trio/__init__.py @@ -64,6 +64,7 @@ ) from ._highlevel_open_tcp_stream import open_tcp_stream as open_tcp_stream from ._highlevel_open_unix_listeners import ( + UnixSocketListener as UnixSocketListener, open_unix_listeners as open_unix_listeners, serve_unix as serve_unix, ) @@ -72,7 +73,6 @@ from ._highlevel_socket import ( SocketListener as SocketListener, SocketStream as SocketStream, - UnixSocketListener as UnixSocketListener, ) from ._highlevel_ssl_helpers import ( open_ssl_over_tcp_listeners as open_ssl_over_tcp_listeners, diff --git a/src/trio/_tests/test_highlevel_open_unix_listeners.py b/src/trio/_tests/test_highlevel_open_unix_listeners.py index 912e88fae0..67ee2ba982 100644 --- a/src/trio/_tests/test_highlevel_open_unix_listeners.py +++ b/src/trio/_tests/test_highlevel_open_unix_listeners.py @@ -3,32 +3,66 @@ import sys from typing import TYPE_CHECKING +import pytest + import trio.socket as tsocket -from trio import SocketListener from trio._highlevel_open_unix_listeners import ( + UnixSocketListener, open_unix_listeners, ) -from trio.testing import open_stream_to_socket_listener +from trio._highlevel_socket import SocketStream + +assert not TYPE_CHECKING or sys.platform != "win32" -assert ( # Skip type checking when on Windows - sys.platform != "win32" or not TYPE_CHECKING +skip_if_not_unix = pytest.mark.skipif( + not hasattr(tsocket, "AF_UNIX"), + reason="Needs unix socket support", ) +async def open_stream_to_unix_socket_listener( + socket_listener: UnixSocketListener, + sockaddr: str, +) -> SocketStream: + """Connect to the given :class:`~trio.UnixSocketListener`. + + This is particularly useful in tests when you want to let a server pick + its own port, and then connect to it:: + + listeners = await trio.open_tcp_listeners(0) + client = await trio.testing.open_stream_to_socket_listener(listeners[0]) + + Args: + socket_listener (~trio.UnixSocketListener): The + :class:`~trio.UnixSocketListener` to connect to. + + Returns: + SocketStream: a stream connected to the given listener. + + """ + family = socket_listener.socket.family + assert family == tsocket.AF_UNIX + + sock = tsocket.socket(family=family) + await sock.connect(sockaddr) + return SocketStream(sock) + + +@skip_if_not_unix async def test_open_unix_listeners_basic() -> None: # Since we are on unix, we can use fun things like /tmp - listeners = await open_unix_listeners("/tmp/test_socket.sock", backlog=0) + path = "/tmp/test_socket.sock" + listeners = await open_unix_listeners(path, backlog=0) assert isinstance(listeners, list) for obj in listeners: - assert isinstance(obj, SocketListener) - # Binds to wildcard address by default - assert obj.socket.family in [tsocket.AF_INET, tsocket.AF_INET6] - assert obj.socket.getsockname()[0] in ["0.0.0.0", "::"] + assert obj.socket.family == tsocket.AF_UNIX + # Does not work because of atomic overwrite + # assert obj.socket.getsockname() == path listener = listeners[0] # Make sure the backlog is at least 2 - c1 = await open_stream_to_socket_listener(listener) - c2 = await open_stream_to_socket_listener(listener) + c1 = await open_stream_to_unix_socket_listener(listener, path) + c2 = await open_stream_to_unix_socket_listener(listener, path) s1 = await listener.accept() s2 = await listener.accept() From 5091177f045ef4befaea0a2182b882f0d678185c Mon Sep 17 00:00:00 2001 From: CoolCat467 <52022020+CoolCat467@users.noreply.github.com> Date: Mon, 20 Jan 2025 13:23:41 -0600 Subject: [PATCH 19/19] Add newsfragment --- newsfragments/279.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/279.feature.rst diff --git a/newsfragments/279.feature.rst b/newsfragments/279.feature.rst new file mode 100644 index 0000000000..7c5820e11d --- /dev/null +++ b/newsfragments/279.feature.rst @@ -0,0 +1 @@ +Add ``trio.open_unix_listener``, ``trio.serve_unix``, and ``trio.UnixSocketListener`` to support ``SOCK_STREAM`` `Unix domain sockets `__