From fc1d6574e644cd4897cbff91ef8ef20503a8bcec Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Tue, 21 Aug 2018 23:37:42 -0700 Subject: [PATCH 1/4] New signal API: trio.open_signal_receiver Fixes gh-354 Other changes: - deprecate trio.catch_signal - fix a few small edge-cases I noticed along the way --- docs/source/reference-io.rst | 4 +- newsfragments/354.feature.rst | 1 + newsfragments/354.removal.rst | 8 ++ trio/_signals.py | 94 ++++++++++++++-------- trio/tests/test_signals.py | 146 +++++++++++++++++++++++++--------- 5 files changed, 178 insertions(+), 75 deletions(-) create mode 100644 newsfragments/354.feature.rst create mode 100644 newsfragments/354.removal.rst diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index 40cea23e90..a6f09150e7 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -638,5 +638,5 @@ Signals .. currentmodule:: trio -.. autofunction:: catch_signals - :with: batched_signal_aiter +.. autofunction:: open_signal_receiver + :with: signal_aiter diff --git a/newsfragments/354.feature.rst b/newsfragments/354.feature.rst new file mode 100644 index 0000000000..c47c9e210d --- /dev/null +++ b/newsfragments/354.feature.rst @@ -0,0 +1 @@ +New and improved signal catching API: :func:`open_signal_receiver`. diff --git a/newsfragments/354.removal.rst b/newsfragments/354.removal.rst new file mode 100644 index 0000000000..ca0c9c2ab8 --- /dev/null +++ b/newsfragments/354.removal.rst @@ -0,0 +1,8 @@ +``trio.signal_catcher`` has been deprecated in favor of +:func:`open_signal_receiver`. The main differences are: + +- it takes \*-args now to specify the list of signals (so + ``open_signal_receiver(SIGINT)`` instead of + ``signal_catcher({SIGINT})``) +- the async iterator now yields individual signals, instead of + "batches" diff --git a/trio/_signals.py b/trio/_signals.py index b15dc3308a..5e5a6b00d1 100644 --- a/trio/_signals.py +++ b/trio/_signals.py @@ -1,11 +1,15 @@ import signal from contextlib import contextmanager +from collections import OrderedDict from . import _core -from ._sync import Semaphore -from ._util import signal_raise, aiter_compat, is_main_thread +from ._sync import Event +from ._util import ( + signal_raise, aiter_compat, is_main_thread, ConflictDetector +) +from ._deprecate import deprecated -__all__ = ["catch_signals"] +__all__ = ["open_signal_receiver", "catch_signals"] # Discussion of signal handling strategies: # @@ -46,40 +50,43 @@ @contextmanager def _signal_handler(signals, handler): original_handlers = {} - for signum in signals: - original_handlers[signum] = signal.signal(signum, handler) try: + for signum in set(signals): + original_handlers[signum] = signal.signal(signum, handler) yield finally: for signum, original_handler in original_handlers.items(): signal.signal(signum, original_handler) -class SignalQueue: +class SignalReceiver: def __init__(self): - self._semaphore = Semaphore(0, max_value=1) - self._pending = set() + # {signal num: None} + self._pending = OrderedDict() + self._have_pending = Event() + self._conflict_detector = ConflictDetector( + "only one task can iterate on a signal receiver at a time" + ) self._closed = False def _add(self, signum): if self._closed: signal_raise(signum) else: - if not self._pending: - self._semaphore.release() - self._pending.add(signum) + self._pending[signum] = None + self._have_pending.set() def _redeliver_remaining(self): # First make sure that any signals still in the delivery pipeline will # get redelivered self._closed = True - # And then redeliver any that are sitting in pending. This is doen + # And then redeliver any that are sitting in pending. This is done # using a weird recursive construct to make sure we process everything # even if some of the handlers raise exceptions. def deliver_next(): if self._pending: - signum = self._pending.pop() + signum, _ = self._pending.popitem(last=False) try: signal_raise(signum) finally: @@ -93,24 +100,26 @@ def __aiter__(self): async def __anext__(self): if self._closed: - raise RuntimeError("catch_signals block exited") - await self._semaphore.acquire() - assert self._pending - pending = set(self._pending) - self._pending.clear() - return pending + raise RuntimeError("open_signal_receiver block already exited") + # In principle it would be possible to support multiple concurrent + # calls to __anext__, but doing it without race conditions is quite + # tricky, and there doesn't seem to be any point in trying. + with self._conflict_detector.sync: + await self._have_pending.wait() + signum, _ = self._pending.popitem(last=False) + if not self._pending: + self._have_pending.clear() + return signum @contextmanager -def catch_signals(signals): +def open_signal_receiver(*signals): """A context manager for catching signals. Entering this context manager starts listening for the given signals and returns an async iterator; exiting the context manager stops listening. - The async iterator blocks until at least one signal has arrived, and then - yields a :class:`set` containing all of the signals that were received - since the last iteration. + The async iterator blocks until a signal arrives, and then yields it. Note that if you leave the ``with`` block while the iterator has unextracted signals still pending inside it, then they will be @@ -119,7 +128,7 @@ def catch_signals(signals): block. Args: - signals: a set of signals to listen for. + signals: the signals to listen for. Raises: RuntimeError: if you try to use this anywhere except Python's main @@ -129,25 +138,21 @@ def catch_signals(signals): A common convention for Unix daemons is that they should reload their configuration when they receive a ``SIGHUP``. Here's a sketch of what - that might look like using :func:`catch_signals`:: + that might look like using :func:`open_signal_receiver`:: - with trio.catch_signals({signal.SIGHUP}) as batched_signal_aiter: - async for batch in batched_signal_aiter: - # We're only listening for one signal, so the batch is always - # {signal.SIGHUP}, but if we were listening to more signals - # then it could vary. - for signum in batch: - assert signum == signal.SIGHUP - reload_configuration() + with trio.open_signal_receiver(signal.SIGHUP) as signal_aiter: + async for signum in signal_aiter: + assert signum == signal.SIGHUP + reload_configuration() """ if not is_main_thread(): raise RuntimeError( - "Sorry, catch_signals is only possible when running in the " + "Sorry, open_signal_receiver is only possible when running in " "Python interpreter's main thread" ) token = _core.current_trio_token() - queue = SignalQueue() + queue = SignalReceiver() def handler(signum, _): token.run_sync_soon(queue._add, signum, idempotent=True) @@ -157,3 +162,22 @@ def handler(signum, _): yield queue finally: queue._redeliver_remaining() + + +class CompatSignalQueue: + def __init__(self, signal_queue): + self._signal_queue = signal_queue + + @aiter_compat + def __aiter__(self): + return self + + async def __anext__(self): + return { await self._signal_queue.__anext__()} + + +@deprecated("0.7.0", issue=354, instead=open_signal_receiver) +@contextmanager +def catch_signals(signals): + with open_signal_receiver(*signals) as signal_queue: + yield CompatSignalQueue(signal_queue) diff --git a/trio/tests/test_signals.py b/trio/tests/test_signals.py index d9b1db2d56..4b7b09f4df 100644 --- a/trio/tests/test_signals.py +++ b/trio/tests/test_signals.py @@ -1,17 +1,16 @@ import os import signal -import threading -import queue as stdlib_queue import pytest +import trio from .. import _core from .._util import signal_raise -from .._signals import catch_signals, _signal_handler -from .._sync import Event +from .._signals import catch_signals, open_signal_receiver, _signal_handler -async def test_catch_signals(): +# Delete when catch_signals is removed +async def test_catch_signals(recwarn): print = lambda *args: None orig = signal.getsignal(signal.SIGILL) print(orig) @@ -35,23 +34,98 @@ async def test_catch_signals(): assert signal.getsignal(signal.SIGILL) is orig -def test_catch_signals_wrong_thread(): - threadqueue = stdlib_queue.Queue() +async def test_open_signal_receiver(): + orig = signal.getsignal(signal.SIGILL) + with open_signal_receiver(signal.SIGILL) as receiver: + # Raise it a few times, to exercise signal coalescing, both at the + # call_soon level and at the SignalQueue level + signal_raise(signal.SIGILL) + signal_raise(signal.SIGILL) + await _core.wait_all_tasks_blocked() + signal_raise(signal.SIGILL) + await _core.wait_all_tasks_blocked() + async for signum in receiver: # pragma: no branch + assert signum == signal.SIGILL + break + signal_raise(signal.SIGILL) + async for signum in receiver: # pragma: no branch + assert signum == signal.SIGILL + break + with pytest.raises(RuntimeError): + await receiver.__anext__() + assert signal.getsignal(signal.SIGILL) is orig + + +async def test_open_signal_receiver_restore_handler_after_one_bad_signal(): + orig = signal.getsignal(signal.SIGILL) + with pytest.raises(ValueError): + with open_signal_receiver(signal.SIGILL, 1234567): + pass # pragma: no cover + # Still restored even if we errored out + assert signal.getsignal(signal.SIGILL) is orig + +async def test_open_signal_receiver_restore_handler_after_duplicate_signal(): + orig = signal.getsignal(signal.SIGILL) + with open_signal_receiver(signal.SIGILL, signal.SIGILL): + pass + # Still restored correctly + assert signal.getsignal(signal.SIGILL) is orig + + +async def test_catch_signals_wrong_thread(): async def naughty(): - try: - with catch_signals([signal.SIGINT]) as _: - pass # pragma: no cover - except Exception as exc: - threadqueue.put(exc) - else: # pragma: no cover - threadqueue.put(None) + with open_signal_receiver(signal.SIGINT): + pass # pragma: no cover + + with pytest.raises(RuntimeError): + await trio.run_sync_in_worker_thread(trio.run, naughty) + - thread = threading.Thread(target=_core.run, args=(naughty,)) - thread.start() - thread.join() - exc = threadqueue.get_nowait() - assert type(exc) is RuntimeError +async def test_open_signal_receiver_conflict(): + with pytest.raises(trio.ResourceBusyError): + with open_signal_receiver(signal.SIGILL) as receiver: + async with trio.open_nursery() as nursery: + nursery.start_soon(receiver.__anext__) + nursery.start_soon(receiver.__anext__) + + +# Blocks until all previous calls to run_sync_soon(idempotent=True) have been +# processed. +async def wait_run_sync_soon_idempotent_queue_barrier(): + ev = trio.Event() + token = _core.current_trio_token() + token.run_sync_soon(ev.set, idempotent=True) + await ev.wait() + + +async def test_open_signal_receiver_no_starvation(): + # Set up a situation where there are always 2 pending signals available to + # report, and make sure that instead of getting the same signal reported + # over and over, it alternates between reporting both of them. + with open_signal_receiver(signal.SIGILL, signal.SIGFPE) as receiver: + try: + print(signal.getsignal(signal.SIGILL)) + previous = None + for _ in range(10): + signal_raise(signal.SIGILL) + signal_raise(signal.SIGFPE) + await wait_run_sync_soon_idempotent_queue_barrier() + if previous is None: + previous = await receiver.__anext__() + else: + got = await receiver.__anext__() + assert got in [signal.SIGILL, signal.SIGFPE] + assert got != previous + previous = got + # Clear out the last signal so it doesn't get redelivered + await receiver.__anext__() + except: # pragma: no cover + # If there's an unhandled exception above, then exiting the + # open_signal_receiver block might cause the signal to be + # redelivered and give us a core dump instead of a traceback... + import traceback + traceback.print_exc() async def test_catch_signals_race_condition_on_exit(): @@ -60,20 +134,14 @@ async def test_catch_signals_race_condition_on_exit(): def direct_handler(signo, frame): delivered_directly.add(signo) - async def wait_call_soon_idempotent_queue_barrier(): - ev = Event() - token = _core.current_trio_token() - token.run_sync_soon(ev.set, idempotent=True) - await ev.wait() - print(1) # Test the version where the call_soon *doesn't* have a chance to run # before we exit the with block: with _signal_handler({signal.SIGILL, signal.SIGFPE}, direct_handler): - with catch_signals({signal.SIGILL, signal.SIGFPE}) as queue: + with open_signal_receiver(signal.SIGILL, signal.SIGFPE) as receiver: signal_raise(signal.SIGILL) signal_raise(signal.SIGFPE) - await wait_call_soon_idempotent_queue_barrier() + await wait_run_sync_soon_idempotent_queue_barrier() assert delivered_directly == {signal.SIGILL, signal.SIGFPE} delivered_directly.clear() @@ -81,11 +149,11 @@ async def wait_call_soon_idempotent_queue_barrier(): # Test the version where the call_soon *does* have a chance to run before # we exit the with block: with _signal_handler({signal.SIGILL, signal.SIGFPE}, direct_handler): - with catch_signals({signal.SIGILL, signal.SIGFPE}) as queue: + with open_signal_receiver(signal.SIGILL, signal.SIGFPE) as receiver: signal_raise(signal.SIGILL) signal_raise(signal.SIGFPE) - await wait_call_soon_idempotent_queue_barrier() - assert len(queue._pending) == 2 + await wait_run_sync_soon_idempotent_queue_barrier() + assert len(receiver._pending) == 2 assert delivered_directly == {signal.SIGILL, signal.SIGFPE} delivered_directly.clear() @@ -93,17 +161,17 @@ async def wait_call_soon_idempotent_queue_barrier(): print(3) with _signal_handler({signal.SIGILL}, signal.SIG_IGN): - with catch_signals({signal.SIGILL}) as queue: + with open_signal_receiver(signal.SIGILL) as receiver: signal_raise(signal.SIGILL) - await wait_call_soon_idempotent_queue_barrier() + await wait_run_sync_soon_idempotent_queue_barrier() # test passes if the process reaches this point without dying print(4) with _signal_handler({signal.SIGILL}, signal.SIG_IGN): - with catch_signals({signal.SIGILL}) as queue: + with open_signal_receiver(signal.SIGILL) as receiver: signal_raise(signal.SIGILL) - await wait_call_soon_idempotent_queue_barrier() - assert len(queue._pending) == 1 + await wait_run_sync_soon_idempotent_queue_barrier() + assert len(receiver._pending) == 1 # test passes if the process reaches this point without dying # Check exception chaining if there are multiple exception-raising @@ -113,11 +181,13 @@ def raise_handler(signum, _): with _signal_handler({signal.SIGILL, signal.SIGFPE}, raise_handler): with pytest.raises(RuntimeError) as excinfo: - with catch_signals({signal.SIGILL, signal.SIGFPE}) as queue: + with open_signal_receiver( + signal.SIGILL, signal.SIGFPE + ) as receiver: signal_raise(signal.SIGILL) signal_raise(signal.SIGFPE) - await wait_call_soon_idempotent_queue_barrier() - assert len(queue._pending) == 2 + await wait_run_sync_soon_idempotent_queue_barrier() + assert len(receiver._pending) == 2 exc = excinfo.value signums = {exc.args[0]} assert isinstance(exc.__context__, RuntimeError) From af71cdbceca7a28782187c0dda77c43c00027f5f Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 22 Aug 2018 00:36:16 -0700 Subject: [PATCH 2/4] Fix doc xrefs --- trio/_core/_entry_queue.py | 2 +- trio/_core/_run.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/trio/_core/_entry_queue.py b/trio/_core/_entry_queue.py index 42996b6130..c562de4474 100644 --- a/trio/_core/_entry_queue.py +++ b/trio/_core/_entry_queue.py @@ -133,7 +133,7 @@ class TrioToken: 1. It lets you re-enter the Trio run loop from external threads or signal handlers. This is the low-level primitive that :func:`trio.run_sync_in_worker_thread` uses to receive results from - worker threads, that :func:`trio.catch_signals` uses to receive + worker threads, that :func:`trio.open_signal_receiver` uses to receive notifications about signals, and so forth. 2. Each call to :func:`trio.run` has exactly one associated diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 6c8651ceb8..d86e449406 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1213,7 +1213,7 @@ def run( This setting has no effect if your program has registered a custom SIGINT handler, or if :func:`run` is called from anywhere but the main thread (this is a Python limitation), or if you use - :func:`catch_signals` to catch SIGINT. + :func:`open_signal_receiver` to catch SIGINT. Returns: Whatever ``async_fn`` returns. From b3991b592b45714613fd8207610919d5d7243b3e Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 22 Aug 2018 19:54:26 -0700 Subject: [PATCH 3/4] Slightly cleaner tests of open_signal_receiver coalescing - Add a private test helper to check how many signals are pending - Use this to explicitly test that coalescing has worked - Use this instead of poking at the object's guts directly in existing tests --- trio/_signals.py | 4 ++++ trio/tests/test_signals.py | 11 +++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/trio/_signals.py b/trio/_signals.py index 5e5a6b00d1..8b1e6bea80 100644 --- a/trio/_signals.py +++ b/trio/_signals.py @@ -94,6 +94,10 @@ def deliver_next(): deliver_next() + # Helper for tests, not public or otherwise used + def _pending_signal_count(self): + return len(self._pending) + @aiter_compat def __aiter__(self): return self diff --git a/trio/tests/test_signals.py b/trio/tests/test_signals.py index 4b7b09f4df..148fd234d0 100644 --- a/trio/tests/test_signals.py +++ b/trio/tests/test_signals.py @@ -47,10 +47,12 @@ async def test_open_signal_receiver(): async for signum in receiver: # pragma: no branch assert signum == signal.SIGILL break + assert receiver._pending_signal_count() == 0 signal_raise(signal.SIGILL) async for signum in receiver: # pragma: no branch assert signum == signal.SIGILL break + assert receiver._pending_signal_count() == 0 with pytest.raises(RuntimeError): await receiver.__anext__() assert signal.getsignal(signal.SIGILL) is orig @@ -119,7 +121,8 @@ async def test_open_signal_receiver_no_starvation(): assert got != previous previous = got # Clear out the last signal so it doesn't get redelivered - await receiver.__anext__() + while receiver._pending_signal_count() != 0: + await receiver.__anext__() except: # pragma: no cover # If there's an unhandled exception above, then exiting the # open_signal_receiver block might cause the signal to be @@ -153,7 +156,7 @@ def direct_handler(signo, frame): signal_raise(signal.SIGILL) signal_raise(signal.SIGFPE) await wait_run_sync_soon_idempotent_queue_barrier() - assert len(receiver._pending) == 2 + assert receiver._pending_signal_count() == 2 assert delivered_directly == {signal.SIGILL, signal.SIGFPE} delivered_directly.clear() @@ -171,7 +174,7 @@ def direct_handler(signo, frame): with open_signal_receiver(signal.SIGILL) as receiver: signal_raise(signal.SIGILL) await wait_run_sync_soon_idempotent_queue_barrier() - assert len(receiver._pending) == 1 + assert receiver._pending_signal_count() == 1 # test passes if the process reaches this point without dying # Check exception chaining if there are multiple exception-raising @@ -187,7 +190,7 @@ def raise_handler(signum, _): signal_raise(signal.SIGILL) signal_raise(signal.SIGFPE) await wait_run_sync_soon_idempotent_queue_barrier() - assert len(receiver._pending) == 2 + assert receiver._pending_signal_count() == 2 exc = excinfo.value signums = {exc.args[0]} assert isinstance(exc.__context__, RuntimeError) From fac80e16c89fa5e014c0048f90e416b2a029da6a Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 22 Aug 2018 23:24:20 -0700 Subject: [PATCH 4/4] Call the removed thing by the right name --- newsfragments/354.removal.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/newsfragments/354.removal.rst b/newsfragments/354.removal.rst index ca0c9c2ab8..c325f365b4 100644 --- a/newsfragments/354.removal.rst +++ b/newsfragments/354.removal.rst @@ -1,8 +1,8 @@ -``trio.signal_catcher`` has been deprecated in favor of +``trio.catch_signals`` has been deprecated in favor of :func:`open_signal_receiver`. The main differences are: - it takes \*-args now to specify the list of signals (so ``open_signal_receiver(SIGINT)`` instead of - ``signal_catcher({SIGINT})``) + ``catch_signals({SIGINT})``) - the async iterator now yields individual signals, instead of "batches"