-
-
Notifications
You must be signed in to change notification settings - Fork 748
Fix ConnectionPool limit handling #3005
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
mrocklin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small comments below, but in general this looks good to me. Thank you for your efforts here @byjott !
| await self.event.wait() | ||
|
|
||
| self._n_connecting += 1 | ||
| await self.semaphore.acquire() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on using a context manager instead? Maybe something like the following:
self._n_connecting += 1
async with self.semaphore:
pass
self._n_connecting -= 1
...Or alternatively. I wonder if _n_connecting could be a property that is based on self.semaphore._value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! I see, you don't release the semaphore until the comm itself is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I also though a lot about keeping less state, e.g. to remove the need for _n_connecting (in addition to the semaphore). However, I did not come up with a working solution, at least not without breaking some externally-visible behavior of the ConnectionPool, as the bookkeeping is complicated by reuse and collect and the question when exactly to call collect ...
Not saying there isn't a better (cleaner, simpler) solution, but I did not find one.
| try: | ||
| self.occupied[addr].remove(comm) | ||
| except KeyError: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, I'm curious about the origin of this KeyError, and if it is still necessary. Apparently tests didn't fail when you removed it? We might want to look at git blame to see if there is something in history about why it is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked for the call sites of ConnectionPool.reuse; fortunately, there were only two, and in both cases, comm refers to an object managed by the pool which should thus not lead to a KeyError here.
Note, though, that I did comment out one line of tests here:
fb30744#diff-863a91ce704c12168c86ab5e8a99ede0L622
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the tests, I stand corrected: it seems to be important after all (test_gather_then_submit_after_failed_workers failed because of this). I'll prepare a fix.
|
|
||
|
|
||
| @gen_test() | ||
| def test_connection_pool_respects_limit(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to use async def style tests if you prefer. No pressure either way though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, some slight pressure to use async def if it's easy for you. We would like to transition things over at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll change it (the first version even used async def, but I changed it back for consistency with the surrounding code ;-) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now opted for converting the whole test module (test_core.py) to the async def style, so at least this one file is consistent concerning style ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now opted for converting the whole test module (test_core.py) to the async def style, so at least this one file is consistent concerning style ...
Great! Thanks! Now I hope that you submit PRs that include single tests in all of the other files, so that you fix them too :)
e8cda45 to
c221bae
Compare
|
Thanks for the quick review, @mrocklin. I believe I have addressed your comments with the latest changes I just pushed. Let me know if there is anything missing. |
|
Thank you @byjott for both identifying this issue and for providing a fix. Also, I notice that this is your first code contribution to this repository. Welcome! Please feel free to come back and fix more bugs any time :) |
This fixes #3001
Basic idea: Use a semaphore to limit the amount of concurrent connections. Count all connections considered in
ConnectionPool.openand connections about to be established, i.e. coroutines currently inConnectionPool.connect. The latter is kept inConnectionPool._n_connecting.