diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index 9207afb41b..e270033b46 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -731,7 +731,11 @@ task and interact with it while it's running: .. autofunction:: trio.run_process -.. autoclass:: trio.Process +.. autoclass:: trio._subprocess.HasFileno(Protocol) + + .. automethod:: fileno + +.. autoclass:: trio.Process() .. autoattribute:: returncode diff --git a/pyproject.toml b/pyproject.toml index 7f2a32f810..566e9ec49f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,11 @@ module = [ "trio._highlevel_open_tcp_stream", "trio._ki", "trio._socket", + "trio._subprocess", + "trio._subprocess_platform", + "trio._subprocess_platform.kqueue", + "trio._subprocess_platform.waitid", + "trio._subprocess_platform.windows", "trio._sync", "trio._threads", "trio._tools.gen_exports", diff --git a/trio/_subprocess.py b/trio/_subprocess.py index 1f8d0a8253..7cf990fa53 100644 --- a/trio/_subprocess.py +++ b/trio/_subprocess.py @@ -1,15 +1,20 @@ +from __future__ import annotations + import os +import signal import subprocess import sys import warnings +from collections.abc import Awaitable, Callable, Mapping, Sequence from contextlib import ExitStack from functools import partial -from typing import TYPE_CHECKING, Optional +from io import TextIOWrapper +from typing import TYPE_CHECKING, Final, Literal, Protocol, Union, overload import trio from ._abc import AsyncResource, ReceiveStream, SendStream -from ._core import ClosedResourceError +from ._core import ClosedResourceError, TaskStatus from ._deprecate import deprecated from ._highlevel_generic import StapledStream from ._subprocess_platform import ( @@ -20,6 +25,14 @@ from ._sync import Lock from ._util import NoPublicConstructor +if TYPE_CHECKING: + from typing_extensions import TypeAlias + + +# Only subscriptable in 3.9+ +StrOrBytesPath: TypeAlias = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"] + + # Linux-specific, but has complex lifetime management stuff so we hard-code it # here instead of hiding it behind the _subprocess_platform abstraction can_try_pidfd_open: bool @@ -65,6 +78,13 @@ def pidfd_open(fd: int, flags: int) -> int: can_try_pidfd_open = False +class HasFileno(Protocol): + """Represents any file-like object that has a file descriptor.""" + + def fileno(self) -> int: + ... + + class Process(AsyncResource, metaclass=NoPublicConstructor): r"""A child process. Like :class:`subprocess.Popen`, but async. @@ -107,32 +127,38 @@ class Process(AsyncResource, metaclass=NoPublicConstructor): available; otherwise this will be None. """ - - universal_newlines = False - encoding = None - errors = None + # We're always in binary mode. + universal_newlines: Final = False + encoding: Final = None + errors: Final = None # Available for the per-platform wait_child_exiting() implementations # to stash some state; waitid platforms use this to avoid spawning # arbitrarily many threads if wait() keeps getting cancelled. - _wait_for_exit_data = None - - def __init__(self, popen, stdin, stdout, stderr): + _wait_for_exit_data: object = None + + def __init__( + self, + popen: subprocess.Popen[bytes], + stdin: SendStream | None, + stdout: ReceiveStream | None, + stderr: ReceiveStream | None, + ) -> None: self._proc = popen - self.stdin: Optional[SendStream] = stdin - self.stdout: Optional[ReceiveStream] = stdout - self.stderr: Optional[ReceiveStream] = stderr + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr - self.stdio: Optional[StapledStream] = None + self.stdio: StapledStream | None = None if self.stdin is not None and self.stdout is not None: self.stdio = StapledStream(self.stdin, self.stdout) - self._wait_lock = Lock() + self._wait_lock: Lock = Lock() - self._pidfd = None + self._pidfd: TextIOWrapper | None = None if can_try_pidfd_open: try: - fd = pidfd_open(self._proc.pid, 0) + fd: int = pidfd_open(self._proc.pid, 0) except OSError: # Well, we tried, but it didn't work (probably because we're # running on an older kernel, or in an older sandbox, that @@ -144,10 +170,10 @@ def __init__(self, popen, stdin, stdout, stderr): # make sure it'll get closed. self._pidfd = open(fd) - self.args = self._proc.args - self.pid = self._proc.pid + self.args: StrOrBytesPath | Sequence[StrOrBytesPath] = self._proc.args + self.pid: int = self._proc.pid - def __repr__(self): + def __repr__(self) -> str: returncode = self.returncode if returncode is None: status = f"running with PID {self.pid}" @@ -159,7 +185,7 @@ def __repr__(self): return f"" @property - def returncode(self): + def returncode(self) -> int | None: """The exit status of the process (an integer), or ``None`` if it's still running. @@ -186,13 +212,13 @@ def returncode(self): issue=1104, instead="run_process or nursery.start(run_process, ...)", ) - async def __aenter__(self): + async def __aenter__(self) -> Process: return self @deprecated( "0.20.0", issue=1104, instead="run_process or nursery.start(run_process, ...)" ) - async def aclose(self): + async def aclose(self) -> None: """Close any pipes we have to the process (both input and output) and wait for it to exit. @@ -214,13 +240,13 @@ async def aclose(self): with trio.CancelScope(shield=True): await self.wait() - def _close_pidfd(self): + def _close_pidfd(self) -> None: if self._pidfd is not None: trio.lowlevel.notify_closing(self._pidfd.fileno()) self._pidfd.close() self._pidfd = None - async def wait(self): + async def wait(self) -> int: """Block until the process exits. Returns: @@ -230,7 +256,7 @@ async def wait(self): if self.poll() is None: if self._pidfd is not None: try: - await trio.lowlevel.wait_readable(self._pidfd) + await trio.lowlevel.wait_readable(self._pidfd.fileno()) except ClosedResourceError: # something else (probably a call to poll) already closed the # pidfd @@ -248,7 +274,7 @@ async def wait(self): assert self._proc.returncode is not None return self._proc.returncode - def poll(self): + def poll(self) -> int | None: """Returns the exit status of the process (an integer), or ``None`` if it's still running. @@ -260,7 +286,7 @@ def poll(self): """ return self.returncode - def send_signal(self, sig): + def send_signal(self, sig: signal.Signals | int) -> None: """Send signal ``sig`` to the process. On UNIX, ``sig`` may be any signal defined in the @@ -270,7 +296,7 @@ def send_signal(self, sig): """ self._proc.send_signal(sig) - def terminate(self): + def terminate(self) -> None: """Terminate the process, politely if possible. On UNIX, this is equivalent to @@ -281,7 +307,7 @@ def terminate(self): """ self._proc.terminate() - def kill(self): + def kill(self) -> None: """Immediately terminate the process. On UNIX, this is equivalent to @@ -294,8 +320,13 @@ def kill(self): self._proc.kill() -async def open_process( - command, *, stdin=None, stdout=None, stderr=None, **options +async def _open_process( + command: list[str] | str, + *, + stdin: int | HasFileno | None = None, + stdout: int | HasFileno | None = None, + stderr: int | HasFileno | None = None, + **options: object, ) -> Process: r"""Execute a child program in a new process. @@ -366,9 +397,9 @@ async def open_process( "on UNIX systems" ) - trio_stdin: Optional[ClosableSendStream] = None - trio_stdout: Optional[ClosableReceiveStream] = None - trio_stderr: Optional[ClosableReceiveStream] = None + trio_stdin: ClosableSendStream | None = None + trio_stdout: ClosableReceiveStream | None = None + trio_stderr: ClosableReceiveStream | None = None # Close the parent's handle for each child side of a pipe; we want the child to # have the only copy, so that when it exits we can read EOF on our side. The # trio ends of pipes will be transferred to the Process object, which will be @@ -414,14 +445,14 @@ async def open_process( return Process._create(popen, trio_stdin, trio_stdout, trio_stderr) -async def _windows_deliver_cancel(p): +async def _windows_deliver_cancel(p: Process) -> None: try: p.terminate() except OSError as exc: warnings.warn(RuntimeWarning(f"TerminateProcess on {p!r} failed with: {exc!r}")) -async def _posix_deliver_cancel(p): +async def _posix_deliver_cancel(p: Process) -> None: try: p.terminate() await trio.sleep(5) @@ -439,17 +470,18 @@ async def _posix_deliver_cancel(p): ) -async def run_process( - command, +# Use a private name, so we can declare platform-specific stubs below. +async def _run_process( + command: StrOrBytesPath | Sequence[StrOrBytesPath], *, - stdin=b"", - capture_stdout=False, - capture_stderr=False, - check=True, - deliver_cancel=None, - task_status=trio.TASK_STATUS_IGNORED, - **options, -): + stdin: bytes | bytearray | memoryview | int | HasFileno | None = b"", + capture_stdout: bool = False, + capture_stderr: bool = False, + check: bool = True, + deliver_cancel: Callable[[Process], Awaitable[object]] | None = None, + task_status: TaskStatus[Process] = trio.TASK_STATUS_IGNORED, + **options: object, +) -> subprocess.CompletedProcess[bytes]: """Run ``command`` in a subprocess and wait for it to complete. This function can be called in two different ways. @@ -687,23 +719,28 @@ async def my_deliver_cancel(process): assert os.name == "posix" deliver_cancel = _posix_deliver_cancel - stdout_chunks = [] - stderr_chunks = [] + stdout_chunks: list[bytes | bytearray] = [] + stderr_chunks: list[bytes | bytearray] = [] - async def feed_input(stream): + async def feed_input(stream: SendStream) -> None: async with stream: try: + assert input is not None await stream.send_all(input) except trio.BrokenResourceError: pass - async def read_output(stream, chunks): + async def read_output( + stream: ReceiveStream, + chunks: list[bytes | bytearray], + ) -> None: async with stream: async for chunk in stream: chunks.append(chunk) async with trio.open_nursery() as nursery: - proc = await open_process(command, **options) + # options needs a complex TypedDict. The overload error only occurs on Unix. + proc = await open_process(command, **options) # type: ignore[arg-type, call-overload, unused-ignore] try: if input is not None: nursery.start_soon(feed_input, proc.stdin) @@ -722,7 +759,7 @@ async def read_output(stream, chunks): with trio.CancelScope(shield=True): killer_cscope = trio.CancelScope(shield=True) - async def killer(): + async def killer() -> None: with killer_cscope: await deliver_cancel(proc) @@ -739,4 +776,147 @@ async def killer(): proc.returncode, proc.args, output=stdout, stderr=stderr ) else: + assert proc.returncode is not None return subprocess.CompletedProcess(proc.args, proc.returncode, stdout, stderr) + + +# There's a lot of duplication here because type checkers don't +# have a good way to represent overloads that differ only +# slightly. A cheat sheet: +# - on Windows, command is Union[str, Sequence[str]]; +# on Unix, command is str if shell=True and Sequence[str] otherwise +# - on Windows, there are startupinfo and creationflags options; +# on Unix, there are preexec_fn, restore_signals, start_new_session, and pass_fds +# - run_process() has the signature of open_process() plus arguments +# capture_stdout, capture_stderr, check, deliver_cancel, and the ability to pass +# bytes as stdin + +if TYPE_CHECKING: + if sys.platform == "win32": + + async def open_process( + command: Union[StrOrBytesPath, Sequence[StrOrBytesPath]], + *, + stdin: int | HasFileno | None = None, + stdout: int | HasFileno | None = None, + stderr: int | HasFileno | None = None, + close_fds: bool = True, + shell: bool = False, + cwd: StrOrBytesPath | None = None, + env: Mapping[str, str] | None = None, + startupinfo: subprocess.STARTUPINFO | None = None, + creationflags: int = 0, + ) -> trio.Process: + ... + + async def run_process( + command: StrOrBytesPath | Sequence[StrOrBytesPath], + *, + task_status: TaskStatus[Process] = trio.TASK_STATUS_IGNORED, + stdin: bytes | bytearray | memoryview | int | HasFileno | None = None, + capture_stdout: bool = False, + capture_stderr: bool = False, + check: bool = True, + deliver_cancel: Callable[[Process], Awaitable[object]] | None = None, + stdout: int | HasFileno | None = None, + stderr: int | HasFileno | None = None, + close_fds: bool = True, + shell: bool = False, + cwd: StrOrBytesPath | None = None, + env: Mapping[str, str] | None = None, + startupinfo: subprocess.STARTUPINFO | None = None, + creationflags: int = 0, + ) -> subprocess.CompletedProcess[bytes]: + ... + + else: # Unix + + @overload # type: ignore[no-overload-impl] + async def open_process( + command: StrOrBytesPath, + *, + stdin: int | HasFileno | None = None, + stdout: int | HasFileno | None = None, + stderr: int | HasFileno | None = None, + close_fds: bool = True, + shell: Literal[True], + cwd: StrOrBytesPath | None = None, + env: Mapping[str, str] | None = None, + preexec_fn: Callable[[], object] | None = None, + restore_signals: bool = True, + start_new_session: bool = False, + pass_fds: Sequence[int] = (), + ) -> trio.Process: + ... + + @overload + async def open_process( + command: Sequence[StrOrBytesPath], + *, + stdin: int | HasFileno | None = None, + stdout: int | HasFileno | None = None, + stderr: int | HasFileno | None = None, + close_fds: bool = True, + shell: bool = False, + cwd: StrOrBytesPath | None = None, + env: Mapping[str, str] | None = None, + preexec_fn: Callable[[], object] | None = None, + restore_signals: bool = True, + start_new_session: bool = False, + pass_fds: Sequence[int] = (), + ) -> trio.Process: + ... + + @overload # type: ignore[no-overload-impl] + async def run_process( + command: StrOrBytesPath, + *, + task_status: TaskStatus[Process] = trio.TASK_STATUS_IGNORED, + stdin: bytes | bytearray | memoryview | int | HasFileno | None = None, + capture_stdout: bool = False, + capture_stderr: bool = False, + check: bool = True, + deliver_cancel: Callable[[Process], Awaitable[object]] | None = None, + stdout: int | HasFileno | None = None, + stderr: int | HasFileno | None = None, + close_fds: bool = True, + shell: Literal[True], + cwd: StrOrBytesPath | None = None, + env: Mapping[str, str] | None = None, + preexec_fn: Callable[[], object] | None = None, + restore_signals: bool = True, + start_new_session: bool = False, + pass_fds: Sequence[int] = (), + ) -> subprocess.CompletedProcess[bytes]: + ... + + @overload + async def run_process( + command: Sequence[StrOrBytesPath], + *, + task_status: TaskStatus[Process] = trio.TASK_STATUS_IGNORED, + stdin: bytes | bytearray | memoryview | int | HasFileno | None = None, + capture_stdout: bool = False, + capture_stderr: bool = False, + check: bool = True, + deliver_cancel: Callable[[Process], Awaitable[None]] | None = None, + stdout: int | HasFileno | None = None, + stderr: int | HasFileno | None = None, + close_fds: bool = True, + shell: bool = False, + cwd: StrOrBytesPath | None = None, + env: Mapping[str, str] | None = None, + preexec_fn: Callable[[], object] | None = None, + restore_signals: bool = True, + start_new_session: bool = False, + pass_fds: Sequence[int] = (), + ) -> subprocess.CompletedProcess[bytes]: + ... + +else: + # At runtime, use the actual implementations. + open_process = _open_process + open_process.__name__ = open_process.__qualname__ = "open_process" + + run_process = _run_process + run_process.__name__ = run_process.__qualname__ = "run_process" diff --git a/trio/_subprocess_platform/kqueue.py b/trio/_subprocess_platform/kqueue.py index 9839fd046b..efd0562fc2 100644 --- a/trio/_subprocess_platform/kqueue.py +++ b/trio/_subprocess_platform/kqueue.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import select import sys from typing import TYPE_CHECKING @@ -35,7 +37,7 @@ async def wait_child_exiting(process: "_subprocess.Process") -> None: # in Chromium it seems we should still keep the check. return - def abort(_): + def abort(_: _core.RaiseCancelT) -> _core.Abort: kqueue.control([make_event(select.KQ_EV_DELETE)], 0) return _core.Abort.SUCCEEDED diff --git a/trio/_subprocess_platform/waitid.py b/trio/_subprocess_platform/waitid.py index ad69017219..2a2ca6719d 100644 --- a/trio/_subprocess_platform/waitid.py +++ b/trio/_subprocess_platform/waitid.py @@ -2,15 +2,19 @@ import math import os import sys +from typing import TYPE_CHECKING from .. import _core, _subprocess from .._sync import CapacityLimiter, Event from .._threads import to_thread_run_sync +assert (sys.platform != "win32" and sys.platform != "darwin") or not TYPE_CHECKING + + try: from os import waitid - def sync_wait_reapable(pid): + def sync_wait_reapable(pid: int) -> None: waitid(os.P_PID, pid, os.WEXITED | os.WNOWAIT) except ImportError: @@ -39,9 +43,9 @@ def sync_wait_reapable(pid): int waitid(int idtype, int id, siginfo_t* result, int options); """ ) - waitid = waitid_ffi.dlopen(None).waitid + waitid_cffi = waitid_ffi.dlopen(None).waitid - def sync_wait_reapable(pid): + def sync_wait_reapable(pid: int) -> None: P_PID = 1 WEXITED = 0x00000004 if sys.platform == "darwin": # pragma: no cover @@ -52,7 +56,7 @@ def sync_wait_reapable(pid): else: WNOWAIT = 0x01000000 result = waitid_ffi.new("siginfo_t *") - while waitid(P_PID, pid, result, WEXITED | WNOWAIT) < 0: + while waitid_cffi(P_PID, pid, result, WEXITED | WNOWAIT) < 0: got_errno = waitid_ffi.errno if got_errno == errno.EINTR: continue @@ -101,7 +105,7 @@ async def wait_child_exiting(process: "_subprocess.Process") -> None: # process. if process._wait_for_exit_data is None: - process._wait_for_exit_data = event = Event() # type: ignore + process._wait_for_exit_data = event = Event() _core.spawn_system_task(_waitid_system_task, process.pid, event) assert isinstance(process._wait_for_exit_data, Event) await process._wait_for_exit_data.wait() diff --git a/trio/_subprocess_platform/windows.py b/trio/_subprocess_platform/windows.py index 958be8675c..1634e74fa7 100644 --- a/trio/_subprocess_platform/windows.py +++ b/trio/_subprocess_platform/windows.py @@ -3,4 +3,5 @@ async def wait_child_exiting(process: "_subprocess.Process") -> None: - await WaitForSingleObject(int(process._proc._handle)) + # _handle is not in Popen stubs, though it is present on Windows. + await WaitForSingleObject(int(process._proc._handle)) # type: ignore[attr-defined] diff --git a/trio/_tests/verify_types.json b/trio/_tests/verify_types.json index 397860bcff..d3a994933e 100644 --- a/trio/_tests/verify_types.json +++ b/trio/_tests/verify_types.json @@ -7,11 +7,11 @@ "warningCount": 0 }, "typeCompleteness": { - "completenessScore": 0.9538216560509554, + "completenessScore": 0.9585987261146497, "exportedSymbolCounts": { "withAmbiguousType": 0, - "withKnownType": 599, - "withUnknownType": 29 + "withKnownType": 602, + "withUnknownType": 26 }, "ignoreUnknownTypesFromImports": true, "missingClassDocStringCount": 1, @@ -45,9 +45,9 @@ } ], "otherSymbolCounts": { - "withAmbiguousType": 3, - "withKnownType": 627, - "withUnknownType": 50 + "withAmbiguousType": 1, + "withKnownType": 642, + "withUnknownType": 39 }, "packageName": "trio", "symbols": [ @@ -68,21 +68,8 @@ "trio._ssl.SSLStream.transport_stream", "trio._ssl.SSLStream.unwrap", "trio._ssl.SSLStream.wait_send_all_might_not_block", - "trio._subprocess.Process.__init__", - "trio._subprocess.Process.__repr__", - "trio._subprocess.Process.args", - "trio._subprocess.Process.encoding", - "trio._subprocess.Process.errors", - "trio._subprocess.Process.kill", - "trio._subprocess.Process.pid", - "trio._subprocess.Process.poll", - "trio._subprocess.Process.returncode", - "trio._subprocess.Process.send_signal", - "trio._subprocess.Process.terminate", - "trio._subprocess.Process.wait", "trio.lowlevel.cancel_shielded_checkpoint", "trio.lowlevel.notify_closing", - "trio.lowlevel.open_process", "trio.lowlevel.permanently_detach_coroutine_object", "trio.lowlevel.reattach_detached_coroutine_object", "trio.lowlevel.temporarily_detach_coroutine_object", @@ -91,7 +78,6 @@ "trio.open_ssl_over_tcp_listeners", "trio.open_ssl_over_tcp_stream", "trio.open_unix_socket", - "trio.run_process", "trio.serve_listeners", "trio.serve_ssl_over_tcp", "trio.testing._memory_streams.MemoryReceiveStream.__init__",