Skip to content

Commit 7ddb4ca

Browse files
Resolve race condition around exceptions during streaming a response. (#491)
* Use 'socks5://' for the protocol in SOCKS proxy docs. Not 'socks://' * Defensive programming around removing request status from the connection pool * Tighten up behaviour around exceptions during streaming responses * Call response_closed on async cancellations, KeyboardInterupts * Include number of requests in-flight in exception if pool is closed early * Update CHANGELOG.md
1 parent 1924b12 commit 7ddb4ca

File tree

11 files changed

+199
-32
lines changed

11 files changed

+199
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
77
## Master branch
88

99
- Fix SOCKS support for `http://` URLs. (#492)
10+
- Resolve race condition around exceptions during streaming a response. (#491)
1011

1112
## 0.14.5 (January 18th, 2022)
1213

httpcore/_async/connection_pool.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ async def handle_async_request(self, request: Request) -> Response:
239239
# status so that the request becomes queued again.
240240
status.unset_connection()
241241
await self._attempt_to_acquire_connection(status)
242-
except Exception as exc:
242+
except BaseException as exc:
243243
await self.response_closed(status)
244244
raise exc
245245
else:
@@ -267,7 +267,8 @@ async def response_closed(self, status: RequestStatus) -> None:
267267

268268
async with self._pool_lock:
269269
# Update the state of the connection pool.
270-
self._requests.remove(status)
270+
if status in self._requests:
271+
self._requests.remove(status)
271272

272273
if connection.is_closed() and connection in self._pool:
273274
self._pool.remove(connection)
@@ -291,11 +292,19 @@ async def aclose(self) -> None:
291292
Close any connections in the pool.
292293
"""
293294
async with self._pool_lock:
295+
requests_still_in_flight = len(self._requests)
296+
294297
for connection in self._pool:
295298
await connection.aclose()
296299
self._pool = []
297300
self._requests = []
298301

302+
if requests_still_in_flight:
303+
raise RuntimeError(
304+
f"The connection pool was closed while {requests_still_in_flight} "
305+
f"HTTP requests/responses were still in-flight."
306+
)
307+
299308
async def __aenter__(self) -> "AsyncConnectionPool":
300309
return self
301310

httpcore/_async/http11.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,13 +279,23 @@ class HTTP11ConnectionByteStream:
279279
def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None:
280280
self._connection = connection
281281
self._request = request
282+
self._closed = False
282283

283284
async def __aiter__(self) -> AsyncIterator[bytes]:
284285
kwargs = {"request": self._request}
285-
async with Trace("http11.receive_response_body", self._request, kwargs):
286-
async for chunk in self._connection._receive_response_body(**kwargs):
287-
yield chunk
286+
try:
287+
async with Trace("http11.receive_response_body", self._request, kwargs):
288+
async for chunk in self._connection._receive_response_body(**kwargs):
289+
yield chunk
290+
except BaseException as exc:
291+
# If we get an exception while streaming the response,
292+
# we want to close the response (and possibly the connection)
293+
# before raising that exception.
294+
await self.aclose()
295+
raise exc
288296

289297
async def aclose(self) -> None:
290-
async with Trace("http11.response_closed", self._request):
291-
await self._connection._response_closed()
298+
if not self._closed:
299+
self._closed = True
300+
async with Trace("http11.response_closed", self._request):
301+
await self._connection._response_closed()

httpcore/_async/http2.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,6 @@ async def _response_closed(self, stream_id: int) -> None:
305305
async def aclose(self) -> None:
306306
# Note that this method unilaterally closes the connection, and does
307307
# not have any kind of locking in place around it.
308-
# For task-safe/thread-safe operations call into 'attempt_close' instead.
309308
self._h2_state.close_connection()
310309
self._state = HTTPConnectionState.CLOSED
311310
await self._network_stream.aclose()
@@ -446,16 +445,26 @@ def __init__(
446445
self._connection = connection
447446
self._request = request
448447
self._stream_id = stream_id
448+
self._closed = False
449449

450450
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
451451
kwargs = {"request": self._request, "stream_id": self._stream_id}
452-
async with Trace("http2.receive_response_body", self._request, kwargs):
453-
async for chunk in self._connection._receive_response_body(
454-
request=self._request, stream_id=self._stream_id
455-
):
456-
yield chunk
452+
try:
453+
async with Trace("http2.receive_response_body", self._request, kwargs):
454+
async for chunk in self._connection._receive_response_body(
455+
request=self._request, stream_id=self._stream_id
456+
):
457+
yield chunk
458+
except BaseException as exc:
459+
# If we get an exception while streaming the response,
460+
# we want to close the response (and possibly the connection)
461+
# before raising that exception.
462+
await self.aclose()
463+
raise exc
457464

458465
async def aclose(self) -> None:
459-
kwargs = {"stream_id": self._stream_id}
460-
async with Trace("http2.response_closed", self._request, kwargs):
461-
await self._connection._response_closed(stream_id=self._stream_id)
466+
if not self._closed:
467+
self._closed = True
468+
kwargs = {"stream_id": self._stream_id}
469+
async with Trace("http2.response_closed", self._request, kwargs):
470+
await self._connection._response_closed(stream_id=self._stream_id)

httpcore/_sync/connection_pool.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def handle_request(self, request: Request) -> Response:
239239
# status so that the request becomes queued again.
240240
status.unset_connection()
241241
self._attempt_to_acquire_connection(status)
242-
except Exception as exc:
242+
except BaseException as exc:
243243
self.response_closed(status)
244244
raise exc
245245
else:
@@ -267,7 +267,8 @@ def response_closed(self, status: RequestStatus) -> None:
267267

268268
with self._pool_lock:
269269
# Update the state of the connection pool.
270-
self._requests.remove(status)
270+
if status in self._requests:
271+
self._requests.remove(status)
271272

272273
if connection.is_closed() and connection in self._pool:
273274
self._pool.remove(connection)
@@ -291,11 +292,19 @@ def close(self) -> None:
291292
Close any connections in the pool.
292293
"""
293294
with self._pool_lock:
295+
requests_still_in_flight = len(self._requests)
296+
294297
for connection in self._pool:
295298
connection.close()
296299
self._pool = []
297300
self._requests = []
298301

302+
if requests_still_in_flight:
303+
raise RuntimeError(
304+
f"The connection pool was closed while {requests_still_in_flight} "
305+
f"HTTP requests/responses were still in-flight."
306+
)
307+
299308
def __enter__(self) -> "ConnectionPool":
300309
return self
301310

httpcore/_sync/http11.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,13 +279,23 @@ class HTTP11ConnectionByteStream:
279279
def __init__(self, connection: HTTP11Connection, request: Request) -> None:
280280
self._connection = connection
281281
self._request = request
282+
self._closed = False
282283

283284
def __iter__(self) -> Iterator[bytes]:
284285
kwargs = {"request": self._request}
285-
with Trace("http11.receive_response_body", self._request, kwargs):
286-
for chunk in self._connection._receive_response_body(**kwargs):
287-
yield chunk
286+
try:
287+
with Trace("http11.receive_response_body", self._request, kwargs):
288+
for chunk in self._connection._receive_response_body(**kwargs):
289+
yield chunk
290+
except BaseException as exc:
291+
# If we get an exception while streaming the response,
292+
# we want to close the response (and possibly the connection)
293+
# before raising that exception.
294+
self.close()
295+
raise exc
288296

289297
def close(self) -> None:
290-
with Trace("http11.response_closed", self._request):
291-
self._connection._response_closed()
298+
if not self._closed:
299+
self._closed = True
300+
with Trace("http11.response_closed", self._request):
301+
self._connection._response_closed()

httpcore/_sync/http2.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,6 @@ def _response_closed(self, stream_id: int) -> None:
305305
def close(self) -> None:
306306
# Note that this method unilaterally closes the connection, and does
307307
# not have any kind of locking in place around it.
308-
# For task-safe/thread-safe operations call into 'attempt_close' instead.
309308
self._h2_state.close_connection()
310309
self._state = HTTPConnectionState.CLOSED
311310
self._network_stream.close()
@@ -446,16 +445,26 @@ def __init__(
446445
self._connection = connection
447446
self._request = request
448447
self._stream_id = stream_id
448+
self._closed = False
449449

450450
def __iter__(self) -> typing.Iterator[bytes]:
451451
kwargs = {"request": self._request, "stream_id": self._stream_id}
452-
with Trace("http2.receive_response_body", self._request, kwargs):
453-
for chunk in self._connection._receive_response_body(
454-
request=self._request, stream_id=self._stream_id
455-
):
456-
yield chunk
452+
try:
453+
with Trace("http2.receive_response_body", self._request, kwargs):
454+
for chunk in self._connection._receive_response_body(
455+
request=self._request, stream_id=self._stream_id
456+
):
457+
yield chunk
458+
except BaseException as exc:
459+
# If we get an exception while streaming the response,
460+
# we want to close the response (and possibly the connection)
461+
# before raising that exception.
462+
self.close()
463+
raise exc
457464

458465
def close(self) -> None:
459-
kwargs = {"stream_id": self._stream_id}
460-
with Trace("http2.response_closed", self._request, kwargs):
461-
self._connection._response_closed(stream_id=self._stream_id)
466+
if not self._closed:
467+
self._closed = True
468+
kwargs = {"stream_id": self._stream_id}
469+
with Trace("http2.response_closed", self._request, kwargs):
470+
self._connection._response_closed(stream_id=self._stream_id)

tests/_async/test_connection_pool.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,3 +435,29 @@ async def test_unsupported_protocol():
435435

436436
with pytest.raises(UnsupportedProtocol):
437437
await pool.request("GET", "://www.example.com/")
438+
439+
440+
@pytest.mark.anyio
441+
async def test_connection_pool_closed_while_request_in_flight():
442+
"""
443+
Closing a connection pool while a request/response is still in-flight
444+
should raise an error.
445+
"""
446+
network_backend = AsyncMockBackend(
447+
[
448+
b"HTTP/1.1 200 OK\r\n",
449+
b"Content-Type: plain/text\r\n",
450+
b"Content-Length: 13\r\n",
451+
b"\r\n",
452+
b"Hello, world!",
453+
]
454+
)
455+
456+
async with AsyncConnectionPool(
457+
network_backend=network_backend,
458+
) as pool:
459+
# Send a request, and then close the connection pool while the
460+
# response has not yet been streamed.
461+
async with pool.stream("GET", "https://example.com/"):
462+
with pytest.raises(RuntimeError):
463+
await pool.aclose()

tests/_async/test_http11.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,35 @@ async def test_http11_connection_with_remote_protocol_error():
9191
)
9292

9393

94+
@pytest.mark.anyio
95+
async def test_http11_connection_with_incomplete_response():
96+
"""
97+
We should be gracefully handling the case where the connection ends prematurely.
98+
"""
99+
origin = Origin(b"https", b"example.com", 443)
100+
stream = AsyncMockStream(
101+
[
102+
b"HTTP/1.1 200 OK\r\n",
103+
b"Content-Type: plain/text\r\n",
104+
b"Content-Length: 13\r\n",
105+
b"\r\n",
106+
b"Hello, wor",
107+
]
108+
)
109+
async with AsyncHTTP11Connection(origin=origin, stream=stream) as conn:
110+
with pytest.raises(RemoteProtocolError):
111+
await conn.request("GET", "https://example.com/")
112+
113+
assert not conn.is_idle()
114+
assert conn.is_closed()
115+
assert not conn.is_available()
116+
assert not conn.has_expired()
117+
assert (
118+
repr(conn)
119+
== "<AsyncHTTP11Connection ['https://example.com:443', CLOSED, Request Count: 1]>"
120+
)
121+
122+
94123
@pytest.mark.anyio
95124
async def test_http11_connection_with_local_protocol_error():
96125
"""

tests/_sync/test_connection_pool.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,3 +435,29 @@ def test_unsupported_protocol():
435435

436436
with pytest.raises(UnsupportedProtocol):
437437
pool.request("GET", "://www.example.com/")
438+
439+
440+
441+
def test_connection_pool_closed_while_request_in_flight():
442+
"""
443+
Closing a connection pool while a request/response is still in-flight
444+
should raise an error.
445+
"""
446+
network_backend = MockBackend(
447+
[
448+
b"HTTP/1.1 200 OK\r\n",
449+
b"Content-Type: plain/text\r\n",
450+
b"Content-Length: 13\r\n",
451+
b"\r\n",
452+
b"Hello, world!",
453+
]
454+
)
455+
456+
with ConnectionPool(
457+
network_backend=network_backend,
458+
) as pool:
459+
# Send a request, and then close the connection pool while the
460+
# response has not yet been streamed.
461+
with pool.stream("GET", "https://example.com/"):
462+
with pytest.raises(RuntimeError):
463+
pool.close()

0 commit comments

Comments
 (0)