diff --git a/trio/_tests/test_threads.py b/trio/_tests/test_threads.py index 84409ae83c..4595b5867a 100644 --- a/trio/_tests/test_threads.py +++ b/trio/_tests/test_threads.py @@ -13,9 +13,17 @@ import pytest import sniffio -from .. import CapacityLimiter, Event, _core, fail_after, sleep, sleep_forever +from .. import ( + CapacityLimiter, + Event, + _core, + fail_after, + move_on_after, + sleep, + sleep_forever, +) from .._core._tests.test_ki import ki_self -from .._core._tests.tutil import buggy_pypy_asyncgens +from .._core._tests.tutil import buggy_pypy_asyncgens, slow from .._threads import ( current_default_thread_limiter, from_thread_check_cancelled, @@ -1015,3 +1023,19 @@ async def test_from_thread_check_cancelled_raises_in_foreign_threads(): _core.start_thread_soon(from_thread_check_cancelled, lambda _: q.put(_)) with pytest.raises(RuntimeError): q.get(timeout=1).unwrap() + + +@slow +async def test_reentry_doesnt_deadlock(): + # Regression test for issue noticed in GH-2827 + # The failure mode is to hang the whole test suite, unfortunately. + # XXX consider running this in a subprocess with a timeout, if it comes up again! + + async def child() -> None: + while True: + await to_thread_run_sync(from_thread_run, sleep, 0, cancellable=False) + + with move_on_after(2): + async with _core.open_nursery() as nursery: + for _ in range(4): + nursery.start_soon(child) diff --git a/trio/_threads.py b/trio/_threads.py index d44e4b68f8..023b1c198e 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -93,13 +93,11 @@ async def run(self) -> None: task = trio.lowlevel.current_task() old_context = task.context task.context = self.context.copy() - try: - await trio.lowlevel.cancel_shielded_checkpoint() - result = await outcome.acapture(self.unprotected_afn) - self.queue.put_nowait(result) - finally: - task.context = old_context - await trio.lowlevel.cancel_shielded_checkpoint() + await trio.lowlevel.cancel_shielded_checkpoint() + result = await outcome.acapture(self.unprotected_afn) + task.context = old_context + await trio.lowlevel.cancel_shielded_checkpoint() + self.queue.put_nowait(result) async def run_system(self) -> None: result = await outcome.acapture(self.unprotected_afn)