diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index b42d9d04c2..33e78ca800 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -173,7 +173,7 @@ Generic stream tools Trio currently provides a generic helper for writing servers that listen for connections using one or more -:class:`~trio.abc.Listener`\s, and a generic utility class for working +:class:`~trio.abc.Listener`\s, and two generic utility classes for working with streams. And if you want to test code that's written against the streams interface, you should also check out :ref:`testing-streams` in :mod:`trio.testing`. @@ -184,6 +184,9 @@ streams interface, you should also check out :ref:`testing-streams` in :members: :show-inheritance: +.. autoclass:: NullStream + :show-inheritance: + .. _high-level-networking: @@ -651,9 +654,44 @@ Spawning subprocesses Trio provides support for spawning other programs as subprocesses, communicating with them via pipes, sending them signals, and waiting -for them to exit. Currently this interface consists of the -:class:`trio.Process` class, which is modelled after :class:`subprocess.Popen` -in the standard library. +for them to exit. The interface for doing so consists of multiple layers: + +* At the bottom, the :class:`~trio.Process` class provides an interface + that mirrors the standard :class:`subprocess.Popen`, with async methods + and with Trio stream wrappers for the process's input and output, plus + a few extra features that are used by the higher layers. + :class:`~trio.Process` is the only interface that permits spawning + processes whose lifetime is not bound by a specific scope in the + parent. You should generally prefer one of the higher-level interfaces + if it suits your needs. + +* In the middle, :func:`~trio.open_process` provides a context manager + for interacting with a process. It wraps :class:`~trio.Process`, + adding error checking (throwing :exc:`subprocess.CalledProcessError` + if the process exits with a failure indication) and providing a + single :class:`~trio.ProcessStream` (an implementation of + :class:`~trio.abc.HalfCloseableStream`) for getting bytes into + and out of the process. This is usually what you should reach for + if you want back-and-forth communication with a subprocess, or + if you want to consume a large output in streaming fashion. + +* At the top, :func:`~trio.run_process` runs a process from start to + finish and returns a :class:`CompletedProcess` object describing its + outputs and return value. This is what you should reach for if you + want to maybe send some input, maybe receive the output (all at once), + but don't need incrementality or back-and-forth. It is modelled after + the standard :func:`subprocess.run` with some additional features + and saner defaults. + +In all of these interfaces, the command to run and its arguments are +specified as the sole positional argument (usually as a sequence of +strings, but see the discussion of :ref:`quoting ` +below). Keyword :ref:`options ` determine how the +subprocess's inputs and outputs will be set up and other aspects of +the environment in which it will run. Almost all of the keyword +arguments accepted by the standard library :class:`subprocess.Popen` +class are supported with their usual semantics, and can be passed +wherever you see ``**options`` in the API documentation. .. _subprocess-options: @@ -680,70 +718,107 @@ process, so these options don't make sense. Text I/O should use a layer on top of the raw byte streams, just as it does with sockets. [This layer does not yet exist, but is in the works.] +Trio also provides support for two Trio-specific options, +``shutdown_signal`` and ``shutdown_timeout``, which control how the +subprocess will be terminated if its surrounding scope becomes +cancelled. (The low-level :class:`~trio.Process` doesn't necessarily +have a surrounding scope per se, but the same logic applies to calls +to its :meth:`~trio.Process.aclose` and :meth:`~trio.Process.join` +methods, which guarantee that the subprocess will have exited +one way or another by the time they return.) + +* The default behavior, if you specify neither argument, is to + forcibly kill the subprocess using ``SIGKILL`` (UNIX) or + ``TerminateProcess`` (Windows). This cannot be caught by the + child process, so it works regardless of potential bugs in the + child, but doesn't give the child a chance to clean up its own + resources (such as its own subprocesses). + +* If you specify just a ``shutdown_timeout``, Trio will follow + the standard UNIX convention: send ``SIGTERM``, wait up to + ``shutdown_timeout`` seconds for the process to exit, + send ``SIGKILL`` if it's still running after the timeout + expires. If you also specify a ``shutdown_signal``, Trio will + send that signal instead of ``SIGTERM``. + +* If you specify a ``shutdown_signal`` but no ``shutdown_timeout``, + Trio will send the ``shutdown_signal`` and then wait as long + as it takes for the subprocess to exit in response. Use this + with caution, as it's easy to wind up with an uncancellable + Trio program if the subprocess doesn't respond how you're + expecting it to. + +* A ``shutdown_signal`` of zero means "close all pipes to and from the + process if you want to signal that it should exit". This is + unlikely to work unless the process is expecting it, but is + unfortunately pretty much the only "signal" we can send reliably on + Windows that's not immediately fatal. + Running a process and waiting for it to finish ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -We're `working on `__ -figuring out the best API for common higher-level subprocess operations. -In the meantime, you can implement something like the standard library -:func:`subprocess.run` in terms of :class:`trio.Process` -as follows:: - - async def run( - command, *, input=None, capture_output=False, **options - ): - if input is not None: - options['stdin'] = subprocess.PIPE - if capture_output: - options['stdout'] = options['stderr'] = subprocess.PIPE - - stdout_chunks = [] - stderr_chunks = [] - - async with trio.Process(command, **options) as proc: - - async def feed_input(): - async with proc.stdin: - if input: - try: - await proc.stdin.send_all(input) - except trio.BrokenResourceError: - pass - - async def read_output(stream, chunks): - async with stream: - while True: - chunk = await stream.receive_some(32768) - if not chunk: - break - chunks.append(chunk) - - async with trio.open_nursery() as nursery: - if proc.stdin is not None: - nursery.start_soon(feed_input) - if proc.stdout is not None: - nursery.start_soon(read_output, proc.stdout, stdout_chunks) - if proc.stderr is not None: - nursery.start_soon(read_output, proc.stderr, stderr_chunks) - await proc.wait() - - stdout = b"".join(stdout_chunks) if proc.stdout is not None else None - stderr = b"".join(stderr_chunks) if proc.stderr is not None else None - - if proc.returncode: - raise subprocess.CalledProcessError( - proc.returncode, proc.args, output=stdout, stderr=stderr - ) - else: - return subprocess.CompletedProcess( - proc.args, proc.returncode, stdout, stderr - ) +The basic interface for running a subprocess start-to-finish is +:func:`trio.run_process`. It always waits for the subprocess to exit +before returning, so there's no need to worry about leaving a process +running by mistake after you've gone on to do other things. +:func:`~trio.run_process` is similar to the standard library +:func:`subprocess.run` function, but tries to have safer defaults: +with no options, the subprocess's input is provided and its outputs +are captured by the parent Trio process rather than connecting them +to the user's terminal, and a failure in the subprocess will be propagated +as a :exc:`subprocess.CalledProcessError` exception. Of course, these +defaults can be changed where necessary. + +.. autofunction:: trio.run_process + +:func:`run_process` returns an object similar to +:class:`subprocess.CompletedProcess`, with a couple of Trio-specific +additions: + +.. autoclass:: trio.CompletedProcess + :members: + +If a subprocess's input and output are to come from an interactive user +-- on the console used to start the parent Trio process, for example -- +:func:`~trio.run_process`'s defaults can be slightly cumbersome. Trio +provides :func:`~trio.delegate_to_process` to cover this case. + +.. autofunction:: trio.delegate_to_process Interacting with a process as it runs ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +:func:`open_process` spawns a subprocess and returns an async context +manager in which the caller can interact with the subprocess's +standard input and output using a Trio stream. The lifetime of the +process is scoped within the ``async with`` block: the flow of +execution won't exit the block until the subprocess exits, and a +cancellation of the block will terminate the subprocess. The effect is +much like :func:`run_process`, but with user code inserted in the +middle. (In fact, :func:`run_process` is implemented in terms of +:func:`open_process` plus I/O logic in the middle.) + +.. autofunction:: trio.open_process + :async-with: stream + +The specific :class:`~trio.abc.HalfCloseableStream` type provided by +:func:`~trio.open_process` is also available for users that +wish to wrap a :class:`~trio.Process` object directly: + +.. autoclass:: trio.ProcessStream + :show-inheritance: + :members: + + +The low-level Process API +~~~~~~~~~~~~~~~~~~~~~~~~~ + +All of the high-level subprocess functions described above are implemented +in terms of the :class:`trio.Process` class, which provides an interface +similar to the standard library's :class:`subprocess.Popen`. + You can spawn a subprocess by creating an instance of :class:`trio.Process` and then interact with it using its :attr:`~trio.Process.stdin`, @@ -751,53 +826,126 @@ You can spawn a subprocess by creating an instance of :attr:`~trio.Process.stderr` streams. .. autoclass:: trio.Process - :members: + .. autoattribute:: returncode + + .. autoattribute:: failed + + .. automethod:: aclose + + .. automethod:: join + + .. automethod:: shutdown + + .. automethod:: wait + + .. automethod:: poll + + .. automethod:: kill + + .. automethod:: terminate + + .. automethod:: send_signal + + .. note:: :meth:`~subprocess.Popen.communicate` is not provided as a + method on :class:`~trio.Process` objects; use :func:`~trio.run_process` + instead, or write the loop yourself if you have unusual + needs. :meth:`~subprocess.Popen.communicate` has quite unusual + cancellation behavior in the standard library (on some platforms it + spawns a background thread which continues to read from the child + process even after the timeout has expired) and we wanted to + provide an interface with fewer surprises. + + +.. _subprocess-quoting: + +Quoting: more than you wanted to know +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Differences from :class:`subprocess.Popen` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -* All arguments to the constructor of - :class:`~trio.Process`, except the command to run, must be - passed using keywords. - -* :meth:`~subprocess.Popen.communicate` is not provided as a method on - :class:`~trio.Process` objects; use a higher-level - function instead, or write the loop yourself if - you have unusual needs. :meth:`~subprocess.Popen.communicate` has - quite unusual cancellation behavior in the standard library (on some - platforms it spawns a background thread which continues to read from - the child process even after the timeout has expired) and we wanted - to provide an interface with fewer surprises. - -* :meth:`~trio.Process.wait` is an async function that does - not take a ``timeout`` argument; combine it with - :func:`~trio.fail_after` if you want a timeout. - -* Text I/O is not supported: you may not use the - :class:`~trio.Process` constructor arguments - ``universal_newlines`` (or its 3.7+ alias ``text``), ``encoding``, - or ``errors``. - -* :attr:`~trio.Process.stdin` is a :class:`~trio.abc.SendStream` and - :attr:`~trio.Process.stdout` and :attr:`~trio.Process.stderr` - are :class:`~trio.abc.ReceiveStream`\s, rather than file objects. The - :class:`~trio.Process` constructor argument ``bufsize`` is - not supported since there would be no file object to pass it to. - -* :meth:`~trio.Process.aclose` (and thus also - ``__aexit__``) behave like the standard :class:`~subprocess.Popen` - context manager exit (close pipes to the process, then wait for it - to exit), but add additional behavior if cancelled: kill the process - and wait for it to finish terminating. This is useful for scoping - the lifetime of a simple subprocess that doesn't spawn any children - of its own. (For subprocesses that do in turn spawn their own - subprocesses, there is not currently any way to clean up the whole - tree; moreover, using the :class:`Process` context manager in such - cases is likely to be counterproductive as killing the top-level - subprocess leaves it no chance to do any cleanup of its children - that might be desired. You'll probably want to write your own - supervision logic in that case.) +The command to run and its arguments usually must be passed to Trio's +subprocess APIs as a sequence of strings, where the first element in +the sequence specifies the command to run and the remaining elements +specify its arguments, one argument per element. This form is used +because it avoids potential quoting pitfalls; for example, you can run +``["cp", "-f", source_file, dest_file]`` without worrying about +whether ``source_file`` or ``dest_file`` contains spaces. + +If you only run subprocesses without ``shell=True`` and on UNIX, +that's all you need to know about specifying the command. If you use +``shell=True`` or run on Windows, you probably should read the +rest of this section to be aware of potential pitfalls. + +With ``shell=True`` on UNIX, you *must* specify the command as a +single string, which will be passed to the shell as if you'd +entered it at an interactive prompt. That means any argument that +might contain spaces, quotes, or other shell metacharacters should +be wrapped in :func:`shlex.quote` or something similar. The standard +:mod:`subprocess` module supports sequences of arguments with +``shell=True`` on UNIX, but their behavior is confusing, so +Trio forbids them. (``subprocess.run(["echo $2 is $1", "weird", "this"], +shell=True)`` prints ``this is weird``.) + +On Windows, the fundamental API for process spawning (the +``CreateProcess()`` system call) takes a string, not a list, and it's +actually up to the child process to decide how it wants to split that +string into individual arguments. Since the C language specifies that +``main()`` should take a list of arguments, *most* programs you +encounter will follow the rules used by the Microsoft C/C++ runtime. +:class:`subprocess.Popen`, and thus also Trio, uses these rules +when it converts an argument sequence to a string, and they +are `documented +`__ +alongside the :mod:`subprocess` module. There is no documented +Python standard library function that can directly perform that +conversion, so even on Windows, you almost always want to pass an +argument sequence rather than a string. But if the program you're +spawning doesn't split its command line back into individual arguments +in the standard way, you might need to pass a string to work around this. +(Or you might just be out of luck: as far as I can tell, there's simply +no way to pass an argument containing a double-quote to a Windows +batch file.) + +On Windows with ``shell=True``, things get even more chaotic. Now +there are two separate sets of quoting rules applied, one by the +Windows command shell ``CMD.EXE`` and one by the process being +spawned, and they're *different*. Most special characters interpreted +by the shell ``&<>()^|`` are not treated as special if the shell +thinks they're inside double quotes, but ``%FOO%`` environment +variable substitutions still are, and the shell doesn't provide any +way to write a double quote inside a double-quoted string. Outside +double quotes, any character (including a double quote) can be escaped +using a leading ``^``. But since a pipeline is processed by running +each command in the pipeline in a subshell, multiple layers of +escaping can be needed:: + + echo ^^^&x | find "x" | find "x" # prints: &x + +And if you combine pipelines with () grouping, you can need even more +levels of escaping:: + + (echo ^^^^^^^&x | find "x") | find "x" # prints: &x + +Since process creation takes a single arguments string, ``CMD.EXE``\'s +quoting does not influence word splitting, and double quotes are not +removed during CMD.EXE's expansion pass. Double quotes are troublesome +because CMD.EXE handles them differently from the MSVC runtime rules; in:: + + prog.exe "foo \"bar\" baz" + +the program will see one argument ``foo "bar" baz`` but CMD.EXE thinks +``bar\`` is not quoted while ``foo \`` and ``baz`` are. All of this +makes it a formidable task to reliably interpolate anything into a +``shell=True`` command line on Windows, and Trio falls back on the +:mod:`subprocess` behavior: If you pass a sequence with +``shell=True``, it's quoted in the same way as a sequence with +``shell=False``, and had better not contain any shell metacharacters +you weren't planning on. + +Further reading: + +* https://stackoverflow.com/questions/30620876/how-to-properly-escape-filenames-in-windows-cmd-exe + +* https://stackoverflow.com/questions/4094699/how-does-the-windows-command-interpreter-cmd-exe-parse-scripts Signals diff --git a/newsfragments/833.feature.rst b/newsfragments/833.feature.rst new file mode 100644 index 0000000000..ad54dc318a --- /dev/null +++ b/newsfragments/833.feature.rst @@ -0,0 +1,2 @@ +High-level subprocess support: :func:`trio.open_process`, :func:`trio.run_process`, +:func:`trio.delegate_to_process`. diff --git a/trio/__init__.py b/trio/__init__.py index 9aac6c1469..9b77a115a1 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -36,7 +36,7 @@ BlockingTrioPortal ) -from ._highlevel_generic import aclose_forcefully, StapledStream +from ._highlevel_generic import aclose_forcefully, StapledStream, NullStream from ._channel import open_memory_channel @@ -64,6 +64,15 @@ open_ssl_over_tcp_stream, open_ssl_over_tcp_listeners, serve_ssl_over_tcp ) +from ._subprocess import ( + Process, + ProcessStream, + CompletedProcess, + open_process, + run_process, + delegate_to_process, +) + from ._deprecate import TrioDeprecationWarning # Imported by default diff --git a/trio/_deprecated_subprocess_reexports.py b/trio/_deprecated_subprocess_reexports.py index b91e28784a..3c432c7524 100644 --- a/trio/_deprecated_subprocess_reexports.py +++ b/trio/_deprecated_subprocess_reexports.py @@ -1,5 +1,3 @@ -from ._subprocess import Process - # Reexport constants and exceptions from the stdlib subprocess module from subprocess import ( PIPE, STDOUT, DEVNULL, CalledProcessError, SubprocessError, TimeoutExpired, diff --git a/trio/_highlevel_generic.py b/trio/_highlevel_generic.py index 05a03043fa..4548a331e3 100644 --- a/trio/_highlevel_generic.py +++ b/trio/_highlevel_generic.py @@ -110,3 +110,62 @@ async def aclose(self): await self.send_stream.aclose() finally: await self.receive_stream.aclose() + + +class NullStream(HalfCloseableStream): + """A :class:`~trio.abc.HalfCloseableStream` in which any data sent is + immediately discarded and receives always return end-of-file. + + This is useful for similar reasons as ``/dev/null`` on Unix; for example, + in a function defined to return a stream of data, it reduces special-casing + if there's no data that needs to be provided. + + Trying to read after closing, or write after closing or + :meth:`~trio.abc.HalfCloseableStream.send_eof`, will raise + :exc:`ClosedResourceError`, even though those operations are + otherwise no-ops. + + A synchronous ``close()`` is provided in addition to the usual ``aclose()``. + """ + + def __init__(self) -> None: + self._read_closed = False + self._write_closed = False + + def __repr__(self) -> str: + if self._read_closed: + closed = "closed" + elif self._write_closed: + closed = "sent EOF" + else: + closed = "open" + return "".format(closed) + + def close(self) -> None: + self._read_closed = self._write_closed = True + + async def aclose(self) -> None: + self.close() + await _core.checkpoint() + + async def receive_some(self, max_bytes: int) -> bytes: + await _core.checkpoint() + if self._read_closed: + raise _core.ClosedResourceError + return b"" + + async def send_all(self, data: bytes) -> None: + await _core.checkpoint() + if self._write_closed: + raise _core.ClosedResourceError + + async def wait_send_all_might_not_block(self) -> None: + await _core.checkpoint() + if self._write_closed: + raise _core.ClosedResourceError + + async def send_eof(self) -> None: + await _core.checkpoint() + if self._read_closed: + raise _core.ClosedResourceError + self._write_closed = True diff --git a/trio/_subprocess.py b/trio/_subprocess.py index 41e93dcfe7..22c11b6028 100644 --- a/trio/_subprocess.py +++ b/trio/_subprocess.py @@ -1,19 +1,31 @@ import math import os import select +import shlex +import signal import subprocess import sys +import attr +from async_generator import async_generator, yield_, asynccontextmanager +from typing import List, Tuple from . import _core -from ._abc import AsyncResource -from ._sync import CapacityLimiter, Lock -from ._threads import run_sync_in_worker_thread +from ._abc import AsyncResource, HalfCloseableStream, ReceiveStream +from ._sync import Lock +from ._highlevel_generic import NullStream from ._subprocess_platform import ( wait_child_exiting, create_pipe_to_child_stdin, create_pipe_from_child_output ) -__all__ = ["Process"] +__all__ = [ + "Process", + "ProcessStream", + "CompletedProcess", + "open_process", + "run_process", + "delegate_to_process", +] class Process(AsyncResource): @@ -24,7 +36,7 @@ class Process(AsyncResource): Constructing a :class:`Process` immediately spawns the child process, or throws an :exc:`OSError` if the spawning fails (for example, if the specified command could not be found). - After construction, you can interact with the child process + After construction, you can communicate with the child process by writing data to its :attr:`stdin` stream (a :class:`~trio.abc.SendStream`), reading data from its :attr:`stdout` and/or :attr:`stderr` streams (both :class:`~trio.abc.ReceiveStream`\s), @@ -37,25 +49,41 @@ class Process(AsyncResource): ``stdin=subprocess.PIPE``, you can write to the :attr:`stdin` stream, else :attr:`stdin` will be ``None``. - :class:`Process` implements :class:`~trio.abc.AsyncResource`, - so you can use it as an async context manager or call its - :meth:`aclose` method directly. "Closing" a :class:`Process` - will close any pipes to the child and wait for it to exit; - if cancelled, the child will be forcibly killed and we will - ensure it has finished exiting before allowing the cancellation - to propagate. It is *strongly recommended* that process lifetime - be scoped using an ``async with`` block wherever possible, to - avoid winding up with processes hanging around longer than you - were planning on. + :class:`Process` implements :class:`~trio.abc.AsyncResource`, so + you can use it as an async context manager or call its + :meth:`aclose` method directly. "Closing" a :class:`Process` will + close any pipes to the child and wait for it to exit; if the call + to :meth:`aclose` is cancelled, the child will be terminated + and we will ensure it has finished exiting before allowing the + cancellation to propagate. It is *strongly recommended* that + process lifetime be scoped using an ``async with`` block wherever + possible, to avoid winding up with processes hanging around longer + than you were planning on. + + By default, process termination when :meth:`aclose` is cancelled + calls :meth:`kill`, which cannot be caught by the process. *UNIX + only:* If you want to allow the process to perform its own cleanup + before exiting, you can specify a ``shutdown_signal`` at + construction time; then :func:`aclose` will send the process that + signal instead of killing it outright, and will wait up to + ``shutdown_timeout`` seconds (forever if unspecified) for the + process to exit in response to that signal. If the shutdown + timeout expires after sending the shutdown signal, the process + gets forcibly killed. (This might work on Windows if you can + find a signal that Python knows how to send and the subprocess + knows how to handle, but your options there are too limited + to be of much use: only ``CTRL_BREAK_EVENT`` can be sent to + a process group (not ``CTRL_C_EVENT``), and Python at least + has no way of catching it.) Args: - command (str or list): The command to run. Typically this is a - list of strings such as ``['ls', '-l', 'directory with spaces']``, + command (list or str): The command to run. Typically this is a + sequence of strings such as ``['ls', '-l', 'directory with spaces']``, where the first element names the executable to invoke and the other - elements specify its arguments. If ``shell=True`` is given as - an option, ``command`` should be a single string like - ``"ls -l 'directory with spaces'"``, which will - split into words following the shell's quoting rules. + elements specify its arguments. With ``shell=True`` in the + ``**options``, or on Windows, ``command`` may alternatively + be a string, which will be parsed following platform-dependent + :ref:`quoting rules `. stdin: Specifies what the child process's standard input stream should connect to: output written by the parent (``subprocess.PIPE``), nothing (``subprocess.DEVNULL``), @@ -70,6 +98,21 @@ class Process(AsyncResource): which causes the child's standard output and standard error messages to be intermixed on a single standard output stream, attached to whatever the ``stdout`` option says to attach it to. + shutdown_signal (int): If specified, the process will be sent + this signal when :meth:`aclose` or :meth:`join` is + cancelled, which might give it a chance to clean itself up + before exiting. The default shutdown signal forcibly kills + the process and cannot be caught. A value of zero will send + no signal, just close pipes to the process when we want it + to exit. Zero is the only value likely to work on Windows. + shutdown_timeout (float): If specified, and the process does not + exit within this many seconds after receiving the + ``shutdown_signal``, it will be forcibly killed anyway. The + default is to wait as long as it takes for the process to + exit. If you pass a ``shutdown_timeout`` but no + ``shutdown_signal``, ``SIGTERM`` is used as the shutdown + signal. (On Windows this will still kill the process + immediately, so you should pass a different signal there.) **options: Other :ref:`general subprocess options ` are also accepted. @@ -91,6 +134,14 @@ class Process(AsyncResource): standard error, the written bytes become available for you to read here. Only available if the :class:`Process` was constructed using ``stderr=PIPE``; otherwise this will be None. + cancelled (bool or None): If a call to :meth:`aclose` or :meth:`join` + has completed, this is False if the process was allowed to exit + on its own and True if we helped its exit along due to a + cancellation. If no call to :meth:`aclose` or :meth:`join` + has completed, this is None. Calls to :meth:`shutdown` + work like :meth:`join` in a cancelled scope, so set + :attr:`cancelled` unconditionally to True if the process + is still running. """ @@ -104,7 +155,15 @@ class Process(AsyncResource): _wait_for_exit_data = None def __init__( - self, args, *, stdin=None, stdout=None, stderr=None, **options + self, + command, + *, + stdin=None, + stdout=None, + stderr=None, + shutdown_signal=None, + shutdown_timeout=None, + **options ): for key in ( 'universal_newlines', 'text', 'encoding', 'errors', 'bufsize' @@ -120,6 +179,34 @@ def __init__( self.stdout = None self.stderr = None + if shutdown_timeout is not None and shutdown_signal is None: + shutdown_signal = signal.SIGTERM + self._shutdown_signal = shutdown_signal + self._shutdown_timeout = shutdown_timeout + self.cancelled = None + + self._wait_lock = Lock() + + # State needed by the logic in the property `failed`: + # - signals we've sent the process before we knew it had exited + self._signals_sent_before_exit = set() + + # - whether we closed its stdout or stderr without receiving EOF + # before we knew it had exited + self._maybe_broke_pipe_before_exit = None + + if os.name == "posix": + if isinstance(command, str) and not options.get("shell"): + raise TypeError( + "command must be a sequence (not a string) if shell=False " + "on UNIX systems" + ) + if not isinstance(command, str) and options.get("shell"): + raise TypeError( + "command must be a string (not a sequence) if shell=True " + "on UNIX systems" + ) + if stdin == subprocess.PIPE: self.stdin, stdin = create_pipe_to_child_stdin() if stdout == subprocess.PIPE: @@ -139,7 +226,7 @@ def __init__( try: self._proc = subprocess.Popen( - args, stdin=stdin, stdout=stdout, stderr=stderr, **options + command, stdin=stdin, stdout=stdout, stderr=stderr, **options ) finally: # Close the parent's handle for each child side of a pipe; @@ -155,67 +242,718 @@ def __init__( self.args = self._proc.args self.pid = self._proc.pid + def __repr__(self): + if self.returncode is None: + status = "running with PID {}".format(self.pid) + else: + if self.returncode < 0: + status = "exited with signal {}".format(-self.returncode) + else: + status = "exited with status {}".format(self.returncode) + if self.returncode != 0 and not self.failed: + status += " (our fault)" + return "".format(self.args, status) + @property def returncode(self): - """The exit status of the process (an integer), or ``None`` if it has - not exited. + """The exit status of the process (an integer), or ``None`` if it is + not yet known to have exited. + + By convention, a return code of zero indicates success. On + UNIX, negative values indicate termination due to a signal, + e.g., -11 if terminated by signal 11 (``SIGSEGV``). On + Windows, a process that exits due to a call to + :meth:`Process.terminate` will have an exit status of 1. - Negative values indicate termination due to a signal (on UNIX only). - Like :attr:`subprocess.Popen.returncode`, this is not updated outside - of a call to :meth:`wait` or :meth:`poll`. + Accessing this attribute does not check for termination; use + :meth:`poll` or :meth:`wait` for that. """ return self._proc.returncode + @property + def failed(self): + """Whether this process should be considered to have failed. + + If the process hasn't exited yet, this is None. Otherwise, it is False + if the :attr:`returncode` is zero, and True otherwise, unless the + nonzero exit status appears to have been potentially caused by + something done by the parent Trio process. Specifically: + + * If we terminate a process and it later exits for any reason, + we don't consider that a failure. (On Windows it's hard to + tell if the exit was caused by the termination or not, but + termination can't be blocked so it's a good bet. On UNIX + there are plenty of processes that respond to ``SIGTERM`` by + cleaning up and then exiting with a nonzero status, and we'd + like to consider that to be caused by the ``SIGTERM`` even + though it's not reported as "exited due to ``SIGTERM``" + directly.) + + * If we close a process's standard output or error stream + before receiving EOF on it and the process later exits for + any reason, we don't consider that a failure. (We could + restrict this to only ``SIGPIPE`` exits on UNIX, but then we + wouldn't handle processes that catch ``SIGPIPE`` and raise + an ordinary error instead, like Python.) + + * On UNIX, if we send a process a signal and it later exits due + to that signal, we don't consider that a failure. (This covers + signals like ``SIGHUP``, ``SIGINT``, etc, which are exit + signals for some processes and not for others.) + + The higher-level subprocess API functions such as :func:`run_process` + use :attr:`failed` to determine whether they should throw a + :exc:`subprocess.CalledProcessError` or not. + + """ + if self.returncode is None: + return None + return ( + self.returncode != 0 + and signal.SIGTERM not in self._signals_sent_before_exit + and not self._maybe_broke_pipe_before_exit + and -self.returncode not in self._signals_sent_before_exit + ) + + async def _close_pipes(self): + if self.stdin is not None: + await self.stdin.aclose() + if self.stdout is not None: + await self.stdout.aclose() + if self.stderr is not None: + await self.stderr.aclose() + async def aclose(self): """Close any pipes we have to the process (both input and output) and wait for it to exit. - If cancelled, kills the process and waits for it to finish - exiting before propagating the cancellation. + If cancelled, shuts down the process following the shutdown + parameters specified at construction time, or forcibly + terminates the process if no shutdown parameters were given. + The process is guaranteed to have exited by the time the call + to :meth:`aclose` completes, whether normally or via an + exception. """ with _core.open_cancel_scope(shield=True): - if self.stdin is not None: - await self.stdin.aclose() - if self.stdout is not None: - await self.stdout.aclose() - if self.stderr is not None: - await self.stderr.aclose() + await self._close_pipes() + await self.join() + + def _check_maybe_broke_pipe(self): + """Called when the child process exits to set + self._maybe_broke_pipe_before_exit, based on whether + we have yet closed the child's stdout or stderr before reading + EOF from it. This is an input to the logic in ``failed``. + """ + if self._maybe_broke_pipe_before_exit is not None: + return + for stream in (self.stdout, self.stderr): + if stream is not None and stream.did_close_before_receiving_eof: + self._maybe_broke_pipe_before_exit = True + break + else: + self._maybe_broke_pipe_before_exit = False + + async def wait(self): + """Block until the process exits. + + Returns: + The exit status of the process; see :attr:`returncode`. + """ + if self.poll() is None: + async with self._wait_lock: + if self.poll() is None: + await wait_child_exiting(self) + self._proc.wait() + self._check_maybe_broke_pipe() + return self.returncode + await _core.checkpoint() + return self.returncode + + async def join(self): + """Block until the process exits, and terminate the process if the + call to :meth:`join` is cancelled. + + :meth:`join` behaves like :meth:`aclose` except that it + doesn't close pipes. This is primarily useful if I/O is being + handled in a concurrent task and an orderly shutdown signal + was specified at :class:`Process` construction time; it + ensures that if the call to :meth:`join` is cancelled, any + output the process writes as it is shutting down won't be + lost. If you don't have a concurrent task handling I/O, you + should probably use :meth:`aclose` instead. Otherwise, you + might get a deadlock as the process blocks waiting for I/O + while you're waiting for it to exit. + + Returns: + The exit status of the process; see :attr:`returncode`. + + """ try: await self.wait() + if self.cancelled is None: + self.cancelled = False + return self.returncode + + except (KeyboardInterrupt, _core.Cancelled): + if self.cancelled is None: + self.cancelled = True + raise + finally: - if self.returncode is None: + if self.returncode is None and self._shutdown_signal is not None: + with _core.open_cancel_scope(shield=True) as scope: + if self._shutdown_timeout is not None: + scope.deadline = ( + _core.current_time() + self._shutdown_timeout + ) + if self._shutdown_signal == 0: + await self._close_pipes() + else: + self.send_signal(self._shutdown_signal) + await self.wait() + + if self.returncode is None: # still running self.kill() with _core.open_cancel_scope(shield=True): await self.wait() - async def wait(self): - """Block until the process exits. + def poll(self): + """Check if the process has exited yet. Returns: - The exit status of the process (a nonnegative integer, with - zero usually indicating success). On UNIX systems, a process - that exits due to a signal will have its exit status reported - as the negative of that signal number, e.g., -11 for ``SIGSEGV``. + The exit status of the process; see :attr:`returncode`. """ - if self.poll() is None: - await wait_child_exiting(self) - self._proc.wait() - else: - await _core.checkpoint() + if self.returncode is None and self._proc.poll() is not None: + self._check_maybe_broke_pipe() return self.returncode - def poll(self): - """Forwards to :meth:`subprocess.Popen.poll`.""" - return self._proc.poll() - def send_signal(self, sig): - """Forwards to :meth:`subprocess.Popen.send_signal`.""" - self._proc.send_signal(sig) + """Send signal ``sig`` to the process. + + On UNIX, ``sig`` may be any signal defined in the + :mod:`signal` module, such as ``signal.SIGINT`` or + ``signal.SIGTERM``. On Windows, it may be anything accepted by + the standard library :meth:`subprocess.Popen.send_signal`. + """ + if self.poll() is None: + self._signals_sent_before_exit.add(sig) + self._proc.send_signal(sig) def terminate(self): - """Forwards to :meth:`subprocess.Popen.terminate`.""" - self._proc.terminate() + """Terminate the process, politely if possible. + + On UNIX, this is equivalent to + ``send_signal(signal.SIGTERM)``; by convention this requests + graceful termination, but a misbehaving or buggy process might + ignore it. On Windows, :meth:`terminate` forcibly terminates the + process in the same manner as :meth:`kill`. + """ + if self.poll() is None: + self._signals_sent_before_exit.add(signal.SIGTERM) + self._proc.terminate() def kill(self): - """Forwards to :meth:`subprocess.Popen.kill`.""" - self._proc.kill() + """Immediately terminate the process. + + On UNIX, this is equivalent to + ``send_signal(signal.SIGKILL)``. On Windows, it calls + ``TerminateProcess``. In both cases, the process cannot + prevent itself from being killed, but the termination will be + delivered asynchronously; use :meth:`wait` if you want to + ensure the process is actually dead before proceeding. + """ + if not hasattr(signal, "SIGKILL"): # Windows + self.terminate() + elif self.poll() is None: + self._signals_sent_before_exit.add(signal.SIGKILL) + self._proc.kill() + + async def shutdown(self): + """Shut down the process in the manner used by a cancelled + :meth:`aclose` or :meth:`join`. + + This is equivalent to (and implemented in terms of) making a + call to :meth:`join` within a cancelled scope. + + Returns: + The exit status of the process; see :attr:`returncode`. + + """ + with _core.open_cancel_scope() as scope: + scope.cancel() + return await self.join() + + +@attr.s(slots=True, cmp=False) +class ProcessStream(HalfCloseableStream): + """A stream for communicating with a subprocess that is being managed + by :func:`open_process`. + + Sending data on this stream writes it to the process's standard + input, and receiving data receives from the process's standard + output. There is also an :attr:`errors` attribute which contains + a :class:`~trio.abc.ReceiveStream` for reading from the process's + standard error stream. If a stream is being redirected elsewhere, + attempting to interact with it will raise :exc:`ClosedResourceError`. + + Closing a :class:`ProcessStream` closes all the underlying pipes, + but unlike :meth:`Process.aclose`, it does *not* wait for the process + to exit. You should use ``await stream.process.aclose()`` if you + want the :meth:`Process.aclose` behavior. + + Args: + process (:class:`Process`): The underlying process to wrap. + + Attributes: + process (:class:`Process`): The underlying process, so you can + :meth:`send it signals `, inspect its + :attr:`~Process.pid`, and so forth. + errors (:class:`~trio.abc.ReceiveStream`): A stream which reads from + the underlying process's standard error if we have access to it, + or raises :exc:`ClosedResourceError` on all reads if not. + """ + + process = attr.ib(type=Process) + errors = attr.ib(type=ReceiveStream, init=False, repr=False) + + def __attrs_post_init__(self) -> None: + if self.process.stderr is None: + self.errors = NullStream() + self.errors.close() + else: + self.errors = self.process.stderr + + async def aclose(self) -> None: + with _core.open_cancel_scope(shield=True): + await self.process._close_pipes() + await _core.checkpoint_if_cancelled() + + async def receive_some(self, max_bytes: int) -> bytes: + if self.process.stdout is None: + await _core.checkpoint() + raise _core.ClosedResourceError( + "can't read from process stdout that was redirected elsewhere" + ) + return await self.process.stdout.receive_some(max_bytes) + + async def _stdin_operation(self, method, *args) -> None: + if self.process.stdin is None: + await _core.checkpoint() + raise _core.ClosedResourceError( + "can't write to process stdin that was redirected elsewhere" + ) + await getattr(self.process.stdin, method)(*args) + + async def send_all(self, data: bytes) -> None: + await self._stdin_operation("send_all", data) + + async def wait_send_all_might_not_block(self) -> None: + await self._stdin_operation("wait_send_all_might_not_block") + + async def send_eof(self) -> None: + await self._stdin_operation("aclose") + + +@asynccontextmanager +@async_generator +async def open_process(command, *, check=True, shield=False, **options): + """An async context manager that runs ``command`` in a subprocess and + evaluates to a :class:`ProcessStream` that can be used to communicate + with it. + + The context manager's ``__aenter__`` executes a checkpoint before + spawning the process but does not otherwise block. Its ``__aexit__`` + closes all pipes to and from the process and waits for it to + exit, like :meth:`Process.aclose` does, so that the lifetime of + the process is scoped to the ``async with`` block. If + the surrounding scope becomes cancelled or an exception propagates + out of the ``async with`` block, the process is terminated and + :func:`open_process` waits for the process to exit before + continuing to propagate the exception. If you need to allow the + process to perform an orderly shutdown instead of being forcibly + terminated, see the ``shutdown_signal`` and ``shutdown_timeout`` + arguments to :class:`Process`, which are accepted in the + ``**options`` here. + + The default behavior of :func:`open_process` is designed to isolate + the subprocess from potential impacts on the parent Trio process, and to + reduce opportunities for errors to pass silently. Specifically: + + * The subprocess's standard input, output, and error streams are + piped to the parent Trio process, so that data written to the + :class:`ProcessStream` may be read by the subprocess and vice + versa. (The subprocess's standard error output is available on a + separate :class:`~trio.abc.ReceiveStream` accessible through the + :attr:`~ProcessStream.errors` attribute of the + :class:`ProcessStream`.) + + * If the subprocess exits with a nonzero status code, indicating + failure, the body of the ``async with`` block will be cancelled + (unless it is shielded by a ``shield=True`` argument) and a + :exc:`subprocess.CalledProcessError` will be raised once it + completes. If the subprocess fails at the same time as an + exception propagates out of the ``async with`` block, the two + will be combined into a :exc:`~trio.MultiError`. + + To suppress the cancellation and + :exc:`~subprocess.CalledProcessError` on failure, pass + ``check=False``. To obtain different I/O behavior, use the + lower-level ``stdin``, ``stdout``, and/or ``stderr`` + :ref:`subprocess options `; to skip + redirecting a certain stream entirely, pass it as ``None``. For + example: + + * If you want the subprocess's input to come from the same + place as the parent Trio process's input, pass ``stdin=None``. + + * If you want the subprocess's standard output and standard error + to be intermixed and both be accessible through the main + ``ProcessStream.receive_some`` channel, pass + ``stderr=subprocess.STDOUT``. + + Args: + command (list or str): The command to run. Typically this is a + sequence of strings such as ``['ls', '-l', 'directory with spaces']``, + where the first element names the executable to invoke and the other + elements specify its arguments. With ``shell=True`` in the + ``**options``, or on Windows, ``command`` may alternatively + be a string, which will be parsed following platform-dependent + :ref:`quoting rules `. + check (bool): If false, don't cancel the ``async with`` block or + throw an exception if the subprocess exits unsuccessfully. + You should be sure to check the return code yourself if you + pass ``check=False``, so that errors don't pass silently. + shield (bool): If true, a cancellation from outside the ``async + with`` block will cause the process to be terminated as usual, + but will not cancel the body of the ``async with`` block. + This is appropriate if the body of the ``async with`` block + will terminate naturally as soon as the process exits, and + you want to be sure not to miss the last bit of output. + **options: :func:`run_process` also accepts any :ref:`general subprocess + options ` and passes them on to the + :class:`~trio.Process` constructor. + + Raises: + subprocess.CalledProcessError: if ``check=False`` is not passed + and the process exits with an exit status indicating a + failure that was not caused by the parent Trio process; + see :attr:`Process.failed` + OSError: if an error is encountered starting or communicating with + the process + + """ + + await _core.checkpoint() + + options.setdefault("stdin", subprocess.PIPE) + options.setdefault("stdout", subprocess.PIPE) + options.setdefault("stderr", subprocess.PIPE) + + async def join_and_check(proc): + await proc.join() + if proc.failed and check: + raise subprocess.CalledProcessError(proc.returncode, proc.args) + + async with Process(command, **options) as proc: + async with _core.open_nursery() as nursery: + nursery.start_soon( + join_and_check, + proc, + name="".format(command) + ) + with _core.open_cancel_scope(shield=shield): + async with ProcessStream(proc) as stream: + await yield_(stream) + + +# This could be exposed publicly, but it might just clutter the API... +async def _communicate_with_process(stream, input=None): + """Communicate with a subprocess over a :class:`ProcessStream` that + has been created using :func:`open_process`. + + If the subprocess's standard input stream is connected to the + parent Trio process, :func:`communicate_with_process` sends it the + bytes provided as ``input``. Once all the ``input`` has been sent, + or if none is provided, :func:`communicate_with_process` closes + the stdin pipe so that the subprocess will receive end-of-file + when it reads from standard input. + + For each of the subprocess's standard output or error streams that + is connected to the parent Trio process, + :func:`communicate_from_process` reads from that stream until EOF + and aggregates the resulting data into a single ``bytes`` object + which will form part of the return value from this function. If a + stream is not available for us to read from because it was + redirected elsewhere, the corresponding slot in the return value + will be None. + + Args: + stream (ProcessStream): The process to communicate with. + input (bytes): The data to send to the process. + + Returns: + Once all input has been sent and end-of-file has been received on + all outputs, returns a tuple ``(stdout, stderr)`` of the bytes + received from the subprocess on each of its output streams. + If a stream was not available for reading because it was redirected + elsewhere, its corresponding slot will be set to None instead. + If none of the subprocess's standard streams are piped to the + parent Trio process, returns ``(None, None)`` immediately. + + """ + + async def read_output(stream, chunks): + while True: + try: + chunk = await stream.receive_some(32768) + except _core.ClosedResourceError: + break + if not chunk: + break + chunks.append(chunk) + + manage_stdin = stream.process.stdin is not None + manage_stdout = stream.process.stdout is not None + manage_stderr = stream.process.stderr is not None + + stdout_chunks = [] + stderr_chunks = [] + + async with _core.open_nursery() as nursery: + if manage_stdout: + nursery.start_soon(read_output, stream, stdout_chunks) + if manage_stderr: + nursery.start_soon(read_output, stream.errors, stderr_chunks) + if manage_stdin: + try: + if input: + await stream.send_all(input) + except (_core.BrokenResourceError, _core.ClosedResourceError): + pass + finally: + await stream.send_eof() + + return ( + b"".join(stdout_chunks) if manage_stdout else None, + b"".join(stderr_chunks) if manage_stderr else None + ) + + +@attr.s(init=False) +class CompletedProcess: + """The result of a call to :func:`run_process`. + + Like :class:`subprocess.CompletedProcess`, but adds the :attr:`failed` and + :attr:`cancelled` attributes. + + Attributes: + command (list or str): The command that was run, as originally passed to + :func:`run_process`. + returncode (int): The exit status of the process (zero for success). + On UNIX, if a subprocess is killed by signal number N, its + :attr:`returncode` will be -N. + failed (bool): Whether we consider the process to have failed. This + will typically be true if the :attr:`returncode` is nonzero, + unless we believe the nonzero return code was caused by + the parent Trio process, following the rules documented under + :attr:`Process.failed`. + cancelled (bool): Whether the process was terminated early due to + the :func:`run_process` call that was managing it becoming + cancelled. A cancelled process will have a :attr:`returncode` + reflecting forced termination, unless it exited normally after + catching a configured ``shutdown_signal``. (You will only actually + see a :class:`CompletedProcess` with ``cancelled=True`` get returned + from a :func:`run_process` call with ``preserve_result=True``; + otherwise you'll see a :exc:`Cancelled` exception propagating + instead.) + stdout (bytes or None): The captured standard output of the process, + or None if it was not captured. + stdout (bytes or None): The captured standard error of the process, + or None if it was not captured. + """ + + command = attr.ib() + returncode = attr.ib() + failed = attr.ib() + cancelled = attr.ib() + stdout = attr.ib() + stderr = attr.ib() + + def __init__(self, process, stdout=None, stderr=None): + self.command = process.args + self.returncode = process.returncode + self.failed = process.failed + self.cancelled = process.cancelled + self.stdout = stdout + self.stderr = stderr + + +async def run_process( + command, + *, + input=None, + check=True, + preserve_result=False, + task_status=_core.TASK_STATUS_IGNORED, + **options +): + """Run ``command`` in a subprocess, wait for it to complete, and + return a :class:`CompletedProcess` instance describing + the results. + + If cancelled, :func:`run_process` terminates the subprocess and + waits for it to exit before propagating the cancellation, like + :meth:`Process.aclose`. If you need to allow the + process to perform an orderly shutdown instead of being forcibly + terminated, see the ``shutdown_signal`` and ``shutdown_timeout`` + arguments to :class:`Process`, which are accepted in the + ``**options`` here. If you need to be able to tell what + partial output the process produced before a timeout, + see the ``preserve_result`` argument. + + The default behavior of :func:`run_process` is designed to isolate + the subprocess from potential impacts on the parent Trio process, and to + reduce opportunities for errors to pass silently. Specifically: + + * The subprocess's standard input stream is set up to receive the + bytes provided as ``input``. Once the given input has been + fully delivered, or if none is provided, the subprocess will + receive end-of-file when reading from its standard input. + + * The subprocess's standard output and standard error streams are + individually captured and returned as bytestrings from + :func:`run_process`. + + * If the subprocess exits with a nonzero status code, indicating failure, + :func:`run_process` raises a :exc:`subprocess.CalledProcessError` + exception rather than returning normally. The captured outputs + are still available as the ``stdout`` and ``stderr`` attributes + of that exception. + + To suppress the :exc:`~subprocess.CalledProcessError` on failure, + pass ``check=False``. To obtain different I/O behavior, use the + lower-level ``stdin``, ``stdout``, and/or ``stderr`` + :ref:`subprocess options `. It is an error to + specify ``input`` if ``stdin`` is specified. If ``stdout`` or + ``stderr`` is specified (as something other than + ``subprocess.PIPE``), the corresponding attribute of the returned + :class:`CompletedProcess` object will be ``None``. + + Args: + command (list or str): The command to run. Typically this is a + sequence of strings such as ``['ls', '-l', 'directory with spaces']``, + where the first element names the executable to invoke and the other + elements specify its arguments. With ``shell=True`` in the + ``**options``, or on Windows, ``command`` may alternatively + be a string, which will be parsed following platform-dependent + :ref:`quoting rules `. + input (bytes): The input to provide to the subprocess on its + standard input stream. If you want the subprocess's input + to come from something other than data specified at the time + of the :func:`run_process` call, you can specify a redirection + using the lower-level ``stdin`` option; then ``input`` must + be unspecified or None. + check (bool): If false, don't validate that the subprocess exits + successfully. You should be sure to check the + ``returncode`` attribute of the returned object if you pass + ``check=False``, so that errors don't pass silently. + preserve_result (bool): If true, return normally even if cancelled, + in order to give the caller a chance to inspect the process's + partial output before a timeout. + task_status: This function can be used with ``nursery.start``. + If it is, it returns the :class:`Process` object, so that other tasks + can send signals to the subprocess or wait for it to exit. + (They shouldn't try to send or receive on the subprocess's + input and output streams; use :func:`open_process` if you plan to do + that.) + **options: :func:`run_process` also accepts any :ref:`general subprocess + options ` and passes them on to the + :class:`~trio.Process` constructor. + + Returns: + A :class:`CompletedProcess` instance describing the + return code and outputs. + + Raises: + subprocess.CalledProcessError: if ``check=False`` is not passed + and the process exits with an exit status indicating a + failure that was not caused by the parent Trio process; + see :attr:`Process.failed` + OSError: if an error is encountered starting or communicating with + the process + + .. warning:: + If you pass ``preserve_result=True``, :func:`run_process` has + no way to directly propagate a :exc:`~trio.Cancelled` + exception, so be careful when inspecting an enclosing cancel + scope's ``cancelled_caught`` attribute if there are no other + checkpoints between the end of :func:`run_process` and the end + of the cancel scope:: + + with trio.move_on_after(1) as scope: + result = await trio.run_process( + "echo -n test; sleep 10", shell=True, preserve_result=True + ) + + print(result.stdout.encode("utf-8")) # test + print(result.cancelled) # True + print(scope.cancel_called) # True + print(scope.cancelled_caught) # False + + As the above example demonstrates, you should usually look at + :attr:`CompletedProcess.cancelled` instead. + + """ + + if input is not None and "stdin" in options: + raise ValueError( + "can't provide input to a process whose stdin is redirected" + ) + + output = None + + try: + async with open_process( + command, check=check, shield=True, **options + ) as stream: + task_status.started(stream.process) + output = await _communicate_with_process(stream, input) + + except subprocess.CalledProcessError as ex: + # We can only get here with output still at None if the + # CalledProcessError was raised out of open_process + # __aenter__, which is not currently possible (it doesn't have + # any checkpoints in between spawning the monitor task and + # yielding to the context). + if output is not None: # pragma: no branch + ex.stdout, ex.stderr = output + raise + + except _core.Cancelled: + if not preserve_result or output is None: + raise + + return CompletedProcess(stream.process, *output) + + +async def delegate_to_process(command, **run_options): + """Run ``command`` in a subprocess, with its standard streams inherited + from the parent Trio process (no redirection), and return a + :class:`CompletedProcess` describing the results. + + This is useful, for example, if you want to spawn an interactive process + and allow the user to interact with it. It is equivalent to + ``functools.partial(run_process, stdin=None, stdout=None, stderr=None)``. + + .. note:: The child is run in the same process group as the + parent, so on UNIX a user Ctrl+C will be delivered to the + parent Trio process as well. You may wish to block signals + while the child is running, start it in a new process group, or + start it in a pseudoterminal. Trio does not currently provide + facilities for this. + + """ + run_options.setdefault("stdin", None) + run_options.setdefault("stdout", None) + run_options.setdefault("stderr", None) + return await run_process(command, **run_options) diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index 629d66da3a..4913028f46 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -88,6 +88,14 @@ async def wait_send_all_might_not_block(self) -> None: class PipeReceiveStream(_PipeMixin, ReceiveStream): """Represents a receive stream over an os.pipe object.""" + did_receive_eof = False + did_close_before_receiving_eof = False + + async def aclose(self) -> None: + if not self._closed: + self.did_close_before_receiving_eof = not self.did_receive_eof + await _PipeMixin.aclose(self) + async def receive_some(self, max_bytes: int) -> bytes: if self._closed: await _core.checkpoint() @@ -111,6 +119,8 @@ async def receive_some(self, max_bytes: int) -> bytes: await _core.cancel_shielded_checkpoint() break + if data == b"": + self.did_receive_eof = True return data diff --git a/trio/_windows_pipes.py b/trio/_windows_pipes.py index 2b57de7692..4442327cbc 100644 --- a/trio/_windows_pipes.py +++ b/trio/_windows_pipes.py @@ -78,6 +78,14 @@ async def wait_send_all_might_not_block(self) -> None: class PipeReceiveStream(_PipeMixin, ReceiveStream): """Represents a receive stream over an os.pipe object.""" + did_receive_eof = False + did_close_before_receiving_eof = False + + async def aclose(self) -> None: + if not self._closed: + self.did_close_before_receiving_eof = not self.did_receive_eof + await _PipeMixin.aclose(self) + async def receive_some(self, max_bytes: int) -> bytes: async with self._conflict_detector: if self._closed: @@ -102,6 +110,7 @@ async def receive_some(self, max_bytes: int) -> bytes: # whenever the other end closes, regardless of direction. # Convert this to the Unix behavior of returning EOF to the # reader when the writer closes. + self.did_receive_eof = True return b"" else: del buffer[size:] diff --git a/trio/tests/test_highlevel_generic.py b/trio/tests/test_highlevel_generic.py index b115d3b270..ff88cc1b6d 100644 --- a/trio/tests/test_highlevel_generic.py +++ b/trio/tests/test_highlevel_generic.py @@ -2,8 +2,10 @@ import attr +from .. import _core from ..abc import SendStream, ReceiveStream -from .._highlevel_generic import StapledStream +from ..testing import assert_checkpoints +from .._highlevel_generic import StapledStream, NullStream, aclose_forcefully @attr.s @@ -92,3 +94,58 @@ async def aclose(self): assert stapled.send_stream.record == ["aclose"] assert stapled.receive_stream.record == ["aclose"] + + +async def test_NullStream(): + stream = NullStream() + assert repr(stream) == "" + + # read returns EOF + with assert_checkpoints(): + assert b"" == await stream.receive_some(32768) + + # write discards the data + with assert_checkpoints(): + await stream.send_all(b"into the bit bucket") + + # wait_send_all_might_not_block is a noop + with assert_checkpoints(): + await stream.wait_send_all_might_not_block() + + # send_eof closes the write side + with assert_checkpoints(): + await stream.send_eof() + + assert repr(stream) == "" + + # can't write after send_eof + with assert_checkpoints(), pytest.raises(_core.ClosedResourceError): + await stream.send_all(b"stuff") + with assert_checkpoints(), pytest.raises(_core.ClosedResourceError): + await stream.wait_send_all_might_not_block() + + # but can still read + with assert_checkpoints(): + assert b"" == await stream.receive_some(32768) + + # and send_eof is idempotent + with assert_checkpoints(): + await stream.send_eof() + + # close works even if cancelled + with assert_checkpoints(): + await aclose_forcefully(stream) + + assert repr(stream) == "" + + # can't read or write after close + with assert_checkpoints(), pytest.raises(_core.ClosedResourceError): + await stream.receive_some(32768) + with assert_checkpoints(), pytest.raises(_core.ClosedResourceError): + await stream.send_all(b"more stuff") + with assert_checkpoints(), pytest.raises(_core.ClosedResourceError): + await stream.send_eof() + + # but can still close + with assert_checkpoints(): + await stream.aclose() diff --git a/trio/tests/test_subprocess.py b/trio/tests/test_subprocess.py index 9a5880336c..33da9469b7 100644 --- a/trio/tests/test_subprocess.py +++ b/trio/tests/test_subprocess.py @@ -1,16 +1,29 @@ import math +import json import os import random +import shlex import signal import subprocess import sys +import tempfile import pytest +from functools import partial from .. import ( - _core, move_on_after, fail_after, sleep, sleep_forever, Process + _core, + move_on_after, + fail_after, + sleep, + sleep_forever, + Process, + ProcessStream, + open_process, + run_process, + delegate_to_process, ) from .._core.tests.tutil import slow -from ..testing import wait_all_tasks_blocked +from ..testing import wait_all_tasks_blocked, assert_checkpoints posix = os.name == "posix" if posix: @@ -29,6 +42,7 @@ def python(code): EXIT_TRUE = python("sys.exit(0)") EXIT_FALSE = python("sys.exit(1)") CAT = python("sys.stdout.buffer.write(sys.stdin.buffer.read())") +YES = python("\n" "while True:\n" " sys.stdout.buffer.write('y\\n')\n") SLEEP = lambda seconds: python("import time; time.sleep({})".format(seconds)) @@ -40,9 +54,32 @@ def got_signal(proc, sig): async def test_basic(): + repr_template = "".format(EXIT_TRUE) async with Process(EXIT_TRUE) as proc: assert proc.returncode is None + assert proc.failed is None + assert repr(proc) == repr_template.format( + "running with PID {}".format(proc.pid) + ) assert proc.returncode == 0 + assert repr(proc) == repr_template.format("exited with status 0") + + async with Process(EXIT_FALSE) as proc: + pass + assert proc.returncode == 1 + assert repr(proc) == "".format( + EXIT_FALSE, "exited with status 1" + ) + + +async def test_multi_wait(): + async with Process(SLEEP(10)) as proc: + async with _core.open_nursery() as nursery: + nursery.start_soon(proc.wait) + nursery.start_soon(proc.wait) + nursery.start_soon(proc.wait) + await wait_all_tasks_blocked() + proc.kill() async def test_kill_when_context_cancelled(): @@ -54,6 +91,10 @@ async def test_kill_when_context_cancelled(): await sleep_forever() assert scope.cancelled_caught assert got_signal(proc, SIGKILL) + assert repr(proc) == "".format( + SLEEP(10), "exited with signal 9 (our fault)" + if posix else "exited with status 1 (our fault)" + ) COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR = python( @@ -96,29 +137,31 @@ async def check_output(stream, expected): assert 0 == await proc.wait() -async def test_interactive(): - # Test some back-and-forth with a subprocess. This one works like so: - # in: 32\n - # out: 0000...0000\n (32 zeroes) - # err: 1111...1111\n (64 ones) - # in: 10\n - # out: 2222222222\n (10 twos) - # err: 3333....3333\n (20 threes) - # in: EOF - # out: EOF - # err: EOF +# Subprocess for testing back-and-forth I/O. Works like so: +# in: 32\n +# out: 0000...0000\n (32 zeroes) +# err: 1111...1111\n (64 ones) +# in: 10\n +# out: 2222222222\n (10 twos) +# err: 3333....3333\n (20 threes) +# in: EOF +# out: EOF +# err: EOF +INTERACTIVE_DEMO = python( + "idx = 0\n" + "while True:\n" + " line = sys.stdin.readline()\n" + " if line == '': break\n" + " request = int(line.strip())\n" + " print(str(idx * 2) * request)\n" + " print(str(idx * 2 + 1) * request * 2, file=sys.stderr)\n" + " idx += 1\n" +) + +async def test_interactive(): async with Process( - python( - "idx = 0\n" - "while True:\n" - " line = sys.stdin.readline()\n" - " if line == '': break\n" - " request = int(line.strip())\n" - " print(str(idx * 2) * request)\n" - " print(str(idx * 2 + 1) * request * 2, file=sys.stderr)\n" - " idx += 1\n" - ), + INTERACTIVE_DEMO, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -166,6 +209,82 @@ async def drain_one(stream, count, digit): assert proc.returncode == 0 +async def test_run(): + data = bytes(random.randint(0, 255) for _ in range(2**18)) + + result = await run_process(CAT, input=data) + assert result.command == CAT + assert result.returncode == 0 + assert result.stdout == data + assert result.stderr == b"" + + result = await run_process(CAT, stderr=None) + assert result.command == CAT + assert result.returncode == 0 + assert result.stdout == b"" + assert result.stderr is None + + result = await run_process( + COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR, input=data + ) + assert result.command == COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR + assert result.returncode == 0 + assert result.stdout == data + assert result.stderr == data[::-1] + + with pytest.raises(ValueError): + # can't use both input and stdin + await run_process(CAT, input=b"la di dah", stdin=subprocess.PIPE) + + +@slow +async def test_run_timeout(): + data = b"1" * 65536 + b"2" * 65536 + b"3" * 65536 + child_script = """ +import sys, time +sys.stdout.buffer.write(sys.stdin.buffer.read(32768)) +time.sleep(10) +sys.stdout.buffer.write(sys.stdin.buffer.read()) +""" + + with move_on_after(1): + result = await run_process( + [sys.executable, "-c", child_script], + input=data, + preserve_result=True + ) + assert result.cancelled + assert not result.failed + assert got_signal(result, SIGKILL) + assert result.stdout == data[:32768] + assert result.stderr == b"" + + +async def test_run_check(): + cmd = python("sys.stderr.buffer.write(b'test\\n'); sys.exit(1)") + with pytest.raises(subprocess.CalledProcessError) as excinfo: + await run_process(cmd, stdout=None) + assert excinfo.value.cmd == cmd + assert excinfo.value.returncode == 1 + assert excinfo.value.stderr == b"test\n" + assert excinfo.value.stdout is None + + result = await run_process(cmd, check=False) + assert result.command == cmd + assert result.stdout == b"" + assert result.stderr == b"test\n" + assert result.returncode == 1 + + +async def test_run_with_broken_pipe(): + result = await run_process( + [sys.executable, "-c", "import sys; sys.stdin.close()"], + input=b"x" * 131072 + ) + assert result.returncode == 0 + assert result.stdout == result.stderr == b"" + + async def test_stderr_stdout(): async with Process( COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR, @@ -175,27 +294,22 @@ async def test_stderr_stdout(): ) as proc: assert proc.stdout is not None assert proc.stderr is None - await proc.stdin.send_all(b"1234") - await proc.stdin.aclose() - - output = [] - while True: - chunk = await proc.stdout.receive_some(16) - if chunk == b"": - break - output.append(chunk) - assert b"".join(output) == b"12344321" - assert proc.returncode == 0 + + result = await run_process( + COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR, + input=b"1234", + stderr=subprocess.STDOUT, + ) + assert result.returncode == 0 + assert result.stdout == b"12344321" + assert result.stderr is None # this one hits the branch where stderr=STDOUT but stdout # is not redirected - async with Process( - CAT, stdin=subprocess.PIPE, stderr=subprocess.STDOUT - ) as proc: - assert proc.stdout is None - assert proc.stderr is None - await proc.stdin.aclose() - assert proc.returncode == 0 + result = await run_process(CAT, stdout=None, stderr=subprocess.STDOUT) + assert result.returncode == 0 + assert result.stdout is None + assert result.stderr is None if posix: try: @@ -225,6 +339,12 @@ async def test_errors(): assert "unbuffered byte streams" in str(excinfo.value) assert "the 'encoding' option is not supported" in str(excinfo.value) + if posix: + with pytest.raises(TypeError) as excinfo: + Process(["ls"], shell=True) + with pytest.raises(TypeError) as excinfo: + Process("ls", shell=False) + async def test_signals(): async def test_one_signal(send_it, signum): @@ -232,6 +352,7 @@ async def test_one_signal(send_it, signum): async with Process(SLEEP(3600)) as proc: send_it(proc) assert not scope.cancelled_caught + assert not proc.failed if posix: assert proc.returncode == -signum else: @@ -243,6 +364,351 @@ async def test_one_signal(send_it, signum): await test_one_signal(lambda proc: proc.send_signal(SIGINT), SIGINT) +async def test_delegate(): + # The whole point of delegate_to_process is that its I/O doesn't go + # through us, so there's not that much we can test, but we can at least + # run the code + + result = await delegate_to_process(EXIT_TRUE) + assert result.stdout is result.stderr is None + + with pytest.raises(subprocess.CalledProcessError) as excinfo: + await delegate_to_process(EXIT_FALSE) + assert excinfo.value.stdout is excinfo.value.stderr is None + + with pytest.raises(ValueError): + await delegate_to_process(CAT, input=b"input not allowed") + + +# no sleeps but it takes over a second on my machine +@slow +async def test_not_a_failure_if_we_signal(): + async def do_test( + which_signal, + send_it, + wait_for_signal=True, + exit_status=None, # None = propagate the signal + ): + code = [] + if wait_for_signal and exit_status is not None: + code.extend( + [ + "import signal", + "def handle(sig, frame):", + " sys.exit({})".format(exit_status), + "signal.signal({}, handle)".format(which_signal), + ] + ) + code.extend([ + "import sys", + "sys.stdout.buffer.write(b'.')", + ]) + if not wait_for_signal: + code.append("sys.exit({})".format(exit_status)) + else: + code.extend([ + "import time", + "time.sleep(10)", + ]) + + with fail_after(1): + async with Process( + python("\n".join(code)), stdout=subprocess.PIPE + ) as proc: + # Wait for process to start up and install signal handler + # if any + await proc.stdout.receive_some(1) + + # Secretly wait for the process to exit if we're trying to + # test the exited-before-we-sent-it case; otherwise the + # signal-sending could race with the exit. (We don't check + # for exit through the Process object because we don't want + # it to know the process exited.) + if not wait_for_signal: + proc._proc.wait() # synchronous Popen.wait() + + send_it(proc) + await proc.wait() + + if exit_status is not None: + assert proc.returncode == exit_status + assert proc.failed == ( + proc.returncode != 0 and + (not wait_for_signal or which_signal != signal.SIGTERM) + ) + else: + assert wait_for_signal + assert proc.returncode == (-which_signal if posix else 1) + assert not proc.failed + + def sender(sig): + return partial(Process.send_signal, sig=sig) + + # test exit on unhandled terminate/kill and exit before + # attempt to terminate/kill + from signal import SIGTERM as TERM + try: + from signal import SIGKILL as KILL + except ImportError: + KILL = TERM + for exit_status in (None, 0, 1, 42): + wait = exit_status is None + await do_test(TERM, sender(TERM), wait, exit_status) + await do_test(TERM, Process.terminate, wait, exit_status) + await do_test(KILL, sender(KILL), wait, exit_status) + await do_test(KILL, Process.kill, wait, exit_status) + + if posix: + # test exit on handled terminate -- never counted as fail + for exit_status in (0, 1, 42): + await do_test(TERM, sender(TERM), True, exit_status) + await do_test(TERM, Process.terminate, True, exit_status) + + # test a non-TERM signal + HUP = signal.SIGHUP + await do_test(HUP, sender(HUP)) # exit on unhandled is OK + for exit_status in (0, 1, 42): + # exit before delivered & exit on handled are both fails + # if the status is nonzero + await do_test(HUP, sender(HUP), False, exit_status) + await do_test(HUP, sender(HUP), True, exit_status) + + +async def test_not_a_failure_if_we_break_their_pipe(): + # Actually broke their pipe + async with Process(YES, stdout=subprocess.PIPE) as proc: + await proc.stdout.aclose() + assert proc.returncode != 0 + assert not proc.failed + + # Looks like we could've + async with Process( + EXIT_FALSE, stdout=subprocess.PIPE + ) as proc: + await proc.stdout.aclose() + assert proc.returncode == 1 + assert not proc.failed + + # ... and we noticed via poll() + async with Process( + python("sys.stdin.buffer.read(); sys.exit(1)"), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) as proc: + await proc.stdout.aclose() + while proc.poll() is None: + await proc.stdin.aclose() + assert proc.returncode == 1 + assert not proc.failed + + # Works on stderr too + async with Process( + EXIT_FALSE, stderr=subprocess.PIPE + ) as proc: + await proc.stderr.aclose() + assert proc.returncode == 1 + assert not proc.failed + + # Closed after process exited --> not our fault + async with Process( + EXIT_FALSE, stdout=subprocess.PIPE + ) as proc: + await proc.wait() + await proc.stdout.aclose() + assert proc.returncode == 1 + assert proc.failed + + # Closed after process exited and we noticed via poll() + async with Process( + EXIT_FALSE, stdout=subprocess.PIPE + ) as proc: + while proc.poll() is None: + pass + await proc.stdout.aclose() + assert proc.returncode == 1 + assert proc.failed + + +async def test_run_in_background(): + # Test signaling a background process + async with _core.open_nursery() as nursery: + proc = await nursery.start( + run_process, python("import time; time.sleep(5)") + ) + assert proc.returncode is None + nursery.cancel_scope.cancel() + assert got_signal(proc, SIGKILL) + assert not proc.failed + + # Test closing a background process's stream + async with _core.open_nursery() as nursery: + proc = await nursery.start(run_process, YES) + await proc.stdout.aclose() + assert proc.returncode == 1 # exits due to BrokenPipeError + assert not proc.failed # we closed their pipe first, so it's our fault + + # Test a background process failing + with pytest.raises(subprocess.CalledProcessError): + async with _core.open_nursery() as nursery: + await nursery.start(run_process, EXIT_FALSE) + + +@slow +async def test_shutdown(): + # test some normal non-shutdown cases with a shutdown configured, + # to make sure it doesn't interfere when not used + with fail_after(2): + await run_process(EXIT_TRUE, shutdown_timeout=1) + with pytest.raises(subprocess.CalledProcessError): + await run_process(EXIT_FALSE, shutdown_timeout=1) + data = b"hi" * 16384 + assert data == ( + await run_process(CAT, input=data, shutdown_timeout=1) + ).stdout + result = await run_process( + COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR, + input=b"1234", + shutdown_timeout=1, + ) + assert result.stdout == b"1234" + assert result.stderr == b"4321" + + if posix: + # test a successful shutdown + shutdown_flags = dict(shutdown_signal=signal.SIGINT) + + SHUTDOWN_HANDLED = python( + r"""import itertools, time +try: + for idx in itertools.count(): + sys.stdout.buffer.write(b"%d\n" % idx) + time.sleep(0.1) +except KeyboardInterrupt: + sys.stdout.buffer.write(b"exiting\n") +""" + ) + with move_on_after(1): + result = await run_process( + SHUTDOWN_HANDLED, + preserve_result=True, + shutdown_timeout=1, + **shutdown_flags + ) + assert result.cancelled + assert result.returncode == 0 + output = result.stdout.split(b"\n") + + # we set a timeout of 1 sec; allow anywhere between 5 and 12 + # lines of every-0.1-sec output to account for possible slow + # startup + assert output.pop() == b"" + assert output.pop() == b"exiting" + assert 5 <= len(output) <= 12 + assert all(line == b"%d" % idx for idx, line in enumerate(output)) + + # test a manual shutdown + async with open_process(SHUTDOWN_HANDLED, **shutdown_flags) as stream: + async with _core.open_nursery() as nursery: + # must wait to receive something to ensure we're + # inside the try block before we interrupt + await stream.receive_some(1024) + nursery.start_soon(stream.process.shutdown) + output = [] + while True: + chunk = await stream.receive_some(1024) + if chunk == b"": + break + output.append(chunk) + assert b"".join(output).endswith(b"exiting\n") + + # test a shutdown that times out + SHUTDOWN_IGNORED = python( + r"""import time +try: + sys.stdout.buffer.write(b"running\n") + time.sleep(10) +except KeyboardInterrupt: + sys.stdout.buffer.write(b"muahahaha\n") + time.sleep(10) +""" + ) + with move_on_after(0.5): + result = await run_process( + SHUTDOWN_IGNORED, + preserve_result=True, + shutdown_timeout=0.5, + **shutdown_flags + ) + assert result.cancelled + assert got_signal(result, SIGKILL) + assert result.stdout == b"running\n" + b"muahahaha\n" + + # make sure cancel gets raised even if it's momentary + with _core.open_cancel_scope() as outer_scope: + with _core.open_cancel_scope() as inner_scope: + async with _core.open_nursery() as nursery: + proc = await nursery.start( + partial( + run_process, + SLEEP(10), + shutdown_timeout=0.5, + **shutdown_flags + ) + ) + await wait_all_tasks_blocked() + # This will immediately cancel the task waiting + # in Process.join: + outer_scope.cancel() + # And this will prevent it from executing any + # further checkpoints in a cancelled state: + inner_scope.shield = True + + # The process still gets killed + assert got_signal(proc, SIGINT) + # The cancel still gets propagated + assert outer_scope.cancelled_caught + + # test a close-pipes-and-wait style of shutdown (works on Windows too) + with _core.open_cancel_scope() as scope: + async with open_process( + INTERACTIVE_DEMO, + stderr=subprocess.DEVNULL, + shutdown_signal=0, + shutdown_timeout=0.5, + ) as stream: + newline = b"\n" if posix else b"\r\n" + await stream.wait_send_all_might_not_block() # for coverage + await stream.send_all(b"4" + newline) + + response = b"" + target = b"0000" + newline + while len(response) < len(target): + response += await stream.receive_some( + len(target) - len(response) + ) + assert response == target + scope.cancel() + + # Make sure we're closing the pipes due to the shutdown, + # not just due to exiting the open_process ``async with`` block + with _core.open_cancel_scope(shield=True): + await stream.process.wait() + + assert scope.cancelled_caught + assert stream.process.returncode == 0 + + +async def test_process_stream_unavail(): + async with Process(EXIT_TRUE) as proc: + stream = ProcessStream(proc) + with pytest.raises(_core.ClosedResourceError): + with assert_checkpoints(): + await stream.send_all(b"hi") + with pytest.raises(_core.ClosedResourceError): + with assert_checkpoints(): + await stream.receive_some(1024) + + @pytest.mark.skipif(not posix, reason="POSIX specific") async def test_wait_reapable_fails(): old_sigchld = signal.signal(signal.SIGCHLD, signal.SIG_IGN)