Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
33ae2ef
Add background tasks and rename ongoing_coroutines to ongoing_comm_ha…
hendrikmakait May 24, 2022
e680be9
Replace add_callback and call_later
hendrikmakait May 25, 2022
0af0741
Move close out of background tasks since it would cancel itself
hendrikmakait May 25, 2022
d9ce34f
Fix issue with non-running IO loop
hendrikmakait May 25, 2022
89f70db
Remove deprecated asyncio.coroutine
hendrikmakait May 25, 2022
76edf23
Add delay decorator to delay async function evaluation
hendrikmakait May 27, 2022
e415d97
Replace add_callback in nanny
hendrikmakait May 27, 2022
3d688ad
Add docstring and rename to create_background_task
hendrikmakait May 27, 2022
d91a4cf
Factor functionality out into TaskGroup and adjust interface to avoid…
hendrikmakait May 27, 2022
8312041
Rename
hendrikmakait May 27, 2022
488b111
Revert changes to log_event
hendrikmakait May 27, 2022
9155235
Revert changes to lifetime callback
hendrikmakait May 30, 2022
62c9383
Fix test
hendrikmakait May 30, 2022
96c6346
Ignore cancelled error when awaiting finished()
hendrikmakait May 30, 2022
0248587
Fix test
hendrikmakait May 30, 2022
3a5695a
Fix test for adaptive scaling by adjusting wait condition
hendrikmakait May 30, 2022
aaa3cd4
Enable tmate for remote debugging of flaking tests
hendrikmakait May 30, 2022
b271783
Re-raise exception unless cancelled
hendrikmakait May 30, 2022
5459964
Re-raise exception unless cancelled
hendrikmakait May 30, 2022
773188a
Fix '_GatheringFuture exception was never retrieved'
hendrikmakait May 30, 2022
734e893
Catch cancellederror on cancelled gather
hendrikmakait May 30, 2022
bc45f99
Revert changes to tests.yaml
hendrikmakait May 30, 2022
a932b79
Replace _ongoing_comm_handlers with TaskGroup
hendrikmakait May 31, 2022
bb9d71b
Add docstrings
hendrikmakait May 31, 2022
eda6b73
Improved docs
hendrikmakait May 31, 2022
79970d7
Improve typing
hendrikmakait May 31, 2022
bdb1b70
Fix typing of delayed
hendrikmakait May 31, 2022
d3f04a5
Adjust stop()
hendrikmakait May 31, 2022
0532a21
Adjust stop()
hendrikmakait May 31, 2022
ac53b94
Make call_soon and call_later public
hendrikmakait May 31, 2022
2f7d90c
Make TypeVar private
hendrikmakait May 31, 2022
4f739b2
Rename TaskGroup to AsyncTaskGroup
hendrikmakait May 31, 2022
f6348ee
Minor
hendrikmakait May 31, 2022
b66fae6
Fix stop() without running tasks
hendrikmakait May 31, 2022
b064ba1
Wrap gather in async def
hendrikmakait Jun 1, 2022
52061d9
Add __len__ to AsyncTaskGroup
hendrikmakait Jun 1, 2022
1e41e20
Fix comm handling and delayed typing
hendrikmakait Jun 1, 2022
688aaa1
Fix test
hendrikmakait Jun 1, 2022
f3d9fa9
Add unit tests for AsyncTaskGroup
hendrikmakait Jun 1, 2022
ad70db2
Merge branch 'main' into background-tasks
hendrikmakait Jun 1, 2022
1b28c3b
Merge branch 'main' into background-tasks
hendrikmakait Jun 1, 2022
683d837
Replace more add_callback's
hendrikmakait Jun 1, 2022
350c2b4
Add hopefully unreachable error
hendrikmakait Jun 1, 2022
43c7b3c
Rollback to add_callback in fail_hard
hendrikmakait Jun 2, 2022
fddd34c
Merge branch 'main' into background-tasks
hendrikmakait Jun 2, 2022
936710a
Retrigger CI to check for flake
hendrikmakait Jun 2, 2022
f46bc59
Clean up closing logic
hendrikmakait Jun 2, 2022
06749bb
Add locking
hendrikmakait Jun 2, 2022
9ca5bea
Fix invalid worker states by not tracking handle_scheduler
hendrikmakait Jun 2, 2022
daa845a
Fix call_later test by using monotonic clock and subtracting clock re…
hendrikmakait Jun 2, 2022
be622fd
Merge branch 'main' into background-tasks
hendrikmakait Jun 3, 2022
075a539
Drop lock and ensure AsyncTaskGroup is called from the correct thread
hendrikmakait Jun 7, 2022
8df26a5
Raise exception if coro cannot be scheduled
hendrikmakait Jun 8, 2022
a77c61e
AsyncTaskGroupClosedError
hendrikmakait Jun 8, 2022
50f5851
Add comment
hendrikmakait Jun 8, 2022
22d0952
Use ParamSpec
hendrikmakait Jun 8, 2022
d5b99ef
Merge branch 'main' into background-tasks
hendrikmakait Jun 8, 2022
cd04ab2
Fix ParamSpec
hendrikmakait Jun 8, 2022
2daae46
Fix tests
hendrikmakait Jun 8, 2022
55bcdd3
Drop wrapper methods to highlight which group is being used
hendrikmakait Jun 8, 2022
6c4536c
Fix tests errors after cleanup
hendrikmakait Jun 8, 2022
53643cd
Fix inproc cancel handling and simplify pc startup
hendrikmakait Jun 8, 2022
2494e40
Drop AsyncTaskGroup.schedule()
hendrikmakait Jun 8, 2022
2751c52
Add comment
hendrikmakait Jun 8, 2022
93ff2ba
Fix f-string
hendrikmakait Jun 8, 2022
e76489c
Abort comm if we cannot handle it
hendrikmakait Jun 8, 2022
09a3b2c
Fix listener.stop
hendrikmakait Jun 8, 2022
a7df081
Add comment
hendrikmakait Jun 8, 2022
8873ccb
Test idempotency
hendrikmakait Jun 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions distributed/comm/inproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,18 @@ def get_nowait(self):
raise QueueEmpty
return q.popleft()

def get(self):
async def get(self):
assert not self._read_future, "Only one reader allowed"
fut = Future()
q = self._q
if q:
fut.set_result(q.popleft())
else:
self._read_future = fut
return fut
try:
return await fut
finally:
self._read_future = None

def put_nowait(self, value):
q = self._q
Expand Down
225 changes: 181 additions & 44 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
from contextlib import suppress
from enum import Enum
from functools import partial
from typing import Callable, ClassVar, TypedDict, TypeVar
from typing import Callable, ClassVar, Coroutine, TypedDict, TypeVar

import tblib
from tlz import merge
from tornado import gen
from tornado.ioloop import IOLoop, PeriodicCallback

import dask
Expand All @@ -38,9 +37,11 @@
from distributed.metrics import time
from distributed.system_monitor import SystemMonitor
from distributed.utils import (
NoOpAwaitable,
delayed,
get_traceback,
has_keyword,
is_coroutine_function,
iscoroutinefunction,
recursive_to_dict,
truncate_exception,
)
Expand Down Expand Up @@ -109,6 +110,146 @@ def _expects_comm(func: Callable) -> bool:
return False


class _LoopBoundMixin:
"""Backport of the private asyncio.mixins._LoopBoundMixin from 3.11"""

_global_lock = threading.Lock()

_loop = None

def _get_loop(self):
loop = asyncio.get_running_loop()

if self._loop is None:
with self._global_lock:
if self._loop is None:
self._loop = loop
if loop is not self._loop:
raise RuntimeError(f"{self!r} is bound to a different event loop")
return loop


class AsyncTaskGroupClosedError(RuntimeError):
pass


class AsyncTaskGroup(_LoopBoundMixin):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't intend to delay merging this PR with this at all. But I'm a little surprised to see something so similar to, but not quite the same as, the asyncio TaskGroup added in 3.11. Maybe once this gets in, we'd want to refactor to something consistent with the new asyncio API?

I also feel like AsyncTaskGroup should maybe be its own file?

Also, this looks interesting: https://github.com/Tinche/quattro

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the task group of 3.11 cannot be easily copy-pasted, it relies on some 3.11 features, e.g. Task.uncancel

I'm hoping to see some backports about the 3.11 async features, though.

Either way, I'm very open to refactor this and align with stdlib. I'm also fine with moving this it's own file

"""Collection tracking all currently running asynchronous tasks within a group"""

#: If True, the group is closed and does not allow adding new tasks.
closed: bool

def __init__(self) -> None:
self.closed = False
self._ongoing_tasks: set[asyncio.Task] = set()

def call_soon(
self, afunc: Callable[..., Coroutine], *args, **kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does ParamSpec work on these?

def call_soon(self, async_fn: Callable[P, Coroutine[Any, Any, T]], /, *args: P.args, **kwargs: P.kwargs) -> asyncio.Task[T]:
call_later(self, delay: float, async_fn: Callable[P, Coroutine[Any, Any, T]], /, *args: P.args, **kwargs: P.kwargs) -> asyncio.Task[T]:

) -> asyncio.Task:
"""Schedule a coroutine function to be executed as an `asyncio.Task`.

The coroutine function `afunc` is scheduled with `args` arguments and `kwargs` keyword arguments
as an `asyncio.Task`.

Parameters
----------
afunc
Coroutine function to schedule.
*args
Arguments to be passed to `afunc`.
**kwargs
Keyword arguments to be passed to `afunc`

Returns
-------
The scheduled Task object.

Raises
------
AsyncTaskGroupClosedError
If the task group is closed.
"""
if self.closed: # Avoid creating a coroutine
raise AsyncTaskGroupClosedError(
"Cannot schedule a new coroutine function as the group is already closed."
)
task = self._get_loop().create_task(afunc(*args, **kwargs))
task.add_done_callback(self._ongoing_tasks.remove)
self._ongoing_tasks.add(task)
return task

def call_later(
self, delay: float, afunc: Callable[..., Coroutine], *args, **kwargs
) -> asyncio.Task:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use the return value of this task anywhere?

if we don't return it we can avoid cases where people call await tg.call_later(...) and get a CancelledError when the scheduled task is cancelled rather than only when the parent task is cancelled

"""Schedule a coroutine function to be executed after `delay` seconds as an `asyncio.Task`.

The coroutine function `afunc` is scheduled with `args` arguments and `kwargs` keyword arguments
as an `asyncio.Task` that is executed after `delay` seconds.

Parameters
----------
delay
Delay in seconds.
afunc
Coroutine function to schedule.
*args
Arguments to be passed to `afunc`.
**kwargs
Keyword arguments to be passed to `afunc`

Returns
-------
The scheduled Task object.

Raises
------
AsyncTaskGroupClosedError
If the task group is closed.
"""
return self.call_soon(delayed(afunc, delay), *args, **kwargs)

def close(self) -> None:
"""Closes the task group so that no new tasks can be scheduled.

Existing tasks continue to run.
"""
self.closed = True

async def stop(self, timeout=1) -> None:
"""Close the group and stop all currently running tasks.

Closes the task group and waits `timeout` seconds for all tasks to gracefully finish.
After the timeout, all remaining tasks are cancelled.
"""
self.close()

current_task = asyncio.current_task(self._get_loop())
tasks_to_stop = [t for t in self._ongoing_tasks if t is not current_task]

if tasks_to_stop:
# Wrap gather in task to avoid Python3.8 issue,
# see https://github.com/dask/distributed/pull/6478#discussion_r885696827
async def gather():
return await asyncio.gather(*tasks_to_stop, return_exceptions=True)

try:
await asyncio.wait_for(
gather(),
timeout,
)
except asyncio.TimeoutError:
# The timeout on gather has cancelled the tasks, so this will not hang indefinitely
await asyncio.gather(*tasks_to_stop, return_exceptions=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to comment that the timeout on gather will cancel all the tasks, so this shouldn't hang forever; that's not very intuitive.


if [t for t in self._ongoing_tasks if t is not current_task]:
raise RuntimeError(
f"Expected all ongoing tasks to be cancelled and removed, found {self._ongoing_tasks}."
)

def __len__(self):
return len(self._ongoing_tasks)


class Server:
"""Dask Distributed Server

Expand Down Expand Up @@ -194,7 +335,8 @@ def __init__(
self.monitor = SystemMonitor()
self.counters = None
self.digests = None
self._ongoing_coroutines = set()
self._ongoing_background_tasks = AsyncTaskGroup()
self._ongoing_comm_handlers = AsyncTaskGroup()
self._event_finished = asyncio.Event()

self.listeners = []
Expand Down Expand Up @@ -227,7 +369,7 @@ def stop() -> bool:

self.counters = defaultdict(partial(Counter, loop=self.io_loop))

self.periodic_callbacks = dict()
self.periodic_callbacks = {}

pc = PeriodicCallback(
self.monitor.update,
Expand Down Expand Up @@ -348,29 +490,30 @@ def start_periodic_callbacks(self):
"""Start Periodic Callbacks consistently

This starts all PeriodicCallbacks stored in self.periodic_callbacks if
they are not yet running. It does this safely on the IOLoop.
they are not yet running. It does this safely by checking that it is using the
correct event loop.
"""
self._last_tick = time()

def start_pcs():
for pc in self.periodic_callbacks.values():
if not pc.is_running():
pc.start()
if self.io_loop.asyncio_loop is not asyncio.get_running_loop():
raise RuntimeError(f"{self!r} is bound to a different event loop")

self.io_loop.add_callback(start_pcs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was originally written so that start_periodic_callbacks can be run from any thread

Copy link
Member

@graingert graingert Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're making it so start_periodic_callbacks can only be called from the event loop thread we can simplify this to be:

if self.io_loop.asyncio_loop is not asyncio.get_running_loop():
    raise RuntimeError(f"{self!r} is bound to a different event loop")

self._last_tick = time()
for pc in self.periodic_callbacks.values():
    if not pc.is_running():  # we probably want to issue a warning here - and also check `pc.io_loop is self.io_loop`
        pc.start()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware of that. Is there a need to keep this as-is or can we safely simplify this? cc @fjetter?

Copy link
Member

@fjetter fjetter Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is always called as part of Server.start_unsafe which is basically whenever we're awaiting a Server object (Scheduler, Nanny, Worker). (__await__ -> start -> start_unsafe)
If this is happening a synchronous world during init we ensure that it is run in the event loop thread (using sync)
I'm not sure about the async world but I think with deprecations like #6473 we are assuming that people only create/start these instances in the event loop thread, are we not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct, currently all our calls of start_periodic_callbacks are called from the running loop

That's why I think it's better to use an assertion that we're on the correct loop than support running from any thread

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I'll adjust according to the outline above then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's address checking pc.io_loop is self.io_loop in a separate PR.

self._last_tick = time()
for pc in self.periodic_callbacks.values():
if not pc.is_running():
logger.info(f"Starting periodic callback {pc!r}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be causing the failures in test_worker_cli_nprocs_renamed_to_nworkers see also https://github.com/dask/distributed/pulls/

Suggested change
logger.info(f"Starting periodic callback {pc!r}")

pc.start()

def stop(self):
if not self.__stopped:
self.__stopped = True

for listener in self.listeners:
# Delay closing the server socket until the next IO loop tick.
# Otherwise race conditions can appear if an event handler
# for an accept() call is already scheduled by the IO loop,
# raising EBADF.
# The demonstrator for this is Worker.terminate(), which
# closes the server socket in response to an incoming message.
# See https://github.com/tornadoweb/tornado/issues/2069
self.io_loop.add_callback(listener.stop)

async def stop_listener(listener):
v = listener.stop()
if inspect.isawaitable(v):
await v

self._ongoing_background_tasks.call_soon(stop_listener, listener)

@property
def listener(self):
Expand Down Expand Up @@ -495,7 +638,15 @@ async def listen(self, port_or_addr=None, allow_offload=True, **kwargs):
)
self.listeners.append(listener)

async def handle_comm(self, comm):
def handle_comm(self, comm):
"""Start a background task that dispatches new communications to coroutine-handlers"""
try:
self._ongoing_background_tasks.call_soon(self._handle_comm, comm)
except AsyncTaskGroupClosedError:
comm.abort()
return NoOpAwaitable()

async def _handle_comm(self, comm):
"""Dispatch new communications to coroutine-handlers

Handlers is a dictionary mapping operation names to functions or
Expand Down Expand Up @@ -589,11 +740,6 @@ async def handle_comm(self, comm):
else:
result = handler(**msg)
if inspect.iscoroutine(result):
result = asyncio.create_task(
result, name=f"handle-comm-{address}-{op}"
)
self._ongoing_coroutines.add(result)
result.add_done_callback(self._ongoing_coroutines.remove)
result = await result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can result in a TypeError: object NoneType can't be used in 'await' expression

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dealt with as specified above.

elif inspect.isawaitable(result):
raise RuntimeError(
Expand Down Expand Up @@ -660,9 +806,11 @@ async def handle_stream(self, comm, extra=None):
closed = True
break
handler = self.stream_handlers[op]
if is_coroutine_function(handler):
self.loop.add_callback(handler, **merge(extra, msg))
await gen.sleep(0)
if iscoroutinefunction(handler):
self._ongoing_background_tasks.call_soon(
handler, **merge(extra, msg)
)
await asyncio.sleep(0)
else:
handler(**merge(extra, msg))
else:
Expand Down Expand Up @@ -695,20 +843,11 @@ async def close(self, timeout=None):
_stops.add(future)
await asyncio.gather(*_stops)

def _ongoing_tasks():
return (
t for t in self._ongoing_coroutines if t is not asyncio.current_task()
)
# TODO: Deal with exceptions
await self._ongoing_background_tasks.stop(timeout=1)

# TODO: Deal with exceptions
try:
# Give the handlers a bit of time to finish gracefully
await asyncio.wait_for(
asyncio.gather(*_ongoing_tasks(), return_exceptions=True), 1
)
except asyncio.TimeoutError:
# the timeout on gather should've cancelled all the tasks
await asyncio.gather(*_ongoing_tasks(), return_exceptions=True)
await self._ongoing_comm_handlers.stop(timeout=1)

await self.rpc.close()
await asyncio.gather(*[comm.close() for comm in list(self._comms)])
Expand Down Expand Up @@ -878,12 +1017,10 @@ async def _close_comm(comm):
tasks = []
for comm in list(self.comms):
if comm and not comm.closed():
# IOLoop.current().add_callback(_close_comm, comm)
task = asyncio.ensure_future(_close_comm(comm))
tasks.append(task)
for comm in list(self._created):
if comm and not comm.closed():
# IOLoop.current().add_callback(_close_comm, comm)
task = asyncio.ensure_future(_close_comm(comm))
tasks.append(task)

Expand Down
6 changes: 5 additions & 1 deletion distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ async def test_adapt_quickly():

await cluster

while len(cluster.scheduler.workers) > 1 or len(cluster.worker_spec) > 1:
while (
len(cluster.scheduler.workers) > 1
or len(cluster.worker_spec) > 1
or len(cluster.workers) > 1
):
await asyncio.sleep(0.01)

# Don't scale up for large sequential computations
Expand Down
4 changes: 2 additions & 2 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ def run(self, comm, *args, **kwargs):
return run(self, comm, *args, **kwargs)

def _on_exit_sync(self, exitcode):
self.loop.add_callback(self._on_exit, exitcode)
self._ongoing_background_tasks.call_soon(self._on_exit, exitcode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this method relying on the fact that loop.add_callback is threadsafe, and can be called from outside the event loop? The sync part of the name makes me think that. I'm not sure if our new call_soon is also threadsafe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no _on_exit_sync is run in a loop.add_callback()

on_exit=self._on_exit_sync,

self.process.set_exit_callback(self._on_exit)

def set_exit_callback(self, func):
"""
Set a function to be called by the event loop when the process exits.
The function is called with the AsyncProcess as sole argument.
The function may be a coroutine function.
"""
# XXX should this be a property instead?
assert callable(func), "exit callback should be callable"
assert (
self._state.pid is None
), "cannot set exit callback when process already started"
self._exit_callback = func

_loop_add_callback(self._loop, self._on_exit, exitcode)

def _on_exit(self, exitcode):
# Called from the event loop when the child process exited
self._process = None
if self._exit_callback is not None:
self._exit_callback(self)
self._exit_future.set_result(exitcode)


@log_errors
async def _on_exit(self, exitcode):
Expand Down Expand Up @@ -595,7 +595,7 @@ async def _log_event(self, topic, msg):
)

def log_event(self, topic, msg):
self.loop.add_callback(self._log_event, topic, msg)
self._ongoing_background_tasks.call_soon(self._log_event, topic, msg)


class WorkerProcess:
Expand Down
Loading