diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 3cf9bc2890..eaaf9e7bab 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -174,6 +174,26 @@ All environments provide the following functions: yourself afterwards. +Unix-specific API +----------------- + +`FdStream` supports wrapping Unix files (such as a pipe or TTY) as +a stream. + +If you have two different file descriptors for sending and receiving, +and want to bundle them together into a single bidirectional +`~trio.abc.Stream`, then use `trio.StapledStream`:: + + bidirectional_stream = trio.StapledStream( + trio.hazmat.FdStream(write_fd), + trio.hazmat.FdStream(read_fd) + ) + +.. autoclass:: FdStream + :show-inheritance: + :members: + + Kqueue-specific API ------------------- diff --git a/newsfragments/829.feature.rst b/newsfragments/829.feature.rst new file mode 100644 index 0000000000..340908571c --- /dev/null +++ b/newsfragments/829.feature.rst @@ -0,0 +1 @@ +Add `trio.hazmat.FdStream` for wrapping a Unix file descriptor as a `~trio.abc.Stream`. diff --git a/trio/_subprocess.py b/trio/_subprocess.py index ae530e3ae8..f9bc06477f 100644 --- a/trio/_subprocess.py +++ b/trio/_subprocess.py @@ -1,9 +1,8 @@ import os -import select import subprocess -from functools import partial +from typing import Optional -from ._abc import AsyncResource +from ._abc import AsyncResource, SendStream, ReceiveStream from ._highlevel_generic import StapledStream from ._sync import Lock from ._subprocess_platform import ( @@ -101,9 +100,10 @@ def _init( .format(key) ) - self.stdin = None - self.stdout = None - self.stderr = None + self.stdin = None # type: Optional[SendStream] + self.stdout = None # type: Optional[ReceiveStream] + self.stderr = None # type: Optional[ReceiveStream] + self.stdio = None # type: Optional[StapledStream] if os.name == "posix": if isinstance(command, str) and not options.get("shell"): @@ -153,8 +153,6 @@ def _init( if self.stdin is not None and self.stdout is not None: self.stdio = StapledStream(self.stdin, self.stdout) - else: - self.stdio = None self.args = self._proc.args self.pid = self._proc.pid diff --git a/trio/_subprocess_platform/__init__.py b/trio/_subprocess_platform/__init__.py index 1507ec1246..b1db8499c6 100644 --- a/trio/_subprocess_platform/__init__.py +++ b/trio/_subprocess_platform/__init__.py @@ -67,15 +67,15 @@ def create_pipe_from_child_output() -> Tuple[ReceiveStream, int]: try: if os.name == "posix": - from .._unix_pipes import PipeSendStream, PipeReceiveStream + from ..hazmat import FdStream def create_pipe_to_child_stdin(): # noqa: F811 rfd, wfd = os.pipe() - return PipeSendStream(wfd), rfd + return FdStream(wfd), rfd def create_pipe_from_child_output(): # noqa: F811 rfd, wfd = os.pipe() - return PipeReceiveStream(rfd), wfd + return FdStream(rfd), wfd elif os.name == "nt": from .._windows_pipes import PipeSendStream, PipeReceiveStream diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index 7cd205e93f..b212b2a961 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -1,12 +1,16 @@ -import fcntl import os import errno -from ._abc import SendStream, ReceiveStream +from ._abc import Stream from ._util import ConflictDetector import trio +if os.name != "posix": + # We raise an error here rather than gating the import in hazmat.py + # in order to keep jedi static analysis happy. + raise ImportError + class _FdHolder: # This class holds onto a raw file descriptor, in non-blocking mode, and @@ -33,9 +37,9 @@ def __init__(self, fd: int): if not isinstance(fd, int): raise TypeError("file descriptor must be an int") self.fd = fd - # Flip the fd to non-blocking mode - flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) - fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + # Store original state, and ensure non-blocking mode is enabled + self._original_is_blocking = os.get_blocking(fd) + os.set_blocking(fd, False) @property def closed(self): @@ -53,6 +57,7 @@ def _raw_close(self): return fd = self.fd self.fd = -1 + os.set_blocking(fd, self._original_is_blocking) os.close(fd) def __del__(self): @@ -65,21 +70,53 @@ async def aclose(self): await trio.hazmat.checkpoint() -class PipeSendStream(SendStream): - """Represents a send stream over an os.pipe object.""" +class FdStream(Stream): + """ + Represents a stream given the file descriptor to a pipe, TTY, etc. + + *fd* must refer to a file that is open for reading and/or writing and + supports non-blocking I/O (pipes and TTYs will work, on-disk files probably + not). The returned stream takes ownership of the fd, so closing the stream + will close the fd too. As with `os.fdopen`, you should not directly use + an fd after you have wrapped it in a stream using this function. + + To be used as a Trio stream, an open file must be placed in non-blocking + mode. Unfortunately, this impacts all I/O that goes through the + underlying open file, including I/O that uses a different + file descriptor than the one that was passed to Trio. If other threads + or processes are using file descriptors that are related through `os.dup` + or inheritance across `os.fork` to the one that Trio is using, they are + unlikely to be prepared to have non-blocking I/O semantics suddenly + thrust upon them. For example, you can use ``FdStream(os.dup(0))`` to + obtain a stream for reading from standard input, but it is only safe to + do so with heavy caveats: your stdin must not be shared by any other + processes and you must not make any calls to synchronous methods of + `sys.stdin` until the stream returned by `FdStream` is closed. See + `issue #174 `__ for a + discussion of the challenges involved in relaxing this restriction. + + Args: + fd (int): The fd to be wrapped. + + Returns: + A new `FdStream` object. + """ def __init__(self, fd: int): self._fd_holder = _FdHolder(fd) - self._conflict_detector = ConflictDetector( - "another task is using this pipe" + self._send_conflict_detector = ConflictDetector( + "another task is using this stream for send" + ) + self._receive_conflict_detector = ConflictDetector( + "another task is using this stream for receive" ) async def send_all(self, data: bytes): - with self._conflict_detector: + with self._send_conflict_detector: # have to check up front, because send_all(b"") on a closed pipe # should raise if self._fd_holder.closed: - raise trio.ClosedResourceError("this pipe was already closed") + raise trio.ClosedResourceError("file was already closed") await trio.hazmat.checkpoint() length = len(data) # adapted from the SocketStream code @@ -94,15 +131,15 @@ async def send_all(self, data: bytes): except OSError as e: if e.errno == errno.EBADF: raise trio.ClosedResourceError( - "this pipe was closed" + "file was already closed" ) from None else: raise trio.BrokenResourceError from e async def wait_send_all_might_not_block(self) -> None: - with self._conflict_detector: + with self._send_conflict_detector: if self._fd_holder.closed: - raise trio.ClosedResourceError("this pipe was already closed") + raise trio.ClosedResourceError("file was already closed") try: await trio.hazmat.wait_writable(self._fd_holder.fd) except BrokenPipeError as e: @@ -110,24 +147,8 @@ async def wait_send_all_might_not_block(self) -> None: # of sending, which is annoying raise trio.BrokenResourceError from e - async def aclose(self): - await self._fd_holder.aclose() - - def fileno(self): - return self._fd_holder.fd - - -class PipeReceiveStream(ReceiveStream): - """Represents a receive stream over an os.pipe object.""" - - def __init__(self, fd: int): - self._fd_holder = _FdHolder(fd) - self._conflict_detector = ConflictDetector( - "another task is using this pipe" - ) - async def receive_some(self, max_bytes: int) -> bytes: - with self._conflict_detector: + with self._receive_conflict_detector: if not isinstance(max_bytes, int): raise TypeError("max_bytes must be integer >= 1") @@ -143,7 +164,7 @@ async def receive_some(self, max_bytes: int) -> bytes: except OSError as e: if e.errno == errno.EBADF: raise trio.ClosedResourceError( - "this pipe was closed" + "file was already closed" ) from None else: raise trio.BrokenResourceError from e diff --git a/trio/hazmat.py b/trio/hazmat.py index 69070a7968..5fe32c03d9 100644 --- a/trio/hazmat.py +++ b/trio/hazmat.py @@ -3,6 +3,7 @@ but useful for extending Trio's functionality. """ +import os import sys # This is the union of a subset of trio/_core/ and some things from trio/*.py. @@ -22,6 +23,12 @@ spawn_system_task, wait_readable, wait_writable, notify_closing ) +# Unix-specific symbols +try: + from ._unix_pipes import FdStream +except ImportError: + pass + # Kqueue-specific symbols try: from ._core import ( diff --git a/trio/tests/test_unix_pipes.py b/trio/tests/test_unix_pipes.py index 7728a57fae..46f28d5471 100644 --- a/trio/tests/test_unix_pipes.py +++ b/trio/tests/test_unix_pipes.py @@ -11,14 +11,17 @@ posix = os.name == "posix" pytestmark = pytest.mark.skipif(not posix, reason="posix only") if posix: - from .._unix_pipes import PipeSendStream, PipeReceiveStream + from .._unix_pipes import FdStream +else: + with pytest.raises(ImportError): + from .._unix_pipes import FdStream # Have to use quoted types so import doesn't crash on windows -async def make_pipe() -> "Tuple[PipeSendStream, PipeReceiveStream]": +async def make_pipe() -> "Tuple[FdStream, FdStream]": """Makes a new pair of pipes.""" (r, w) = os.pipe() - return PipeSendStream(w), PipeReceiveStream(r) + return FdStream(w), FdStream(r) async def make_clogged_pipe(): @@ -49,7 +52,7 @@ async def make_clogged_pipe(): async def test_send_pipe(): r, w = os.pipe() - async with PipeSendStream(w) as send: + async with FdStream(w) as send: assert send.fileno() == w await send.send_all(b"123") assert (os.read(r, 8)) == b"123" @@ -59,7 +62,7 @@ async def test_send_pipe(): async def test_receive_pipe(): r, w = os.pipe() - async with PipeReceiveStream(r) as recv: + async with FdStream(r) as recv: assert (recv.fileno()) == r os.write(w, b"123") assert (await recv.receive_some(8)) == b"123" @@ -93,10 +96,10 @@ async def reader(): async def test_pipe_errors(): with pytest.raises(TypeError): - PipeReceiveStream(None) + FdStream(None) with pytest.raises(ValueError): - await PipeReceiveStream(0).receive_some(0) + await FdStream(0).receive_some(0) async def test_del(): @@ -146,7 +149,7 @@ async def test_misdirected_aclose_regression(): if r2_fd != old_r_fd: # pragma: no cover os.dup2(r2_fd, old_r_fd) os.close(r2_fd) - async with PipeReceiveStream(old_r_fd) as r2: + async with FdStream(old_r_fd) as r2: assert r2.fileno() == old_r_fd # And now set up a background task that's working on the new receive