Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions trio/_unix_pipes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import os
import errno
from typing_extensions import Final as FinalType

from ._abc import Stream
from ._util import ConflictDetector, Final
Expand All @@ -13,7 +16,7 @@

# XX TODO: is this a good number? who knows... it does match the default Linux
# pipe capacity though.
DEFAULT_RECEIVE_SIZE = 65536
DEFAULT_RECEIVE_SIZE: FinalType = 65536


class _FdHolder:
Expand All @@ -34,7 +37,9 @@ class _FdHolder:
# impossible to make this mistake – we'll just get an EBADF.
#
# (This trick was copied from the stdlib socket module.)
def __init__(self, fd: int):
fd: int

def __init__(self, fd: int) -> None:
# make sure self.fd is always initialized to *something*, because even
# if we error out here then __del__ will run and access it.
self.fd = -1
Expand All @@ -46,10 +51,10 @@ def __init__(self, fd: int):
os.set_blocking(fd, False)

@property
def closed(self):
def closed(self) -> bool:
return self.fd == -1

def _raw_close(self):
def _raw_close(self) -> None:
# This doesn't assume it's in a Trio context, so it can be called from
# __del__. You should never call it from Trio context, because it
# skips calling notify_fd_close. But from __del__, skipping that is
Expand All @@ -64,10 +69,10 @@ def _raw_close(self):
os.set_blocking(fd, self._original_is_blocking)
os.close(fd)

def __del__(self):
def __del__(self) -> None:
self._raw_close()

def close(self):
def close(self) -> None:
if not self.closed:
trio.lowlevel.notify_closing(self.fd)
self._raw_close()
Expand All @@ -93,7 +98,7 @@ class FdStream(Stream, metaclass=Final):
thrust upon them. For example, you can use
``FdStream(os.dup(sys.stdin.fileno()))`` to obtain a stream for reading
from standard input, but it is only safe to do so with heavy caveats: your
stdin must not be shared by any other processes and you must not make any
stdin must not be shared by any other processes, and you must not make any
calls to synchronous methods of `sys.stdin` until the stream returned by
`FdStream` is closed. See `issue #174
<https://github.com/python-trio/trio/issues/174>`__ for a discussion of the
Expand All @@ -106,7 +111,7 @@ class FdStream(Stream, metaclass=Final):
A new `FdStream` object.
"""

def __init__(self, fd: int):
def __init__(self, fd: int) -> None:
self._fd_holder = _FdHolder(fd)
self._send_conflict_detector = ConflictDetector(
"another task is using this stream for send"
Expand All @@ -115,7 +120,7 @@ def __init__(self, fd: int):
"another task is using this stream for receive"
)

async def send_all(self, data: bytes):
async def send_all(self, data: bytes) -> None:
with self._send_conflict_detector:
# have to check up front, because send_all(b"") on a closed pipe
# should raise
Expand Down Expand Up @@ -151,7 +156,7 @@ async def wait_send_all_might_not_block(self) -> None:
# of sending, which is annoying
raise trio.BrokenResourceError from e

async def receive_some(self, max_bytes=None) -> bytes:
async def receive_some(self, max_bytes: int | None = None) -> bytes:
with self._receive_conflict_detector:
if max_bytes is None:
max_bytes = DEFAULT_RECEIVE_SIZE
Expand Down Expand Up @@ -179,12 +184,12 @@ async def receive_some(self, max_bytes=None) -> bytes:

return data

def close(self):
def close(self) -> None:
self._fd_holder.close()

async def aclose(self):
async def aclose(self) -> None:
self.close()
await trio.lowlevel.checkpoint()

def fileno(self):
def fileno(self) -> int:
return self._fd_holder.fd