diff --git a/docs/source/conf.py b/docs/source/conf.py index e5c1e28d40..9d5890b491 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -35,10 +35,7 @@ ("py:class", "trio.hazmat.RunLocal"), # trio.abc is documented at random places scattered throughout the docs ("py:mod", "trio.abc"), - # contextvars is added in 3.7, but the docs point to 3.6 - # these two entries can be removed after 3.7 is released - ("py:mod", "contextvars"), - ("py:class", "contextvars.Context"), + ("py:class", "math.inf"), ] autodoc_inherit_docstrings = False @@ -70,7 +67,7 @@ def setup(app): intersphinx_mapping = { "python": ('https://docs.python.org/3', None), - "outcome": ('https://outcome.readthedocs.org/en/latest/', None), + "outcome": ('https://outcome.readthedocs.io/en/latest/', None), } autodoc_member_order = "bysource" diff --git a/docs/source/design.rst b/docs/source/design.rst index 699aa16107..07ea830753 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -413,11 +413,6 @@ Specific style guidelines and the ``nowait`` version raises :exc:`trio.WouldBlock` if it would block. -* The word ``monitor`` is used for APIs that involve an - :class:`trio.hazmat.UnboundedQueue` receiving some kind of events. - (Examples: nursery ``.monitor`` attribute, some of the low-level I/O - functions in :mod:`trio.hazmat`.) - * ...we should, but currently don't, have a solid convention to distinguish between functions that take an async callable and those that take a sync callable. See `issue #68 @@ -447,7 +442,7 @@ strategy is to make sure that it's possible for independent packages to add new features on top of trio. Enforcing the ``trio`` vs ``trio._core`` split is a way of `eating our own dogfood `__: basic -functionality like :class:`trio.Queue` and :mod:`trio.socket` is +functionality like :class:`trio.Lock` and :mod:`trio.socket` is actually implemented solely in terms of public APIs. And the hope is that by doing this, we increase the chances that someone who comes up with a better kind of queue or wants to add some new functionality diff --git a/docs/source/history.rst b/docs/source/history.rst index 32a6d18536..2c87e50dbe 100644 --- a/docs/source/history.rst +++ b/docs/source/history.rst @@ -139,7 +139,7 @@ Features the creator's :mod:`contextvars` context, instead using one created at :func:`~trio.run`. (`#289 `__) -- Add support for :class:`trio.Queue` with `capacity=0`. Queue's implementation +- Add support for ``trio.Queue`` with ``capacity=0``. Queue's implementation is also faster now. (`#473 `__) - Switch to using standalone `Outcome @@ -397,7 +397,7 @@ Upcoming breaking changes with warnings (i.e., stuff that in 0.2.0 See `#68 `__ for details. -* :class:`trio.Queue`\'s ``join`` and ``task_done`` methods are +* ``trio.Queue``\'s ``join`` and ``task_done`` methods are deprecated without replacement (`#321 `__) @@ -424,7 +424,7 @@ Upcoming breaking changes with warnings (i.e., stuff that in 0.2.0 * ``trio.Result`` → ``trio.hazmat.Result`` * ``trio.Value`` → ``trio.hazmat.Value`` * ``trio.Error`` → ``trio.hazmat.Error`` - * ``trio.UnboundedQueue`` → :class:`trio.hazmat.UnboundedQueue` + * ``trio.UnboundedQueue`` → ``trio.hazmat.UnboundedQueue`` In addition, several introspection attributes are being renamed: diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index ae6bbedc32..5b2dc82741 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -830,23 +830,23 @@ finishes first:: if not async_fns: raise ValueError("must pass at least one argument") - q = trio.Queue(1) + send_channel, receive_channel = trio.open_memory_channel(0) async def jockey(async_fn): - await q.put(await async_fn()) + await send_channel.send(await async_fn()) async with trio.open_nursery() as nursery: for async_fn in async_fns: nursery.start_soon(jockey, async_fn) - winner = await q.get() + winner = await receive_channel.receive() nursery.cancel_scope.cancel() return winner This works by starting a set of tasks which each try to run their function, and then report back the value it returns. The main task -uses ``q.get()`` to wait for one to finish; as soon as the first task -crosses the finish line, it cancels the rest, and then returns the -winning value. +uses ``receive_channel.receive`` to wait for one to finish; as soon as +the first task crosses the finish line, it cancels the rest, and then +returns the winning value. Here if one or more of the racing functions raises an unhandled exception then Trio's normal handling kicks in: it cancels the others @@ -1191,11 +1191,11 @@ In trio, we standardize on the following conventions: :mod:`threading`.) We like this approach because it allows us to make the blocking version async and the non-blocking version sync. -* When a non-blocking method cannot succeed (the queue is empty, the - lock is already held, etc.), then it raises - :exc:`trio.WouldBlock`. There's no equivalent to the - :exc:`queue.Empty` versus :exc:`queue.Full` distinction – we just - have the one exception that we use consistently. +* When a non-blocking method cannot succeed (the channel is empty, the + lock is already held, etc.), then it raises :exc:`trio.WouldBlock`. + There's no equivalent to the :exc:`queue.Empty` versus + :exc:`queue.Full` distinction – we just have the one exception that + we use consistently. Fairness @@ -1245,64 +1245,326 @@ Broadcasting an event with :class:`Event` :members: -.. _queue: +.. _channels: -Passing messages with :class:`Queue` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Using channels to pass values between tasks +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -You can use :class:`Queue` objects to safely pass objects between -tasks. Trio :class:`Queue` objects always have a bounded size. Here's -a toy example to demonstrate why this is important. Suppose we have a -queue with two producers and one consumer:: +*Channels* allow you to safely and conveniently send objects between +different tasks. They're particularly useful for implementing +producer/consumer patterns. - async def producer(queue): - while True: - await queue.put(1) +The channel API is defined by the abstract base classes +:class:`trio.abc.SendChannel` and :class:`trio.abc.ReceiveChannel`. +You can use these to implement your own custom channels, that do +things like pass objects between processes or over the network. But in +many cases, you just want to pass objects between different tasks +inside a single process, and for that you can use +:func:`trio.open_memory_channel`: - async def consumer(queue): - while True: - print(await queue.get()) +.. autofunction:: open_memory_channel - async def main(): - # This example won't work with Trio's actual Queue class, so - # imagine we have some sort of platonic ideal of an unbounded - # queue here: - queue = trio.HypotheticalQueue() - async with trio.open_nursery() as nursery: - # Two producers - nursery.start_soon(producer, queue) - nursery.start_soon(producer, queue) - # One consumer - nursery.start_soon(consumer, queue) +.. note:: If you've used the :mod:`threading` or :mod:`asyncio` + modules, you may be familiar with :class:`queue.Queue` or + :class:`asyncio.Queue`. In Trio, :func:`open_memory_channel` is + what you use when you're looking for a queue. The main difference + is that Trio splits the classic queue interface up into two + objects. The advantage of this is that it makes it possible to put + the two ends in different processes, and that we can close the two + sides separately. - trio.run(main) -If we naively cycle between these three tasks in round-robin style, -then we put an item, then put an item, then get an item, then put an -item, then put an item, then get an item, ... and since on each cycle -we add two items to the queue but only remove one, then over time the -queue size grows arbitrarily large, our latency is terrible, we run -out of memory, it's just generally bad news all around. - -By placing an upper bound on our queue's size, we avoid this problem. -If the queue gets too big, then it applies *backpressure*: ``put`` -blocks and forces the producers to slow down and wait until the -consumer calls ``get``. - -You can also create a :class:`Queue` with size 0. In that case any -task that calls ``put`` on the queue will wait until another task -calls ``get`` on the same queue, and vice versa. This is similar to -the behavior of `channels as described in the CSP model -`__. - -.. autoclass:: Queue - :members: +A simple channel example +++++++++++++++++++++++++ + +Here's a simple example of how to use channels: + +.. literalinclude:: reference-core/channels-simple.py + +If you run this, it prints: + +.. code-block:: none + + got value "message 0" + got value "message 1" + got value "message 2" + +And then it hangs forever. (Use control-C to quit.) + + +.. _channel-shutdown: + +Clean shutdown with channels +++++++++++++++++++++++++++++ + +Of course we don't generally like it when programs hang. What +happened? The problem is that the producer sent 3 messages and then +exited, but the consumer has no way to tell that the producer is gone: +for all it knows, another message might be coming along any moment. So +it hangs forever waiting for the 4th message. + +Here's a new version that fixes this: it produces the same output as +the previous version, and then exits cleanly. The only change is the +addition of ``async with`` blocks inside the producer and consumer: + +.. literalinclude:: reference-core/channels-shutdown.py + :emphasize-lines: 10,15 + +The really important thing here is the producer's ``async with`` . +When the producer exits, this closes the ``send_channel``, and that +tells the consumer that no more messages are coming, so it can cleanly +exit its ``async for`` loop. Then the program shuts down because both +tasks have exited. + +We also added an ``async with`` to the consumer. This isn't as +important, but can it help us catch mistakes or other problems. For +example, suppose that the consumer exited early for some reason – +maybe because of a bug. Then the producer would be sending messages +into the void, and might get stuck indefinitely. But, if the consumer +closes its ``receive_channel``, then the producer will get a +:exc:`BrokenResourceError` to alert it that it should stop sending +messages because no-one is listening. + +If you want to see the effect of the consumer exiting early, try +adding a ``break`` statement to the ``async for`` loop – you should +see a :exc:`BrokenResourceError` from the producer. + + +.. _channel-mpmc: + +Managing multiple producers and/or multiple consumers ++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +You can also have multiple producers, and multiple consumers, all +sharing the same channel. However, this makes shutdown a little more +complicated. + +For example, consider this naive extension of our previous example, +now with two producers and two consumers: + +.. literalinclude:: reference-core/channels-mpmc-broken.py + +The two producers, A and B, send 3 messages apiece. These are then +randomly distributed between the two producers, X and Y. So we're +hoping to see some output like: + +.. code-block:: none + + consumer Y got value '0 from producer B' + consumer X got value '0 from producer A' + consumer Y got value '1 from producer A' + consumer Y got value '1 from producer B' + consumer X got value '2 from producer B' + consumer X got value '2 from producer A' + +However, on most runs, that's not what happens – the first part of the +output is OK, and then when we get to the end the program crashes with +:exc:`ClosedResourceError`. If you run the program a few times, you'll +see that sometimes the traceback shows ``send`` crashing, and other +times it shows ``receive`` crashing, and you might even find that on +some runs it doesn't crash at all. + +Here's what's happening: suppose that producer A finishes first. It +exits, and its ``async with`` block closes the ``send_channel``. But +wait! Producer B was still using that ``send_channel``... so the next +time B calls ``send``, it gets a :exc:`ClosedResourceError`. + +Sometimes, though if we're lucky, the two producers might finish at +the same time (or close enough), so they both make their last ``send`` +before either of them closes the ``send_channel``. + +But, even if that happens, we're not out of the woods yet! After the +producers exit, the two consumers race to be the first to notice that +the ``send_channel`` has closed. Suppose that X wins the race. It +exits its ``async for`` loop, then exits the ``async with`` block... +and closes the ``receive_channel``, while Y is still using it. Again, +this causes a crash. + +We could avoid this by using some complicated bookkeeping to make sure +that only the *last* producer and the *last* consumer close their +channel endpoints... but that would be tiresome and fragile. +Fortunately, there's a better way! Here's a fixed version of our +program above: + +.. literalinclude:: reference-core/channels-mpmc-fixed.py + :emphasize-lines: 7, 9, 10, 12, 13 + +This example demonstrates using the :meth:`SendChannel.clone +` and :meth:`ReceiveChannel.clone +` methods. What these do is create +copies of our endpoints, that act just like the original – except that +they can be closed independently. And the underlying channel is only +closed after *all* the clones have been closed. So this completely +solves our problem with shutdown, and if you run this program, you'll +see it print its six lines of output and then exits cleanly. + +Notice a small trick we use: the code in ``main`` creates clone +objects to pass into all the child tasks, and then closes the original +objects using ``async with``. Another option is to pass clones into +all-but-one of the child tasks, and then pass the original object into +the last task, like:: + + # Also works, but is more finicky: + send_channel, receive_channel = trio.open_memory_channel(0) + nursery.start_soon(producer, "A", send_channel.clone()) + nursery.start_soon(producer, "B", send_channel) + nursery.start_soon(consumer, "X", receive_channel.clone()) + nursery.start_soon(consumer, "Y", receive_channel) + +But this is more error-prone, especially if you use a loop to spawn +the producers/consumers. + +Just make sure that you don't write:: + + # Broken, will cause program to hang: + send_channel, receive_channel = trio.open_memory_channel(0) + nursery.start_soon(producer, "A", send_channel.clone()) + nursery.start_soon(producer, "B", send_channel.clone()) + nursery.start_soon(consumer, "X", receive_channel.clone()) + nursery.start_soon(consumer, "Y", receive_channel.clone()) + +Here we pass clones into the tasks, but never close the original +objects. That means we have 3 send channel objects (the original + two +clones), but we only close 2 of them, so the consumers will hang +around forever waiting for that last one to be closed. + + +.. _channel-buffering: + +Buffering in channels ++++++++++++++++++++++ + +When you call :func:`open_memory_channel`, you have to specify how +many values can be buffered internally in the channel. If the buffer +is full, then any task that calls :meth:`~trio.abc.SendChannel.send` +will stop and wait for another task to call +:meth:`~trio.abc.ReceiveChannel.receive`. This is useful because it +produces *backpressure*: if the channel producers are running faster +than the consumers, then it forces the producers to slow down. + +You can disable buffering entirely, by doing +``open_memory_channel(0)``. In that case any task calls +:meth:`~trio.abc.SendChannel.send` will wait until another task calls +`~trio.abc.SendChannel.receive`, and vice versa. This is similar to +how channels work in the `classic Communicating Sequential Processes +model `__, and is +a reasonable default if you aren't sure what size buffer to use. +(That's why we used it in the examples above.) + +At the other extreme, you can make the buffer unbounded by using +``open_memory_channel(math.inf)``. In this case, +:meth:`~trio.abc.SendChannel.send` *always* returns immediately. +Normally, this is a bad idea. To see why, consider a program where the +producer runs more quickly than the consumer:: + +.. literalinclude:: reference-core/channels-backpressure.py + +If you run this program, you'll see output like: + +.. code-block:: none + + Sent message: 0 + Received message: 0 + Sent message: 1 + Sent message: 2 + Sent message: 3 + Sent message: 4 + Sent message: 5 + Sent message: 6 + Sent message: 7 + Sent message: 8 + Sent message: 9 + Received message: 1 + Sent message: 10 + Sent message: 11 + Sent message: 12 + ... + +On average, the producer sends ten messages per second, but the +consumer only calls ``receive`` once per second. That means that each +second, the channel's internal buffer has to grow to hold an extra +nine items. After a minute, the buffer will have ~540 items in it; +after an hour, that grows to ~32,400. Eventually, the program will run +out of memory. And well before we run out of memory, our latency on +handling individual messages will become abysmal. For example, at the +one minute mark, the producer is sending message ~600, but the +producer is still processing message ~60. Message 600 will have to sit +in the channel for ~9 minutes before the consumer catches up and +processes it. + +Now try replacing ``open_memory_channel(math.inf)`` with +``open_memory_channel(0)``, and run it again. We get output like: + +.. code-block:: none + + Sent message: 0 + Received message: 0 + Received message: 1 + Sent message: 1 + Received message: 2 + Sent message: 2 + Sent message: 3 + Received message: 3 + ... + +Now the ``send`` calls wait for the ``receive`` calls to finish, which +forces the producer to slow down to match the consumer's speed. (It +might look strange that some values are reported as "Received" before +they're reported as "Sent"; this happens because the actual +send/receive happen at the same time, so which line gets printed first +is random.) + +Now, let's try setting a small but nonzero buffer size, like +``open_memory_channel(3)``. what do you think will happen? + +I get: + +.. code-block:: none + + Sent message: 0 + Received message: 0 + Sent message: 1 + Sent message: 2 + Sent message: 3 + Received message: 1 + Sent message: 4 + Received message: 2 + Sent message: 5 + ... + +So you can see that the producer runs ahead by 3 messages, and then +stops to wait: when the consumer reads message 1, it sends message 4, +then when the consumer reads message 2, it sends message 5, and so on. +Once it reaches the steady state, this version acts just like our +previous version where we set the buffer size to 0, except that it +uses a bit more memory and each message sits in the buffer for a bit +longer before being processed (i.e., the message latency is higher). + +Of course real producers and consumers are usually more complicated +than this, and in some situations, a modest amount of buffering might +improve throughput. But too much buffering wastes memory and increases +latency, so if you want to tune your application you should experiment +to see what value works best for you. + +**Why do we even support unbounded buffers then?** Good question! +Despite everything we saw above, there are times when you actually do +need an unbounded buffer. For example, consider a web crawler that +uses a channel to keep track of all the URLs it still wants to crawl. +Each crawler runs a loop where it takes a URL from the channel, +fetches it, checks the HTML for outgoing links, and then adds the new +URLs to the channel. This creates a *circular flow*, where each +consumer is also a producer. In this case, if your channel buffer gets +full, then the crawlers will block when they try to add new URLs to +the channel, and if all the crawlers got blocked, then they aren't +taking any URLs out of the channel, so they're stuck forever in a +deadlock. Using an unbounded channel avoids this, because it means +that :meth:`~trio.abc.SendChannel.send` never blocks. Lower-level synchronization primitives ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Personally, I find that events and queues are usually enough to +Personally, I find that events and channels are usually enough to implement most things I care about, and lead to easier to read code than the lower-level primitives discussed in this section. But if you need them, they're here. (If you find yourself reaching for these @@ -1490,49 +1752,10 @@ Getting back into the trio thread from another thread :members: This will probably be clearer with an example. Here we demonstrate how -to spawn a child thread, and then use a :class:`trio.Queue` to send -messages between the thread and a trio task:: +to spawn a child thread, and then use a :ref:`memory channel +` to send messages between the thread and a trio task:: - import trio - import threading - - def thread_fn(portal, request_queue, response_queue): - while True: - # Since we're in a thread, we can't call trio.Queue methods - # directly -- so we use our portal to call them. - request = portal.run(request_queue.get) - # We use 'None' as a request to quit - if request is not None: - response = request + 1 - portal.run(response_queue.put, response) - else: - # acknowledge that we're shutting down, and then do it - portal.run(response_queue.put, None) - return - - async def main(): - portal = trio.BlockingTrioPortal() - request_queue = trio.Queue(1) - response_queue = trio.Queue(1) - thread = threading.Thread( - target=thread_fn, - args=(portal, request_queue, response_queue)) - thread.start() - - # prints "1" - await request_queue.put(0) - print(await response_queue.get()) - - # prints "2" - await request_queue.put(1) - print(await response_queue.get()) - - # prints "None" - await request_queue.put(None) - print(await response_queue.get()) - thread.join() - - trio.run(main) +.. literalinclude:: reference-core/blocking-trio-portal-example.py Exceptions and warnings @@ -1544,6 +1767,8 @@ Exceptions and warnings .. autoexception:: WouldBlock +.. autoexception:: EndOfChannel + .. autoexception:: BusyResourceError .. autoexception:: ClosedResourceError diff --git a/docs/source/reference-core/blocking-trio-portal-example.py b/docs/source/reference-core/blocking-trio-portal-example.py new file mode 100644 index 0000000000..998fec9bd2 --- /dev/null +++ b/docs/source/reference-core/blocking-trio-portal-example.py @@ -0,0 +1,44 @@ +import trio +import threading + +def thread_fn(portal, receive_from_trio, send_to_trio): + while True: + # Since we're in a thread, we can't call methods on Trio + # objects directly -- so we use our portal to call them. + try: + request = portal.run(receive_from_trio.receive) + except trio.EndOfChannel: + portal.run(send_to_trio.aclose) + return + else: + response = request + 1 + portal.run(send_to_trio.send, response) + +async def main(): + portal = trio.BlockingTrioPortal() + send_to_thread, receive_from_trio = trio.open_memory_channel(0) + send_to_trio, receive_from_thread = trio.open_memory_channel(0) + + async with trio.open_nursery() as nursery: + # In a background thread, run: + # thread_fn(portal, receive_from_trio, send_to_trio) + nursery.start_soon( + trio.run_sync_in_worker_thread, + thread_fn, portal, receive_from_trio, send_to_trio + ) + + # prints "1" + await send_to_thread.send(0) + print(await receive_from_thread.receive()) + + # prints "2" + await send_to_thread.send(1) + print(await receive_from_thread.receive()) + + # When we close the channel, it signals the thread to exit. + await send_to_thread.aclose() + + # When we exit the nursery, it waits for the background thread to + # exit. + +trio.run(main) diff --git a/docs/source/reference-core/channels-backpressure.py b/docs/source/reference-core/channels-backpressure.py new file mode 100644 index 0000000000..50ac67f20a --- /dev/null +++ b/docs/source/reference-core/channels-backpressure.py @@ -0,0 +1,30 @@ +# Simulate a producer that generates values 10x faster than the +# consumer can handle them. + +import trio +import math + +async def producer(send_channel): + count = 0 + while True: + # Pretend that we have to do some work to create this message, and it + # takes 0.1 seconds: + await trio.sleep(0.1) + await send_channel.send(count) + print("Sent message:", count) + count += 1 + +async def consumer(receive_channel): + async for value in receive_channel: + print("Received message:", value) + # Pretend that we have to do some work to handle this message, and it + # takes 1 second + await trio.sleep(1) + +async def main(): + send_channel, receive_channel = trio.open_memory_channel(math.inf) + async with trio.open_nursery() as nursery: + nursery.start_soon(producer, send_channel) + nursery.start_soon(consumer, receive_channel) + +trio.run(main) diff --git a/docs/source/reference-core/channels-mpmc-broken.py b/docs/source/reference-core/channels-mpmc-broken.py new file mode 100644 index 0000000000..2a755acba3 --- /dev/null +++ b/docs/source/reference-core/channels-mpmc-broken.py @@ -0,0 +1,30 @@ +# This example usually crashes! + +import trio +import random + +async def main(): + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(0) + # Start two producers + nursery.start_soon(producer, "A", send_channel) + nursery.start_soon(producer, "B", send_channel) + # And two consumers + nursery.start_soon(consumer, "X", receive_channel) + nursery.start_soon(consumer, "Y", receive_channel) + +async def producer(name, send_channel): + async with send_channel: + for i in range(3): + await send_channel.send("{} from producer {}".format(i, name)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +async def consumer(name, receive_channel): + async with receive_channel: + async for value in receive_channel: + print("consumer {} got value {!r}".format(name, value)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +trio.run(main) diff --git a/docs/source/reference-core/channels-mpmc-fixed.py b/docs/source/reference-core/channels-mpmc-fixed.py new file mode 100644 index 0000000000..a3e7044fe7 --- /dev/null +++ b/docs/source/reference-core/channels-mpmc-fixed.py @@ -0,0 +1,29 @@ +import trio +import random + +async def main(): + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(0) + async with send_channel, receive_channel: + # Start two producers, giving each its own private clone + nursery.start_soon(producer, "A", send_channel.clone()) + nursery.start_soon(producer, "B", send_channel.clone()) + # And two consumers, giving each its own private clone + nursery.start_soon(consumer, "X", receive_channel.clone()) + nursery.start_soon(consumer, "Y", receive_channel.clone()) + +async def producer(name, send_channel): + async with send_channel: + for i in range(3): + await send_channel.send("{} from producer {}".format(i, name)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +async def consumer(name, receive_channel): + async with receive_channel: + async for value in receive_channel: + print("consumer {} got value {!r}".format(name, value)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +trio.run(main) diff --git a/docs/source/reference-core/channels-shutdown.py b/docs/source/reference-core/channels-shutdown.py new file mode 100644 index 0000000000..dcd35767ae --- /dev/null +++ b/docs/source/reference-core/channels-shutdown.py @@ -0,0 +1,19 @@ +import trio + +async def main(): + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(0) + nursery.start_soon(producer, send_channel) + nursery.start_soon(consumer, receive_channel) + +async def producer(send_channel): + async with send_channel: + for i in range(3): + await send_channel.send("message {}".format(i)) + +async def consumer(receive_channel): + async with receive_channel: + async for value in receive_channel: + print("got value {!r}".format(value)) + +trio.run(main) diff --git a/docs/source/reference-core/channels-simple.py b/docs/source/reference-core/channels-simple.py new file mode 100644 index 0000000000..d04ebd722c --- /dev/null +++ b/docs/source/reference-core/channels-simple.py @@ -0,0 +1,23 @@ +import trio + +async def main(): + async with trio.open_nursery() as nursery: + # Open a channel: + send_channel, receive_channel = trio.open_memory_channel(0) + # Start a producer and a consumer, passing one end of the channel to + # each of them: + nursery.start_soon(producer, send_channel) + nursery.start_soon(consumer, receive_channel) + +async def producer(send_channel): + # Producer sends 3 messages + for i in range(3): + # The producer sends using 'await send_channel.send(...)' + await send_channel.send("message {}".format(i)) + +async def consumer(receive_channel): + # The consumer uses an 'async for' loop to receive the values: + async for value in receive_channel: + print("got value {!r}".format(value)) + +trio.run(main) diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index c3156617e1..65545253ec 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -46,7 +46,7 @@ Debugging and instrumentation Trio tries hard to provide useful hooks for debugging and instrumentation. Some are documented above (the nursery introspection -attributes, :meth:`trio.Queue.statistics`, etc.). Here are some more. +attributes, :meth:`trio.Lock.statistics`, etc.). Here are some more. Global statistics @@ -281,36 +281,6 @@ anything real. See `#26 :with: queue -Unbounded queues -================ - -In the section :ref:`queue`, we showed an example with two producers -and one consumer using the same queue, where the queue size would grow -without bound to produce unbounded latency and memory usage. -:class:`trio.Queue` avoids this by placing an upper bound on how big -the queue can get before ``put`` starts blocking. But what if you're -in a situation where ``put`` can't block? - -There is another option: the queue consumer could get greedy. Each -time it runs, it could eagerly consume all of the pending items before -allowing another task to run. (In some other systems, this would -happen automatically because their queue's ``get`` method doesn't -invoke the scheduler unless it has to block. But :ref:`in trio, get is -always a checkpoint `.) This works, but it's a bit -risky: basically instead of applying backpressure to specifically the -producer tasks, we're applying it to *all* the tasks in our system. -The danger here is that if enough items have built up in the queue, -then "stopping the world" to process them all may cause unacceptable -latency spikes in unrelated tasks. Nonetheless, this is still the -right choice in situations where it's impossible to apply backpressure -more precisely. So this is the strategy implemented by -:class:`UnboundedQueue`. The main time you should use this is when -working with low-level APIs like :func:`monitor_kevent`. - -.. autoclass:: UnboundedQueue - :members: - - Global state: system tasks and run-local variables ================================================== diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index 178b552e39..1af80212d1 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -118,6 +118,16 @@ Abstract base classes - :meth:`~Listener.accept` - - :class:`~trio.SocketListener`, :class:`~trio.ssl.SSLListener` + * - :class:`SendChannel` + - :class:`AsyncResource` + - :meth:`~SendChannel.send`, :meth:`~SendChannel.send_nowait` + - + - :func:`~trio.open_memory_channel` + * - :class:`ReceiveChannel` + - :class:`AsyncResource` + - :meth:`~ReceiveChannel.receive`, :meth:`~ReceiveChannel.receive_nowait` + - ``__aiter__``, ``__anext__`` + - :func:`~trio.open_memory_channel` .. autoclass:: trio.abc.AsyncResource :members: @@ -150,6 +160,14 @@ Abstract base classes :members: :show-inheritance: +.. autoclass:: trio.abc.SendChannel + :members: + :show-inheritance: + +.. autoclass:: trio.abc.ReceiveChannel + :members: + :show-inheritance: + .. currentmodule:: trio diff --git a/newsfragments/497.feature.rst b/newsfragments/497.feature.rst new file mode 100644 index 0000000000..2f3e21b4b1 --- /dev/null +++ b/newsfragments/497.feature.rst @@ -0,0 +1,12 @@ +New and improved APIs for inter-task communication: +:class:`trio.abc.SendChannel`, :class:`trio.abc.ReceiveChannel`, and +:func:`trio.open_memory_channel` (which replaces ``trio.Queue``). This +interface uses separate "send" and "receive" methods, for consistency +with other communication interfaces like :class:`~trio.abc.Stream`. +Also, the two objects can now be closed individually, making it much +easier to gracefully shut down a channel. Also, check out the nifty +``clone`` API to make it easy to manage fan-in scenarios. Also, the +API has been written to allow for future channel-like objects that +send objects across process boundaries. Also, it supports unbounded +buffering if you really need it. Also, help I can't stop writing also. +See :ref:`channels` for more details. diff --git a/newsfragments/497.removal.rst b/newsfragments/497.removal.rst new file mode 100644 index 0000000000..aa838ad7b6 --- /dev/null +++ b/newsfragments/497.removal.rst @@ -0,0 +1,2 @@ +``trio.Queue`` and ``trio.hazmat.UnboundedQueue`` have been +deprecated, in favor of :func:`trio.open_memory_channel`. diff --git a/setup.py b/setup.py index cc6728c093..73ac00ab8e 100644 --- a/setup.py +++ b/setup.py @@ -74,7 +74,7 @@ license="MIT -or- Apache License 2.0", packages=find_packages(), install_requires=[ - "attrs", + "attrs >= 18.1.0", # for attr.ib(factory=...) "sortedcontainers", "async_generator >= 1.9", "idna", diff --git a/trio/__init__.py b/trio/__init__.py index adf8221f2a..d77360a3a4 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -19,7 +19,7 @@ TrioInternalError, RunFinishedError, WouldBlock, Cancelled, BusyResourceError, ClosedResourceError, MultiError, run, open_nursery, open_cancel_scope, current_effective_deadline, TASK_STATUS_IGNORED, - current_time, BrokenResourceError + current_time, BrokenResourceError, EndOfChannel ) from ._timeouts import ( @@ -38,6 +38,8 @@ from ._highlevel_generic import aclose_forcefully, StapledStream +from ._channel import open_memory_channel + from ._signals import catch_signals, open_signal_receiver from ._highlevel_socket import SocketStream, SocketListener diff --git a/trio/_abc.py b/trio/_abc.py index e49062a2f1..5c41f79e70 100644 --- a/trio/_abc.py +++ b/trio/_abc.py @@ -1,4 +1,5 @@ from abc import ABCMeta, abstractmethod +from ._util import aiter_compat from . import _core __all__ = [ @@ -12,6 +13,8 @@ "SocketFactory", "HostnameResolver", "Listener", + "SendChannel", + "ReceiveChannel", ] @@ -282,8 +285,12 @@ class SendStream(AsyncResource): bidirectional, then you probably want to also implement :class:`ReceiveStream`, which makes your object a :class:`Stream`. - Every :class:`SendStream` also implements the :class:`AsyncResource` - interface. + :class:`SendStream` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + + If you want to send Python objects rather than raw bytes, see + :class:`SendChannel`. """ __slots__ = () @@ -378,8 +385,12 @@ class ReceiveStream(AsyncResource): bidirectional, then you probably want to also implement :class:`SendStream`, which makes your object a :class:`Stream`. - Every :class:`ReceiveStream` also implements the :class:`AsyncResource` - interface. + :class:`ReceiveStream` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + + If you want to receive Python objects rather than raw bytes, see + :class:`ReceiveChannel`. """ __slots__ = () @@ -490,6 +501,10 @@ async def send_eof(self): class Listener(AsyncResource): """A standard interface for listening for incoming connections. + :class:`Listener` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + """ __slots__ = () @@ -499,9 +514,9 @@ async def accept(self): Returns: AsyncResource: An object representing the incoming connection. In - practice this is almost always some variety of :class:`Stream`, - though in principle you could also use this interface with, say, - SOCK_SEQPACKET sockets or similar. + practice this is generally some kind of :class:`Stream`, + but in principle you could also define a :class:`Listener` that + returned, say, channel objects. Raises: trio.BusyResourceError: if two tasks attempt to call @@ -510,11 +525,187 @@ async def accept(self): object, or if another task closes this listener object while :meth:`accept` is running. - Note that there is no ``BrokenListenerError``, because for listeners - there is no general condition of "the network/remote peer broke the - connection" that can be handled in a generic way, like there is for - streams. Other errors *can* occur and be raised from :meth:`accept` – - for example, if you run out of file descriptors then you might get an - :class:`OSError` with its errno set to ``EMFILE``. + Listeners don't generally raise :exc:`~trio.BrokenResourceError`, + because for listeners there is no general condition of "the + network/remote peer broke the connection" that can be handled in a + generic way, like there is for streams. Other errors *can* occur and + be raised from :meth:`accept` – for example, if you run out of file + descriptors then you might get an :class:`OSError` with its errno set + to ``EMFILE``. + + """ + + +class SendChannel(AsyncResource): + """A standard interface for sending Python objects to some receiver. + + :class:`SendChannel` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + + If you want to send raw bytes rather than Python objects, see + :class:`ReceiveStream`. + + """ + __slots__ = () + + @abstractmethod + def send_nowait(self, value): + """Attempt to send an object through the channel, without blocking. + + Args: + value (object): The object to send. + + Raises: + trio.WouldBlock: if the operation cannot be completed immediately + (for example, because the channel's internal buffer is full). + trio.BrokenResourceError: if something has gone wrong, and the + channel is broken. For example, you may get this if the receiver + has already been closed. + trio.ClosedResourceError: if you previously closed this + :class:`SendChannel` object. + + """ + + @abstractmethod + async def send(self, value): + """Attempt to send an object through the channel, blocking if necessary. + + Args: + value (object): The object to send. + + Raises: + trio.BrokenResourceError: if something has gone wrong, and the + channel is broken. For example, you may get this if the receiver + has already been closed. + trio.ClosedResourceError: if you previously closed this + :class:`SendChannel` object, or if another task closes it while + :meth:`send` is running. """ + + @abstractmethod + def clone(self): + """Clone this send channel object. + + This returns a new :class:`SendChannel` object, which acts as a + duplicate of the original: sending on the new object does exactly the + same thing as sending on the old object. + + However, closing one of the objects does not close the other, and + receivers don't get :exc:`~trio.EndOfChannel` until *all* clones have + been closed. + + This is useful for communication patterns that involve multiple + producers all sending objects to the same destination. If you give + each producer its own clone of the :class:`SendChannel`, and then make + sure to close each :class:`SendChannel` when it's finished, receivers + will automatically get notified when all producers are finished. See + :ref:`channel-mpmc` for examples. + + Raises: + trio.ClosedResourceError: if you already closed this + :class:`SendChannel` object. + + """ + + +class ReceiveChannel(AsyncResource): + """A standard interface for receiving Python objects from some sender. + + You can iterate over a :class:`ReceiveChannel` using an ``async for`` + loop:: + + async for value in receive_channel: + ... + + This is equivalent to calling :meth:`receive` repeatedly. The loop exits + without error when :meth:`receive` raises :exc:`~trio.EndOfChannel`. + + :class:`ReceiveChannel` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + + If you want to receive raw bytes rather than Python objects, see + :class:`ReceiveStream`. + + """ + __slots__ = () + + @abstractmethod + def receive_nowait(self): + """Attempt to receive an incoming object, without blocking. + + Returns: + object: Whatever object was received. + + Raises: + trio.WouldBlock: if the operation cannot be completed immediately + (for example, because no object has been sent yet). + trio.EndOfChannel: if the sender has been closed cleanly, and no + more objects are coming. This is not an error condition. + trio.ClosedResourceError: if you previously closed this + :class:`ReceiveChannel` object. + trio.BrokenResourceError: if something has gone wrong, and the + channel is broken. + + """ + + @abstractmethod + async def receive(self): + """Attempt to receive an incoming object, blocking if necessary. + + It's legal for multiple tasks to call :meth:`receive` at the same + time. If this happens, then one task receives the first value sent, + another task receives the next value sent, and so on. + + Returns: + object: Whatever object was received. + + Raises: + trio.EndOfChannel: if the sender has been closed cleanly, and no + more objects are coming. This is not an error condition. + trio.ClosedResourceError: if you previously closed this + :class:`ReceiveChannel` object. + trio.BrokenResourceError: if something has gone wrong, and the + channel is broken. + + """ + + @abstractmethod + def clone(self): + """Clone this receive channel object. + + This returns a new :class:`ReceiveChannel` object, which acts as a + duplicate of the original: receiving on the new object does exactly + the same thing as receiving on the old object. + + However, closing one of the objects does not close the other, and the + underlying channel is not closed until all clones are closed. + + This is useful for communication patterns involving multiple consumers + all receiving objects from the same underlying channel. See + :ref:`channel-mpmc` for examples. + + .. warning:: The clones all share the same underlying channel. + Whenever a clone :meth:`receive`\s a value, it is removed from the + channel and the other clones do *not* receive that value. If you + want to send multiple copies of the same stream of values to + multiple destinations, like :func:`itertools.tee`, then you need to + find some other solution; this method does *not* do that. + + Raises: + trio.ClosedResourceError: if you already closed this + :class:`SendChannel` object. + + """ + + @aiter_compat + def __aiter__(self): + return self + + async def __anext__(self): + try: + return await self.receive() + except _core.EndOfChannel: + raise StopAsyncIteration diff --git a/trio/_channel.py b/trio/_channel.py new file mode 100644 index 0000000000..7188e0a054 --- /dev/null +++ b/trio/_channel.py @@ -0,0 +1,265 @@ +from collections import deque, OrderedDict +from math import inf + +import attr +from outcome import Error, Value + +from . import _core +from .abc import SendChannel, ReceiveChannel + + +def open_memory_channel(max_buffer_size): + """Open a channel for passing objects between tasks within a process. + + This channel is lightweight and entirely in-memory; it doesn't involve any + operating-system resources. + + The channel objects are only closed if you explicitly call + :meth:`~trio.abc.AsyncResource.aclose` or use ``async with``. In + particular, they are *not* automatically closed when garbage collected. + Closing in-memory channel objects is not mandatory, but it's generally a + good idea, because it helps avoid situations where tasks get stuck + waiting on a channel when there's no-one on the other side. + + Args: + max_buffer_size (int or math.inf): The maximum number of items that can + be buffered in the channel before :meth:`~trio.abc.SendChannel.send` + blocks. Choosing a sensible value here is important to ensure that + backpressure is communicated promptly and avoid unnecessary latency; + see :ref:`channel-buffering` for more details. If in doubt, use 0. + + Returns: + A pair ``(send_channel, receive_channel)``. If you have + trouble remembering which order these go in, remember: data + flows from left → right. + + In addition to the regular channel interfaces, all memory channel + endpoints provide a ``statistics()`` method, which returns an object with + the following fields: + + * ``current_buffer_used``: The number of items currently stored in the + channel buffer. + * ``max_buffer_size``: The maximum number of items allowed in the buffer, + as passed to :func:`open_memory_channel`. + * ``open_send_channels``: The number of open + :class:`~trio.abc.SendChannel` endpoints pointing to this channel. + Initially 1, but can be increased by + :meth:`~trio.abc.SendChannel.clone`. + * ``open_receive_channels``: Likewise, but for open + :class:`~trio.abc.ReceiveChannel` endpoints. + * ``tasks_waiting_send``: The number of tasks blocked in ``send`` on this + channel (summing over all clones). + * ``tasks_waiting_receive``: The number of tasks blocked in ``receive`` on + this channel (summing over all clones). + + """ + if max_buffer_size != inf and not isinstance(max_buffer_size, int): + raise TypeError("max_buffer_size must be an integer or math.inf") + if max_buffer_size < 0: + raise ValueError("max_buffer_size must be >= 0") + state = MemoryChannelState(max_buffer_size) + return MemorySendChannel(state), MemoryReceiveChannel(state) + + +@attr.s(frozen=True) +class ChannelStats: + current_buffer_used = attr.ib() + max_buffer_size = attr.ib() + open_send_channels = attr.ib() + open_receive_channels = attr.ib() + tasks_waiting_send = attr.ib() + tasks_waiting_receive = attr.ib() + + +@attr.s +class MemoryChannelState: + max_buffer_size = attr.ib() + data = attr.ib(factory=deque) + # Counts of open endpoints using this state + open_send_channels = attr.ib(default=0) + open_receive_channels = attr.ib(default=0) + # {task: value} + send_tasks = attr.ib(factory=OrderedDict) + # {task: None} + receive_tasks = attr.ib(factory=OrderedDict) + + def statistics(self): + return ChannelStats( + current_buffer_used=len(self.data), + max_buffer_size=self.max_buffer_size, + open_send_channels=self.open_send_channels, + open_receive_channels=self.open_receive_channels, + tasks_waiting_send=len(self.send_tasks), + tasks_waiting_receive=len(self.receive_tasks), + ) + + +@attr.s(cmp=False, repr=False) +class MemorySendChannel(SendChannel): + _state = attr.ib() + _closed = attr.ib(default=False) + # This is just the tasks waiting on *this* object. As compared to + # self._state.send_tasks, which includes tasks from this object and + # all clones. + _tasks = attr.ib(factory=set) + + def __attrs_post_init__(self): + self._state.open_send_channels += 1 + + def __repr__(self): + return ( + "".format( + id(self), id(self._state) + ) + ) + + def statistics(self): + # XX should we also report statistics specific to this object? + return self._state.statistics() + + @_core.enable_ki_protection + def send_nowait(self, value): + if self._closed: + raise _core.ClosedResourceError + if self._state.open_receive_channels == 0: + raise _core.BrokenResourceError + if self._state.receive_tasks: + assert not self._state.data + task, _ = self._state.receive_tasks.popitem(last=False) + task.custom_sleep_data._tasks.remove(task) + _core.reschedule(task, Value(value)) + elif len(self._state.data) < self._state.max_buffer_size: + self._state.data.append(value) + else: + raise _core.WouldBlock + + @_core.enable_ki_protection + async def send(self, value): + await _core.checkpoint_if_cancelled() + try: + self.send_nowait(value) + except _core.WouldBlock: + pass + else: + await _core.cancel_shielded_checkpoint() + return + + task = _core.current_task() + self._tasks.add(task) + self._state.send_tasks[task] = value + task.custom_sleep_data = self + + def abort_fn(_): + self._tasks.remove(task) + del self._state.send_tasks[task] + return _core.Abort.SUCCEEDED + + await _core.wait_task_rescheduled(abort_fn) + + @_core.enable_ki_protection + def clone(self): + if self._closed: + raise _core.ClosedResourceError + return MemorySendChannel(self._state) + + @_core.enable_ki_protection + async def aclose(self): + if self._closed: + await _core.checkpoint() + return + self._closed = True + for task in self._tasks: + _core.reschedule(task, Error(_core.ClosedResourceError())) + del self._state.send_tasks[task] + self._tasks.clear() + self._state.open_send_channels -= 1 + if self._state.open_send_channels == 0: + assert not self._state.send_tasks + for task in self._state.receive_tasks: + task.custom_sleep_data._tasks.remove(task) + _core.reschedule(task, Error(_core.EndOfChannel())) + self._state.receive_tasks.clear() + await _core.checkpoint() + + +@attr.s(cmp=False, repr=False) +class MemoryReceiveChannel(ReceiveChannel): + _state = attr.ib() + _closed = attr.ib(default=False) + _tasks = attr.ib(factory=set) + + def __attrs_post_init__(self): + self._state.open_receive_channels += 1 + + def statistics(self): + return self._state.statistics() + + def __repr__(self): + return "".format( + id(self), id(self._state) + ) + + @_core.enable_ki_protection + def receive_nowait(self): + if self._closed: + raise _core.ClosedResourceError + if self._state.send_tasks: + task, value = self._state.send_tasks.popitem(last=False) + task.custom_sleep_data._tasks.remove(task) + _core.reschedule(task) + self._state.data.append(value) + # Fall through + if self._state.data: + return self._state.data.popleft() + if not self._state.open_send_channels: + raise _core.EndOfChannel + raise _core.WouldBlock + + @_core.enable_ki_protection + async def receive(self): + await _core.checkpoint_if_cancelled() + try: + value = self.receive_nowait() + except _core.WouldBlock: + pass + else: + await _core.cancel_shielded_checkpoint() + return value + + task = _core.current_task() + self._tasks.add(task) + self._state.receive_tasks[task] = None + task.custom_sleep_data = self + + def abort_fn(_): + self._tasks.remove(task) + del self._state.receive_tasks[task] + return _core.Abort.SUCCEEDED + + return await _core.wait_task_rescheduled(abort_fn) + + @_core.enable_ki_protection + def clone(self): + if self._closed: + raise _core.ClosedResourceError + return MemoryReceiveChannel(self._state) + + @_core.enable_ki_protection + async def aclose(self): + if self._closed: + await _core.checkpoint() + return + self._closed = True + for task in self._tasks: + _core.reschedule(task, Error(_core.ClosedResourceError())) + del self._state.receive_tasks[task] + self._tasks.clear() + self._state.open_receive_channels -= 1 + if self._state.open_receive_channels == 0: + assert not self._state.receive_tasks + for task in self._state.send_tasks: + task.custom_sleep_data._tasks.remove(task) + _core.reschedule(task, Error(_core.BrokenResourceError())) + self._state.send_tasks.clear() + self._state.data.clear() + await _core.checkpoint() diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index 7853ce896c..49899f716d 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -16,7 +16,7 @@ def _public(fn): from ._exceptions import ( TrioInternalError, RunFinishedError, WouldBlock, Cancelled, - BusyResourceError, ClosedResourceError, BrokenResourceError + BusyResourceError, ClosedResourceError, BrokenResourceError, EndOfChannel ) from ._multierror import MultiError diff --git a/trio/_core/_exceptions.py b/trio/_core/_exceptions.py index 09397f2b31..fd20406ae0 100644 --- a/trio/_core/_exceptions.py +++ b/trio/_core/_exceptions.py @@ -123,3 +123,12 @@ class BrokenResourceError(Exception): information about the underlying error. """ + + +class EndOfChannel(Exception): + """Raised when trying to receive from a :class:`trio.abc.ReceiveChannel` + that has no more data to receive. + + This is analogous to an "end-of-file" condition, but for channels. + + """ diff --git a/trio/_core/_unbounded_queue.py b/trio/_core/_unbounded_queue.py index 9109b45a9c..57ea47d5ed 100644 --- a/trio/_core/_unbounded_queue.py +++ b/trio/_core/_unbounded_queue.py @@ -3,6 +3,7 @@ from .. import _core from .._util import aiter_compat +from .._deprecate import deprecated __all__ = ["UnboundedQueue"] @@ -43,6 +44,12 @@ class UnboundedQueue: """ + @deprecated( + "0.9.0", + issue=497, + thing="trio.hazmat.UnboundedQueue", + instead="trio.open_memory_channel(math.inf)" + ) def __init__(self): self._lot = _core.ParkingLot() self._data = [] diff --git a/trio/_core/tests/test_unbounded_queue.py b/trio/_core/tests/test_unbounded_queue.py index d8d4dd7cf0..801c34ce46 100644 --- a/trio/_core/tests/test_unbounded_queue.py +++ b/trio/_core/tests/test_unbounded_queue.py @@ -5,6 +5,10 @@ from ... import _core from ...testing import assert_checkpoints, wait_all_tasks_blocked +pytestmark = pytest.mark.filterwarnings( + "ignore:.*UnboundedQueue:trio.TrioDeprecationWarning" +) + async def test_UnboundedQueue_basic(): q = _core.UnboundedQueue() diff --git a/trio/_core/tests/test_windows.py b/trio/_core/tests/test_windows.py index c0ec5cce64..18436c7704 100644 --- a/trio/_core/tests/test_windows.py +++ b/trio/_core/tests/test_windows.py @@ -11,6 +11,12 @@ from .._windows_cffi import ffi, kernel32 +# The undocumented API that this is testing should be changed to stop using +# UnboundedQueue (or just removed until we have time to redo it), but until +# then we filter out the warning. +@pytest.mark.filterwarnings( + "ignore:.*UnboundedQueue:trio.TrioDeprecationWarning" +) async def test_completion_key_listen(): async def post(key): iocp = ffi.cast("HANDLE", _core.current_iocp()) diff --git a/trio/_sync.py b/trio/_sync.py index f9a22b49d9..edb85ce1bc 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -6,6 +6,7 @@ from . import _core from ._util import aiter_compat +from ._deprecate import deprecated __all__ = [ "Event", @@ -843,6 +844,12 @@ class Queue: """ + @deprecated( + "0.9.0", + issue=497, + thing="trio.Queue", + instead="trio.open_memory_channel" + ) def __init__(self, capacity): if not isinstance(capacity, int): raise TypeError("capacity must be an integer") diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py new file mode 100644 index 0000000000..b43466dd7d --- /dev/null +++ b/trio/tests/test_channel.py @@ -0,0 +1,352 @@ +import pytest + +from ..testing import wait_all_tasks_blocked, assert_checkpoints +import trio +from trio import open_memory_channel, EndOfChannel + + +async def test_channel(): + with pytest.raises(TypeError): + open_memory_channel(1.0) + with pytest.raises(ValueError): + open_memory_channel(-1) + + s, r = open_memory_channel(2) + repr(s) # smoke test + repr(r) # smoke test + + s.send_nowait(1) + with assert_checkpoints(): + await s.send(2) + with pytest.raises(trio.WouldBlock): + s.send_nowait(None) + + with assert_checkpoints(): + assert await r.receive() == 1 + assert r.receive_nowait() == 2 + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + + s.send_nowait("last") + await s.aclose() + with pytest.raises(trio.ClosedResourceError): + await s.send("too late") + with pytest.raises(trio.ClosedResourceError): + s.send_nowait("too late") + with pytest.raises(trio.ClosedResourceError): + s.clone() + await s.aclose() + + assert r.receive_nowait() == "last" + with pytest.raises(EndOfChannel): + await r.receive() + await r.aclose() + with pytest.raises(trio.ClosedResourceError): + await r.receive() + with pytest.raises(trio.ClosedResourceError): + await r.receive_nowait() + await r.aclose() + + +async def test_553(autojump_clock): + s, r = open_memory_channel(1) + with trio.move_on_after(10) as timeout_scope: + await r.receive() + assert timeout_scope.cancelled_caught + await s.send("Test for PR #553") + + +async def test_channel_multiple_producers(): + async def producer(send_channel, i): + # We close our handle when we're done with it + async with send_channel: + for j in range(3 * i, 3 * (i + 1)): + await send_channel.send(j) + + send_channel, receive_channel = open_memory_channel(0) + async with trio.open_nursery() as nursery: + # We hand out clones to all the new producers, and then close the + # original. + async with send_channel: + for i in range(10): + nursery.start_soon(producer, send_channel.clone(), i) + + got = [] + async for value in receive_channel: + got.append(value) + + got.sort() + assert got == list(range(30)) + + +async def test_channel_multiple_consumers(): + successful_receivers = set() + received = [] + + async def consumer(receive_channel, i): + async for value in receive_channel: + successful_receivers.add(i) + received.append(value) + + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(1) + async with send_channel: + for i in range(5): + nursery.start_soon(consumer, receive_channel, i) + await wait_all_tasks_blocked() + for i in range(10): + await send_channel.send(i) + + assert successful_receivers == set(range(5)) + assert len(received) == 10 + assert set(received) == set(range(10)) + + +async def test_close_basics(): + async def send_block(s, expect): + with pytest.raises(expect): + await s.send(None) + + # closing send -> other send gets ClosedResourceError + s, r = open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(send_block, s, trio.ClosedResourceError) + await wait_all_tasks_blocked() + await s.aclose() + + # and it's persistent + with pytest.raises(trio.ClosedResourceError): + s.send_nowait(None) + with pytest.raises(trio.ClosedResourceError): + await s.send(None) + + # and receive gets EndOfChannel + with pytest.raises(EndOfChannel): + r.receive_nowait() + with pytest.raises(EndOfChannel): + await r.receive() + + # closing receive -> send gets BrokenResourceError + s, r = open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(send_block, s, trio.BrokenResourceError) + await wait_all_tasks_blocked() + await r.aclose() + + # and it's persistent + with pytest.raises(trio.BrokenResourceError): + s.send_nowait(None) + with pytest.raises(trio.BrokenResourceError): + await s.send(None) + + # closing receive -> other receive gets ClosedResourceError + async def receive_block(r): + with pytest.raises(trio.ClosedResourceError): + await r.receive() + + s, r = open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(receive_block, r) + await wait_all_tasks_blocked() + await r.aclose() + + # and it's persistent + with pytest.raises(trio.ClosedResourceError): + r.receive_nowait() + with pytest.raises(trio.ClosedResourceError): + await r.receive() + + +async def test_receive_channel_clone_and_close(): + s, r = open_memory_channel(10) + + r2 = r.clone() + r3 = r.clone() + + s.send_nowait(None) + await r.aclose() + async with r2: + pass + + with pytest.raises(trio.ClosedResourceError): + r.clone() + + with pytest.raises(trio.ClosedResourceError): + r2.clone() + + # Can still send, r3 is still open + s.send_nowait(None) + + await r3.aclose() + + # But now the receiver is really closed + with pytest.raises(trio.BrokenResourceError): + s.send_nowait(None) + + +async def test_close_multiple_send_handles(): + # With multiple send handles, closing one handle only wakes senders on + # that handle, but others can continue just fine + s1, r = open_memory_channel(0) + s2 = s1.clone() + + async def send_will_close(): + with pytest.raises(trio.ClosedResourceError): + await s1.send("nope") + + async def send_will_succeed(): + await s2.send("ok") + + async with trio.open_nursery() as nursery: + nursery.start_soon(send_will_close) + nursery.start_soon(send_will_succeed) + await wait_all_tasks_blocked() + await s1.aclose() + assert await r.receive() == "ok" + + +async def test_close_multiple_receive_handles(): + # With multiple receive handles, closing one handle only wakes receivers on + # that handle, but others can continue just fine + s, r1 = open_memory_channel(0) + r2 = r1.clone() + + async def receive_will_close(): + with pytest.raises(trio.ClosedResourceError): + await r1.receive() + + async def receive_will_succeed(): + assert await r2.receive() == "ok" + + async with trio.open_nursery() as nursery: + nursery.start_soon(receive_will_close) + nursery.start_soon(receive_will_succeed) + await wait_all_tasks_blocked() + await r1.aclose() + await s.send("ok") + + +async def test_inf_capacity(): + s, r = open_memory_channel(float("inf")) + + # It's accepted, and we can send all day without blocking + async with s: + for i in range(10): + s.send_nowait(i) + + got = [] + async for i in r: + got.append(i) + assert got == list(range(10)) + + +async def test_statistics(): + s, r = open_memory_channel(2) + + assert s.statistics() == r.statistics() + stats = s.statistics() + assert stats.current_buffer_used == 0 + assert stats.max_buffer_size == 2 + assert stats.open_send_channels == 1 + assert stats.open_receive_channels == 1 + assert stats.tasks_waiting_send == 0 + assert stats.tasks_waiting_receive == 0 + + s.send_nowait(None) + assert s.statistics().current_buffer_used == 1 + + s2 = s.clone() + assert s.statistics().open_send_channels == 2 + await s.aclose() + assert s2.statistics().open_send_channels == 1 + + r2 = r.clone() + assert s2.statistics().open_receive_channels == 2 + await r2.aclose() + assert s2.statistics().open_receive_channels == 1 + + async with trio.open_nursery() as nursery: + s2.send_nowait(None) # fill up the buffer + assert s.statistics().current_buffer_used == 2 + nursery.start_soon(s2.send, None) + nursery.start_soon(s2.send, None) + await wait_all_tasks_blocked() + assert s.statistics().tasks_waiting_send == 2 + nursery.cancel_scope.cancel() + assert s.statistics().tasks_waiting_send == 0 + + # empty out the buffer again + try: + while True: + r.receive_nowait() + except trio.WouldBlock: + pass + + async with trio.open_nursery() as nursery: + nursery.start_soon(r.receive) + await wait_all_tasks_blocked() + assert s.statistics().tasks_waiting_receive == 1 + nursery.cancel_scope.cancel() + assert s.statistics().tasks_waiting_receive == 0 + + +async def test_channel_fairness(): + + # We can remove an item we just sent, and send an item back in after, if + # no-one else is waiting. + s, r = open_memory_channel(1) + s.send_nowait(1) + assert r.receive_nowait() == 1 + s.send_nowait(2) + assert r.receive_nowait() == 2 + + # But if someone else is waiting to receive, then they "own" the item we + # send, so we can't receive it (even though we run first): + + result = None + + async def do_receive(r): + nonlocal result + result = await r.receive() + + async with trio.open_nursery() as nursery: + nursery.start_soon(do_receive, r) + await wait_all_tasks_blocked() + s.send_nowait(2) + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + assert result == 2 + + # And the analogous situation for send: if we free up a space, we can't + # immediately send something in it if someone is already waiting to do + # that + s, r = open_memory_channel(1) + s.send_nowait(1) + with pytest.raises(trio.WouldBlock): + s.send_nowait(None) + async with trio.open_nursery() as nursery: + nursery.start_soon(s.send, 2) + await wait_all_tasks_blocked() + assert r.receive_nowait() == 1 + with pytest.raises(trio.WouldBlock): + s.send_nowait(3) + assert (await r.receive()) == 2 + + +async def test_unbuffered(): + s, r = open_memory_channel(0) + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + with pytest.raises(trio.WouldBlock): + s.send_nowait(1) + + async def do_send(s, v): + with assert_checkpoints(): + await s.send(v) + + async with trio.open_nursery() as nursery: + nursery.start_soon(do_send, s, 1) + with assert_checkpoints(): + assert await r.receive() == 1 + with pytest.raises(trio.WouldBlock): + r.receive_nowait() diff --git a/trio/tests/test_highlevel_serve_listeners.py b/trio/tests/test_highlevel_serve_listeners.py index 7d237419a6..a0fc80589d 100644 --- a/trio/tests/test_highlevel_serve_listeners.py +++ b/trio/tests/test_highlevel_serve_listeners.py @@ -13,14 +13,14 @@ @attr.s(hash=False, cmp=False) class MemoryListener(trio.abc.Listener): closed = attr.ib(default=False) - accepted_streams = attr.ib(default=attr.Factory(list)) - queued_streams = attr.ib(default=attr.Factory(lambda: trio.Queue(1))) + accepted_streams = attr.ib(factory=list) + queued_streams = attr.ib(factory=(lambda: trio.open_memory_channel(1))) accept_hook = attr.ib(default=None) async def connect(self): assert not self.closed client, server = memory_stream_pair() - await self.queued_streams.put(server) + await self.queued_streams[0].send(server) return client async def accept(self): @@ -28,7 +28,7 @@ async def accept(self): assert not self.closed if self.accept_hook is not None: await self.accept_hook() - stream = await self.queued_streams.get() + stream = await self.queued_streams[1].receive() self.accepted_streams.append(stream) return stream diff --git a/trio/tests/test_sync.py b/trio/tests/test_sync.py index c30992804f..0f39f62008 100644 --- a/trio/tests/test_sync.py +++ b/trio/tests/test_sync.py @@ -9,6 +9,10 @@ from .._timeouts import sleep_forever, move_on_after from .._sync import * +pytestmark = pytest.mark.filterwarnings( + "ignore:.*trio.Queue:trio.TrioDeprecationWarning" +) + async def test_Event(): e = Event() @@ -558,10 +562,76 @@ async def do_put(q, v): q.get_nowait() -# Two ways of implementing a Lock in terms of a Queue. Used to let us put the -# Queue through the generic lock tests. - from .._sync import async_cm +from .._channel import open_memory_channel + +# Three ways of implementing a Lock in terms of a channel. Used to let us put +# the channel through the generic lock tests. + + +@async_cm +class ChannelLock1: + def __init__(self, capacity): + self.s, self.r = open_memory_channel(capacity) + for _ in range(capacity - 1): + self.s.send_nowait(None) + + def acquire_nowait(self): + self.s.send_nowait(None) + + async def acquire(self): + await self.s.send(None) + + def release(self): + self.r.receive_nowait() + + +@async_cm +class ChannelLock2: + def __init__(self): + self.s, self.r = open_memory_channel(10) + self.s.send_nowait(None) + + def acquire_nowait(self): + self.r.receive_nowait() + + async def acquire(self): + await self.r.receive() + + def release(self): + self.s.send_nowait(None) + + +@async_cm +class ChannelLock3: + def __init__(self): + self.s, self.r = open_memory_channel(0) + # self.acquired is true when one task acquires the lock and + # only becomes false when it's released and no tasks are + # waiting to acquire. + self.acquired = False + + def acquire_nowait(self): + assert not self.acquired + self.acquired = True + + async def acquire(self): + if self.acquired: + await self.s.send(None) + else: + self.acquired = True + await _core.checkpoint() + + def release(self): + try: + self.r.receive_nowait() + except _core.WouldBlock: + assert self.acquired + self.acquired = False + + +# Three ways of implementing a Lock in terms of a Queue. Used to let us put +# the Queue through the generic lock tests. @async_cm @@ -630,6 +700,10 @@ def release(self): lambda: Semaphore(1), Lock, StrictFIFOLock, + lambda: ChannelLock1(10), + lambda: ChannelLock1(1), + ChannelLock2, + ChannelLock3, lambda: QueueLock1(10), lambda: QueueLock1(1), QueueLock2, @@ -640,6 +714,10 @@ def release(self): "Semaphore(1)", "Lock", "StrictFIFOLock", + "ChannelLock1(10)", + "ChannelLock1(1)", + "ChannelLock2", + "ChannelLock3", "QueueLock1(10)", "QueueLock1(1)", "QueueLock2",