Skip to content
20 changes: 20 additions & 0 deletions docs/source/reference-hazmat.rst
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,26 @@ All environments provide the following functions:
yourself afterwards.


Unix-specific API
-----------------

`FdStream` supports wrapping Unix files (such as a pipe or TTY) as
a stream.

If you have two different file descriptors for sending and receiving,
and want to bundle them together into a single bidirectional
`~trio.abc.Stream`, then use `trio.StapledStream`::

bidirectional_stream = trio.StapledStream(
trio.hazmat.FdStream(write_fd),
trio.hazmat.FdStream(read_fd)
)

.. autoclass:: FdStream
:show-inheritance:
:members:


Kqueue-specific API
-------------------

Expand Down
1 change: 1 addition & 0 deletions newsfragments/829.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `trio.hazmat.FdStream` for wrapping a Unix file descriptor as a `~trio.abc.Stream`.
14 changes: 6 additions & 8 deletions trio/_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import os
import select
import subprocess
from functools import partial
from typing import Optional

from ._abc import AsyncResource
from ._abc import AsyncResource, SendStream, ReceiveStream
from ._highlevel_generic import StapledStream
from ._sync import Lock
from ._subprocess_platform import (
Expand Down Expand Up @@ -101,9 +100,10 @@ def _init(
.format(key)
)

self.stdin = None
self.stdout = None
self.stderr = None
self.stdin = None # type: Optional[SendStream]
self.stdout = None # type: Optional[ReceiveStream]
self.stderr = None # type: Optional[ReceiveStream]
self.stdio = None # type: Optional[StapledStream]

if os.name == "posix":
if isinstance(command, str) and not options.get("shell"):
Expand Down Expand Up @@ -153,8 +153,6 @@ def _init(

if self.stdin is not None and self.stdout is not None:
self.stdio = StapledStream(self.stdin, self.stdout)
else:
self.stdio = None

self.args = self._proc.args
self.pid = self._proc.pid
Expand Down
6 changes: 3 additions & 3 deletions trio/_subprocess_platform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ def create_pipe_from_child_output() -> Tuple[ReceiveStream, int]:

try:
if os.name == "posix":
from .._unix_pipes import PipeSendStream, PipeReceiveStream
from ..hazmat import FdStream

def create_pipe_to_child_stdin(): # noqa: F811
rfd, wfd = os.pipe()
return PipeSendStream(wfd), rfd
return FdStream(wfd), rfd

def create_pipe_from_child_output(): # noqa: F811
rfd, wfd = os.pipe()
return PipeReceiveStream(rfd), wfd
return FdStream(rfd), wfd

elif os.name == "nt":
from .._windows_pipes import PipeSendStream, PipeReceiveStream
Expand Down
85 changes: 53 additions & 32 deletions trio/_unix_pipes.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import fcntl
import os
import errno

from ._abc import SendStream, ReceiveStream
from ._abc import Stream
from ._util import ConflictDetector

import trio

if os.name != "posix":
# We raise an error here rather than gating the import in hazmat.py
# in order to keep jedi static analysis happy.
raise ImportError


class _FdHolder:
# This class holds onto a raw file descriptor, in non-blocking mode, and
Expand All @@ -33,9 +37,9 @@ def __init__(self, fd: int):
if not isinstance(fd, int):
raise TypeError("file descriptor must be an int")
self.fd = fd
# Flip the fd to non-blocking mode
flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Store original state, and ensure non-blocking mode is enabled
self._original_is_blocking = os.get_blocking(fd)
os.set_blocking(fd, False)

@property
def closed(self):
Expand All @@ -53,6 +57,7 @@ def _raw_close(self):
return
fd = self.fd
self.fd = -1
os.set_blocking(fd, self._original_is_blocking)
os.close(fd)

def __del__(self):
Expand All @@ -65,21 +70,53 @@ async def aclose(self):
await trio.hazmat.checkpoint()


class PipeSendStream(SendStream):
"""Represents a send stream over an os.pipe object."""
class FdStream(Stream):
"""
Represents a stream given the file descriptor to a pipe, TTY, etc.

*fd* must refer to a file that is open for reading and/or writing and
supports non-blocking I/O (pipes and TTYs will work, on-disk files probably
not). The returned stream takes ownership of the fd, so closing the stream
will close the fd too. As with `os.fdopen`, you should not directly use
an fd after you have wrapped it in a stream using this function.

To be used as a Trio stream, an open file must be placed in non-blocking
mode. Unfortunately, this impacts all I/O that goes through the
underlying open file, including I/O that uses a different
file descriptor than the one that was passed to Trio. If other threads
or processes are using file descriptors that are related through `os.dup`
or inheritance across `os.fork` to the one that Trio is using, they are
unlikely to be prepared to have non-blocking I/O semantics suddenly
thrust upon them. For example, you can use ``FdStream(os.dup(0))`` to
obtain a stream for reading from standard input, but it is only safe to
do so with heavy caveats: your stdin must not be shared by any other
processes and you must not make any calls to synchronous methods of
`sys.stdin` until the stream returned by `FdStream` is closed. See
`issue #174 <https://github.com/python-trio/trio/issues/174>`__ for a
discussion of the challenges involved in relaxing this restriction.

Args:
fd (int): The fd to be wrapped.

Returns:
A new `FdStream` object.
"""

def __init__(self, fd: int):
self._fd_holder = _FdHolder(fd)
self._conflict_detector = ConflictDetector(
"another task is using this pipe"
self._send_conflict_detector = ConflictDetector(
"another task is using this stream for send"
)
self._receive_conflict_detector = ConflictDetector(
"another task is using this stream for receive"
)

async def send_all(self, data: bytes):
with self._conflict_detector:
with self._send_conflict_detector:
# have to check up front, because send_all(b"") on a closed pipe
# should raise
if self._fd_holder.closed:
raise trio.ClosedResourceError("this pipe was already closed")
raise trio.ClosedResourceError("file was already closed")
await trio.hazmat.checkpoint()
length = len(data)
# adapted from the SocketStream code
Expand All @@ -94,40 +131,24 @@ async def send_all(self, data: bytes):
except OSError as e:
if e.errno == errno.EBADF:
raise trio.ClosedResourceError(
"this pipe was closed"
"file was already closed"
) from None
else:
raise trio.BrokenResourceError from e

async def wait_send_all_might_not_block(self) -> None:
with self._conflict_detector:
with self._send_conflict_detector:
if self._fd_holder.closed:
raise trio.ClosedResourceError("this pipe was already closed")
raise trio.ClosedResourceError("file was already closed")
try:
await trio.hazmat.wait_writable(self._fd_holder.fd)
except BrokenPipeError as e:
# kqueue: raises EPIPE on wait_writable instead
# of sending, which is annoying
raise trio.BrokenResourceError from e

async def aclose(self):
await self._fd_holder.aclose()

def fileno(self):
return self._fd_holder.fd


class PipeReceiveStream(ReceiveStream):
"""Represents a receive stream over an os.pipe object."""

def __init__(self, fd: int):
self._fd_holder = _FdHolder(fd)
self._conflict_detector = ConflictDetector(
"another task is using this pipe"
)

async def receive_some(self, max_bytes: int) -> bytes:
with self._conflict_detector:
with self._receive_conflict_detector:
if not isinstance(max_bytes, int):
raise TypeError("max_bytes must be integer >= 1")

Expand All @@ -143,7 +164,7 @@ async def receive_some(self, max_bytes: int) -> bytes:
except OSError as e:
if e.errno == errno.EBADF:
raise trio.ClosedResourceError(
"this pipe was closed"
"file was already closed"
) from None
else:
raise trio.BrokenResourceError from e
Expand Down
7 changes: 7 additions & 0 deletions trio/hazmat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
but useful for extending Trio's functionality.
"""

import os
import sys

# This is the union of a subset of trio/_core/ and some things from trio/*.py.
Expand All @@ -22,6 +23,12 @@
spawn_system_task, wait_readable, wait_writable, notify_closing
)

# Unix-specific symbols
try:
from ._unix_pipes import FdStream
except ImportError:
pass

# Kqueue-specific symbols
try:
from ._core import (
Expand Down
19 changes: 11 additions & 8 deletions trio/tests/test_unix_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@
posix = os.name == "posix"
pytestmark = pytest.mark.skipif(not posix, reason="posix only")
if posix:
from .._unix_pipes import PipeSendStream, PipeReceiveStream
from .._unix_pipes import FdStream
else:
with pytest.raises(ImportError):
from .._unix_pipes import FdStream


# Have to use quoted types so import doesn't crash on windows
async def make_pipe() -> "Tuple[PipeSendStream, PipeReceiveStream]":
async def make_pipe() -> "Tuple[FdStream, FdStream]":
"""Makes a new pair of pipes."""
(r, w) = os.pipe()
return PipeSendStream(w), PipeReceiveStream(r)
return FdStream(w), FdStream(r)


async def make_clogged_pipe():
Expand Down Expand Up @@ -49,7 +52,7 @@ async def make_clogged_pipe():

async def test_send_pipe():
r, w = os.pipe()
async with PipeSendStream(w) as send:
async with FdStream(w) as send:
assert send.fileno() == w
await send.send_all(b"123")
assert (os.read(r, 8)) == b"123"
Expand All @@ -59,7 +62,7 @@ async def test_send_pipe():

async def test_receive_pipe():
r, w = os.pipe()
async with PipeReceiveStream(r) as recv:
async with FdStream(r) as recv:
assert (recv.fileno()) == r
os.write(w, b"123")
assert (await recv.receive_some(8)) == b"123"
Expand Down Expand Up @@ -93,10 +96,10 @@ async def reader():

async def test_pipe_errors():
with pytest.raises(TypeError):
PipeReceiveStream(None)
FdStream(None)

with pytest.raises(ValueError):
await PipeReceiveStream(0).receive_some(0)
await FdStream(0).receive_some(0)


async def test_del():
Expand Down Expand Up @@ -146,7 +149,7 @@ async def test_misdirected_aclose_regression():
if r2_fd != old_r_fd: # pragma: no cover
os.dup2(r2_fd, old_r_fd)
os.close(r2_fd)
async with PipeReceiveStream(old_r_fd) as r2:
async with FdStream(old_r_fd) as r2:
assert r2.fileno() == old_r_fd

# And now set up a background task that's working on the new receive
Expand Down