Skip to content

We should have some kind of better closure tracking for Queues #497

@njsmith

Description

@njsmith

Right now, if you want to do clean shutdown of a Queue, you have to do some messy thing where you pass in a sentinel value and the other side has to watch for it. This is inherited from Python's queue.Queue, but ... queue.Queue sometimes feels like a neat v1 that never got fully reviewed or polished. (We already got rid of some of its unwise decisions, see #322.)

For streams, there's a nice clear well-defined way to do this: the sender closes the stream, the receiver gets a notification (a special sentinel value in that case), any further attempts to send on the stream raise an error. Or, the receiver closers their end of the stream, and then any further attempts to send/receive raise an error.

For Queue it's a little more complicated, because we have the sending and receiving merged into a single object. The most important case is a sender notifying the receiver that they're done. So the simplest thing would be to have a close method that makes further calls to get/put raise an error, and __anext__ to raise StopAsyncIteration, and document that generally it should be the sender that uses it.

(What if there are multiple tasks putting to the same queue? We could have close cause any blocked puts to immediately wake up (compare #460), or have an aclose that waits for any current put calls to finish.)

Here's another option, that's more disruptive but might ultimately be better: have the put and get sides be two different objects, so you'd do something like put_queue, get_queue = trio.open_queue(). put_queue would have the put methods, get_queue would have the get methods and async iteration protocol, and they could be closed independently, with the same detailed semantics as the stream APIs.

Maybe we'd even want to make an ABC for this, like ObjectSendStream and ObjectReceiveStream (names to be bikeshedded later); then people could make versions that use pickle to implement the interface on top of a Stream, etc. If doing this we might want to also change the names to better match the Stream conventions, like have the two main methods be send and receive instead of put and get.

But we shouldn't take this analogy too far... regular in-memory Queues are often used in a way that byte streams aren't. Are there any cases where what you really want is a bidirectional Queue? If so then that would be a strong argument for splitting the two endpoints into two different objects... but I'm not sure if this really happens? Another way Queues are different from streams is that fan-in and fan-out is common. For fan-in/fan-out, maybe we could do even better than having two objects: maybe we want to have lots of objects, one for every send/receiver! For example this might look like:

async def sender(object_send_stream):
    async with object_send_stream:
        await object_send_stream.send(None)

async def receiver(object_receive_stream):
    async with object_receive_stream:
        async for obj in object_receive_stream:
            print("got", None)

async def main():
    object_stream_buffer = trio.ObjectStreamBuffer()
    async with trio.open_nursery() as nursery:
        nursery.start_soon(sender, object_stream_buffer.object_send_stream())
        nursery.start_soon(sender, object_stream_buffer.object_send_stream())
        nursery.start_soon(sender, object_stream_buffer.object_send_stream())
        nursery.start_soon(receiver, object_stream_buffer.object_receive_stream())

(Expected output: prints got None three times, and then exits. Notice that the three calls to sender each get their own object_send_stream, and each of them closes their version when done.)

The idea here is that by having multiple send/receive endpoints that you can attach to the same underlying buffer, you can track the closure state for each endpoint separately. So for a fan-in case, you can have a bunch of producers that each have their own copy of the send endpoint, send stuff for a while, then close their endpoint, and then once all the endpoints are closed the receiver gets notified. (This could even replace the task_left bookkeeping in this example.)

I guess another way to do this would be to have the put_queue, get_queue = trio.open_queue() style API, but have a .clone() method on the endpoints. This would enforce that you can't create a new endpoint unless there's already at least one open endpoint, which might help avoid race conditions like: send_stream1 = buffer.object_send_stream(); send_stream1.close(); send_stream2 = buffer.object_send_stream(); send_stream2.send(...) (fails because after the first call to close, there were zero endpoints, so the underlying buffer got closed). Or else I guess we could "closed" be a toggleable state – it starts off closed, until you create the first endpoint? But then that has a different race condition where if the receiver starts first they might see the stream as being closed and exit before the first producer wakes up and starts sending stuff.

Hmm.

CC: @oremanj since they reminded me of this today.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions