Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
("py:class", "types.FrameType"),
("py:class", "P.args"),
("py:class", "P.kwargs"),
("py:class", "RetT"),
# TODO: figure out if you can link this to SSL
("py:class", "Context"),
# TODO: temporary type
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ module = [
"trio._core._entry_queue",
"trio._core._local",
"trio._core._unbounded_queue",
"trio._core._thread_cache",
"trio._deprecate",
"trio._dtls",
"trio._file_io",
Expand Down
52 changes: 36 additions & 16 deletions trio/_core/_thread_cache.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from __future__ import annotations

import ctypes
import ctypes.util
import sys
import traceback
from functools import partial
from itertools import count
from threading import Lock, Thread
from typing import Callable, Optional, Tuple
from typing import Any, Callable, Generic, TypeVar

import outcome

RetT = TypeVar("RetT")


def _to_os_thread_name(name: str) -> bytes:
# ctypes handles the trailing \00
Expand All @@ -17,18 +21,20 @@ def _to_os_thread_name(name: str) -> bytes:

# used to construct the method used to set os thread name, or None, depending on platform.
# called once on import
def get_os_thread_name_func() -> Optional[Callable[[Optional[int], str], None]]:
def namefunc(setname: Callable[[int, bytes], int], ident: Optional[int], name: str):
def get_os_thread_name_func() -> Callable[[int | None, str], None] | None:
def namefunc(
setname: Callable[[int, bytes], int], ident: int | None, name: str
) -> None:
# Thread.ident is None "if it has not been started". Unclear if that can happen
# with current usage.
if ident is not None: # pragma: no cover
setname(ident, _to_os_thread_name(name))

# namefunc on mac also takes an ident, even if pthread_setname_np doesn't/can't use it
# namefunc on Mac also takes an ident, even if pthread_setname_np doesn't/can't use it
# so the caller don't need to care about platform.
def darwin_namefunc(
setname: Callable[[bytes], int], ident: Optional[int], name: str
):
setname: Callable[[bytes], int], ident: int | None, name: str
) -> None:
# I don't know if Mac can rename threads that hasn't been started, but default
# to no to be on the safe side.
if ident is not None: # pragma: no cover
Expand Down Expand Up @@ -110,9 +116,13 @@ def darwin_namefunc(
name_counter = count()


class WorkerThread:
def __init__(self, thread_cache):
self._job: Optional[Tuple[Callable, Callable, str]] = None
class WorkerThread(Generic[RetT]):
def __init__(self, thread_cache: ThreadCache) -> None:
self._job: tuple[ # type: ignore[no-any-unimported]
Callable[[], RetT],
Callable[[outcome.Outcome[RetT]], object],
str | None,
] | None = None
self._thread_cache = thread_cache
# This Lock is used in an unconventional way.
#
Expand All @@ -130,7 +140,7 @@ def __init__(self, thread_cache):
set_os_thread_name(self._thread.ident, self._default_name)
self._thread.start()

def _handle_job(self):
def _handle_job(self) -> None:
# Handle job in a separate method to ensure user-created
# objects are cleaned up in a consistent manner.
assert self._job is not None
Expand Down Expand Up @@ -161,7 +171,7 @@ def _handle_job(self):
print("Exception while delivering result of thread", file=sys.stderr)
traceback.print_exception(type(e), e, e.__traceback__)

def _work(self):
def _work(self) -> None:
while True:
if self._worker_lock.acquire(timeout=IDLE_TIMEOUT):
# We got a job
Expand All @@ -185,10 +195,16 @@ def _work(self):


class ThreadCache:
def __init__(self):
self._idle_workers = {}

def start_thread_soon(self, fn, deliver, name: Optional[str] = None):
def __init__(self) -> None:
self._idle_workers: dict[WorkerThread[Any], None] = {}

def start_thread_soon( # type: ignore[no-any-unimported]
self,
fn: Callable[[], RetT],
deliver: Callable[[outcome.Outcome[RetT]], object],
name: str | None = None,
) -> None:
worker: WorkerThread[RetT]
try:
worker, _ = self._idle_workers.popitem()
except KeyError:
Expand All @@ -200,7 +216,11 @@ def start_thread_soon(self, fn, deliver, name: Optional[str] = None):
THREAD_CACHE = ThreadCache()


def start_thread_soon(fn, deliver, name: Optional[str] = None):
def start_thread_soon( # type: ignore[no-any-unimported]
fn: Callable[[], RetT],
deliver: Callable[[outcome.Outcome[RetT]], object],
name: str | None = None,
) -> None:
"""Runs ``deliver(outcome.capture(fn))`` in a worker thread.

Generally ``fn`` does some blocking work, and ``deliver`` delivers the
Expand Down
7 changes: 3 additions & 4 deletions trio/_tests/verify_types.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
"warningCount": 0
},
"typeCompleteness": {
"completenessScore": 0.9186602870813397,
"completenessScore": 0.9202551834130781,
"exportedSymbolCounts": {
"withAmbiguousType": 0,
"withKnownType": 576,
"withUnknownType": 51
"withKnownType": 577,
"withUnknownType": 50
},
"ignoreUnknownTypesFromImports": true,
"missingClassDocStringCount": 1,
Expand Down Expand Up @@ -101,7 +101,6 @@
"trio.lowlevel.reschedule",
"trio.lowlevel.spawn_system_task",
"trio.lowlevel.start_guest_run",
"trio.lowlevel.start_thread_soon",
"trio.lowlevel.temporarily_detach_coroutine_object",
"trio.lowlevel.wait_readable",
"trio.lowlevel.wait_writable",
Expand Down