-
-
Notifications
You must be signed in to change notification settings - Fork 748
Do not catch CancelledError in CommPool #6005
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not catch CancelledError in CommPool #6005
Conversation
Unit Test Results 18 files + 7 18 suites +7 9h 29m 0s ⏱️ + 3h 48m 4s For more details on these failures, see this check. Results for commit 88ca221. ± Comparison against base commit ed48736. ♻️ This comment has been updated with latest results. |
|
It looks like there is one genuine test failure in |
| ) as logger: | ||
| await s.close() | ||
| while c.status != "closed": | ||
| await c._update_scheduler_info() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume these should ensure that while the scheduler is closing this particular call is silent. I don't think this is how we should test it.
These tests failed because the update_scheduler_info is basically a RPC which is cancelled since the client is closing. This cancellation will then propagate to the test itself and let it fail as what appears to be a TimeoutError
|
cc @graingert I would appreciate a review |
distributed/core.py
Outdated
| ) | ||
|
|
||
| fut = asyncio.create_task( | ||
| task = asyncio.create_task( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you tracking the pending and connecting tasks separately?
can it be simplified to this?
async def _connect():
async with self.semaphore:
if self.status != Status.running:
raise CommClosedError(
f"ConnectionPool not running. Status: {self.status}"
)
comm = await connect(
addr,
timeout=timeout or self.timeout,
deserialize=self.deserialize,
**self.connection_args,
)
comm.name = "ConnectionPool"
comm._pool = weakref.ref(self)
comm.allow_offload = self.allow_offload
self._created.add(comm)
occupied.add(comm)
return comm
task = asyncio.create_task(connect)
task._connecting.add(task)
task.add_done_callback(self._connecting.discard)
return await taskThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would only limit the concurrency of simultaneous connections. The implementation tries to ensure that the number of connection attempts + the number of open connections is bound
See #3005
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can merge the two sets, though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see so more like this?
async def _connect():
await self.semaphore.acquire()
try:
if self.status != Status.running:
raise CommClosedError(
f"ConnectionPool not running. Status: {self.status}"
)
comm = await connect(
addr,
timeout=timeout or self.timeout,
deserialize=self.deserialize,
**self.connection_args,
)
comm.name = "ConnectionPool"
comm._pool = weakref.ref(self)
comm.allow_offload = self.allow_offload
self._created.add(comm)
occupied.add(comm)
except BaseException:
self.semaphore.release()
task = asyncio.create_task(connect)
task._connecting.add(task)
task.add_done_callback(self._connecting.discard)
return await taskThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I was wrong and we need to split these two. There was a bug in there before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question is what should happen if there are pending tasks (i.e. they are waiting to acquire the semaphore) and the pool is closing. We should ensure that the behaviour for an outside caller is the same regardless of whether they are pending or connecting.
Previously, the behaviour was to raise a CommClosed. Now we are cancelling the connect. What would you expect to happen in this situation?
If we want to go for a CommClosedError or similar like before, that's probably possible but will require a bit more work
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
0a5cf4e to
41b3f67
Compare
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
|
I had a chat with @graingert and we believe the previous behaviour of raising a CommClosed error in case of a pool shutdown is the preferred behaviour. We'll need to make sure that it does not intercept a CancelledError, though. |
| self._ongoing_coroutines.add(result) | ||
| result = await result | ||
| except (CommClosedError, asyncio.CancelledError): | ||
| except CommClosedError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure what's supposed to happen here tbh
This is another problem that I encountered in #5910
Depending on the timing of whether or not a connection attempt is tried while the server is shutting down, this can cause failures, e.g. by getting stuck because the CancelledError is not properly propagated
The behaviour this PR suggests is to cancel all connection attempts once the CommPool is closed. Ideally, all tasks would already be stopped once this happens but this is by no means guaranteed to happen.