Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions newsfragments/1109.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
It turns out that creating a subprocess can block the parent process
for a surprisingly long time. So `trio.open_process` now uses a worker
thread to avoid blocking the event loop.
3 changes: 3 additions & 0 deletions newsfragments/1109.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
If you want to create a `trio.Process` object, you now have to call
`trio.open_process`; calling ``trio.Process()`` directly was
deprecated in v0.12.0 and has now been removed.
166 changes: 75 additions & 91 deletions trio/_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import subprocess
import sys
from typing import Optional
from functools import partial

from ._abc import AsyncResource, SendStream, ReceiveStream
from ._highlevel_generic import StapledStream
Expand All @@ -10,6 +11,7 @@
wait_child_exiting, create_pipe_to_child_stdin,
create_pipe_from_child_output
)
from ._util import NoPublicConstructor
import trio

# Linux-specific, but has complex lifetime management stuff so we hard-code it
Expand Down Expand Up @@ -46,7 +48,7 @@ def pidfd_open(fd, flags):
can_try_pidfd_open = False


class Process(AsyncResource):
class Process(AsyncResource, metaclass=NoPublicConstructor):
r"""A child process. Like :class:`subprocess.Popen`, but async.

This class has no public constructor. To create a child process, use
Expand Down Expand Up @@ -100,91 +102,18 @@ class Process(AsyncResource):
# arbitrarily many threads if wait() keeps getting cancelled.
_wait_for_exit_data = None

# After the deprecation period:
# - delete __init__ and _create
# - add metaclass=NoPublicConstructor
# - rename _init to __init__
# - move most of the code into open_process()
# - put the subprocess.Popen(...) call into a thread
def __init__(self, *args, **kwargs):
trio._deprecate.warn_deprecated(
"directly constructing Process objects",
"0.12.0",
issue=1109,
instead="trio.open_process"
)
self._init(*args, **kwargs)

@classmethod
def _create(cls, *args, **kwargs):
self = cls.__new__(cls)
self._init(*args, **kwargs)
return self

def _init(
self, command, *, stdin=None, stdout=None, stderr=None, **options
):
for key in (
'universal_newlines', 'text', 'encoding', 'errors', 'bufsize'
):
if options.get(key):
raise TypeError(
"trio.Process only supports communicating over "
"unbuffered byte streams; the '{}' option is not supported"
.format(key)
)

self.stdin = None # type: Optional[SendStream]
self.stdout = None # type: Optional[ReceiveStream]
self.stderr = None # type: Optional[ReceiveStream]
self.stdio = None # type: Optional[StapledStream]
def __init__(self, popen, stdin, stdout, stderr):
self._proc = popen
self.stdin = stdin # type: Optional[SendStream]
self.stdout = stdout # type: Optional[ReceiveStream]
self.stderr = stderr # type: Optional[ReceiveStream]

if os.name == "posix":
if isinstance(command, str) and not options.get("shell"):
raise TypeError(
"command must be a sequence (not a string) if shell=False "
"on UNIX systems"
)
if not isinstance(command, str) and options.get("shell"):
raise TypeError(
"command must be a string (not a sequence) if shell=True "
"on UNIX systems"
)
self.stdio = None # type: Optional[StapledStream]
if self.stdin is not None and self.stdout is not None:
self.stdio = StapledStream(self.stdin, self.stdout)

self._wait_lock = Lock()

if stdin == subprocess.PIPE:
self.stdin, stdin = create_pipe_to_child_stdin()
if stdout == subprocess.PIPE:
self.stdout, stdout = create_pipe_from_child_output()
if stderr == subprocess.STDOUT:
# If we created a pipe for stdout, pass the same pipe for
# stderr. If stdout was some non-pipe thing (DEVNULL or a
# given FD), pass the same thing. If stdout was passed as
# None, keep stderr as STDOUT to allow subprocess to dup
# our stdout. Regardless of which of these is applicable,
# don't create a new Trio stream for stderr -- if stdout
# is piped, stderr will be intermixed on the stdout stream.
if stdout is not None:
stderr = stdout
elif stderr == subprocess.PIPE:
self.stderr, stderr = create_pipe_from_child_output()

try:
self._proc = subprocess.Popen(
command, stdin=stdin, stdout=stdout, stderr=stderr, **options
)
finally:
# Close the parent's handle for each child side of a pipe;
# we want the child to have the only copy, so that when
# it exits we can read EOF on our side.
if self.stdin is not None:
os.close(stdin)
if self.stdout is not None:
os.close(stdout)
if self.stderr is not None:
os.close(stderr)

self._pidfd = None
if can_try_pidfd_open:
try:
Expand All @@ -200,9 +129,6 @@ def _init(
# make sure it'll get closed.
self._pidfd = open(fd)

if self.stdin is not None and self.stdout is not None:
self.stdio = StapledStream(self.stdin, self.stdout)

self.args = self._proc.args
self.pid = self._proc.pid

Expand Down Expand Up @@ -378,12 +304,70 @@ async def open_process(
specified command could not be found.

"""
# XX FIXME: move the process creation into a thread as soon as we're done
# deprecating Process(...)
await trio.lowlevel.checkpoint()
return Process._create(
command, stdin=stdin, stdout=stdout, stderr=stderr, **options
)
for key in ('universal_newlines', 'text', 'encoding', 'errors', 'bufsize'):
if options.get(key):
raise TypeError(
"trio.Process only supports communicating over "
"unbuffered byte streams; the '{}' option is not supported"
.format(key)
)

if os.name == "posix":
if isinstance(command, str) and not options.get("shell"):
raise TypeError(
"command must be a sequence (not a string) if shell=False "
"on UNIX systems"
)
if not isinstance(command, str) and options.get("shell"):
raise TypeError(
"command must be a string (not a sequence) if shell=True "
"on UNIX systems"
)

trio_stdin = None # type: Optional[SendStream]
trio_stdout = None # type: Optional[ReceiveStream]
trio_stderr = None # type: Optional[ReceiveStream]

if stdin == subprocess.PIPE:
trio_stdin, stdin = create_pipe_to_child_stdin()
if stdout == subprocess.PIPE:
trio_stdout, stdout = create_pipe_from_child_output()
if stderr == subprocess.STDOUT:
# If we created a pipe for stdout, pass the same pipe for
# stderr. If stdout was some non-pipe thing (DEVNULL or a
# given FD), pass the same thing. If stdout was passed as
# None, keep stderr as STDOUT to allow subprocess to dup
# our stdout. Regardless of which of these is applicable,
# don't create a new Trio stream for stderr -- if stdout
# is piped, stderr will be intermixed on the stdout stream.
if stdout is not None:
stderr = stdout
elif stderr == subprocess.PIPE:
trio_stderr, stderr = create_pipe_from_child_output()

try:
popen = await trio.to_thread.run_sync(
partial(
subprocess.Popen,
command,
stdin=stdin,
stdout=stdout,
stderr=stderr,
**options
)
)
finally:
# Close the parent's handle for each child side of a pipe;
# we want the child to have the only copy, so that when
# it exits we can read EOF on our side.
if trio_stdin is not None:
os.close(stdin)
if trio_stdout is not None:
os.close(stdout)
if trio_stderr is not None:
os.close(stderr)

return Process._create(popen, trio_stdin, trio_stdout, trio_stderr)


async def run_process(
Expand Down
8 changes: 0 additions & 8 deletions trio/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,6 @@ async def test_basic():
)


# Delete this test when we remove direct Process construction
async def test_deprecated_Process_init():
with pytest.warns(TrioDeprecationWarning):
async with Process(EXIT_TRUE) as proc:
assert isinstance(proc, Process)
assert proc.returncode == 0


async def test_multi_wait():
async with await open_process(SLEEP(10)) as proc:
# Check that wait (including multi-wait) tolerates being cancelled
Expand Down