Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ include LICENSE LICENSE.MIT LICENSE.APACHE2
include README.rst
include CODE_OF_CONDUCT.md CONTRIBUTING.md
include test-requirements.txt
include trio/py.typed
recursive-include trio/tests/test_ssl_certs *.pem
recursive-include docs *
prune docs/build
26 changes: 23 additions & 3 deletions trio/_core/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
from contextvars import copy_context
from math import inf
from time import perf_counter
from typing import Callable, TYPE_CHECKING
from typing import Any, Awaitable, Callable, TYPE_CHECKING, TypeVar

if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec

from sniffio import current_async_library_cvar

Expand Down Expand Up @@ -801,7 +806,7 @@ class NurseryManager:
"""

@enable_ki_protection
async def __aenter__(self):
async def __aenter__(self) -> "Nursery":
self._scope = CancelScope()
self._scope.__enter__()
self._nursery = Nursery._create(current_task(), self._scope)
Expand Down Expand Up @@ -840,7 +845,7 @@ def __exit__(self): # pragma: no cover
assert False, """Never called, but should be defined"""


def open_nursery():
def open_nursery() -> NurseryManager:
"""Returns an async context manager which must be used to create a
new `Nursery`.

Expand All @@ -851,6 +856,10 @@ def open_nursery():
return NurseryManager()


T_Retval = TypeVar("T_Retval")
T_ParamSpec = ParamSpec("T_ParamSpec")


class Nursery(metaclass=NoPublicConstructor):
"""A context which may be used to spawn (or cancel) child tasks.

Expand Down Expand Up @@ -957,6 +966,17 @@ def aborted(raise_cancel):
# (see test_nursery_cancel_doesnt_create_cyclic_garbage)
del self._pending_excs

def soonify(
self,
async_fn: Callable[T_ParamSpec, Awaitable[Any]],
name: str = None,
) -> Callable[T_ParamSpec, None]:
def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> None:
partial_f = functools.partial(async_fn, *args, **kwargs)
self.start_soon(partial_f, name=name)

return wrapper

def start_soon(self, async_fn, *args, name=None):
"""Creates a child task, scheduling ``await async_fn(*args)``.

Expand Down
39 changes: 39 additions & 0 deletions trio/_threads.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
# coding: utf-8

import functools
import threading
import queue as stdlib_queue
from itertools import count
from typing import Awaitable, TypeVar, Callable, Optional
import sys

import attr
import inspect
import outcome

import trio

if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec

from ._sync import CapacityLimiter
from ._core import (
enable_ki_protection,
Expand Down Expand Up @@ -55,6 +63,27 @@ class ThreadPlaceholder:
name = attr.ib()


T_Retval = TypeVar("T_Retval")
T_ParamSpec = ParamSpec("T_ParamSpec")


def to_thread_asyncify(
sync_fn: Callable[T_ParamSpec, T_Retval],
*,
cancellable: bool = False,
limiter: Optional[CapacityLimiter] = None,
) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
async def wrapper(
*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
) -> T_Retval:
partial_f = functools.partial(sync_fn, *args, **kwargs)
return await to_thread_run_sync(
partial_f, cancellable=cancellable, limiter=limiter
)

return wrapper


@enable_ki_protection
async def to_thread_run_sync(sync_fn, *args, cancellable=False, limiter=None):
"""Convert a blocking operation into an async operation using a thread.
Expand Down Expand Up @@ -238,6 +267,16 @@ def _run_fn_as_system_task(cb, fn, *args, trio_token=None):
return q.get().unwrap()


def from_thread_syncify(
afn: Callable[T_ParamSpec, Awaitable[T_Retval]],
) -> Callable[T_ParamSpec, T_Retval]:
def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval:
partial_f = functools.partial(afn, *args, **kwargs)
return from_thread_run(partial_f)

return wrapper


def from_thread_run(afn, *args, trio_token=None):
"""Run the given async function in the parent Trio thread, blocking until it
is complete.
Expand Down
1 change: 1 addition & 0 deletions trio/from_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
"""

from ._threads import from_thread_run as run
from ._threads import from_thread_syncify as syncify
from ._threads import from_thread_run_sync as run_sync
Empty file added trio/py.typed
Empty file.
1 change: 1 addition & 0 deletions trio/to_thread.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from ._threads import to_thread_run_sync as run_sync
from ._threads import to_thread_asyncify as asyncify
from ._threads import current_default_thread_limiter