-
-
Notifications
You must be signed in to change notification settings - Fork 748
Ensure exceptions in handlers are handled equally for sync and async #4734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test failure in |
|
|
||
| for func in every_cycle: | ||
| if is_coroutine_function(func): | ||
| self.loop.add_callback(func) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tracked down the dangling task and this was the ensure_compute scheduled here. In particular it concerned the ensure_computing which uses the async interface to offload deserialisation. This was introduced in #4307 just before the infamous 2020.12.0 release
baf1281 to
2826308
Compare
|
This situation is a bit awkward and I see two options
Note: this problem pops up in many places in the worker so solving 2.) once might benefit other situations as well. @mrocklin Any thoughts on this? Do we have any asyncio/tornado experts we can loop into this discussion? Note: The PR grew again in scope. I ignored the warning in the gen.coroutine comment and figured it was easy to rewrite. Turns out this introduces race conditions during closing since the behaviour of gen.coroutine and plain async coroutines differ in the way they are cancelled (or not). Will break this off into another PR (At least I hope this is the explanation and the changes can be decoupled :/ ) |
2826308 to
6270e4f
Compare
I'm not sure that this is true. There is a handle_stream coroutine for every comm. Comms are most often used with the ## caller side
await worker.some_handler(arg=foo)The caller side generally waits until the call to the handler completes before moving on, so the backup won't be inside the socket on the receiver side, it will be in application code on the caller side, which seems safe. Previously this call would have completed very quickly and now it will take longer. That's probably ok. In summary, if things aren't breaking here I think that we're ok. In principle this change feels safe to me. I wouldn't be surprised if it did break some things, but even if it did, those things were probably broken beforehand and relying on bad logic in the worker. For example, if the caller side was doing the following: await worker.some_handler(arg=1)
await worker.some_handler(arg=2)
await worker.some_handler(arg=3)
await worker.some_handler(arg=4)Then with async handlers this would have previously run much more quickly than it should have. In a situation like the following: await asyncio.gather(
worker.some_handler(arg=1),
worker.some_handler(arg=2),
...
)The caller side will actually open many comms to the server to handle the requests in parallel, and so we'll get many (Usual disclaimer of "I'm a bit rusty on all of this, so please don't believe me") |
I'm mostly concerned about the primary link between scheduler and worker which is based on a single comm. Most of the communication between the two is done via this single comm using a However, with the exception of |
Yeah, anything sent by a BatchedSend is intended to run immediately. If you wanted to be careful though you could add a read into the |
5a71ee0 to
0f6a8e1
Compare
b28bca2 to
c2cb945
Compare
71d8b6f to
6dfaf28
Compare
f2b38d4 to
284bced
Compare
|
|
||
| logger.debug("Connection from %r to %s", address, type(self).__name__) | ||
| self._comms[comm] = op | ||
| await self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This await self without the guard I introduced in __await__ would allow an incoming comm to restart a currently shutting down scheduler. There is a time window while the scheduler is handling its affairs, before the actual server closes, where it still accepts incoming comms. If during this time a new comm is opened, the scheduler is restarted although it is currently trying to shutdown.
distributed/distributed/scheduler.py
Lines 3759 to 3808 in d9bc3c6
| self.status = Status.closing | |
| logger.info("Scheduler closing...") | |
| setproctitle("dask-scheduler [closing]") | |
| for preload in self.preloads: | |
| await preload.teardown() | |
| if close_workers: | |
| await self.broadcast(msg={"op": "close_gracefully"}, nanny=True) | |
| for worker in parent._workers_dv: | |
| self.worker_send(worker, {"op": "close"}) | |
| for i in range(20): # wait a second for send signals to clear | |
| if parent._workers_dv: | |
| await asyncio.sleep(0.05) | |
| else: | |
| break | |
| await asyncio.gather(*[plugin.close() for plugin in self.plugins]) | |
| for pc in self.periodic_callbacks.values(): | |
| pc.stop() | |
| self.periodic_callbacks.clear() | |
| self.stop_services() | |
| for ext in parent._extensions.values(): | |
| with suppress(AttributeError): | |
| ext.teardown() | |
| logger.info("Scheduler closing all comms") | |
| futures = [] | |
| for w, comm in list(self.stream_comms.items()): | |
| if not comm.closed(): | |
| comm.send({"op": "close", "report": False}) | |
| comm.send({"op": "close-stream"}) | |
| with suppress(AttributeError): | |
| futures.append(comm.close()) | |
| for future in futures: # TODO: do all at once | |
| await future | |
| for comm in self.client_comms.values(): | |
| comm.abort() | |
| await self.rpc.close() | |
| self.status = Status.closed | |
| self.stop() | |
| await super().close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't add a dedicated test for this since there are actually many failing tests once the close coroutines are shielded
| def shielded(func): | ||
| """ | ||
| Shield decorated method or function from cancellation. Note that the | ||
| decorated coroutine will immediately scheduled as a task if the decorated | ||
| function is invoked. | ||
| See also https://docs.python.org/3/library/asyncio-task.html#asyncio.shield | ||
| """ | ||
|
|
||
| @wraps(func) | ||
| def _(*args, **kwargs): | ||
| return asyncio.shield(func(*args, **kwargs)) | ||
|
|
||
| return _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not absolutely required but with this decorator we can define methods which are always shielded (e.g. close) instead of relying on remembering it on every invocation
284bced to
5196890
Compare
mrocklin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about the reason for a few of the changes here. Also, do we know why tests aren't happy yet?
distributed/deploy/spec.py
Outdated
| for w in to_close | ||
| if w in self.workers | ||
| ] | ||
| tasks = [self.workers[w].close() for w in to_close if w in self.workers] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call asyncio.wait here and avoid the tasks variable entirely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
| except Exception: | ||
| pass | ||
| self.process = None | ||
| await self.rpc.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same argument as above about cleaning up
| for preload in self.preloads: | ||
| await preload.teardown() | ||
|
|
||
| self.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is redundant since we do the same in Server.close. it looked like being redundant and I felt removing it would reduce complexity. If there is some purpose to stopping earlier, I can revert this logic. This is only my sense of cleaning up
| await self.rpc.close() | ||
|
|
||
| self.status = Status.closed | ||
| self.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is performed by super().close() since there are no other things happening in between, letting the server close itself felt like the right thing to do to reduce complexity.
| *[ | ||
| self.remove_worker(address=w, safe=True, close=False) | ||
| for w in worker_keys | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change adds close=False (default is True). This causes in most situations to issue a close_worker call twice since there is a dedicated close_worker call above.
It might be the correct thing to change this to close=not close_workers to avoid the duplication
4e36762 to
6ddf5b1
Compare
853848f to
fceb5ce
Compare
|
I'm hitting a lot of known "flaky" tests here (cannot reproduce locally, yet) |
1cee222 to
2fdb036
Compare
75f33ce to
353bd34
Compare
| def __await__(self): | ||
| return self.start().__await__() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This MultiWorker is a bit strange since it isn't a Server although Workers are always Servers
Unless the task is entirely forgotten by the worker it is confusing behaviour if the dependent or a task is removed once it finished executing. if this information is required a dedicated dynamic attribute should be used like waiters on the scheduler side
If not awaited, the server might server handlers before the server is properly initialized. For instance plugins and preload modules might not be fully loaded before that happens
1d04276 to
62825c2
Compare
I do not have a strong opinion at the moment about what should happen if a handler raises an exception but regardless, I believe an async and sync handler should show the same behaviour. Everything else is pretty hard to distinguish for any developer.
One easy way to achieve this is to simply await the handler during handle_stream. I am not sure if it was implemented this way hoping that messages would be faster worked off or what the motivation behind this was.
If we want to stick to the old
add_callbackwe need to implement exception handling on the returned future/task.