Skip to content

Conversation

@hendrikmakait
Copy link
Member

@hendrikmakait hendrikmakait commented May 31, 2022

Closes #6359

This PR introduces a TaskGroup class that encapsulates scheduling behavior behind call_soon and call_later methods and allows gracefully shutting down all its tasks at once with a timeout.

Outside of scope:

  • Replacing loop.add_callback and loop.call_later if the call is performed to another loop or the loop is not running.
  • Avoiding multiple concurrent calls to Server.close(), the fastest one wins as before, now it just cancels the others.

except asyncio.TimeoutError:
try:
await gather
except asyncio.CancelledError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because return_exceptions=True was passed to gather that would mean this CancelledError was raised because someone cancelled the call to TaskGroup.stop() ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python3.8, we otherwise receive a _GatheringFuture exception was never retrieved warning, which broke a bunch of things. This seems to be solved in 3.9 and higher. See the snippet for a small reproducible example.

import asyncio

async def foo():
    await asyncio.sleep(10)

async def bar():
    coro = asyncio.gather(foo())
    try:
        await asyncio.wait_for(coro, timeout=1)
    except asyncio.TimeoutError:  # wait_for timed out after 1 second
        try:
            await coro # Await tor retrieve exception from future
        except asyncio.CancelledError:
            pass

asyncio.run(bar())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems related python/cpython#73618

for me wrapping the call to gather in an async def also swallows the _GatheringFuture exception was never retrieved:

import asyncio

async def foo():
    await asyncio.sleep(10)

async def bar():
    async def async_fn():
        return await asyncio.gather(foo())

    try:
        await asyncio.wait_for(async_fn(), timeout=1)
    except asyncio.TimeoutError:  # wait_for timed out after 1 second
        pass

asyncio.run(bar())

Copy link
Member Author

@hendrikmakait hendrikmakait May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I came across that issue yesterday as well. Let's go with the async def then, it looks less convoluted. I'll link to this discussion in a comment.

Copy link
Member

@graingert graingert May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fix was introduced here: https://github.com/python/cpython/pull/20054/files#diff-429f4ed1e0f89ea2c92e2a8e8548ea8ae1a3d528979554fbfa4c38329e951529R503 this calls _GatheringFuture.result which marks the future exception as retrieved.

wrapping the _GatheringFuture in task also causes _GatheringFuture.result to get called: https://github.com/python/cpython/blob/372afb7a9b1fff2fcc724f999744b6126f0f6e07/Lib/asyncio/tasks.py#L304

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL: Wrapping the _GatheringFuture in asyncio.create_task() does not work after all, it throws a TypeError: a coroutine was expected, got <_GatheringFuture pending>.

import asyncio

async def foo():
    await asyncio.sleep(10)

async def bar():
    try:
        task = asyncio.create_task(asyncio.gather(foo()))
        await asyncio.wait_for(task, timeout=1)
    except asyncio.TimeoutError:  # wait_for timed out after 1 second
        try:
            await task
        except asyncio.CancelledError:
            pass

asyncio.run(bar())
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/script.py", line 19, in <module>
    asyncio.run(bar())
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/Users/hendrikmakait/projects/dask/distributed/script.py", line 10, in bar
    task = asyncio.create_task(asyncio.gather(foo()))
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 361, in create_task
    task = loop.create_task(coro)
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 438, in create_task
    task = tasks.Task(coro, loop=self, name=name)
TypeError: a coroutine was expected, got <_GatheringFuture pending>


def _send_worker_status_change(self, stimulus_id: str) -> None:
if (
async def _send_worker_status_change(self, stimulus_id: str) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to an async function with a while loop instead of a trampoline.

###########

def reevaluate_occupancy(self, worker_index: int = 0):
async def reevaluate_occupancy(self, worker_index: int = 0):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to an async function with a while loop instead of a trampoline.

except asyncio.TimeoutError:
try:
await gather
except asyncio.CancelledError:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python3.8, we otherwise receive a _GatheringFuture exception was never retrieved warning, which broke a bunch of things. This seems to be solved in 3.9 and higher. See the snippet for a small reproducible example.

import asyncio

async def foo():
    await asyncio.sleep(10)

async def bar():
    coro = asyncio.gather(foo())
    try:
        await asyncio.wait_for(coro, timeout=1)
    except asyncio.TimeoutError:  # wait_for timed out after 1 second
        try:
            await coro # Await tor retrieve exception from future
        except asyncio.CancelledError:
            pass

asyncio.run(bar())

@@ -370,7 +534,10 @@ def stop(self):
# The demonstrator for this is Worker.terminate(), which
# closes the server socket in response to an incoming message.
# See https://github.com/tornadoweb/tornado/issues/2069
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tornadoweb/tornado#2075 is fixed now released in v5 so we should be able to call listener.stop()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@github-actions
Copy link
Contributor

github-actions bot commented Jun 8, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   9h 24m 49s ⏱️ + 2h 55m 38s
  2 861 tests +  8    2 776 ✔️ +  7    82 💤 ±0  3 +1 
21 193 runs  +56  20 240 ✔️ +52  949 💤 +2  4 +2 

For more details on these failures, see this check.

Results for commit 8873ccb. ± Comparison against base commit 2252287.

♻️ This comment has been updated with latest results.

@hendrikmakait
Copy link
Member Author

Test results:
Known flakes:

New flakes:

  • distributed/tests/test_scheduler.py::test_missing_data_errant_worker
  • distributed/tests/test_active_memory_manager.py::test_RetireWorker_all_replicas_are_being_retired
  • distributed/tests/test_steal.py::test_cleanup_repeated_tasks

It generally feels like some tests have become a lot slower (e.g. distributed/tests/test_scheduler.py::test_missing_data_errant_worker).

@graingert graingert closed this Jun 8, 2022
@graingert graingert reopened this Jun 8, 2022
@graingert
Copy link
Member

pytest distributed/tests/test_scheduler.py::test_missing_data_errant_worker -s
================================================================================================================================== test session starts ===================================================================================================================================
platform linux -- Python 3.10.4, pytest-7.1.2, pluggy-1.0.0 -- /home/graingert/miniconda3/envs/dask-distributed/bin/python
cachedir: .pytest_cache
rootdir: /home/graingert/projects/distributed, configfile: setup.cfg
plugins: cov-3.0.0, repeat-0.9.1, rerunfailures-10.2, timeout-2.1.0
timeout: 300.0s
timeout method: thread
timeout func_only: False
collected 1 item                                                                                                                                                                                                                                                                         

distributed/tests/test_scheduler.py::test_missing_data_errant_worker 2022-06-08 17:22:24,675 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-06-08 17:22:24,677 - distributed.scheduler - INFO - State start
2022-06-08 17:22:24,679 - distributed.scheduler - INFO - Clear task state
2022-06-08 17:22:24,679 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:35523
2022-06-08 17:22:24,679 - distributed.scheduler - INFO -   dashboard at:           127.0.0.1:45737
2022-06-08 17:22:24,679 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcc340>
2022-06-08 17:22:24,680 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcc3a0>
2022-06-08 17:22:24,680 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcc4f0>
2022-06-08 17:22:24,680 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcc0d0>
2022-06-08 17:22:24,689 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:36619
2022-06-08 17:22:24,689 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:36619
2022-06-08 17:22:24,690 - distributed.worker - INFO -          dashboard at:            127.0.0.1:41439
2022-06-08 17:22:24,690 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:35523
2022-06-08 17:22:24,690 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:22:24,690 - distributed.worker - INFO -               Threads:                          1
2022-06-08 17:22:24,690 - distributed.worker - INFO -                Memory:                  15.36 GiB
2022-06-08 17:22:24,690 - distributed.worker - INFO -       Local Directory: /tmp/tmp8g6uqdam/dask-worker-space/worker-kms39afd
2022-06-08 17:22:24,690 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:22:24,691 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:45445
2022-06-08 17:22:24,691 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:45445
2022-06-08 17:22:24,691 - distributed.worker - INFO -          dashboard at:            127.0.0.1:32859
2022-06-08 17:22:24,691 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:35523
2022-06-08 17:22:24,691 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:22:24,691 - distributed.worker - INFO -               Threads:                          1
2022-06-08 17:22:24,691 - distributed.worker - INFO -                Memory:                  15.36 GiB
2022-06-08 17:22:24,691 - distributed.worker - INFO -       Local Directory: /tmp/tmp8g6uqdam/dask-worker-space/worker-6xeuprr5
2022-06-08 17:22:24,691 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:22:24,692 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:40227
2022-06-08 17:22:24,692 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:40227
2022-06-08 17:22:24,692 - distributed.worker - INFO -          dashboard at:            127.0.0.1:39453
2022-06-08 17:22:24,692 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:35523
2022-06-08 17:22:24,692 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:22:24,692 - distributed.worker - INFO -               Threads:                          1
2022-06-08 17:22:24,692 - distributed.worker - INFO -                Memory:                  15.36 GiB
2022-06-08 17:22:24,692 - distributed.worker - INFO -       Local Directory: /tmp/tmp8g6uqdam/dask-worker-space/worker-wwr48qe4
2022-06-08 17:22:24,692 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:22:24,957 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:36619', name: 0, status: init, memory: 0, processing: 0>
2022-06-08 17:22:24,957 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:36619
2022-06-08 17:22:24,957 - distributed.core - INFO - Starting established connection
2022-06-08 17:22:24,957 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:45445', name: 1, status: init, memory: 0, processing: 0>
2022-06-08 17:22:24,958 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45445
2022-06-08 17:22:24,958 - distributed.core - INFO - Starting established connection
2022-06-08 17:22:24,958 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:40227', name: 2, status: init, memory: 0, processing: 0>
2022-06-08 17:22:24,958 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:40227
2022-06-08 17:22:24,958 - distributed.core - INFO - Starting established connection
2022-06-08 17:22:24,959 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:35523
2022-06-08 17:22:24,959 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:22:24,959 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcde10>
2022-06-08 17:22:24,959 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcdc90>
2022-06-08 17:22:24,959 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcdde0>
2022-06-08 17:22:24,959 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcdfc0>
2022-06-08 17:22:24,959 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fce290>
2022-06-08 17:22:24,959 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:35523
2022-06-08 17:22:24,959 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:22:24,959 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcf1c0>
2022-06-08 17:22:24,959 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcf040>
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcf190>
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcf370>
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c98fcf7f0>
2022-06-08 17:22:24,960 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:35523
2022-06-08 17:22:24,960 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c990145e0>
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c99014400>
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c99014490>
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c99014790>
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f1c99014c10>
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting established connection
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting established connection
2022-06-08 17:22:24,960 - distributed.core - INFO - Starting established connection
2022-06-08 17:22:25,018 - distributed.scheduler - INFO - Receive client connection: Client-2e7a6da6-e747-11ec-945e-9cb6d08f15e1
2022-06-08 17:22:25,018 - distributed.core - INFO - Starting established connection
2022-06-08 17:22:25,215 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:36619
2022-06-08 17:22:25,249 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:36619', name: 0, status: closing, memory: 1, processing: 0>
2022-06-08 17:22:25,249 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:36619
2022-06-08 17:22:25,250 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-5cdbf681-328a-46ab-a320-7d58beea813e Address tcp://127.0.0.1:36619 Status: Status.closing
2022-06-08 17:22:25,262 - tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f1c8555cd30>, <Task finished name='Task-99' coro=<BaseTCPListener._handle_stream() done, defined at /home/graingert/projects/distributed/distributed/comm/tcp.py:543> exception=AsyncTaskGroupClosedError('Cannot schedule a new coroutine function as the group is already closed.')>)
Traceback (most recent call last):
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/tcpserver.py", line 331, in <lambda>
    gen.convert_yielded(future), lambda f: f.result()
  File "/home/graingert/projects/distributed/distributed/comm/tcp.py", line 560, in _handle_stream
    await self.comm_handler(comm)
  File "/home/graingert/projects/distributed/distributed/core.py", line 643, in handle_comm
    self._ongoing_background_tasks.call_soon(self._handle_comm, comm)
  File "/home/graingert/projects/distributed/distributed/core.py", line 173, in call_soon
    raise AsyncTaskGroupClosedError(
distributed.core.AsyncTaskGroupClosedError: Cannot schedule a new coroutine function as the group is already closed.
2022-06-08 17:22:53,471 - distributed.comm.tcp - WARNING - Closing dangling stream in <TCP  local=tcp://127.0.0.1:36619 remote=tcp://127.0.0.1:59748>
2022-06-08 17:22:53,477 - distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:36619
Traceback (most recent call last):
  File "/home/graingert/projects/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/graingert/projects/distributed/distributed/worker.py", line 3290, in gather_dep
    response = await get_data_from_worker(
  File "/home/graingert/projects/distributed/distributed/worker.py", line 4612, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/home/graingert/projects/distributed/distributed/utils_comm.py", line 381, in retry_operation
    return await retry(
  File "/home/graingert/projects/distributed/distributed/utils_comm.py", line 366, in retry
    return await coro()
  File "/home/graingert/projects/distributed/distributed/worker.py", line 4592, in _get_data
    response = await send_recv(
  File "/home/graingert/projects/distributed/distributed/core.py", line 884, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/graingert/projects/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/graingert/projects/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Ephemeral Worker->Worker for gather local=tcp://127.0.0.1:59748 remote=tcp://127.0.0.1:36619>: Stream is closed
2022-06-08 17:22:53,546 - distributed.scheduler - INFO - Remove client Client-2e7a6da6-e747-11ec-945e-9cb6d08f15e1
2022-06-08 17:22:53,546 - distributed.scheduler - INFO - Remove client Client-2e7a6da6-e747-11ec-945e-9cb6d08f15e1
2022-06-08 17:22:53,547 - distributed.scheduler - INFO - Close client connection: Client-2e7a6da6-e747-11ec-945e-9cb6d08f15e1
2022-06-08 17:22:53,547 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:45445
2022-06-08 17:22:53,548 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:40227
2022-06-08 17:22:53,549 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:45445', name: 1, status: closing, memory: 0, processing: 0>
2022-06-08 17:22:53,549 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:45445
2022-06-08 17:22:53,549 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:40227', name: 2, status: closing, memory: 0, processing: 0>
2022-06-08 17:22:53,549 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:40227
2022-06-08 17:22:53,549 - distributed.scheduler - INFO - Lost all workers
2022-06-08 17:22:53,550 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-d6dbf5ef-e477-440b-9419-e3a258a3ef8e Address tcp://127.0.0.1:45445 Status: Status.closing
2022-06-08 17:22:53,550 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-cc3b375f-d4b2-4ab6-9ac8-16e9a056d777 Address tcp://127.0.0.1:40227 Status: Status.closing
2022-06-08 17:22:53,552 - distributed.scheduler - INFO - Scheduler closing...
2022-06-08 17:22:53,552 - distributed.scheduler - INFO - Scheduler closing all comms
PASSED

================================================================================================================================== slowest 20 durations ==================================================================================================================================
29.94s call     distributed/tests/test_scheduler.py::test_missing_data_errant_worker

(2 durations < 0.005s hidden.  Use -vv to show these durations.)
=================================================================================================================================== 1 passed in 30.00s ===================================================================================================================================

Comment on lines 641 to 644
def handle_comm(self, comm):
"""Start a background task that dispatches new communications to coroutine-handlers"""
self._ongoing_background_tasks.call_soon(self._handle_comm, comm)
return NoOpAwaitable()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might help with new connections that are accepted after the worker starts closing

Suggested change
def handle_comm(self, comm):
"""Start a background task that dispatches new communications to coroutine-handlers"""
self._ongoing_background_tasks.call_soon(self._handle_comm, comm)
return NoOpAwaitable()
def handle_comm(self, comm):
"""Start a background task that dispatches new communications to coroutine-handlers"""
try:
self._ongoing_background_tasks.call_soon(self._handle_comm, comm)
except AsyncTaskGroupClosedError:
comm.abort()
return NoOpAwaitable()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

results in:

================================================================================================================================== test session starts ===================================================================================================================================
platform linux -- Python 3.10.4, pytest-7.1.2, pluggy-1.0.0 -- /home/graingert/miniconda3/envs/dask-distributed/bin/python
cachedir: .pytest_cache
rootdir: /home/graingert/projects/distributed, configfile: setup.cfg
plugins: cov-3.0.0, repeat-0.9.1, rerunfailures-10.2, timeout-2.1.0
timeout: 300.0s
timeout method: thread
timeout func_only: False
collected 1 item                                                                                                                                                                                                                                                                         

distributed/tests/test_scheduler.py::test_missing_data_errant_worker 2022-06-08 17:28:04,831 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-06-08 17:28:04,833 - distributed.scheduler - INFO - State start
2022-06-08 17:28:04,836 - distributed.scheduler - INFO - Clear task state
2022-06-08 17:28:04,836 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:34829
2022-06-08 17:28:04,836 - distributed.scheduler - INFO -   dashboard at:           127.0.0.1:34739
2022-06-08 17:28:04,836 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368dc280>
2022-06-08 17:28:04,836 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368dc2e0>
2022-06-08 17:28:04,836 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368dc430>
2022-06-08 17:28:04,836 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368abfd0>
2022-06-08 17:28:04,846 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:36549
2022-06-08 17:28:04,846 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:36549
2022-06-08 17:28:04,847 - distributed.worker - INFO -          dashboard at:            127.0.0.1:38441
2022-06-08 17:28:04,847 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:34829
2022-06-08 17:28:04,847 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:28:04,847 - distributed.worker - INFO -               Threads:                          1
2022-06-08 17:28:04,847 - distributed.worker - INFO -                Memory:                  15.36 GiB
2022-06-08 17:28:04,847 - distributed.worker - INFO -       Local Directory: /tmp/tmpv1oh7_1e/dask-worker-space/worker-69nya6ow
2022-06-08 17:28:04,847 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:28:04,848 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:44517
2022-06-08 17:28:04,848 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:44517
2022-06-08 17:28:04,848 - distributed.worker - INFO -          dashboard at:            127.0.0.1:40143
2022-06-08 17:28:04,848 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:34829
2022-06-08 17:28:04,848 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:28:04,848 - distributed.worker - INFO -               Threads:                          1
2022-06-08 17:28:04,848 - distributed.worker - INFO -                Memory:                  15.36 GiB
2022-06-08 17:28:04,848 - distributed.worker - INFO -       Local Directory: /tmp/tmpv1oh7_1e/dask-worker-space/worker-xphv4q5t
2022-06-08 17:28:04,848 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:28:04,849 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:39575
2022-06-08 17:28:04,849 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:39575
2022-06-08 17:28:04,849 - distributed.worker - INFO -          dashboard at:            127.0.0.1:40919
2022-06-08 17:28:04,849 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:34829
2022-06-08 17:28:04,849 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:28:04,849 - distributed.worker - INFO -               Threads:                          1
2022-06-08 17:28:04,849 - distributed.worker - INFO -                Memory:                  15.36 GiB
2022-06-08 17:28:04,849 - distributed.worker - INFO -       Local Directory: /tmp/tmpv1oh7_1e/dask-worker-space/worker-zhhd7q2x
2022-06-08 17:28:04,849 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:28:05,076 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:36549', name: 0, status: init, memory: 0, processing: 0>
2022-06-08 17:28:05,076 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:36549
2022-06-08 17:28:05,077 - distributed.core - INFO - Starting established connection
2022-06-08 17:28:05,077 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:44517', name: 1, status: init, memory: 0, processing: 0>
2022-06-08 17:28:05,077 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:44517
2022-06-08 17:28:05,077 - distributed.core - INFO - Starting established connection
2022-06-08 17:28:05,078 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39575', name: 2, status: init, memory: 0, processing: 0>
2022-06-08 17:28:05,078 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39575
2022-06-08 17:28:05,078 - distributed.core - INFO - Starting established connection
2022-06-08 17:28:05,078 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:34829
2022-06-08 17:28:05,078 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368ddd50>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368ddbd0>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368ddd20>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368ddf00>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368de1d0>
2022-06-08 17:28:05,079 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:34829
2022-06-08 17:28:05,079 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368df100>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368def80>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368df0d0>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368df2b0>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84368df730>
2022-06-08 17:28:05,079 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:34829
2022-06-08 17:28:05,079 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f8436924520>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f8436924340>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84369243d0>
2022-06-08 17:28:05,079 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f84369246d0>
2022-06-08 17:28:05,080 - distributed.core - INFO - Starting periodic callback <tornado.ioloop.PeriodicCallback object at 0x7f8436924b50>
2022-06-08 17:28:05,080 - distributed.core - INFO - Starting established connection
2022-06-08 17:28:05,080 - distributed.core - INFO - Starting established connection
2022-06-08 17:28:05,080 - distributed.core - INFO - Starting established connection
2022-06-08 17:28:05,138 - distributed.scheduler - INFO - Receive client connection: Client-f934aaaf-e747-11ec-9586-9cb6d08f15e1
2022-06-08 17:28:05,138 - distributed.core - INFO - Starting established connection
2022-06-08 17:28:05,320 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:36549
2022-06-08 17:28:05,322 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:36549', name: 0, status: closing, memory: 1, processing: 0>
2022-06-08 17:28:05,322 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:36549
2022-06-08 17:28:05,322 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-9ccd4644-8b3c-4eee-874c-f932ddcdf737 Address tcp://127.0.0.1:36549 Status: Status.closing
2022-06-08 17:28:05,324 - distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:36549
Traceback (most recent call last):
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/iostream.py", line 867, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/iostream.py", line 1140, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
ConnectionResetError: [Errno 104] Connection reset by peer

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/graingert/projects/distributed/distributed/worker.py", line 3290, in gather_dep
    response = await get_data_from_worker(
  File "/home/graingert/projects/distributed/distributed/worker.py", line 4612, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/home/graingert/projects/distributed/distributed/utils_comm.py", line 381, in retry_operation
    return await retry(
  File "/home/graingert/projects/distributed/distributed/utils_comm.py", line 366, in retry
    return await coro()
  File "/home/graingert/projects/distributed/distributed/worker.py", line 4592, in _get_data
    response = await send_recv(
  File "/home/graingert/projects/distributed/distributed/core.py", line 887, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/graingert/projects/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/graingert/projects/distributed/distributed/comm/tcp.py", line 148, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Ephemeral Worker->Worker for gather local=tcp://127.0.0.1:59712 remote=tcp://127.0.0.1:36549>: ConnectionResetError: [Errno 104] Connection reset by peer
2022-06-08 17:28:06,347 - distributed.scheduler - INFO - Remove client Client-f934aaaf-e747-11ec-9586-9cb6d08f15e1
2022-06-08 17:28:06,347 - distributed.scheduler - INFO - Remove client Client-f934aaaf-e747-11ec-9586-9cb6d08f15e1
2022-06-08 17:28:06,348 - distributed.scheduler - INFO - Close client connection: Client-f934aaaf-e747-11ec-9586-9cb6d08f15e1
2022-06-08 17:28:06,349 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:44517
2022-06-08 17:28:06,350 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39575
2022-06-08 17:28:06,351 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:44517', name: 1, status: closing, memory: 0, processing: 0>
2022-06-08 17:28:06,351 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:44517
2022-06-08 17:28:06,352 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:39575', name: 2, status: closing, memory: 0, processing: 0>
2022-06-08 17:28:06,352 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:39575
2022-06-08 17:28:06,352 - distributed.scheduler - INFO - Lost all workers
2022-06-08 17:28:06,352 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-3b0793d7-3826-4dd9-8737-37212f30ccf5 Address tcp://127.0.0.1:44517 Status: Status.closing
2022-06-08 17:28:06,352 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-6e66ab61-a339-4674-8115-e07ff206fca7 Address tcp://127.0.0.1:39575 Status: Status.closing
2022-06-08 17:28:06,355 - distributed.scheduler - INFO - Scheduler closing...
2022-06-08 17:28:06,356 - distributed.scheduler - INFO - Scheduler closing all comms
PASSED

================================================================================================================================== slowest 20 durations ==================================================================================================================================
2.57s call     distributed/tests/test_scheduler.py::test_missing_data_errant_worker

(2 durations < 0.005s hidden.  Use -vv to show these durations.)
=================================================================================================================================== 1 passed in 2.64s ====================================================================================================================================

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which is the same as on main:

 graingert@superjacent  ~/projects/distributed   main  pytest distributed/tests/test_scheduler.py::test_missing_data_errant_worker -s
================================================================================================================================== test session starts ===================================================================================================================================
platform linux -- Python 3.10.4, pytest-7.1.2, pluggy-1.0.0 -- /home/graingert/miniconda3/envs/dask-distributed/bin/python
cachedir: .pytest_cache
rootdir: /home/graingert/projects/distributed, configfile: setup.cfg
plugins: cov-3.0.0, repeat-0.9.1, rerunfailures-10.2, timeout-2.1.0
timeout: 300.0s
timeout method: thread
timeout func_only: False
collected 1 item                                                                                                                                                                                                                                                                         

distributed/tests/test_scheduler.py::test_missing_data_errant_worker 2022-06-08 17:32:30,332 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-06-08 17:32:30,336 - distributed.scheduler - INFO - State start
2022-06-08 17:32:30,342 - distributed.scheduler - INFO - Clear task state
2022-06-08 17:32:30,343 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:33183
2022-06-08 17:32:30,343 - distributed.scheduler - INFO -   dashboard at:           127.0.0.1:32993
2022-06-08 17:32:30,361 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:43121
2022-06-08 17:32:30,361 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:43121
2022-06-08 17:32:30,361 - distributed.worker - INFO -          dashboard at:            127.0.0.1:45323
2022-06-08 17:32:30,361 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:33183
2022-06-08 17:32:30,362 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:32:30,362 - distributed.worker - INFO -               Threads:                          1
2022-06-08 17:32:30,362 - distributed.worker - INFO -                Memory:                  15.36 GiB
2022-06-08 17:32:30,362 - distributed.worker - INFO -       Local Directory: /tmp/tmprhlds1rj/dask-worker-space/worker-qprmb83h
2022-06-08 17:32:30,362 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:32:30,363 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:39179
2022-06-08 17:32:30,363 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:39179
2022-06-08 17:32:30,363 - distributed.worker - INFO -          dashboard at:            127.0.0.1:46715
2022-06-08 17:32:30,363 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:33183
2022-06-08 17:32:30,363 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:32:30,363 - distributed.worker - INFO -               Threads:                          1
2022-06-08 17:32:30,364 - distributed.worker - INFO -                Memory:                  15.36 GiB
2022-06-08 17:32:30,364 - distributed.worker - INFO -       Local Directory: /tmp/tmprhlds1rj/dask-worker-space/worker-2dmt431u
2022-06-08 17:32:30,364 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:32:30,367 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:36953
2022-06-08 17:32:30,367 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:36953
2022-06-08 17:32:30,367 - distributed.worker - INFO -          dashboard at:            127.0.0.1:37997
2022-06-08 17:32:30,367 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:33183
2022-06-08 17:32:30,367 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:32:30,367 - distributed.worker - INFO -               Threads:                          1
2022-06-08 17:32:30,367 - distributed.worker - INFO -                Memory:                  15.36 GiB
2022-06-08 17:32:30,367 - distributed.worker - INFO -       Local Directory: /tmp/tmprhlds1rj/dask-worker-space/worker-5vvbjqd7
2022-06-08 17:32:30,367 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:32:31,052 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:43121', name: 0, status: init, memory: 0, processing: 0>
2022-06-08 17:32:31,053 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:43121
2022-06-08 17:32:31,054 - distributed.core - INFO - Starting established connection
2022-06-08 17:32:31,054 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39179', name: 1, status: init, memory: 0, processing: 0>
2022-06-08 17:32:31,055 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39179
2022-06-08 17:32:31,055 - distributed.core - INFO - Starting established connection
2022-06-08 17:32:31,056 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:36953', name: 2, status: init, memory: 0, processing: 0>
2022-06-08 17:32:31,057 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:36953
2022-06-08 17:32:31,057 - distributed.core - INFO - Starting established connection
2022-06-08 17:32:31,057 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:33183
2022-06-08 17:32:31,057 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:32:31,058 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:33183
2022-06-08 17:32:31,058 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:32:31,059 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:33183
2022-06-08 17:32:31,059 - distributed.worker - INFO - -------------------------------------------------
2022-06-08 17:32:31,059 - distributed.core - INFO - Starting established connection
2022-06-08 17:32:31,060 - distributed.core - INFO - Starting established connection
2022-06-08 17:32:31,060 - distributed.core - INFO - Starting established connection
2022-06-08 17:32:31,098 - distributed.scheduler - INFO - Receive client connection: Client-97ba5f2e-e748-11ec-96e8-9cb6d08f15e1
2022-06-08 17:32:31,099 - distributed.core - INFO - Starting established connection
2022-06-08 17:32:31,587 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:43121
2022-06-08 17:32:31,605 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:43121', name: 0, status: closing, memory: 1, processing: 0>
2022-06-08 17:32:31,605 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:43121
2022-06-08 17:32:31,606 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-b33593a2-312a-4bcb-a4aa-8e33d80554c8 Address tcp://127.0.0.1:43121 Status: Status.closing
2022-06-08 17:32:31,609 - distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:43121
Traceback (most recent call last):
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/iostream.py", line 867, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
  File "/home/graingert/miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/iostream.py", line 1140, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
ConnectionResetError: [Errno 104] Connection reset by peer

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/graingert/projects/distributed/distributed/worker.py", line 3290, in gather_dep
    response = await get_data_from_worker(
  File "/home/graingert/projects/distributed/distributed/worker.py", line 4611, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/home/graingert/projects/distributed/distributed/utils_comm.py", line 381, in retry_operation
    return await retry(
  File "/home/graingert/projects/distributed/distributed/utils_comm.py", line 366, in retry
    return await coro()
  File "/home/graingert/projects/distributed/distributed/worker.py", line 4591, in _get_data
    response = await send_recv(
  File "/home/graingert/projects/distributed/distributed/core.py", line 748, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/graingert/projects/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/graingert/projects/distributed/distributed/comm/tcp.py", line 148, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Ephemeral Worker->Worker for gather local=tcp://127.0.0.1:53490 remote=tcp://127.0.0.1:43121>: ConnectionResetError: [Errno 104] Connection reset by peer
2022-06-08 17:32:31,775 - distributed.scheduler - INFO - Remove client Client-97ba5f2e-e748-11ec-96e8-9cb6d08f15e1
2022-06-08 17:32:31,776 - distributed.scheduler - INFO - Remove client Client-97ba5f2e-e748-11ec-96e8-9cb6d08f15e1
2022-06-08 17:32:31,776 - distributed.scheduler - INFO - Close client connection: Client-97ba5f2e-e748-11ec-96e8-9cb6d08f15e1
2022-06-08 17:32:31,777 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39179
2022-06-08 17:32:31,778 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:36953
2022-06-08 17:32:31,780 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:39179', name: 1, status: closing, memory: 0, processing: 0>
2022-06-08 17:32:31,780 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:39179
2022-06-08 17:32:31,781 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:36953', name: 2, status: closing, memory: 0, processing: 0>
2022-06-08 17:32:31,781 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:36953
2022-06-08 17:32:31,781 - distributed.scheduler - INFO - Lost all workers
2022-06-08 17:32:31,781 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-600a21d1-a19a-4f79-9db6-322ea9c286bb Address tcp://127.0.0.1:39179 Status: Status.closing
2022-06-08 17:32:31,781 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-5d99f3f3-c0ed-4531-b2c0-e271e50746b4 Address tcp://127.0.0.1:36953 Status: Status.closing
2022-06-08 17:32:31,789 - distributed.scheduler - INFO - Scheduler closing...
2022-06-08 17:32:31,789 - distributed.scheduler - INFO - Scheduler closing all comms
PASSED

================================================================================================================================== slowest 20 durations ==================================================================================================================================
1.55s call     distributed/tests/test_scheduler.py::test_missing_data_errant_worker

(2 durations < 0.005s hidden.  Use -vv to show these durations.)
=================================================================================================================================== 1 passed in 1.73s ====================================================================================================================================

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return _python_shutting_down


def delayed(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we call this delay instead? Because delayed is a term in Dask https://docs.dask.org/en/stable/delayed.html, this could be a little confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just make this private for now

pass


class AsyncTaskGroup(_LoopBoundMixin):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't intend to delay merging this PR with this at all. But I'm a little surprised to see something so similar to, but not quite the same as, the asyncio TaskGroup added in 3.11. Maybe once this gets in, we'd want to refactor to something consistent with the new asyncio API?

I also feel like AsyncTaskGroup should maybe be its own file?

Also, this looks interesting: https://github.com/Tinche/quattro

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the task group of 3.11 cannot be easily copy-pasted, it relies on some 3.11 features, e.g. Task.uncancel

I'm hoping to see some backports about the 3.11 async features, though.

Either way, I'm very open to refactor this and align with stdlib. I'm also fine with moving this it's own file

timeout,
)
except asyncio.TimeoutError:
await asyncio.gather(*tasks_to_stop, return_exceptions=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to comment that the timeout on gather will cancel all the tasks, so this shouldn't hang forever; that's not very intuitive.


def _on_exit_sync(self, exitcode):
self.loop.add_callback(self._on_exit, exitcode)
self._ongoing_background_tasks.call_soon(self._on_exit, exitcode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this method relying on the fact that loop.add_callback is threadsafe, and can be called from outside the event loop? The sync part of the name makes me think that. I'm not sure if our new call_soon is also threadsafe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no _on_exit_sync is run in a loop.add_callback()

on_exit=self._on_exit_sync,

self.process.set_exit_callback(self._on_exit)

def set_exit_callback(self, func):
"""
Set a function to be called by the event loop when the process exits.
The function is called with the AsyncProcess as sole argument.
The function may be a coroutine function.
"""
# XXX should this be a property instead?
assert callable(func), "exit callback should be callable"
assert (
self._state.pid is None
), "cannot set exit callback when process already started"
self._exit_callback = func

_loop_add_callback(self._loop, self._on_exit, exitcode)

def _on_exit(self, exitcode):
# Called from the event loop when the child process exited
self._process = None
if self._exit_callback is not None:
self._exit_callback(self)
self._exit_future.set_result(exitcode)

if duration > 0.005: # 5ms since last release
next_time = timedelta(seconds=duration * 5) # 25ms gap
break
while self.status != Status.closed:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the try should be inside the while, but I guess what you have is consistent with the current behavior

assert end - start > 1 - timemod.get_clock_info("monotonic").resolution


def test_async_task_group_close_closes():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps also test that it's idempotent? (double-close doesn't cause an error)


bcomm.start(comm)

self.loop.add_callback(batched_send_connect)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are a few other instances of loop.add_callback you didn't refactor in worker.py, was that intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some instances of this didn't handle cancellation in the prescribed order well. This has been left for future work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe would be good to add TODO comments or open issues for those? It's nice when reading the code to be aware that something is an antipattern (and why)

self._last_tick = time()
for pc in self.periodic_callbacks.values():
if not pc.is_running():
logger.info(f"Starting periodic callback {pc!r}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be causing the failures in test_worker_cli_nprocs_renamed_to_nworkers see also https://github.com/dask/distributed/pulls/

Suggested change
logger.info(f"Starting periodic callback {pc!r}")


def call_later(
self, delay: float, afunc: Callable[..., Coroutine], *args, **kwargs
) -> asyncio.Task:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use the return value of this task anywhere?

if we don't return it we can avoid cases where people call await tg.call_later(...) and get a CancelledError when the scheduled task is cancelled rather than only when the parent task is cancelled

self._ongoing_tasks: set[asyncio.Task] = set()

def call_soon(
self, afunc: Callable[..., Coroutine], *args, **kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does ParamSpec work on these?

def call_soon(self, async_fn: Callable[P, Coroutine[Any, Any, T]], /, *args: P.args, **kwargs: P.kwargs) -> asyncio.Task[T]:
call_later(self, delay: float, async_fn: Callable[P, Coroutine[Any, Any, T]], /, *args: P.args, **kwargs: P.kwargs) -> asyncio.Task[T]:

@graingert
Copy link
Member

this got merged in #6603 !

@graingert graingert closed this Jun 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Replace loop.call_later and loop.add_callback with background tasks

5 participants