Skip to content

Comm objects do not handle cancellation correctly #6548

@graingert

Description

@graingert
import logging
import asyncio
import sys

from distributed.comm import (
    connect,
    listen
)

logger = logging.getLogger(__name__)

async def get_comm_pair(listen_addr, listen_args={}, connect_args={}, **kwargs):
    q = asyncio.Queue()

    async def handle_comm(comm):
        await q.put(comm)

    async with listen(listen_addr, handle_comm, **listen_args, **kwargs) as listener:
        comm = await connect(listener.contact_address, **connect_args, **kwargs)

    serv_comm = await q.get()
    return (comm, serv_comm)


async def check_comm_cancel(addr):
    a, b = await get_comm_pair(addr)
    try:
        try:
            await asyncio.wait_for(a.read(), 0.05)
        except asyncio.TimeoutError:
            pass
        else:
            raise Exception("did not raise")
        await a.read()
    finally:
        await a.close()
        await b.close()

async def amain():
    try:
        await check_comm_cancel("tcp://")
    except AssertionError:
        logger.exception("tcp failed")
    try:
        await check_comm_cancel("inproc://")
    except AssertionError:
        logger.exception("inproc failed")

def main():
    return asyncio.run(amain())

if __name__ == "__main__":
    sys.exit(main())

output:

tcp failed
Traceback (most recent call last):
  File "/home/graingert/projects/distributed/demo.py", line 41, in amain
    await check_comm_cancel("tcp://")
  File "/home/graingert/projects/distributed/demo.py", line 34, in check_comm_cancel
    await a.read()
  File "/home/graingert/projects/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/iostream.py", line 421, in read_bytes
    future = self._start_read()
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/iostream.py", line 809, in _start_read
    assert self._read_future is None, "Already reading"
AssertionError: Already reading
Traceback (most recent call last):
  File "/home/graingert/projects/distributed/demo.py", line 34, in check_comm_cancel
    await a.read()
  File "/home/graingert/projects/distributed/distributed/comm/inproc.py", line 196, in read
    msg = await self._read_q.get()
  File "/home/graingert/projects/distributed/distributed/comm/inproc.py", line 107, in get
    assert not self._read_future, "Only one reader allowed"
AssertionError: Only one reader allowed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/graingert/projects/distributed/demo.py", line 53, in <module>
    sys.exit(main())
  File "/home/graingert/projects/distributed/demo.py", line 50, in main
    return asyncio.run(amain())
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/graingert/projects/distributed/demo.py", line 45, in amain
    await check_comm_cancel("inproc://")
  File "/home/graingert/projects/distributed/demo.py", line 36, in check_comm_cancel
    await a.close()
  File "/home/graingert/projects/distributed/distributed/comm/inproc.py", line 216, in close
    self.abort()
  File "/home/graingert/projects/distributed/distributed/comm/inproc.py", line 222, in abort
    self._read_q.put_nowait(_EOF)
  File "/home/graingert/projects/distributed/distributed/comm/inproc.py", line 122, in put_nowait
    fut.set_result(value)
asyncio.exceptions.InvalidStateError: invalid state

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions