Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added trio/_subprocess/__init__.py
Empty file.
120 changes: 120 additions & 0 deletions trio/_subprocess/unix_pipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import fcntl
import os
from typing import Tuple

from .. import _core, BrokenStreamError
from .._abc import SendStream, ReceiveStream

__all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"]


class _PipeMixin:
def __init__(self, pipefd: int):
if not isinstance(pipefd, int):
raise TypeError(
"{0.__class__.__name__} needs a pipe fd".format(self)
)

self._pipe = pipefd
self._closed = False

flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL)
fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK)

def _close(self):
if self._closed:
return

self._closed = True
os.close(self._pipe)

async def aclose(self):
# XX: This would be in _close, but this can only be used from an
# async context.
_core.notify_fd_close(self._pipe)
self._close()
await _core.checkpoint()

def fileno(self) -> int:
"""Gets the file descriptor for this pipe."""
return self._pipe

def __del__(self):
self._close()


class PipeSendStream(_PipeMixin, SendStream):
"""Represents a send stream over an os.pipe object."""

async def send_all(self, data: bytes):
# we have to do this no matter what
await _core.checkpoint()
if self._closed:
raise _core.ClosedResourceError("this pipe is already closed")

if not data:
return

length = len(data)
# adapted from the SocketStream code
with memoryview(data) as view:
total_sent = 0
while total_sent < length:
with view[total_sent:] as remaining:
try:
total_sent += os.write(self._pipe, remaining)
except BrokenPipeError as e:
await _core.checkpoint()
raise BrokenStreamError from e
except BlockingIOError:
await self.wait_send_all_might_not_block()

async def wait_send_all_might_not_block(self) -> None:
if self._closed:
await _core.checkpoint()
raise _core.ClosedResourceError("This pipe is already closed")

try:
await _core.wait_writable(self._pipe)
except BrokenPipeError as e:
# kqueue: raises EPIPE on wait_writable instead
# of sending, which is annoying
# also doesn't checkpoint so we have to do that
# ourselves here too
await _core.checkpoint()
raise BrokenStreamError from e


class PipeReceiveStream(_PipeMixin, ReceiveStream):
"""Represents a receive stream over an os.pipe object."""

async def receive_some(self, max_bytes: int) -> bytes:
if self._closed:
await _core.checkpoint()
raise _core.ClosedResourceError("this pipe is already closed")

if not isinstance(max_bytes, int):
await _core.checkpoint()
raise TypeError("max_bytes must be integer >= 1")

if max_bytes < 1:
await _core.checkpoint()
raise ValueError("max_bytes must be integer >= 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can save a bit of repetition by making it if not isinstance(max_bytes, int) or max_bytes < 1

Copy link
Member Author

@Fuyukai Fuyukai Aug 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't, must be different errors for the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, good point :-)


while True:
try:
await _core.checkpoint_if_cancelled()
data = os.read(self._pipe, max_bytes)
except BlockingIOError:
await _core.wait_readable(self._pipe)
else:
await _core.cancel_shielded_checkpoint()
break

return data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess technically you could move the checkpoint stuff out of the loop but in practice it doesn't really matter.



async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]:
"""Makes a new pair of pipes."""
(r, w) = os.pipe()
return PipeSendStream(w), PipeReceiveStream(r)
Empty file.
148 changes: 148 additions & 0 deletions trio/tests/subprocess/test_unix_pipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import errno
import select

import os
import pytest

from trio._core.tests.tutil import gc_collect_harder
from ... import _core
from ...testing import (wait_all_tasks_blocked, check_one_way_stream)

posix = os.name == "posix"

pytestmark = pytest.mark.skipif(
not posix, reason="pipes are only supported on posix"
)

if posix:
from ..._subprocess.unix_pipes import (
PipeSendStream, PipeReceiveStream, make_pipe
)


async def test_send_pipe():
r, w = os.pipe()
send = PipeSendStream(w)
assert send.fileno() == w
await send.send_all(b"123")
assert (os.read(r, 8)) == b"123"

os.close(r)
os.close(w)
send._closed = True


async def test_receive_pipe():
r, w = os.pipe()
recv = PipeReceiveStream(r)
assert (recv.fileno()) == r
os.write(w, b"123")
assert (await recv.receive_some(8)) == b"123"

os.close(r)
os.close(w)
recv._closed = True


async def test_pipes_combined():
write, read = await make_pipe()
count = 2**20

async def sender():
big = bytearray(count)
await write.send_all(big)

async def reader():
await wait_all_tasks_blocked()
received = 0
while received < count:
received += len(await read.receive_some(4096))

assert received == count

async with _core.open_nursery() as n:
n.start_soon(sender)
n.start_soon(reader)

await read.aclose()
await write.aclose()


async def test_send_on_closed_pipe():
write, read = await make_pipe()
await write.aclose()

with pytest.raises(_core.ClosedResourceError):
await write.send_all(b"123")

await read.aclose()


async def test_pipe_errors():
with pytest.raises(TypeError):
PipeReceiveStream(None)

with pytest.raises(ValueError):
await PipeReceiveStream(0).receive_some(0)


async def test_del():
w, r = await make_pipe()
f1, f2 = w.fileno(), r.fileno()
del w, r
gc_collect_harder()

with pytest.raises(OSError) as excinfo:
os.close(f1)
assert excinfo.value.errno == errno.EBADF

with pytest.raises(OSError) as excinfo:
os.close(f2)
assert excinfo.value.errno == errno.EBADF


async def test_async_with():
w, r = await make_pipe()
async with w, r:
pass

assert w._closed
assert r._closed

with pytest.raises(OSError) as excinfo:
os.close(w.fileno())
assert excinfo.value.errno == errno.EBADF

with pytest.raises(OSError) as excinfo:
os.close(r.fileno())
assert excinfo.value.errno == errno.EBADF


async def make_clogged_pipe():
s, r = await make_pipe()
try:
while True:
# We want to totally fill up the pipe buffer.
# This requires working around a weird feature that POSIX pipes
# have.
# If you do a write of <= PIPE_BUF bytes, then it's guaranteed
# to either complete entirely, or not at all. So if we tried to
# write PIPE_BUF bytes, and the buffer's free space is only
# PIPE_BUF/2, then the write will raise BlockingIOError... even
# though a smaller write could still succeed! To avoid this,
# make sure to write >PIPE_BUF bytes each time, which disables
# the special behavior.
# For details, search for PIPE_BUF here:
# http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html

# for the getattr:
# https://bitbucket.org/pypy/pypy/issues/2876/selectpipe_buf-is-missing-on-pypy3
buf_size = getattr(select, "PIPE_BUF", 8192)
os.write(s.fileno(), b"x" * buf_size * 2)
except BlockingIOError:
pass
return s, r


async def test_pipe_fully():
await check_one_way_stream(make_pipe, make_clogged_pipe)