Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
e448431
[WIP] subprocess support
oremanj Nov 28, 2018
7a8f918
flake8 doesn't like redefinitions so move the OS-specific method docs…
oremanj Nov 28, 2018
581554b
don't assume executing 'python' invokes python 2
oremanj Nov 28, 2018
6d2bc09
fix pypy issues
oremanj Nov 28, 2018
1885487
fix pypy harder
oremanj Nov 28, 2018
3807c64
add links to pypy issues
oremanj Nov 28, 2018
8d5f5a1
add call(), check_call(), check_output(), fix some coverage gaps
oremanj Nov 28, 2018
701655b
add a comment about the dangerous-looking siginfo_t layout assumption
oremanj Nov 28, 2018
51d6a50
yapfify
oremanj Nov 28, 2018
756e55b
Explode _subprocess package into its constituent parts
oremanj Dec 13, 2018
3720fe7
split off wait_reapable into a separate module
oremanj Dec 13, 2018
6be26eb
use AsyncResource, make test less silly
oremanj Dec 13, 2018
77ec5ee
close stdout/stderr when closing child
oremanj Dec 13, 2018
f2db971
Rename _platform to _subprocess_platform, move more stuff there, misc…
oremanj Dec 15, 2018
ce78d8d
yapfify
oremanj Dec 15, 2018
a1d70ca
less aggressive timeout in the hope of reducing test flakiness
oremanj Dec 15, 2018
0ed693b
placate flake8
oremanj Dec 15, 2018
ccefeac
win32 support
oremanj Dec 15, 2018
33f6db9
Merge remote-tracking branch 'origin/master' into subprocess
oremanj Dec 15, 2018
ab43b2b
merge with new layout of hazmat
oremanj Dec 15, 2018
b21f308
yapfify
oremanj Dec 15, 2018
94b8416
fix accidental change to make_clogged_pipe
oremanj Dec 15, 2018
e35f11c
add missing file
oremanj Dec 15, 2018
fc6e60a
fix unused variable warning
oremanj Dec 15, 2018
869e42b
attempt to fix remaining coverage issues
oremanj Dec 15, 2018
054951d
more small coverage tweaks
oremanj Dec 15, 2018
a79eca7
see if this will fix pypy+mac build
oremanj Dec 15, 2018
09c0f43
try again
oremanj Dec 15, 2018
b3b2bb0
Stop trying to run a PyPy on Travis OS X -- couldn't get the installa…
oremanj Dec 16, 2018
6380aea
hopefully fix windows flapping; no-cover the line of pypy+OSX only code
oremanj Dec 16, 2018
0eb001c
fix careless error in Windows test
oremanj Dec 16, 2018
ad0bc67
updates after code review
oremanj Dec 20, 2018
a87a87c
streamline waitid EINTR test in the hopes that it starts working on C…
oremanj Dec 20, 2018
82253ca
fix sense of check
oremanj Dec 20, 2018
4be2609
one more coverage nit
oremanj Dec 20, 2018
5722d4b
add newsfragment
oremanj Dec 20, 2018
9aea79b
Add docs and capture_output parameter
oremanj Dec 20, 2018
3bde9a4
yapf
oremanj Dec 20, 2018
3386ed2
fix newsfragment formatting
oremanj Dec 20, 2018
709eabf
more doc nits
oremanj Dec 20, 2018
bd15874
wait_child_exiting takes a trio process, not a stdlib subprocess
oremanj Dec 21, 2018
8bdc217
missed a spot
oremanj Dec 21, 2018
32494e6
updates after code review
oremanj Dec 21, 2018
b175e62
fix forgot-to-register test
oremanj Dec 21, 2018
5477ac1
wait for the I/O to probably complete before we try to cancel it
oremanj Dec 21, 2018
3bc7756
fix test_forgot_to_register_with_iocp, make test_too_late_to_cancel @…
oremanj Dec 23, 2018
14c6991
yapf
oremanj Dec 23, 2018
ee77992
Use gc_collect_harder instead of trying to fool Trio into not warning
oremanj Dec 23, 2018
300741d
Merge remote-tracking branch 'origin/master' into subprocess
oremanj Dec 25, 2018
879b3b8
Remove run()
oremanj Dec 26, 2018
8ec3a6c
fix newsfragment reference
oremanj Dec 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 163 additions & 7 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ create complex transport configurations. Here's some examples:
speak SSL over the network is to wrap an
:class:`~trio.ssl.SSLStream` around a :class:`~trio.SocketStream`.

* If you spawn a subprocess then you can get a
* If you spawn a :ref:`subprocess`, you can get a
:class:`~trio.abc.SendStream` that lets you write to its stdin, and
a :class:`~trio.abc.ReceiveStream` that lets you read from its
stdout. If for some reason you wanted to speak SSL to a subprocess,
Expand All @@ -36,9 +36,6 @@ create complex transport configurations. Here's some examples:
ssl_context.check_hostname = False
s = SSLStream(StapledStream(process.stdin, process.stdout), ssl_context)

[Note: subprocess support is not implemented yet, but that's the
plan. Unless it is implemented, and I forgot to remove this note.]

* It sometimes happens that you want to connect to an HTTPS server,
but you have to go through a web proxy... and the proxy also uses
HTTPS. So you end up having to do `SSL-on-top-of-SSL
Expand Down Expand Up @@ -641,10 +638,169 @@ Asynchronous file objects
The underlying synchronous file object.


Subprocesses
------------
.. module:: trio.subprocess
.. _subprocess:

Spawning subprocesses with :mod:`trio.subprocess`
-------------------------------------------------

The :mod:`trio.subprocess` module provides support for spawning
other programs, communicating with them via pipes, sending them signals,
and waiting for them to exit. Its interface is based on the
:mod:`subprocess` module in the standard library; differences
are noted below.

The constants and exceptions from the standard :mod:`subprocess`
module are re-exported by :mod:`trio.subprocess` unchanged.
So, if you like, you can say ``from trio import subprocess``
and continue referring to ``subprocess.PIPE``,
:exc:`subprocess.CalledProcessError`, and so on, in the same
way you would in synchronous code.


.. _subprocess-options:

Options for starting subprocesses
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The standard :mod:`subprocess` module supports a dizzying array
of `options <https://docs.python.org/3/library/subprocess.html#popen-constructor>`__
for controlling the environment in which a process starts and the
mechanisms used for communicating with it. (If you find that list
overwhelming, you're not alone; you might prefer to start with
just the `frequently used ones
<https://docs.python.org/3/library/subprocess.html#frequently-used-arguments>`__.)

Trio makes use of the :mod:`subprocess` module's logic for spawning processes,
so almost all of these options can be used with their same semantics when
starting subprocesses under Trio. The exceptions are ``encoding``, ``errors``,
``universal_newlines`` (and its 3.7+ alias ``text``), and ``bufsize``;
Trio always uses unbuffered byte streams for communicating with a process,
so these options don't make sense. Text I/O should use a layer
on top of the raw byte streams, just as it does with sockets.
[This layer does not yet exist, but is in the works.]


Running a process and waiting for it to finish
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

We're `working on <https://github.com/python-trio/trio/pull/791>`
figuring out the best API for common higher-level subprocess operations.
In the meantime, you can implement something like the standard library
:func:`subprocess.run` in terms of :class:`trio.subprocess.Process`
as follows::

async def run(
command, *, input=None, capture_output=False, **options
):
if input is not None:
options['stdin'] = subprocess.PIPE
if capture_output:
options['stdout'] = options['stderr'] = subprocess.PIPE

stdout_chunks = []
stderr_chunks = []

async with trio.subprocess.Process(command, **options) as proc:

async def feed_input():
async with proc.stdin:
if input:
try:
await proc.stdin.send_all(input)
except trio.BrokenResourceError:
pass

async def read_output(stream, chunks):
async with stream:
while True:
chunk = await stream.receive_some(32768)
if not chunk:
break
chunks.append(chunk)

async with trio.open_nursery() as nursery:
if proc.stdin is not None:
nursery.start_soon(feed_input)
if proc.stdout is not None:
nursery.start_soon(read_output, proc.stdout, stdout_chunks)
if proc.stderr is not None:
nursery.start_soon(read_output, proc.stderr, stderr_chunks)
await proc.wait()

stdout = b"".join(stdout_chunks) if proc.stdout is not None else None
stderr = b"".join(stderr_chunks) if proc.stderr is not None else None

if proc.returncode:
raise subprocess.CalledProcessError(
proc.returncode, proc.args, output=stdout, stderr=stderr
)
else:
return subprocess.CompletedProcess(
proc.args, proc.returncode, stdout, stderr
)


Interacting with a process as it runs
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can spawn a subprocess by creating an instance of
:class:`trio.subprocess.Process` and then interact with it using its
:attr:`~trio.subprocess.Process.stdin`,
:attr:`~trio.subprocess.Process.stdout`, and/or
:attr:`~trio.subprocess.Process.stderr` streams.

.. autoclass:: trio.subprocess.Process
:members:


`Not implemented yet! <https://github.com/python-trio/trio/issues/4>`__
Differences from the standard library
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

* All arguments to the constructor of
:class:`~trio.subprocess.Process`, except the command to run, must be
passed using keywords.

* :func:`~subprocess.call`, :func:`~subprocess.check_call`, and
:func:`~subprocess.check_output` are not provided.

* :meth:`~subprocess.Popen.communicate` is not provided as a method on
:class:`~trio.subprocess.Process` objects; use a higher-level
function instead, or write the loop yourself if
you have unusual needs. :meth:`~subprocess.Popen.communicate` has
quite unusual cancellation behavior in the standard library (on some
platforms it spawns a background thread which continues to read from
the child process even after the timeout has expired) and we wanted
to provide an interface with fewer surprises.

* :meth:`~trio.subprocess.Process.wait` is an async function that does
not take a ``timeout`` argument; combine it with
:func:`~trio.fail_after` if you want a timeout.

* Text I/O is not supported: you may not use the
:class:`~trio.subprocess.Process` constructor arguments
``universal_newlines`` (or its 3.7+ alias ``text``), ``encoding``,
or ``errors``.

* :attr:`~trio.subprocess.Process.stdin` is a :class:`~trio.abc.SendStream` and
:attr:`~trio.subprocess.Process.stdout` and :attr:`~trio.subprocess.Process.stderr`
are :class:`~trio.abc.ReceiveStream`\s, rather than file objects. The
:class:`~trio.subprocess.Process` constructor argument ``bufsize`` is
not supported since there would be no file object to pass it to.

* :meth:`~trio.subprocess.Process.aclose` (and thus also
``__aexit__``) behave like the standard :class:`~subprocess.Popen`
context manager exit (close pipes to the process, then wait for it
to exit), but add additional behavior if cancelled: kill the process
and wait for it to finish terminating. This is useful for scoping
the lifetime of a simple subprocess that doesn't spawn any children
of its own. (For subprocesses that do in turn spawn their own
subprocesses, there is not currently any way to clean up the whole
tree; moreover, using the :class:`Process` context manager in such
cases is likely to be counterproductive as killing the top-level
subprocess leaves it no chance to do any cleanup of its children
that might be desired. You'll probably want to write your own
supervision logic in that case.)


Signals
Expand Down
6 changes: 6 additions & 0 deletions newsfragments/4.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Initial :ref:`subprocess support <subprocess>`.
Add :class:`trio.subprocess.Process`, an async wrapper around the stdlib
:class:`subprocess.Popen` class, which permits spawning subprocesses
and communicating with them over standard Trio streams.
:mod:`trio.subprocess` also reexports all the stdlib :mod:`subprocess`
exceptions and constants for convenience.
73 changes: 73 additions & 0 deletions notes-to-self/subprocess-notes.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# subprocesses are a huge hassle
# on Linux there is simply no way to async wait for a child to exit except by
# messing with SIGCHLD and that is ... *such* a mess. Not really
# tenable. We're better off trying os.waitpid(..., os.WNOHANG), and if that
# says the process is still going then spawn a thread to sit in waitpid.
# ......though that waitpid is non-cancellable so ugh. this is a problem,
# becaues it's also mutating -- you only get to waitpid() once, and you have
# to do it, because zombies. I guess we could make sure the waitpid thread is
# daemonic and either it gets back to us eventually (even if our first call to
# 'await wait()' is cancelled, maybe another one won't be), or else we go away
# and don't care anymore.
# I guess simplest is just to spawn a thread at the same time as we spawn the
# process, with more reasonable notification semantics.
# or we can poll every 100 ms or something, sigh.

# on Mac/*BSD then kqueue works, go them. (maybe have WNOHANG after turning it
# on to avoid a race condition I guess)

# on Windows, you can either do the thread thing, or something involving
# WaitForMultipleObjects, or the Job Object API:
# https://stackoverflow.com/questions/17724859/detecting-exit-failure-of-child-processes-using-iocp-c-windows
# (see also the comments here about using the Job Object API:
# https://stackoverflow.com/questions/23434842/python-how-to-kill-child-processes-when-parent-dies/23587108#23587108)
# however the docs say:
# "Note that, with the exception of limits set with the
# JobObjectNotificationLimitInformation information class, delivery of
# messages to the completion port is not guaranteed; failure of a message to
# arrive does not necessarily mean that the event did not occur"
#
# oh windows wtf

# We'll probably want to mess with the job API anyway for worker processes
# (b/c that's the reliable way to make sure we never leave residual worker
# processes around after exiting, see that stackoverflow question again), so
# maybe this isn't too big a hassle? waitpid is probably easiest for the
# first-pass implementation though.

# the handle version has the same issues as waitpid on Linux, except I guess
# that on windows the waitpid equivalent doesn't consume the handle.
# -- wait no, the windows equivalent takes a timeout! and we know our
# cancellation deadline going in, so that's actually okay. (Still need to use
# a thread but whatever.)

# asyncio does RegisterWaitForSingleObject with a callback that does
# PostQueuedCompletionStatus.
# this is just a thread pool in disguise (and in principle could have weird
# problems if you have enough children and run out of threads)
# it's possible we could do something with a thread that just sits in
# an alertable state and handle callbacks...? though hmm, maybe the set of
# events that can notify via callbacks is equivalent to the set that can
# notify via IOCP.
# there's WaitForMultipleObjects to let multiple waits share a thread I
# guess.
# you can wake up a WaitForMultipleObjectsEx on-demand by using QueueUserAPC
# to send a no-op APC to its thread.
# this is also a way to cancel a WaitForSingleObjectEx, actually. So it
# actually is possible to cancel the equivalent of a waitpid on Windows.

# Potentially useful observation: you *can* use a socket as the
# stdin/stdout/stderr for a child, iff you create that socket *without*
# WSA_FLAG_OVERLAPPED:
# http://stackoverflow.com/a/5725609
# Here's ncm's Windows implementation of socketpair, which has a flag to
# control whether one of the sockets has WSA_FLAG_OVERLAPPED set:
# https://github.com/ncm/selectable-socketpair/blob/master/socketpair.c
# (it also uses listen(1) so it's robust against someone intercepting things,
# unlike the version in socket.py... not sure anyone really cares, but
# hey. OTOH it only supports AF_INET, while socket.py supports AF_INET6,
# fancy.)
# (or it would be trivial to (re)implement in python, using either
# socket.socketpair or ncm's version as a model, given a cffi function to
# create the non-overlapped socket in the first place then just pass it into
# the socket.socket constructor (avoiding the dup() that fromfd does).)
1 change: 1 addition & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from . import socket
from . import abc
from . import ssl
from . import subprocess
# Not imported by default: testing
if False:
from . import testing
Expand Down
Loading