Skip to content

Commit a174e66

Browse files
Adjust max streams when remote settings change (#652)
* Adjust max streams when remote settings change If the remote server sends a settings update for `max_concurrent_streams`, check and potentially update the max number of streams we will use for a connection. * Update sync version of http2 * Add aiter_stream sub to iter_stream * Add test for remote max streams update Improve testing for remote max streams update This new test checks whether a change in the max streams setting is properly handled. The max streams value is changed twice throughout the mock stream. The first update increases the value while the second update decreases it. --------- Co-authored-by: Tom Christie <tom@tomchristie.com>
1 parent 95406e6 commit a174e66

File tree

5 files changed

+177
-4
lines changed

5 files changed

+177
-4
lines changed

httpcore/_async/http2.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,18 @@ async def handle_async_request(self, request: Request) -> Response:
8888
async with Trace("http2.send_connection_init", request, kwargs):
8989
await self._send_connection_init(**kwargs)
9090
self._sent_connection_init = True
91-
max_streams = self._h2_state.local_settings.max_concurrent_streams
92-
self._max_streams_semaphore = AsyncSemaphore(max_streams)
91+
92+
# Initially start with just 1 until the remote server provides
93+
# its max_concurrent_streams value
94+
self._max_streams = 1
95+
96+
local_settings_max_streams = (
97+
self._h2_state.local_settings.max_concurrent_streams
98+
)
99+
self._max_streams_semaphore = AsyncSemaphore(local_settings_max_streams)
100+
101+
for _ in range(local_settings_max_streams - self._max_streams):
102+
await self._max_streams_semaphore.acquire()
93103

94104
await self._max_streams_semaphore.acquire()
95105

@@ -280,6 +290,13 @@ async def _receive_events(
280290
if stream_id is None or not self._events.get(stream_id):
281291
events = await self._read_incoming_data(request)
282292
for event in events:
293+
if isinstance(event, h2.events.RemoteSettingsChanged):
294+
async with Trace(
295+
"http2.receive_remote_settings", request
296+
) as trace:
297+
await self._receive_remote_settings_change(event)
298+
trace.return_value = event
299+
283300
event_stream_id = getattr(event, "stream_id", 0)
284301

285302
# The ConnectionTerminatedEvent applies to the entire connection,
@@ -293,6 +310,23 @@ async def _receive_events(
293310

294311
await self._write_outgoing_data(request)
295312

313+
async def _receive_remote_settings_change(self, event: h2.events.Event) -> None:
314+
max_concurrent_streams = event.changed_settings.get(
315+
h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS
316+
)
317+
if max_concurrent_streams:
318+
new_max_streams = min(
319+
max_concurrent_streams.new_value,
320+
self._h2_state.local_settings.max_concurrent_streams,
321+
)
322+
if new_max_streams and new_max_streams != self._max_streams:
323+
while new_max_streams > self._max_streams:
324+
await self._max_streams_semaphore.release()
325+
self._max_streams += 1
326+
while new_max_streams < self._max_streams:
327+
await self._max_streams_semaphore.acquire()
328+
self._max_streams -= 1
329+
296330
async def _response_closed(self, stream_id: int) -> None:
297331
await self._max_streams_semaphore.release()
298332
del self._events[stream_id]

httpcore/_sync/http2.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,18 @@ def handle_request(self, request: Request) -> Response:
8888
with Trace("http2.send_connection_init", request, kwargs):
8989
self._send_connection_init(**kwargs)
9090
self._sent_connection_init = True
91-
max_streams = self._h2_state.local_settings.max_concurrent_streams
92-
self._max_streams_semaphore = Semaphore(max_streams)
91+
92+
# Initially start with just 1 until the remote server provides
93+
# its max_concurrent_streams value
94+
self._max_streams = 1
95+
96+
local_settings_max_streams = (
97+
self._h2_state.local_settings.max_concurrent_streams
98+
)
99+
self._max_streams_semaphore = Semaphore(local_settings_max_streams)
100+
101+
for _ in range(local_settings_max_streams - self._max_streams):
102+
self._max_streams_semaphore.acquire()
93103

94104
self._max_streams_semaphore.acquire()
95105

@@ -280,6 +290,13 @@ def _receive_events(
280290
if stream_id is None or not self._events.get(stream_id):
281291
events = self._read_incoming_data(request)
282292
for event in events:
293+
if isinstance(event, h2.events.RemoteSettingsChanged):
294+
with Trace(
295+
"http2.receive_remote_settings", request
296+
) as trace:
297+
self._receive_remote_settings_change(event)
298+
trace.return_value = event
299+
283300
event_stream_id = getattr(event, "stream_id", 0)
284301

285302
# The ConnectionTerminatedEvent applies to the entire connection,
@@ -293,6 +310,23 @@ def _receive_events(
293310

294311
self._write_outgoing_data(request)
295312

313+
def _receive_remote_settings_change(self, event: h2.events.Event) -> None:
314+
max_concurrent_streams = event.changed_settings.get(
315+
h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS
316+
)
317+
if max_concurrent_streams:
318+
new_max_streams = min(
319+
max_concurrent_streams.new_value,
320+
self._h2_state.local_settings.max_concurrent_streams,
321+
)
322+
if new_max_streams and new_max_streams != self._max_streams:
323+
while new_max_streams > self._max_streams:
324+
self._max_streams_semaphore.release()
325+
self._max_streams += 1
326+
while new_max_streams < self._max_streams:
327+
self._max_streams_semaphore.acquire()
328+
self._max_streams -= 1
329+
296330
def _response_closed(self, stream_id: int) -> None:
297331
self._max_streams_semaphore.release()
298332
del self._events[stream_id]

tests/_async/test_http2.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,3 +294,55 @@ async def test_http2_request_to_incorrect_origin():
294294
async with AsyncHTTP2Connection(origin=origin, stream=stream) as conn:
295295
with pytest.raises(RuntimeError):
296296
await conn.request("GET", "https://other.com/")
297+
298+
299+
@pytest.mark.anyio
300+
async def test_http2_remote_max_streams_update():
301+
"""
302+
If the remote server updates the maximum concurrent streams value, we should
303+
be adjusting how many streams we will allow.
304+
"""
305+
origin = Origin(b"https", b"example.com", 443)
306+
stream = AsyncMockStream(
307+
[
308+
hyperframe.frame.SettingsFrame(
309+
settings={hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS: 1000}
310+
).serialize(),
311+
hyperframe.frame.HeadersFrame(
312+
stream_id=1,
313+
data=hpack.Encoder().encode(
314+
[
315+
(b":status", b"200"),
316+
(b"content-type", b"plain/text"),
317+
]
318+
),
319+
flags=["END_HEADERS"],
320+
).serialize(),
321+
hyperframe.frame.DataFrame(stream_id=1, data=b"Hello, world!").serialize(),
322+
hyperframe.frame.SettingsFrame(
323+
settings={hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS: 50}
324+
).serialize(),
325+
hyperframe.frame.DataFrame(
326+
stream_id=1, data=b"Hello, world...again!", flags=["END_STREAM"]
327+
).serialize(),
328+
]
329+
)
330+
async with AsyncHTTP2Connection(origin=origin, stream=stream) as conn:
331+
async with conn.stream("GET", "https://example.com/") as response:
332+
i = 0
333+
async for chunk in response.aiter_stream():
334+
if i == 0:
335+
assert chunk == b"Hello, world!"
336+
assert conn._h2_state.remote_settings.max_concurrent_streams == 1000
337+
assert conn._max_streams == min(
338+
conn._h2_state.remote_settings.max_concurrent_streams,
339+
conn._h2_state.local_settings.max_concurrent_streams,
340+
)
341+
elif i == 1:
342+
assert chunk == b"Hello, world...again!"
343+
assert conn._h2_state.remote_settings.max_concurrent_streams == 50
344+
assert conn._max_streams == min(
345+
conn._h2_state.remote_settings.max_concurrent_streams,
346+
conn._h2_state.local_settings.max_concurrent_streams,
347+
)
348+
i += 1

tests/_sync/test_http2.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,3 +294,55 @@ def test_http2_request_to_incorrect_origin():
294294
with HTTP2Connection(origin=origin, stream=stream) as conn:
295295
with pytest.raises(RuntimeError):
296296
conn.request("GET", "https://other.com/")
297+
298+
299+
300+
def test_http2_remote_max_streams_update():
301+
"""
302+
If the remote server updates the maximum concurrent streams value, we should
303+
be adjusting how many streams we will allow.
304+
"""
305+
origin = Origin(b"https", b"example.com", 443)
306+
stream = MockStream(
307+
[
308+
hyperframe.frame.SettingsFrame(
309+
settings={hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS: 1000}
310+
).serialize(),
311+
hyperframe.frame.HeadersFrame(
312+
stream_id=1,
313+
data=hpack.Encoder().encode(
314+
[
315+
(b":status", b"200"),
316+
(b"content-type", b"plain/text"),
317+
]
318+
),
319+
flags=["END_HEADERS"],
320+
).serialize(),
321+
hyperframe.frame.DataFrame(stream_id=1, data=b"Hello, world!").serialize(),
322+
hyperframe.frame.SettingsFrame(
323+
settings={hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS: 50}
324+
).serialize(),
325+
hyperframe.frame.DataFrame(
326+
stream_id=1, data=b"Hello, world...again!", flags=["END_STREAM"]
327+
).serialize(),
328+
]
329+
)
330+
with HTTP2Connection(origin=origin, stream=stream) as conn:
331+
with conn.stream("GET", "https://example.com/") as response:
332+
i = 0
333+
for chunk in response.iter_stream():
334+
if i == 0:
335+
assert chunk == b"Hello, world!"
336+
assert conn._h2_state.remote_settings.max_concurrent_streams == 1000
337+
assert conn._max_streams == min(
338+
conn._h2_state.remote_settings.max_concurrent_streams,
339+
conn._h2_state.local_settings.max_concurrent_streams,
340+
)
341+
elif i == 1:
342+
assert chunk == b"Hello, world...again!"
343+
assert conn._h2_state.remote_settings.max_concurrent_streams == 50
344+
assert conn._max_streams == min(
345+
conn._h2_state.remote_settings.max_concurrent_streams,
346+
conn._h2_state.local_settings.max_concurrent_streams,
347+
)
348+
i += 1

unasync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
('aclose', 'close'),
1919
('aclose_func', 'close_func'),
2020
('aiterator', 'iterator'),
21+
('aiter_stream', 'iter_stream'),
2122
('aread', 'read'),
2223
('asynccontextmanager', 'contextmanager'),
2324
('__aenter__', '__enter__'),

0 commit comments

Comments
 (0)