diff --git a/trio/_subprocess/__init__.py b/trio/_subprocess/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py new file mode 100644 index 0000000000..f62fa8f590 --- /dev/null +++ b/trio/_subprocess/unix_pipes.py @@ -0,0 +1,120 @@ +import fcntl +import os +from typing import Tuple + +from .. import _core, BrokenStreamError +from .._abc import SendStream, ReceiveStream + +__all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"] + + +class _PipeMixin: + def __init__(self, pipefd: int): + if not isinstance(pipefd, int): + raise TypeError( + "{0.__class__.__name__} needs a pipe fd".format(self) + ) + + self._pipe = pipefd + self._closed = False + + flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL) + fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + def _close(self): + if self._closed: + return + + self._closed = True + os.close(self._pipe) + + async def aclose(self): + # XX: This would be in _close, but this can only be used from an + # async context. + _core.notify_fd_close(self._pipe) + self._close() + await _core.checkpoint() + + def fileno(self) -> int: + """Gets the file descriptor for this pipe.""" + return self._pipe + + def __del__(self): + self._close() + + +class PipeSendStream(_PipeMixin, SendStream): + """Represents a send stream over an os.pipe object.""" + + async def send_all(self, data: bytes): + # we have to do this no matter what + await _core.checkpoint() + if self._closed: + raise _core.ClosedResourceError("this pipe is already closed") + + if not data: + return + + length = len(data) + # adapted from the SocketStream code + with memoryview(data) as view: + total_sent = 0 + while total_sent < length: + with view[total_sent:] as remaining: + try: + total_sent += os.write(self._pipe, remaining) + except BrokenPipeError as e: + await _core.checkpoint() + raise BrokenStreamError from e + except BlockingIOError: + await self.wait_send_all_might_not_block() + + async def wait_send_all_might_not_block(self) -> None: + if self._closed: + await _core.checkpoint() + raise _core.ClosedResourceError("This pipe is already closed") + + try: + await _core.wait_writable(self._pipe) + except BrokenPipeError as e: + # kqueue: raises EPIPE on wait_writable instead + # of sending, which is annoying + # also doesn't checkpoint so we have to do that + # ourselves here too + await _core.checkpoint() + raise BrokenStreamError from e + + +class PipeReceiveStream(_PipeMixin, ReceiveStream): + """Represents a receive stream over an os.pipe object.""" + + async def receive_some(self, max_bytes: int) -> bytes: + if self._closed: + await _core.checkpoint() + raise _core.ClosedResourceError("this pipe is already closed") + + if not isinstance(max_bytes, int): + await _core.checkpoint() + raise TypeError("max_bytes must be integer >= 1") + + if max_bytes < 1: + await _core.checkpoint() + raise ValueError("max_bytes must be integer >= 1") + + while True: + try: + await _core.checkpoint_if_cancelled() + data = os.read(self._pipe, max_bytes) + except BlockingIOError: + await _core.wait_readable(self._pipe) + else: + await _core.cancel_shielded_checkpoint() + break + + return data + + +async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]: + """Makes a new pair of pipes.""" + (r, w) = os.pipe() + return PipeSendStream(w), PipeReceiveStream(r) diff --git a/trio/tests/subprocess/__init__.py b/trio/tests/subprocess/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trio/tests/subprocess/test_unix_pipes.py b/trio/tests/subprocess/test_unix_pipes.py new file mode 100644 index 0000000000..b8cc2d496a --- /dev/null +++ b/trio/tests/subprocess/test_unix_pipes.py @@ -0,0 +1,148 @@ +import errno +import select + +import os +import pytest + +from trio._core.tests.tutil import gc_collect_harder +from ... import _core +from ...testing import (wait_all_tasks_blocked, check_one_way_stream) + +posix = os.name == "posix" + +pytestmark = pytest.mark.skipif( + not posix, reason="pipes are only supported on posix" +) + +if posix: + from ..._subprocess.unix_pipes import ( + PipeSendStream, PipeReceiveStream, make_pipe + ) + + +async def test_send_pipe(): + r, w = os.pipe() + send = PipeSendStream(w) + assert send.fileno() == w + await send.send_all(b"123") + assert (os.read(r, 8)) == b"123" + + os.close(r) + os.close(w) + send._closed = True + + +async def test_receive_pipe(): + r, w = os.pipe() + recv = PipeReceiveStream(r) + assert (recv.fileno()) == r + os.write(w, b"123") + assert (await recv.receive_some(8)) == b"123" + + os.close(r) + os.close(w) + recv._closed = True + + +async def test_pipes_combined(): + write, read = await make_pipe() + count = 2**20 + + async def sender(): + big = bytearray(count) + await write.send_all(big) + + async def reader(): + await wait_all_tasks_blocked() + received = 0 + while received < count: + received += len(await read.receive_some(4096)) + + assert received == count + + async with _core.open_nursery() as n: + n.start_soon(sender) + n.start_soon(reader) + + await read.aclose() + await write.aclose() + + +async def test_send_on_closed_pipe(): + write, read = await make_pipe() + await write.aclose() + + with pytest.raises(_core.ClosedResourceError): + await write.send_all(b"123") + + await read.aclose() + + +async def test_pipe_errors(): + with pytest.raises(TypeError): + PipeReceiveStream(None) + + with pytest.raises(ValueError): + await PipeReceiveStream(0).receive_some(0) + + +async def test_del(): + w, r = await make_pipe() + f1, f2 = w.fileno(), r.fileno() + del w, r + gc_collect_harder() + + with pytest.raises(OSError) as excinfo: + os.close(f1) + assert excinfo.value.errno == errno.EBADF + + with pytest.raises(OSError) as excinfo: + os.close(f2) + assert excinfo.value.errno == errno.EBADF + + +async def test_async_with(): + w, r = await make_pipe() + async with w, r: + pass + + assert w._closed + assert r._closed + + with pytest.raises(OSError) as excinfo: + os.close(w.fileno()) + assert excinfo.value.errno == errno.EBADF + + with pytest.raises(OSError) as excinfo: + os.close(r.fileno()) + assert excinfo.value.errno == errno.EBADF + + +async def make_clogged_pipe(): + s, r = await make_pipe() + try: + while True: + # We want to totally fill up the pipe buffer. + # This requires working around a weird feature that POSIX pipes + # have. + # If you do a write of <= PIPE_BUF bytes, then it's guaranteed + # to either complete entirely, or not at all. So if we tried to + # write PIPE_BUF bytes, and the buffer's free space is only + # PIPE_BUF/2, then the write will raise BlockingIOError... even + # though a smaller write could still succeed! To avoid this, + # make sure to write >PIPE_BUF bytes each time, which disables + # the special behavior. + # For details, search for PIPE_BUF here: + # http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html + + # for the getattr: + # https://bitbucket.org/pypy/pypy/issues/2876/selectpipe_buf-is-missing-on-pypy3 + buf_size = getattr(select, "PIPE_BUF", 8192) + os.write(s.fileno(), b"x" * buf_size * 2) + except BlockingIOError: + pass + return s, r + + +async def test_pipe_fully(): + await check_one_way_stream(make_pipe, make_clogged_pipe)