From ddf778c2d5a279c374a8c2f77361eea18545ef9b Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Thu, 27 Jun 2019 21:20:59 +0900 Subject: [PATCH 01/13] add hazmat.fd_open_receive_stream() / fd_open_send_stream() --- trio/_subprocess_platform/__init__.py | 6 +++--- trio/_unix_pipes.py | 31 ++++++++++++++++++++++++--- trio/hazmat.py | 8 +++++++ 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/trio/_subprocess_platform/__init__.py b/trio/_subprocess_platform/__init__.py index 1507ec1246..6b687dc13b 100644 --- a/trio/_subprocess_platform/__init__.py +++ b/trio/_subprocess_platform/__init__.py @@ -67,15 +67,15 @@ def create_pipe_from_child_output() -> Tuple[ReceiveStream, int]: try: if os.name == "posix": - from .._unix_pipes import PipeSendStream, PipeReceiveStream + from ..hazmat import fd_open_send_stream, fd_open_receive_stream def create_pipe_to_child_stdin(): # noqa: F811 rfd, wfd = os.pipe() - return PipeSendStream(wfd), rfd + return fd_open_send_stream(wfd), rfd def create_pipe_from_child_output(): # noqa: F811 rfd, wfd = os.pipe() - return PipeReceiveStream(rfd), wfd + return fd_open_receive_stream(rfd), wfd elif os.name == "nt": from .._windows_pipes import PipeSendStream, PipeReceiveStream diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index 7cd205e93f..830e39d82b 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -33,9 +33,11 @@ def __init__(self, fd: int): if not isinstance(fd, int): raise TypeError("file descriptor must be an int") self.fd = fd - # Flip the fd to non-blocking mode - flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) - fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + # Store original state, and ensure non-blocking mode is enabled + self._original_flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) + fcntl.fcntl( + self.fd, fcntl.F_SETFL, self._original_flags | os.O_NONBLOCK + ) @property def closed(self): @@ -53,6 +55,7 @@ def _raw_close(self): return fd = self.fd self.fd = -1 + fcntl.fcntl(fd, fcntl.F_SETFL, self._original_flags) os.close(fd) def __del__(self): @@ -157,3 +160,25 @@ async def aclose(self): def fileno(self): return self._fd_holder.fd + + +def fd_open_send_stream(fd: int): + """Returns a send stream over an os.pipe. + + The pipe must support non-blocking mode. The stream takes ownership + of the fd. + + :rtype: SendStream + """ + return PipeSendStream(fd) + + +def fd_open_receive_stream(fd: int): + """Returns a receive stream over an os.pipe. + + The pipe must support non-blocking mode. The stream takes ownership + of the fd. + + :rtype: ReceiveStream + """ + return PipeReceiveStream(fd) diff --git a/trio/hazmat.py b/trio/hazmat.py index 69070a7968..5f72890777 100644 --- a/trio/hazmat.py +++ b/trio/hazmat.py @@ -3,6 +3,7 @@ but useful for extending Trio's functionality. """ +import os import sys # This is the union of a subset of trio/_core/ and some things from trio/*.py. @@ -22,6 +23,13 @@ spawn_system_task, wait_readable, wait_writable, notify_closing ) +# Unix-specific symbols +if os.name == "posix": + from ._unix_pipes import ( + fd_open_receive_stream, + fd_open_send_stream, + ) + # Kqueue-specific symbols try: from ._core import ( From 1818e884ed7e8da419f717797070fb6b891142cb Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Fri, 5 Jul 2019 23:32:59 +0900 Subject: [PATCH 02/13] introduce FdStream --- trio/_subprocess.py | 1 + trio/_subprocess_platform/__init__.py | 6 +- trio/_unix_pipes.py | 137 +++++++++++++++++++++++--- trio/hazmat.py | 5 +- 4 files changed, 129 insertions(+), 20 deletions(-) diff --git a/trio/_subprocess.py b/trio/_subprocess.py index ae530e3ae8..221f11499d 100644 --- a/trio/_subprocess.py +++ b/trio/_subprocess.py @@ -152,6 +152,7 @@ def _init( os.close(stderr) if self.stdin is not None and self.stdout is not None: + # TODO: fix me self.stdio = StapledStream(self.stdin, self.stdout) else: self.stdio = None diff --git a/trio/_subprocess_platform/__init__.py b/trio/_subprocess_platform/__init__.py index 6b687dc13b..90a053c2e8 100644 --- a/trio/_subprocess_platform/__init__.py +++ b/trio/_subprocess_platform/__init__.py @@ -67,15 +67,15 @@ def create_pipe_from_child_output() -> Tuple[ReceiveStream, int]: try: if os.name == "posix": - from ..hazmat import fd_open_send_stream, fd_open_receive_stream + from ..hazmat import FdStream def create_pipe_to_child_stdin(): # noqa: F811 rfd, wfd = os.pipe() - return fd_open_send_stream(wfd), rfd + return FdStream(send_fd=wfd), rfd def create_pipe_from_child_output(): # noqa: F811 rfd, wfd = os.pipe() - return fd_open_receive_stream(rfd), wfd + return FdStream(receive_fd=rfd), wfd elif os.name == "nt": from .._windows_pipes import PipeSendStream, PipeReceiveStream diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index 830e39d82b..aa248717e4 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -1,6 +1,7 @@ import fcntl import os import errno +from typing import Optional, List from ._abc import SendStream, ReceiveStream from ._util import ConflictDetector @@ -68,6 +69,7 @@ async def aclose(self): await trio.hazmat.checkpoint() +# TODO: remove class PipeSendStream(SendStream): """Represents a send stream over an os.pipe object.""" @@ -162,23 +164,132 @@ def fileno(self): return self._fd_holder.fd -def fd_open_send_stream(fd: int): - """Returns a send stream over an os.pipe. +class FdStream(SendStream, ReceiveStream): + """ + Represents a send and/or receive stream given file descriptor(s) to + a pipe, TTY, etc. + + *send_fd*/*receive_fd* must refer to a file that is open for + reading/writing and supports non-blocking I/O (pipes and TTYs will work, + on-disk files probably not). The returned stream takes ownership of the + fd(s), so closing the stream will close the fd(s) too. As with + `os.fdopen`, you should not directly use an fd after you have wrapped it in + a stream using thisfunction. + + Where an fd supports both read and write, pass the same fd to *send_fd* and + *receive_fd*. + + Args: + send_fd (int on None): The send stream fd. + receive_fd (int or None): The receive stream fd. - The pipe must support non-blocking mode. The stream takes ownership - of the fd. + Returns: + A new `FdStream` object. - :rtype: SendStream + Raises: + ValueError: if both *send_fd* and *receive_fd* are None. """ - return PipeSendStream(fd) + def __init__( + self, send_fd: Optional[int] = None, receive_fd: Optional[int] = None + ): + if (send_fd, receive_fd) == (None, None): + raise ValueError('Expected send_fd or receive_fd') + self._send_conflict_detector = ConflictDetector( + "another task is using this stream for send" + ) + self._receive_conflict_detector = ConflictDetector( + "another task is using this stream for receive" + ) + # NOTE: if send is supported, it's always represented by first + # fd holder in the list. + self._fd_holders = [] # type: List[_FdHolder] + if send_fd == receive_fd: + self._fd_holders.append(_FdHolder(send_fd)) + else: + if send_fd is not None: + self._fd_holders.append(_FdHolder(send_fd)) + if receive_fd is not None: + self._fd_holders.append(_FdHolder(receive_fd)) + self._send_fd = send_fd + self._receive_fd = receive_fd + + async def send_all(self, data: bytes): + if self._send_fd is None: + raise RuntimeError('stream does not support send') + with self._send_conflict_detector: + # have to check up front, because send_all(b"") on a closed pipe + # should raise + if self._fd_holders[0].closed: + raise trio.ClosedResourceError("send file was already closed") + await trio.hazmat.checkpoint() + length = len(data) + # adapted from the SocketStream code + with memoryview(data) as view: + sent = 0 + while sent < length: + with view[sent:] as remaining: + try: + sent += os.write(self._send_fd, remaining) + except BlockingIOError: + await trio.hazmat.wait_writable(self._send_fd) + except OSError as e: + if e.errno == errno.EBADF: + raise trio.ClosedResourceError( + "send file was closed" + ) from None + else: + raise trio.BrokenResourceError from e + + async def wait_send_all_might_not_block(self) -> None: + if self._send_fd is None: + raise RuntimeError('stream does not support send') + with self._send_conflict_detector: + if self._fd_holders[0].closed: + raise trio.ClosedResourceError("send file was already closed") + try: + await trio.hazmat.wait_writable(self._send_fd) + except BrokenPipeError as e: + # kqueue: raises EPIPE on wait_writable instead + # of sending, which is annoying + raise trio.BrokenResourceError from e + + async def receive_some(self, max_bytes: int) -> bytes: + with self._receive_conflict_detector: + if not isinstance(max_bytes, int): + raise TypeError("max_bytes must be integer >= 1") -def fd_open_receive_stream(fd: int): - """Returns a receive stream over an os.pipe. + if max_bytes < 1: + raise ValueError("max_bytes must be integer >= 1") - The pipe must support non-blocking mode. The stream takes ownership - of the fd. + await trio.hazmat.checkpoint() + while True: + try: + data = os.read(self._receive_fd, max_bytes) + except BlockingIOError: + await trio.hazmat.wait_readable(self._receive_fd) + except OSError as e: + if e.errno == errno.EBADF: + raise trio.ClosedResourceError( + "receive file was closed" + ) from None + else: + raise trio.BrokenResourceError from e + else: + break - :rtype: ReceiveStream - """ - return PipeReceiveStream(fd) + return data + + async def aclose(self): + for fd_holder in self._fd_holders: + await fd_holder.aclose() + + def send_fileno(self): + if self._send_fd is None: + raise RuntimeError('stream does not support send') + return self._send_fd + + def receive_fileno(self): + if self._receive_fd is None: + raise RuntimeError('stream does not support receive') + return self._receive_fd diff --git a/trio/hazmat.py b/trio/hazmat.py index 5f72890777..c182704164 100644 --- a/trio/hazmat.py +++ b/trio/hazmat.py @@ -25,10 +25,7 @@ # Unix-specific symbols if os.name == "posix": - from ._unix_pipes import ( - fd_open_receive_stream, - fd_open_send_stream, - ) + from ._unix_pipes import FdStream # Kqueue-specific symbols try: From 711595d3304d030cc32a23ab0164c4f564f61453 Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Sat, 6 Jul 2019 07:14:50 +0900 Subject: [PATCH 03/13] type hints for Process.stdin and friends Process.stdio documented as Stream rather than StapledStream --- trio/_subprocess.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/trio/_subprocess.py b/trio/_subprocess.py index 221f11499d..a109d38ad5 100644 --- a/trio/_subprocess.py +++ b/trio/_subprocess.py @@ -1,9 +1,8 @@ import os -import select import subprocess -from functools import partial +from typing import Optional -from ._abc import AsyncResource +from ._abc import AsyncResource, SendStream, ReceiveStream, Stream from ._highlevel_generic import StapledStream from ._sync import Lock from ._subprocess_platform import ( @@ -51,7 +50,7 @@ class Process(AsyncResource): standard error, the written bytes become available for you to read here. Only available if the :class:`Process` was constructed using ``stderr=PIPE``; otherwise this will be None. - stdio (trio.StapledStream or None): A stream that sends data to + stdio (trio.Stream or None): A stream that sends data to the child's standard input and receives from the child's standard output. Only available if both :attr:`stdin` and :attr:`stdout` are available; otherwise this will be None. @@ -101,9 +100,10 @@ def _init( .format(key) ) - self.stdin = None - self.stdout = None - self.stderr = None + self.stdin = None # type: Optional[SendStream] + self.stdout = None # type: Optional[ReceiveStream] + self.stderr = None # type: Optional[ReceiveStream] + self.stdio = None # type: Optional[Stream] if os.name == "posix": if isinstance(command, str) and not options.get("shell"): @@ -154,8 +154,6 @@ def _init( if self.stdin is not None and self.stdout is not None: # TODO: fix me self.stdio = StapledStream(self.stdin, self.stdout) - else: - self.stdio = None self.args = self._proc.args self.pid = self._proc.pid From 8388c5b6758831ca2525999610c973b1b9a17e1b Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Sat, 6 Jul 2019 14:26:11 +0900 Subject: [PATCH 04/13] FdStream wraps only one fd --- trio/_subprocess.py | 7 +-- trio/_subprocess_platform/__init__.py | 4 +- trio/_unix_pipes.py | 81 ++++++++------------------- 3 files changed, 27 insertions(+), 65 deletions(-) diff --git a/trio/_subprocess.py b/trio/_subprocess.py index a109d38ad5..f9bc06477f 100644 --- a/trio/_subprocess.py +++ b/trio/_subprocess.py @@ -2,7 +2,7 @@ import subprocess from typing import Optional -from ._abc import AsyncResource, SendStream, ReceiveStream, Stream +from ._abc import AsyncResource, SendStream, ReceiveStream from ._highlevel_generic import StapledStream from ._sync import Lock from ._subprocess_platform import ( @@ -50,7 +50,7 @@ class Process(AsyncResource): standard error, the written bytes become available for you to read here. Only available if the :class:`Process` was constructed using ``stderr=PIPE``; otherwise this will be None. - stdio (trio.Stream or None): A stream that sends data to + stdio (trio.StapledStream or None): A stream that sends data to the child's standard input and receives from the child's standard output. Only available if both :attr:`stdin` and :attr:`stdout` are available; otherwise this will be None. @@ -103,7 +103,7 @@ def _init( self.stdin = None # type: Optional[SendStream] self.stdout = None # type: Optional[ReceiveStream] self.stderr = None # type: Optional[ReceiveStream] - self.stdio = None # type: Optional[Stream] + self.stdio = None # type: Optional[StapledStream] if os.name == "posix": if isinstance(command, str) and not options.get("shell"): @@ -152,7 +152,6 @@ def _init( os.close(stderr) if self.stdin is not None and self.stdout is not None: - # TODO: fix me self.stdio = StapledStream(self.stdin, self.stdout) self.args = self._proc.args diff --git a/trio/_subprocess_platform/__init__.py b/trio/_subprocess_platform/__init__.py index 90a053c2e8..b1db8499c6 100644 --- a/trio/_subprocess_platform/__init__.py +++ b/trio/_subprocess_platform/__init__.py @@ -71,11 +71,11 @@ def create_pipe_from_child_output() -> Tuple[ReceiveStream, int]: def create_pipe_to_child_stdin(): # noqa: F811 rfd, wfd = os.pipe() - return FdStream(send_fd=wfd), rfd + return FdStream(wfd), rfd def create_pipe_from_child_output(): # noqa: F811 rfd, wfd = os.pipe() - return FdStream(receive_fd=rfd), wfd + return FdStream(rfd), wfd elif os.name == "nt": from .._windows_pipes import PipeSendStream, PipeReceiveStream diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index aa248717e4..e542c0ad80 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -1,9 +1,8 @@ import fcntl import os import errno -from typing import Optional, List -from ._abc import SendStream, ReceiveStream +from ._abc import SendStream, ReceiveStream, Stream from ._util import ConflictDetector import trio @@ -164,63 +163,37 @@ def fileno(self): return self._fd_holder.fd -class FdStream(SendStream, ReceiveStream): +class FdStream(Stream): """ - Represents a send and/or receive stream given file descriptor(s) to - a pipe, TTY, etc. + Represents a stream given the file descriptor to a pipe, TTY, etc. - *send_fd*/*receive_fd* must refer to a file that is open for - reading/writing and supports non-blocking I/O (pipes and TTYs will work, - on-disk files probably not). The returned stream takes ownership of the - fd(s), so closing the stream will close the fd(s) too. As with - `os.fdopen`, you should not directly use an fd after you have wrapped it in - a stream using thisfunction. - - Where an fd supports both read and write, pass the same fd to *send_fd* and - *receive_fd*. + *fd* must refer to a file that is open for reading and/or writing and + supports non-blocking I/O (pipes and TTYs will work, on-disk files probably + not). The returned stream takes ownership of the fd, so closing the stream + will close the fd(s) too. As with `os.fdopen`, you should not directly use + an fd after you have wrapped it in a stream using this function. Args: - send_fd (int on None): The send stream fd. - receive_fd (int or None): The receive stream fd. + fd (int): The fd to be wrapped. Returns: A new `FdStream` object. - - Raises: - ValueError: if both *send_fd* and *receive_fd* are None. """ - def __init__( - self, send_fd: Optional[int] = None, receive_fd: Optional[int] = None - ): - if (send_fd, receive_fd) == (None, None): - raise ValueError('Expected send_fd or receive_fd') + def __init__(self, fd: int): + self._fd_holder = _FdHolder(fd) self._send_conflict_detector = ConflictDetector( "another task is using this stream for send" ) self._receive_conflict_detector = ConflictDetector( "another task is using this stream for receive" ) - # NOTE: if send is supported, it's always represented by first - # fd holder in the list. - self._fd_holders = [] # type: List[_FdHolder] - if send_fd == receive_fd: - self._fd_holders.append(_FdHolder(send_fd)) - else: - if send_fd is not None: - self._fd_holders.append(_FdHolder(send_fd)) - if receive_fd is not None: - self._fd_holders.append(_FdHolder(receive_fd)) - self._send_fd = send_fd - self._receive_fd = receive_fd async def send_all(self, data: bytes): - if self._send_fd is None: - raise RuntimeError('stream does not support send') with self._send_conflict_detector: # have to check up front, because send_all(b"") on a closed pipe # should raise - if self._fd_holders[0].closed: + if self._fd_holder.closed: raise trio.ClosedResourceError("send file was already closed") await trio.hazmat.checkpoint() length = len(data) @@ -230,9 +203,9 @@ async def send_all(self, data: bytes): while sent < length: with view[sent:] as remaining: try: - sent += os.write(self._send_fd, remaining) + sent += os.write(self._fd_holder.fd, remaining) except BlockingIOError: - await trio.hazmat.wait_writable(self._send_fd) + await trio.hazmat.wait_writable(self._fd_holder.fd) except OSError as e: if e.errno == errno.EBADF: raise trio.ClosedResourceError( @@ -242,13 +215,11 @@ async def send_all(self, data: bytes): raise trio.BrokenResourceError from e async def wait_send_all_might_not_block(self) -> None: - if self._send_fd is None: - raise RuntimeError('stream does not support send') with self._send_conflict_detector: - if self._fd_holders[0].closed: + if self._fd_holder.closed: raise trio.ClosedResourceError("send file was already closed") try: - await trio.hazmat.wait_writable(self._send_fd) + await trio.hazmat.wait_writable(self._fd_holder.fd) except BrokenPipeError as e: # kqueue: raises EPIPE on wait_writable instead # of sending, which is annoying @@ -265,9 +236,9 @@ async def receive_some(self, max_bytes: int) -> bytes: await trio.hazmat.checkpoint() while True: try: - data = os.read(self._receive_fd, max_bytes) + data = os.read(self._fd_holder.fd, max_bytes) except BlockingIOError: - await trio.hazmat.wait_readable(self._receive_fd) + await trio.hazmat.wait_readable(self._fd_holder.fd) except OSError as e: if e.errno == errno.EBADF: raise trio.ClosedResourceError( @@ -281,15 +252,7 @@ async def receive_some(self, max_bytes: int) -> bytes: return data async def aclose(self): - for fd_holder in self._fd_holders: - await fd_holder.aclose() - - def send_fileno(self): - if self._send_fd is None: - raise RuntimeError('stream does not support send') - return self._send_fd - - def receive_fileno(self): - if self._receive_fd is None: - raise RuntimeError('stream does not support receive') - return self._receive_fd + await self._fd_holder.aclose() + + def fileno(self): + return self._fd_holder.fd From 1c3bc05e73a5adaf7d75171c588370bb6b43b06e Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Sat, 6 Jul 2019 14:43:41 +0900 Subject: [PATCH 05/13] remove PipeSend/ReceiveStream on Unix; FdStream tests --- trio/_unix_pipes.py | 97 +---------------------------------- trio/tests/test_unix_pipes.py | 16 +++--- 2 files changed, 9 insertions(+), 104 deletions(-) diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index e542c0ad80..02d5c72e76 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -68,101 +68,6 @@ async def aclose(self): await trio.hazmat.checkpoint() -# TODO: remove -class PipeSendStream(SendStream): - """Represents a send stream over an os.pipe object.""" - - def __init__(self, fd: int): - self._fd_holder = _FdHolder(fd) - self._conflict_detector = ConflictDetector( - "another task is using this pipe" - ) - - async def send_all(self, data: bytes): - with self._conflict_detector: - # have to check up front, because send_all(b"") on a closed pipe - # should raise - if self._fd_holder.closed: - raise trio.ClosedResourceError("this pipe was already closed") - await trio.hazmat.checkpoint() - length = len(data) - # adapted from the SocketStream code - with memoryview(data) as view: - sent = 0 - while sent < length: - with view[sent:] as remaining: - try: - sent += os.write(self._fd_holder.fd, remaining) - except BlockingIOError: - await trio.hazmat.wait_writable(self._fd_holder.fd) - except OSError as e: - if e.errno == errno.EBADF: - raise trio.ClosedResourceError( - "this pipe was closed" - ) from None - else: - raise trio.BrokenResourceError from e - - async def wait_send_all_might_not_block(self) -> None: - with self._conflict_detector: - if self._fd_holder.closed: - raise trio.ClosedResourceError("this pipe was already closed") - try: - await trio.hazmat.wait_writable(self._fd_holder.fd) - except BrokenPipeError as e: - # kqueue: raises EPIPE on wait_writable instead - # of sending, which is annoying - raise trio.BrokenResourceError from e - - async def aclose(self): - await self._fd_holder.aclose() - - def fileno(self): - return self._fd_holder.fd - - -class PipeReceiveStream(ReceiveStream): - """Represents a receive stream over an os.pipe object.""" - - def __init__(self, fd: int): - self._fd_holder = _FdHolder(fd) - self._conflict_detector = ConflictDetector( - "another task is using this pipe" - ) - - async def receive_some(self, max_bytes: int) -> bytes: - with self._conflict_detector: - if not isinstance(max_bytes, int): - raise TypeError("max_bytes must be integer >= 1") - - if max_bytes < 1: - raise ValueError("max_bytes must be integer >= 1") - - await trio.hazmat.checkpoint() - while True: - try: - data = os.read(self._fd_holder.fd, max_bytes) - except BlockingIOError: - await trio.hazmat.wait_readable(self._fd_holder.fd) - except OSError as e: - if e.errno == errno.EBADF: - raise trio.ClosedResourceError( - "this pipe was closed" - ) from None - else: - raise trio.BrokenResourceError from e - else: - break - - return data - - async def aclose(self): - await self._fd_holder.aclose() - - def fileno(self): - return self._fd_holder.fd - - class FdStream(Stream): """ Represents a stream given the file descriptor to a pipe, TTY, etc. @@ -170,7 +75,7 @@ class FdStream(Stream): *fd* must refer to a file that is open for reading and/or writing and supports non-blocking I/O (pipes and TTYs will work, on-disk files probably not). The returned stream takes ownership of the fd, so closing the stream - will close the fd(s) too. As with `os.fdopen`, you should not directly use + will close the fd too. As with `os.fdopen`, you should not directly use an fd after you have wrapped it in a stream using this function. Args: diff --git a/trio/tests/test_unix_pipes.py b/trio/tests/test_unix_pipes.py index 7728a57fae..eab32d1718 100644 --- a/trio/tests/test_unix_pipes.py +++ b/trio/tests/test_unix_pipes.py @@ -11,14 +11,14 @@ posix = os.name == "posix" pytestmark = pytest.mark.skipif(not posix, reason="posix only") if posix: - from .._unix_pipes import PipeSendStream, PipeReceiveStream + from .._unix_pipes import FdStream # Have to use quoted types so import doesn't crash on windows -async def make_pipe() -> "Tuple[PipeSendStream, PipeReceiveStream]": +async def make_pipe() -> "Tuple[FdStream, FdStream]": """Makes a new pair of pipes.""" (r, w) = os.pipe() - return PipeSendStream(w), PipeReceiveStream(r) + return FdStream(w), FdStream(r) async def make_clogged_pipe(): @@ -49,7 +49,7 @@ async def make_clogged_pipe(): async def test_send_pipe(): r, w = os.pipe() - async with PipeSendStream(w) as send: + async with FdStream(w) as send: assert send.fileno() == w await send.send_all(b"123") assert (os.read(r, 8)) == b"123" @@ -59,7 +59,7 @@ async def test_send_pipe(): async def test_receive_pipe(): r, w = os.pipe() - async with PipeReceiveStream(r) as recv: + async with FdStream(r) as recv: assert (recv.fileno()) == r os.write(w, b"123") assert (await recv.receive_some(8)) == b"123" @@ -93,10 +93,10 @@ async def reader(): async def test_pipe_errors(): with pytest.raises(TypeError): - PipeReceiveStream(None) + FdStream(None) with pytest.raises(ValueError): - await PipeReceiveStream(0).receive_some(0) + await FdStream(0).receive_some(0) async def test_del(): @@ -146,7 +146,7 @@ async def test_misdirected_aclose_regression(): if r2_fd != old_r_fd: # pragma: no cover os.dup2(r2_fd, old_r_fd) os.close(r2_fd) - async with PipeReceiveStream(old_r_fd) as r2: + async with FdStream(old_r_fd) as r2: assert r2.fileno() == old_r_fd # And now set up a background task that's working on the new receive From 9e5c20c7d57a7a57fa60522179d99a228c3bf064 Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Sat, 6 Jul 2019 15:47:15 +0900 Subject: [PATCH 06/13] FdStream documentation --- docs/source/reference-hazmat.rst | 34 ++++++++++++++++++++++++++++++++ newsfragments/829.feature.rst | 1 + 2 files changed, 35 insertions(+) create mode 100644 newsfragments/829.feature.rst diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 3cf9bc2890..0201025e71 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -174,6 +174,40 @@ All environments provide the following functions: yourself afterwards. +Unix-specific API +----------------- + +`FdStream` supports wrapping Unix files (such as a pipe or TTY) as +a stream. + +To be used as a Trio stream, an open file must be placed in non-blocking +mode. Unfortunately, this impacts all I/O that goes through the +underlying open file description, including I/O that uses a different +file descriptor than the one that was passed to Trio. If other threads +or processes are using file descriptors that are related through `os.dup` +or inheritance across `os.fork` to the one that Trio is using, they are +unlikely to be prepared to have non-blocking I/O semantics suddenly +thrust upon them. For example, you can use ``FdStream(os.dup(0))`` to +obtain a stream for reading from standard input, but it is only safe to +do so with heavy caveats: your stdin must not be shared by any other +processes and you must not make any calls to synchronous methods of +`sys.stdin` until the stream returned by `FdStream` is closed. See +`issue #174 `__ for a +discussion of the challenges involved in relaxing this restriction. + +To obtain a bidirectional `~trio.abc.Stream` for sending and receiving on +different file descriptors, use `trio.StapledStream`:: + + bidirectional_stream = trio.StapledStream( + trio.hazmat.FdStream(write_fd), + trio.hazmat.FdStream(read_fd) + ) + +.. autoclass:: FdStream + :show-inheritance: + :members: + + Kqueue-specific API ------------------- diff --git a/newsfragments/829.feature.rst b/newsfragments/829.feature.rst new file mode 100644 index 0000000000..30d211af02 --- /dev/null +++ b/newsfragments/829.feature.rst @@ -0,0 +1 @@ +Add :class:`trio.hazmat.FdStream` for wrapping a Unix file as a Stream. From c6d4a3fe0b8991d4e416edd7af19742f340495e3 Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Fri, 19 Jul 2019 21:56:01 -0500 Subject: [PATCH 07/13] use os.get/set_blocking() --- trio/_unix_pipes.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index 02d5c72e76..a5b63c8097 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -1,8 +1,7 @@ -import fcntl import os import errno -from ._abc import SendStream, ReceiveStream, Stream +from ._abc import Stream from ._util import ConflictDetector import trio @@ -34,10 +33,8 @@ def __init__(self, fd: int): raise TypeError("file descriptor must be an int") self.fd = fd # Store original state, and ensure non-blocking mode is enabled - self._original_flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) - fcntl.fcntl( - self.fd, fcntl.F_SETFL, self._original_flags | os.O_NONBLOCK - ) + self._original_is_blocking = os.get_blocking(fd) + os.set_blocking(fd, False) @property def closed(self): @@ -55,7 +52,7 @@ def _raw_close(self): return fd = self.fd self.fd = -1 - fcntl.fcntl(fd, fcntl.F_SETFL, self._original_flags) + os.set_blocking(fd, self._original_is_blocking) os.close(fd) def __del__(self): From ee02a253a6ce2cd10d320a78dc3c44ff37efbc03 Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Fri, 19 Jul 2019 22:08:14 -0500 Subject: [PATCH 08/13] move docs to FdStream class --- docs/source/reference-hazmat.rst | 15 --------------- trio/_unix_pipes.py | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 0201025e71..ce3ff4e9cf 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -180,21 +180,6 @@ Unix-specific API `FdStream` supports wrapping Unix files (such as a pipe or TTY) as a stream. -To be used as a Trio stream, an open file must be placed in non-blocking -mode. Unfortunately, this impacts all I/O that goes through the -underlying open file description, including I/O that uses a different -file descriptor than the one that was passed to Trio. If other threads -or processes are using file descriptors that are related through `os.dup` -or inheritance across `os.fork` to the one that Trio is using, they are -unlikely to be prepared to have non-blocking I/O semantics suddenly -thrust upon them. For example, you can use ``FdStream(os.dup(0))`` to -obtain a stream for reading from standard input, but it is only safe to -do so with heavy caveats: your stdin must not be shared by any other -processes and you must not make any calls to synchronous methods of -`sys.stdin` until the stream returned by `FdStream` is closed. See -`issue #174 `__ for a -discussion of the challenges involved in relaxing this restriction. - To obtain a bidirectional `~trio.abc.Stream` for sending and receiving on different file descriptors, use `trio.StapledStream`:: diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index a5b63c8097..49f8fcd1ee 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -75,6 +75,21 @@ class FdStream(Stream): will close the fd too. As with `os.fdopen`, you should not directly use an fd after you have wrapped it in a stream using this function. + To be used as a Trio stream, an open file must be placed in non-blocking + mode. Unfortunately, this impacts all I/O that goes through the + underlying open file, including I/O that uses a different + file descriptor than the one that was passed to Trio. If other threads + or processes are using file descriptors that are related through `os.dup` + or inheritance across `os.fork` to the one that Trio is using, they are + unlikely to be prepared to have non-blocking I/O semantics suddenly + thrust upon them. For example, you can use ``FdStream(os.dup(0))`` to + obtain a stream for reading from standard input, but it is only safe to + do so with heavy caveats: your stdin must not be shared by any other + processes and you must not make any calls to synchronous methods of + `sys.stdin` until the stream returned by `FdStream` is closed. See + `issue #174 `__ for a + discussion of the challenges involved in relaxing this restriction. + Args: fd (int): The fd to be wrapped. From ee61d20810077b81f83647d9eaa4ca097c509ca3 Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Fri, 19 Jul 2019 22:34:05 -0500 Subject: [PATCH 09/13] fix jedi visibility --- trio/hazmat.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/trio/hazmat.py b/trio/hazmat.py index c182704164..5fe32c03d9 100644 --- a/trio/hazmat.py +++ b/trio/hazmat.py @@ -24,8 +24,10 @@ ) # Unix-specific symbols -if os.name == "posix": +try: from ._unix_pipes import FdStream +except ImportError: + pass # Kqueue-specific symbols try: From 2876c7b65bbf7a762ebc5c3bb5b4d8036c0254ed Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 21 Jul 2019 00:22:27 -0700 Subject: [PATCH 10/13] Tweak phrasing in docs --- docs/source/reference-hazmat.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index ce3ff4e9cf..eaaf9e7bab 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -180,8 +180,9 @@ Unix-specific API `FdStream` supports wrapping Unix files (such as a pipe or TTY) as a stream. -To obtain a bidirectional `~trio.abc.Stream` for sending and receiving on -different file descriptors, use `trio.StapledStream`:: +If you have two different file descriptors for sending and receiving, +and want to bundle them together into a single bidirectional +`~trio.abc.Stream`, then use `trio.StapledStream`:: bidirectional_stream = trio.StapledStream( trio.hazmat.FdStream(write_fd), From 52b020e74c1e6c0c98709cf3c37f52614cf44ed3 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 21 Jul 2019 00:23:16 -0700 Subject: [PATCH 11/13] Tweak phrasing in newsfragment --- newsfragments/829.feature.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newsfragments/829.feature.rst b/newsfragments/829.feature.rst index 30d211af02..340908571c 100644 --- a/newsfragments/829.feature.rst +++ b/newsfragments/829.feature.rst @@ -1 +1 @@ -Add :class:`trio.hazmat.FdStream` for wrapping a Unix file as a Stream. +Add `trio.hazmat.FdStream` for wrapping a Unix file descriptor as a `~trio.abc.Stream`. From 1228ca3270d76670187d613b8faae6866594001f Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 21 Jul 2019 00:24:39 -0700 Subject: [PATCH 12/13] Make closed-fd error messages more consistent --- trio/_unix_pipes.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index 49f8fcd1ee..2fba4df415 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -111,7 +111,7 @@ async def send_all(self, data: bytes): # have to check up front, because send_all(b"") on a closed pipe # should raise if self._fd_holder.closed: - raise trio.ClosedResourceError("send file was already closed") + raise trio.ClosedResourceError("file was already closed") await trio.hazmat.checkpoint() length = len(data) # adapted from the SocketStream code @@ -126,7 +126,7 @@ async def send_all(self, data: bytes): except OSError as e: if e.errno == errno.EBADF: raise trio.ClosedResourceError( - "send file was closed" + "file was already closed" ) from None else: raise trio.BrokenResourceError from e @@ -134,7 +134,7 @@ async def send_all(self, data: bytes): async def wait_send_all_might_not_block(self) -> None: with self._send_conflict_detector: if self._fd_holder.closed: - raise trio.ClosedResourceError("send file was already closed") + raise trio.ClosedResourceError("file was already closed") try: await trio.hazmat.wait_writable(self._fd_holder.fd) except BrokenPipeError as e: @@ -159,7 +159,7 @@ async def receive_some(self, max_bytes: int) -> bytes: except OSError as e: if e.errno == errno.EBADF: raise trio.ClosedResourceError( - "receive file was closed" + "file was already closed" ) from None else: raise trio.BrokenResourceError from e From fd44db568726f92c1ddc8d0bf286aac3de458a01 Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Sun, 21 Jul 2019 17:45:55 -0500 Subject: [PATCH 13/13] ensure _unix_pipes.py import error on non-posix --- trio/_unix_pipes.py | 5 +++++ trio/tests/test_unix_pipes.py | 3 +++ 2 files changed, 8 insertions(+) diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index 2fba4df415..b212b2a961 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -6,6 +6,11 @@ import trio +if os.name != "posix": + # We raise an error here rather than gating the import in hazmat.py + # in order to keep jedi static analysis happy. + raise ImportError + class _FdHolder: # This class holds onto a raw file descriptor, in non-blocking mode, and diff --git a/trio/tests/test_unix_pipes.py b/trio/tests/test_unix_pipes.py index eab32d1718..46f28d5471 100644 --- a/trio/tests/test_unix_pipes.py +++ b/trio/tests/test_unix_pipes.py @@ -12,6 +12,9 @@ pytestmark = pytest.mark.skipif(not posix, reason="posix only") if posix: from .._unix_pipes import FdStream +else: + with pytest.raises(ImportError): + from .._unix_pipes import FdStream # Have to use quoted types so import doesn't crash on windows