From 51e9c6db3cb9c370e38989ba2fea75d0857752d5 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Wed, 22 Aug 2018 19:06:06 +0100 Subject: [PATCH 01/15] Add support for pipes. --- newsfragments/621.feature.rst | 1 + trio/__init__.py | 3 ++ trio/_highlevel_pipes.py | 63 +++++++++++++++++++++++++ trio/tests/test_highlevel_pipes.py | 74 ++++++++++++++++++++++++++++++ 4 files changed, 141 insertions(+) create mode 100644 newsfragments/621.feature.rst create mode 100644 trio/_highlevel_pipes.py create mode 100644 trio/tests/test_highlevel_pipes.py diff --git a/newsfragments/621.feature.rst b/newsfragments/621.feature.rst new file mode 100644 index 0000000000..4badc1d031 --- /dev/null +++ b/newsfragments/621.feature.rst @@ -0,0 +1 @@ +Add support for pipes (os.pipe). \ No newline at end of file diff --git a/trio/__init__.py b/trio/__init__.py index 5f84226570..22e427fc9a 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -56,6 +56,9 @@ from ._highlevel_open_unix_stream import * __all__ += _highlevel_open_unix_stream.__all__ +from ._highlevel_pipes import * +__all__ += _highlevel_pipes.__all__ + from ._highlevel_ssl_helpers import * __all__ += _highlevel_ssl_helpers.__all__ diff --git a/trio/_highlevel_pipes.py b/trio/_highlevel_pipes.py new file mode 100644 index 0000000000..2e6c49831a --- /dev/null +++ b/trio/_highlevel_pipes.py @@ -0,0 +1,63 @@ +import os +from typing import Tuple + +from . import _core +from ._abc import SendStream, ReceiveStream +from ._highlevel_socket import _translate_socket_errors_to_stream_errors + +__all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"] + + +class _PipeMixin: + def __init__(self, pipefd: int): + if not isinstance(pipefd, int): + raise TypeError("PipeSendStream needs a pipe fd") + + self._pipe = pipefd + + async def aclose(self): + os.close(self._pipe) + + def fileno(self) -> int: + """Gets the file descriptor for this pipe.""" + return self._pipe + + +class PipeSendStream(_PipeMixin, SendStream): + """Represents a send stream over an os.pipe object.""" + + async def send_all(self, data: bytes): + length = len(data) + # adapted from the SocketStream code + with memoryview(data) as view, \ + _translate_socket_errors_to_stream_errors(): + total_sent = 0 + while total_sent < length: + with view[total_sent:] as remaining: + total_sent += os.write(self._pipe, remaining) + + await self.wait_send_all_might_not_block() + + async def wait_send_all_might_not_block(self) -> None: + await _core.wait_socket_writable(self._pipe) + + +class PipeReceiveStream(_PipeMixin, ReceiveStream): + """Represents a receive stream over an os.pipe object.""" + + async def receive_some(self, max_bytes: int) -> bytes: + if max_bytes < 1: + await _core.checkpoint() + raise ValueError("max_bytes must be >= 1") + + with _translate_socket_errors_to_stream_errors(): + await _core.wait_socket_readable(self._pipe) + data = os.read(self._pipe, max_bytes) + + return data + + +def make_pipe() -> Tuple[PipeReceiveStream, PipeSendStream]: + """Makes a new pair of pipes.""" + (r, w) = os.pipe2(os.O_NONBLOCK) + return PipeReceiveStream(r), PipeSendStream(w) diff --git a/trio/tests/test_highlevel_pipes.py b/trio/tests/test_highlevel_pipes.py new file mode 100644 index 0000000000..2520fe15c1 --- /dev/null +++ b/trio/tests/test_highlevel_pipes.py @@ -0,0 +1,74 @@ +import os +import pytest + +from .. import _core +from .._highlevel_pipes import PipeSendStream, PipeReceiveStream, make_pipe +from ..testing import wait_all_tasks_blocked + +pytestmark = pytest.mark.skipif( + not hasattr(os, "pipe2"), reason="pipes require os.pipe2()" +) + + +async def test_send_pipe(): + r, w = os.pipe2(os.O_NONBLOCK) + send = PipeSendStream(w) + assert send.fileno() == w + await send.send_all(b"123") + assert (os.read(r, 8)) == b"123" + + os.close(r) + os.close(w) + + +async def test_receive_pipe(): + r, w = os.pipe2(os.O_NONBLOCK) + recv = PipeReceiveStream(r) + assert (recv.fileno()) == r + os.write(w, b"123") + assert (await recv.receive_some(8)) == b"123" + + os.close(r) + os.close(w) + + +async def test_pipes_combined(): + read, write = make_pipe() + count = 2**20 + + async def sender(): + big = bytearray(count) + await write.send_all(big) + + async def reader(): + await wait_all_tasks_blocked() + received = 0 + while received < count: + received += len(await read.receive_some(4096)) + + assert received == count + + async with _core.open_nursery() as n: + n.start_soon(sender) + n.start_soon(reader) + + await read.aclose() + await write.aclose() + + +async def test_send_on_closed_pipe(): + read, write = make_pipe() + await write.aclose() + + with pytest.raises(_core.ClosedResourceError): + await write.send_all(b"123") + + await read.aclose() + + +async def test_pipe_errors(): + with pytest.raises(TypeError): + PipeReceiveStream(None) + + with pytest.raises(ValueError): + await PipeReceiveStream(0).receive_some(0) From 905158f659a8640bdb2547260a445daa91fdf1e7 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Thu, 23 Aug 2018 16:02:01 +0100 Subject: [PATCH 02/15] Add `__del__` method for GC'd pipes. --- trio/_highlevel_pipes.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/trio/_highlevel_pipes.py b/trio/_highlevel_pipes.py index 2e6c49831a..dec19665e4 100644 --- a/trio/_highlevel_pipes.py +++ b/trio/_highlevel_pipes.py @@ -22,6 +22,13 @@ def fileno(self) -> int: """Gets the file descriptor for this pipe.""" return self._pipe + def __del__(self): + try: + os.close(self._pipe) + except AttributeError: + # probably in interpreter shut down + pass + class PipeSendStream(_PipeMixin, SendStream): """Represents a send stream over an os.pipe object.""" From 4f4910fbab35e73612cf582a3c8a9b93ace2267d Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Fri, 24 Aug 2018 14:38:32 +0100 Subject: [PATCH 03/15] Change a lotta stuff --- trio/__init__.py | 3 - trio/_highlevel_pipes.py | 70 ----------- trio/_subprocess/__init__.py | 0 trio/_subprocess/tests/__init__.py | 0 trio/_subprocess/tests/conftest.py | 25 ++++ .../tests/test_highlevel_pipes.py | 16 ++- trio/_subprocess/unix_pipes.py | 114 ++++++++++++++++++ 7 files changed, 150 insertions(+), 78 deletions(-) delete mode 100644 trio/_highlevel_pipes.py create mode 100644 trio/_subprocess/__init__.py create mode 100644 trio/_subprocess/tests/__init__.py create mode 100644 trio/_subprocess/tests/conftest.py rename trio/{ => _subprocess}/tests/test_highlevel_pipes.py (81%) create mode 100644 trio/_subprocess/unix_pipes.py diff --git a/trio/__init__.py b/trio/__init__.py index 22e427fc9a..5f84226570 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -56,9 +56,6 @@ from ._highlevel_open_unix_stream import * __all__ += _highlevel_open_unix_stream.__all__ -from ._highlevel_pipes import * -__all__ += _highlevel_pipes.__all__ - from ._highlevel_ssl_helpers import * __all__ += _highlevel_ssl_helpers.__all__ diff --git a/trio/_highlevel_pipes.py b/trio/_highlevel_pipes.py deleted file mode 100644 index dec19665e4..0000000000 --- a/trio/_highlevel_pipes.py +++ /dev/null @@ -1,70 +0,0 @@ -import os -from typing import Tuple - -from . import _core -from ._abc import SendStream, ReceiveStream -from ._highlevel_socket import _translate_socket_errors_to_stream_errors - -__all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"] - - -class _PipeMixin: - def __init__(self, pipefd: int): - if not isinstance(pipefd, int): - raise TypeError("PipeSendStream needs a pipe fd") - - self._pipe = pipefd - - async def aclose(self): - os.close(self._pipe) - - def fileno(self) -> int: - """Gets the file descriptor for this pipe.""" - return self._pipe - - def __del__(self): - try: - os.close(self._pipe) - except AttributeError: - # probably in interpreter shut down - pass - - -class PipeSendStream(_PipeMixin, SendStream): - """Represents a send stream over an os.pipe object.""" - - async def send_all(self, data: bytes): - length = len(data) - # adapted from the SocketStream code - with memoryview(data) as view, \ - _translate_socket_errors_to_stream_errors(): - total_sent = 0 - while total_sent < length: - with view[total_sent:] as remaining: - total_sent += os.write(self._pipe, remaining) - - await self.wait_send_all_might_not_block() - - async def wait_send_all_might_not_block(self) -> None: - await _core.wait_socket_writable(self._pipe) - - -class PipeReceiveStream(_PipeMixin, ReceiveStream): - """Represents a receive stream over an os.pipe object.""" - - async def receive_some(self, max_bytes: int) -> bytes: - if max_bytes < 1: - await _core.checkpoint() - raise ValueError("max_bytes must be >= 1") - - with _translate_socket_errors_to_stream_errors(): - await _core.wait_socket_readable(self._pipe) - data = os.read(self._pipe, max_bytes) - - return data - - -def make_pipe() -> Tuple[PipeReceiveStream, PipeSendStream]: - """Makes a new pair of pipes.""" - (r, w) = os.pipe2(os.O_NONBLOCK) - return PipeReceiveStream(r), PipeSendStream(w) diff --git a/trio/_subprocess/__init__.py b/trio/_subprocess/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trio/_subprocess/tests/__init__.py b/trio/_subprocess/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trio/_subprocess/tests/conftest.py b/trio/_subprocess/tests/conftest.py new file mode 100644 index 0000000000..a7f4d612d5 --- /dev/null +++ b/trio/_subprocess/tests/conftest.py @@ -0,0 +1,25 @@ +import pytest +import inspect + +# XX this should move into a global something +from ...testing import MockClock, trio_test + + +@pytest.fixture +def mock_clock(): + return MockClock() + + +@pytest.fixture +def autojump_clock(): + return MockClock(autojump_threshold=0) + + +# FIXME: split off into a package (or just make part of trio's public +# interface?), with config file to enable? and I guess a mark option too; I +# guess it's useful with the class- and file-level marking machinery (where +# the raw @trio_test decorator isn't enough). +@pytest.hookimpl(tryfirst=True) +def pytest_pyfunc_call(pyfuncitem): + if inspect.iscoroutinefunction(pyfuncitem.obj): + pyfuncitem.obj = trio_test(pyfuncitem.obj) \ No newline at end of file diff --git a/trio/tests/test_highlevel_pipes.py b/trio/_subprocess/tests/test_highlevel_pipes.py similarity index 81% rename from trio/tests/test_highlevel_pipes.py rename to trio/_subprocess/tests/test_highlevel_pipes.py index 2520fe15c1..dbc060c8c6 100644 --- a/trio/tests/test_highlevel_pipes.py +++ b/trio/_subprocess/tests/test_highlevel_pipes.py @@ -1,9 +1,11 @@ import os import pytest -from .. import _core -from .._highlevel_pipes import PipeSendStream, PipeReceiveStream, make_pipe -from ..testing import wait_all_tasks_blocked +from ... import _core +from ..unix_pipes import PipeSendStream, PipeReceiveStream, make_pipe +from ...testing import ( + wait_all_tasks_blocked, check_one_way_stream +) pytestmark = pytest.mark.skipif( not hasattr(os, "pipe2"), reason="pipes require os.pipe2()" @@ -33,7 +35,7 @@ async def test_receive_pipe(): async def test_pipes_combined(): - read, write = make_pipe() + write, read = await make_pipe() count = 2**20 async def sender(): @@ -57,7 +59,7 @@ async def reader(): async def test_send_on_closed_pipe(): - read, write = make_pipe() + write, read = await make_pipe() await write.aclose() with pytest.raises(_core.ClosedResourceError): @@ -72,3 +74,7 @@ async def test_pipe_errors(): with pytest.raises(ValueError): await PipeReceiveStream(0).receive_some(0) + + +#async def test_pipe_fully(): +# await check_one_way_stream(make_pipe, None) diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py new file mode 100644 index 0000000000..32ccada279 --- /dev/null +++ b/trio/_subprocess/unix_pipes.py @@ -0,0 +1,114 @@ +import os +from typing import Tuple + +from .. import _core, BrokenStreamError +from .._abc import SendStream, ReceiveStream + +__all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"] + + +class _PipeMixin: + def __init__(self, pipefd: int): + if not isinstance(pipefd, int): + raise TypeError( + "{0.__class__.__name__} needs a pipe fd".format(self) + ) + + self._pipe = pipefd + self._closed = False + + async def aclose(self): + if self._closed: + return + + os.close(self._pipe) + _core.notify_fd_close(self._pipe) + self._closed = True + await _core.checkpoint() + + def fileno(self) -> int: + """Gets the file descriptor for this pipe.""" + return self._pipe + + def __del__(self): + if self._closed: + return + + try: + os.close(self._pipe) + except AttributeError: + # probably in interpreter shut down + pass + except OSError as e: + # already closed from somewhere else + if e.errno != 9: + raise e from None + + +class PipeSendStream(_PipeMixin, SendStream): + """Represents a send stream over an os.pipe object.""" + + async def send_all(self, data: bytes): + if self._closed: + await _core.checkpoint() + raise _core.ClosedResourceError("this pipe is already closed") + + if not data: + await _core.checkpoint() + return + + length = len(data) + # adapted from the SocketStream code + with memoryview(data) as view: + total_sent = 0 + while total_sent < length: + with view[total_sent:] as remaining: + try: + total_sent += os.write(self._pipe, remaining) + except BrokenPipeError as e: + await _core.checkpoint() + raise BrokenStreamError from e + except BlockingIOError: + pass + + await self.wait_send_all_might_not_block() + + async def wait_send_all_might_not_block(self) -> None: + if self._closed: + await _core.checkpoint() + raise _core.ClosedResourceError("This pipe is already closed") + + await _core.wait_writable(self._pipe) + + +class PipeReceiveStream(_PipeMixin, ReceiveStream): + """Represents a receive stream over an os.pipe object.""" + + async def receive_some(self, max_bytes: int) -> bytes: + if self._closed: + await _core.checkpoint() + raise _core.ClosedResourceError("this pipe is already closed") + + if not isinstance(max_bytes, int): + await _core.checkpoint() + raise TypeError("max_bytes must be integer >= 1") + + if max_bytes < 1: + await _core.checkpoint() + raise ValueError("max_bytes must be integer >= 1") + + while True: + try: + data = os.read(self._pipe, max_bytes) + except BlockingIOError: + await _core.wait_readable(self._pipe) + else: + break + + return data + + +async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]: + """Makes a new pair of pipes.""" + (r, w) = os.pipe2(os.O_NONBLOCK) + return PipeSendStream(w), PipeReceiveStream(r) From b0354d1f3e2c639f613144b15fabd29fc2f222c7 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Fri, 24 Aug 2018 14:41:52 +0100 Subject: [PATCH 04/15] Pass blocking pipes by default, and fcntl them to non-block. --- trio/_subprocess/tests/test_highlevel_pipes.py | 4 +--- trio/_subprocess/unix_pipes.py | 9 +++++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/trio/_subprocess/tests/test_highlevel_pipes.py b/trio/_subprocess/tests/test_highlevel_pipes.py index dbc060c8c6..26d9880300 100644 --- a/trio/_subprocess/tests/test_highlevel_pipes.py +++ b/trio/_subprocess/tests/test_highlevel_pipes.py @@ -3,9 +3,7 @@ from ... import _core from ..unix_pipes import PipeSendStream, PipeReceiveStream, make_pipe -from ...testing import ( - wait_all_tasks_blocked, check_one_way_stream -) +from ...testing import (wait_all_tasks_blocked, check_one_way_stream) pytestmark = pytest.mark.skipif( not hasattr(os, "pipe2"), reason="pipes require os.pipe2()" diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py index 32ccada279..d3c50e1132 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_subprocess/unix_pipes.py @@ -8,7 +8,7 @@ class _PipeMixin: - def __init__(self, pipefd: int): + def __init__(self, pipefd: int, *, set_non_blocking: bool = True): if not isinstance(pipefd, int): raise TypeError( "{0.__class__.__name__} needs a pipe fd".format(self) @@ -17,6 +17,11 @@ def __init__(self, pipefd: int): self._pipe = pipefd self._closed = False + if set_non_blocking: + import fcntl + flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL) + fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK) + async def aclose(self): if self._closed: return @@ -110,5 +115,5 @@ async def receive_some(self, max_bytes: int) -> bytes: async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]: """Makes a new pair of pipes.""" - (r, w) = os.pipe2(os.O_NONBLOCK) + (r, w) = os.pipe() return PipeSendStream(w), PipeReceiveStream(r) From b6f0a86a11287a7b9fc5effa91f47a01a8b22977 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sat, 25 Aug 2018 12:18:49 +0100 Subject: [PATCH 05/15] Make requested changes --- newsfragments/621.feature.rst | 1 - trio/_subprocess/tests/conftest.py | 25 ------------- trio/_subprocess/unix_pipes.py | 36 +++++++------------ .../tests => tests/subprocess}/__init__.py | 0 .../subprocess/test_unix_pipes.py} | 10 +++--- 5 files changed, 18 insertions(+), 54 deletions(-) delete mode 100644 newsfragments/621.feature.rst delete mode 100644 trio/_subprocess/tests/conftest.py rename trio/{_subprocess/tests => tests/subprocess}/__init__.py (100%) rename trio/{_subprocess/tests/test_highlevel_pipes.py => tests/subprocess/test_unix_pipes.py} (86%) diff --git a/newsfragments/621.feature.rst b/newsfragments/621.feature.rst deleted file mode 100644 index 4badc1d031..0000000000 --- a/newsfragments/621.feature.rst +++ /dev/null @@ -1 +0,0 @@ -Add support for pipes (os.pipe). \ No newline at end of file diff --git a/trio/_subprocess/tests/conftest.py b/trio/_subprocess/tests/conftest.py deleted file mode 100644 index a7f4d612d5..0000000000 --- a/trio/_subprocess/tests/conftest.py +++ /dev/null @@ -1,25 +0,0 @@ -import pytest -import inspect - -# XX this should move into a global something -from ...testing import MockClock, trio_test - - -@pytest.fixture -def mock_clock(): - return MockClock() - - -@pytest.fixture -def autojump_clock(): - return MockClock(autojump_threshold=0) - - -# FIXME: split off into a package (or just make part of trio's public -# interface?), with config file to enable? and I guess a mark option too; I -# guess it's useful with the class- and file-level marking machinery (where -# the raw @trio_test decorator isn't enough). -@pytest.hookimpl(tryfirst=True) -def pytest_pyfunc_call(pyfuncitem): - if inspect.iscoroutinefunction(pyfuncitem.obj): - pyfuncitem.obj = trio_test(pyfuncitem.obj) \ No newline at end of file diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py index d3c50e1132..04f95cb21c 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_subprocess/unix_pipes.py @@ -8,7 +8,7 @@ class _PipeMixin: - def __init__(self, pipefd: int, *, set_non_blocking: bool = True): + def __init__(self, pipefd: int): if not isinstance(pipefd, int): raise TypeError( "{0.__class__.__name__} needs a pipe fd".format(self) @@ -17,38 +17,22 @@ def __init__(self, pipefd: int, *, set_non_blocking: bool = True): self._pipe = pipefd self._closed = False - if set_non_blocking: - import fcntl - flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL) - fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK) + import fcntl + flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL) + fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK) async def aclose(self): - if self._closed: - return + if not self._closed: + os.close(self._pipe) + _core.notify_fd_close(self._pipe) + self._closed = True - os.close(self._pipe) - _core.notify_fd_close(self._pipe) - self._closed = True await _core.checkpoint() def fileno(self) -> int: """Gets the file descriptor for this pipe.""" return self._pipe - def __del__(self): - if self._closed: - return - - try: - os.close(self._pipe) - except AttributeError: - # probably in interpreter shut down - pass - except OSError as e: - # already closed from somewhere else - if e.errno != 9: - raise e from None - class PipeSendStream(_PipeMixin, SendStream): """Represents a send stream over an os.pipe object.""" @@ -105,9 +89,13 @@ async def receive_some(self, max_bytes: int) -> bytes: while True: try: data = os.read(self._pipe, max_bytes) + if data == b'': + await self.aclose() + return data except BlockingIOError: await _core.wait_readable(self._pipe) else: + await _core.checkpoint() break return data diff --git a/trio/_subprocess/tests/__init__.py b/trio/tests/subprocess/__init__.py similarity index 100% rename from trio/_subprocess/tests/__init__.py rename to trio/tests/subprocess/__init__.py diff --git a/trio/_subprocess/tests/test_highlevel_pipes.py b/trio/tests/subprocess/test_unix_pipes.py similarity index 86% rename from trio/_subprocess/tests/test_highlevel_pipes.py rename to trio/tests/subprocess/test_unix_pipes.py index 26d9880300..d321ea01e7 100644 --- a/trio/_subprocess/tests/test_highlevel_pipes.py +++ b/trio/tests/subprocess/test_unix_pipes.py @@ -2,11 +2,13 @@ import pytest from ... import _core -from ..unix_pipes import PipeSendStream, PipeReceiveStream, make_pipe +from ..._subprocess.unix_pipes import ( + PipeSendStream, PipeReceiveStream, make_pipe +) from ...testing import (wait_all_tasks_blocked, check_one_way_stream) pytestmark = pytest.mark.skipif( - not hasattr(os, "pipe2"), reason="pipes require os.pipe2()" + os.name != "posix", reason="pipes are only supported on posix" ) @@ -74,5 +76,5 @@ async def test_pipe_errors(): await PipeReceiveStream(0).receive_some(0) -#async def test_pipe_fully(): -# await check_one_way_stream(make_pipe, None) +async def test_pipe_fully(): + await check_one_way_stream(make_pipe, None) From 8ff9d4e4656a616aee34dcb7970a2abeaa317aa4 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sat, 25 Aug 2018 12:20:29 +0100 Subject: [PATCH 06/15] Change order of these operations --- trio/_subprocess/unix_pipes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py index 04f95cb21c..3dbe9484e7 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_subprocess/unix_pipes.py @@ -23,9 +23,9 @@ def __init__(self, pipefd: int): async def aclose(self): if not self._closed: - os.close(self._pipe) _core.notify_fd_close(self._pipe) self._closed = True + os.close(self._pipe) await _core.checkpoint() From cf5de2d1332a6cb8e701a36a73e463ee23acb67f Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sat, 25 Aug 2018 12:27:35 +0100 Subject: [PATCH 07/15] Manually make these pipes non-blocking --- trio/tests/subprocess/test_unix_pipes.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/trio/tests/subprocess/test_unix_pipes.py b/trio/tests/subprocess/test_unix_pipes.py index d321ea01e7..e3653f60f3 100644 --- a/trio/tests/subprocess/test_unix_pipes.py +++ b/trio/tests/subprocess/test_unix_pipes.py @@ -12,8 +12,15 @@ ) +def nonblock_pipe(p: int): + import fcntl + flags = fcntl.fcntl(p, fcntl.F_GETFL) + fcntl.fcntl(p, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + async def test_send_pipe(): - r, w = os.pipe2(os.O_NONBLOCK) + r, w = os.pipe() + nonblock_pipe(w) send = PipeSendStream(w) assert send.fileno() == w await send.send_all(b"123") @@ -24,7 +31,8 @@ async def test_send_pipe(): async def test_receive_pipe(): - r, w = os.pipe2(os.O_NONBLOCK) + r, w = os.pipe() + nonblock_pipe(r) recv = PipeReceiveStream(r) assert (recv.fileno()) == r os.write(w, b"123") From f6e1ff18aac2fe4bac68d3d4873e54fd1ebfe1f5 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sun, 26 Aug 2018 01:19:49 +0100 Subject: [PATCH 08/15] Perform more tests on the pipe stream. --- trio/tests/subprocess/test_unix_pipes.py | 36 +++++++++++++++++------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/trio/tests/subprocess/test_unix_pipes.py b/trio/tests/subprocess/test_unix_pipes.py index e3653f60f3..20db872376 100644 --- a/trio/tests/subprocess/test_unix_pipes.py +++ b/trio/tests/subprocess/test_unix_pipes.py @@ -1,3 +1,5 @@ +import select + import os import pytest @@ -12,15 +14,8 @@ ) -def nonblock_pipe(p: int): - import fcntl - flags = fcntl.fcntl(p, fcntl.F_GETFL) - fcntl.fcntl(p, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - async def test_send_pipe(): r, w = os.pipe() - nonblock_pipe(w) send = PipeSendStream(w) assert send.fileno() == w await send.send_all(b"123") @@ -32,7 +27,6 @@ async def test_send_pipe(): async def test_receive_pipe(): r, w = os.pipe() - nonblock_pipe(r) recv = PipeReceiveStream(r) assert (recv.fileno()) == r os.write(w, b"123") @@ -44,7 +38,7 @@ async def test_receive_pipe(): async def test_pipes_combined(): write, read = await make_pipe() - count = 2**20 + count = 2 ** 20 async def sender(): big = bytearray(count) @@ -84,5 +78,27 @@ async def test_pipe_errors(): await PipeReceiveStream(0).receive_some(0) +async def make_clogged_pipe(): + s, r = await make_pipe() + try: + while True: + # We want to totally fill up the pipe buffer. + # This requires working around a weird feature that POSIX pipes + # have. + # If you do a write of <= PIPE_BUF bytes, then it's guaranteed + # to either complete entirely, or not at all. So if we tried to + # write PIPE_BUF bytes, and the buffer's free space is only + # PIPE_BUF/2, then the write will raise BlockingIOError... even + # though a smaller write could still succeed! To avoid this, + # make sure to write >PIPE_BUF bytes each time, which disables + # the special behavior. + # For details, search for PIPE_BUF here: + # http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html + os.write(s.fileno(), b"x" * select.PIPE_BUF * 2) + except BlockingIOError: + pass + return s, r + + async def test_pipe_fully(): - await check_one_way_stream(make_pipe, None) + await check_one_way_stream(make_pipe, make_clogged_pipe) From 6f52ce4b7296aefafa7620f1538b7bd3e1a5e037 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sun, 26 Aug 2018 01:20:04 +0100 Subject: [PATCH 09/15] Do proper checkpoints for the receive_some call. --- trio/_subprocess/unix_pipes.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py index 3dbe9484e7..5b4fdfb053 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_subprocess/unix_pipes.py @@ -88,14 +88,12 @@ async def receive_some(self, max_bytes: int) -> bytes: while True: try: + await _core.checkpoint_if_cancelled() data = os.read(self._pipe, max_bytes) - if data == b'': - await self.aclose() - return data except BlockingIOError: await _core.wait_readable(self._pipe) else: - await _core.checkpoint() + await _core.cancel_shielded_checkpoint() break return data From ad644fbfa04f4b33363814ec04a29670b1bf45d7 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sun, 26 Aug 2018 01:25:13 +0100 Subject: [PATCH 10/15] Change send_all slightly --- trio/_subprocess/unix_pipes.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py index 5b4fdfb053..91d3bcaa34 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_subprocess/unix_pipes.py @@ -38,12 +38,12 @@ class PipeSendStream(_PipeMixin, SendStream): """Represents a send stream over an os.pipe object.""" async def send_all(self, data: bytes): + # we have to do this no matter what + await _core.checkpoint() if self._closed: - await _core.checkpoint() raise _core.ClosedResourceError("this pipe is already closed") if not data: - await _core.checkpoint() return length = len(data) @@ -58,9 +58,7 @@ async def send_all(self, data: bytes): await _core.checkpoint() raise BrokenStreamError from e except BlockingIOError: - pass - - await self.wait_send_all_might_not_block() + await self.wait_send_all_might_not_block() async def wait_send_all_might_not_block(self) -> None: if self._closed: From 2a9d4535ef0bb19e704177eddb1c5c2350d32fad Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sun, 26 Aug 2018 01:53:57 +0100 Subject: [PATCH 11/15] kqueue returns EPIPE immediately, rather than on write --- trio/_subprocess/unix_pipes.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py index 91d3bcaa34..996e61974c 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_subprocess/unix_pipes.py @@ -58,7 +58,15 @@ async def send_all(self, data: bytes): await _core.checkpoint() raise BrokenStreamError from e except BlockingIOError: - await self.wait_send_all_might_not_block() + try: + await self.wait_send_all_might_not_block() + except BrokenPipeError as e: + # kqueue: raises EPIPE on wait_writable instead + # of sending, which is annoying + # also doesn't checkpoint so we have to do that + # ourselves here too + await _core.checkpoint() + raise BrokenStreamError from e async def wait_send_all_might_not_block(self) -> None: if self._closed: From 8c4792b8810c874ed3726228a9a3824bffbc7bb8 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sun, 26 Aug 2018 01:55:05 +0100 Subject: [PATCH 12/15] select.BUF_SIZE doesn't exist on PyPy yet https://bitbucket.org/pypy/pypy/issues/2876 --- trio/tests/subprocess/test_unix_pipes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/trio/tests/subprocess/test_unix_pipes.py b/trio/tests/subprocess/test_unix_pipes.py index 20db872376..e1239cfb71 100644 --- a/trio/tests/subprocess/test_unix_pipes.py +++ b/trio/tests/subprocess/test_unix_pipes.py @@ -94,7 +94,8 @@ async def make_clogged_pipe(): # the special behavior. # For details, search for PIPE_BUF here: # http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html - os.write(s.fileno(), b"x" * select.PIPE_BUF * 2) + buf_size = getattr(select, "PIPE_BUF", 8192) + os.write(s.fileno(), b"x" * buf_size * 2) except BlockingIOError: pass return s, r From 61f2262405d47039b9052f889f3bdc6ac2bb62d6 Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sun, 26 Aug 2018 02:00:34 +0100 Subject: [PATCH 13/15] Rearrange logic --- trio/_subprocess/unix_pipes.py | 20 ++++++++++---------- trio/tests/subprocess/test_unix_pipes.py | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py index 996e61974c..cecb8dc25c 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_subprocess/unix_pipes.py @@ -58,22 +58,22 @@ async def send_all(self, data: bytes): await _core.checkpoint() raise BrokenStreamError from e except BlockingIOError: - try: - await self.wait_send_all_might_not_block() - except BrokenPipeError as e: - # kqueue: raises EPIPE on wait_writable instead - # of sending, which is annoying - # also doesn't checkpoint so we have to do that - # ourselves here too - await _core.checkpoint() - raise BrokenStreamError from e + await self.wait_send_all_might_not_block() async def wait_send_all_might_not_block(self) -> None: if self._closed: await _core.checkpoint() raise _core.ClosedResourceError("This pipe is already closed") - await _core.wait_writable(self._pipe) + try: + await _core.wait_writable(self._pipe) + except BrokenPipeError as e: + # kqueue: raises EPIPE on wait_writable instead + # of sending, which is annoying + # also doesn't checkpoint so we have to do that + # ourselves here too + await _core.checkpoint() + raise BrokenStreamError from e class PipeReceiveStream(_PipeMixin, ReceiveStream): diff --git a/trio/tests/subprocess/test_unix_pipes.py b/trio/tests/subprocess/test_unix_pipes.py index e1239cfb71..3fd16b34bd 100644 --- a/trio/tests/subprocess/test_unix_pipes.py +++ b/trio/tests/subprocess/test_unix_pipes.py @@ -38,7 +38,7 @@ async def test_receive_pipe(): async def test_pipes_combined(): write, read = await make_pipe() - count = 2 ** 20 + count = 2**20 async def sender(): big = bytearray(count) From 2e1a1e506f2a20ba720cae3efd1f1bbd5485918f Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Sun, 26 Aug 2018 03:34:52 +0100 Subject: [PATCH 14/15] Make some last changes --- trio/_subprocess/unix_pipes.py | 2 +- trio/tests/subprocess/test_unix_pipes.py | 15 +++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py index cecb8dc25c..0ae3e9c55a 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_subprocess/unix_pipes.py @@ -1,3 +1,4 @@ +import fcntl import os from typing import Tuple @@ -17,7 +18,6 @@ def __init__(self, pipefd: int): self._pipe = pipefd self._closed = False - import fcntl flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL) fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK) diff --git a/trio/tests/subprocess/test_unix_pipes.py b/trio/tests/subprocess/test_unix_pipes.py index 3fd16b34bd..e33e8e2962 100644 --- a/trio/tests/subprocess/test_unix_pipes.py +++ b/trio/tests/subprocess/test_unix_pipes.py @@ -4,15 +4,19 @@ import pytest from ... import _core -from ..._subprocess.unix_pipes import ( - PipeSendStream, PipeReceiveStream, make_pipe -) from ...testing import (wait_all_tasks_blocked, check_one_way_stream) +posix = os.name == "posix" + pytestmark = pytest.mark.skipif( - os.name != "posix", reason="pipes are only supported on posix" + not posix, reason="pipes are only supported on posix" ) +if posix: + from ..._subprocess.unix_pipes import ( + PipeSendStream, PipeReceiveStream, make_pipe + ) + async def test_send_pipe(): r, w = os.pipe() @@ -94,6 +98,9 @@ async def make_clogged_pipe(): # the special behavior. # For details, search for PIPE_BUF here: # http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html + + # for the getattr: + # https://bitbucket.org/pypy/pypy/issues/2876/selectpipe_buf-is-missing-on-pypy3 buf_size = getattr(select, "PIPE_BUF", 8192) os.write(s.fileno(), b"x" * buf_size * 2) except BlockingIOError: From aaad5b570e9593ae5a48c557111e119872420c9e Mon Sep 17 00:00:00 2001 From: "Laura F. D" Date: Tue, 28 Aug 2018 15:53:42 +0100 Subject: [PATCH 15/15] Add `__del__` to pipe objects. --- trio/_subprocess/unix_pipes.py | 19 +++++++++---- trio/tests/subprocess/test_unix_pipes.py | 36 ++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/trio/_subprocess/unix_pipes.py b/trio/_subprocess/unix_pipes.py index 0ae3e9c55a..f62fa8f590 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_subprocess/unix_pipes.py @@ -21,18 +21,27 @@ def __init__(self, pipefd: int): flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL) fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK) - async def aclose(self): - if not self._closed: - _core.notify_fd_close(self._pipe) - self._closed = True - os.close(self._pipe) + def _close(self): + if self._closed: + return + self._closed = True + os.close(self._pipe) + + async def aclose(self): + # XX: This would be in _close, but this can only be used from an + # async context. + _core.notify_fd_close(self._pipe) + self._close() await _core.checkpoint() def fileno(self) -> int: """Gets the file descriptor for this pipe.""" return self._pipe + def __del__(self): + self._close() + class PipeSendStream(_PipeMixin, SendStream): """Represents a send stream over an os.pipe object.""" diff --git a/trio/tests/subprocess/test_unix_pipes.py b/trio/tests/subprocess/test_unix_pipes.py index e33e8e2962..b8cc2d496a 100644 --- a/trio/tests/subprocess/test_unix_pipes.py +++ b/trio/tests/subprocess/test_unix_pipes.py @@ -1,8 +1,10 @@ +import errno import select import os import pytest +from trio._core.tests.tutil import gc_collect_harder from ... import _core from ...testing import (wait_all_tasks_blocked, check_one_way_stream) @@ -27,6 +29,7 @@ async def test_send_pipe(): os.close(r) os.close(w) + send._closed = True async def test_receive_pipe(): @@ -38,6 +41,7 @@ async def test_receive_pipe(): os.close(r) os.close(w) + recv._closed = True async def test_pipes_combined(): @@ -82,6 +86,38 @@ async def test_pipe_errors(): await PipeReceiveStream(0).receive_some(0) +async def test_del(): + w, r = await make_pipe() + f1, f2 = w.fileno(), r.fileno() + del w, r + gc_collect_harder() + + with pytest.raises(OSError) as excinfo: + os.close(f1) + assert excinfo.value.errno == errno.EBADF + + with pytest.raises(OSError) as excinfo: + os.close(f2) + assert excinfo.value.errno == errno.EBADF + + +async def test_async_with(): + w, r = await make_pipe() + async with w, r: + pass + + assert w._closed + assert r._closed + + with pytest.raises(OSError) as excinfo: + os.close(w.fileno()) + assert excinfo.value.errno == errno.EBADF + + with pytest.raises(OSError) as excinfo: + os.close(r.fileno()) + assert excinfo.value.errno == errno.EBADF + + async def make_clogged_pipe(): s, r = await make_pipe() try: