diff --git a/CHANGELOG.md b/CHANGELOG.md index d2abd040..f85d4e45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## unreleased - The networking backend interface has [been added to the public API](https://www.encode.io/httpcore/network-backends). Some classes which were previously private implementation detail are now part of the top-level public API. (#699) +- Fix response cleanup after cancellation. (#704) ## 0.17.2 (May 23th, 2023) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 935f34db..6d7842fc 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -1,5 +1,6 @@ import ssl import sys +import typing from types import TracebackType from typing import AsyncIterable, AsyncIterator, Iterable, List, Optional, Type @@ -11,6 +12,8 @@ from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface +GRACEFUL_CLOSE_TIMEOUT = 2 + class RequestStatus: def __init__(self, request: Request): @@ -342,13 +345,33 @@ def __init__( self._pool = pool self._status = status + @AutoBackend.shield_cancellation + async def clean_up(self) -> None: + # Close the underlying "connection" if the natural closing was canceled + # or an exception was raised. This happens when a request is cancelled + # in the middle of a cycle, and async backend cancellations close all + # active tasks in that scope, including the clean up function, so we need + # to run those functions again in the cancellation isolated environment. + + await self._pool.response_closed(self._status) + async def __aiter__(self) -> AsyncIterator[bytes]: async for part in self._stream: yield part async def aclose(self) -> None: + cancelled: typing.Optional[bool] = None try: if hasattr(self._stream, "aclose"): + cancelled = True await self._stream.aclose() + cancelled = False finally: - await self._pool.response_closed(self._status) + try: + if cancelled and hasattr( + self._stream, "_connection" + ): # pragma: no cover + with AutoBackend.fail_after(GRACEFUL_CLOSE_TIMEOUT): + await self._stream._connection.aclose() + finally: + await self.clean_up() diff --git a/httpcore/_backends/auto.py b/httpcore/_backends/auto.py index b612ba07..f28ff9b6 100644 --- a/httpcore/_backends/auto.py +++ b/httpcore/_backends/auto.py @@ -1,12 +1,18 @@ import typing +from functools import wraps from typing import Optional +import anyio import sniffio from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream +_R = typing.TypeVar("_R") + class AutoBackend(AsyncNetworkBackend): + fail_after = anyio.fail_after + async def _init_backend(self) -> None: if not (hasattr(self, "_backend")): backend = sniffio.current_async_library() @@ -50,3 +56,16 @@ async def connect_unix_socket( async def sleep(self, seconds: float) -> None: # pragma: nocover await self._init_backend() return await self._backend.sleep(seconds) + + @staticmethod + def shield_cancellation( + fnc: typing.Callable[..., typing.Awaitable[_R]] + ) -> typing.Callable[..., typing.Awaitable[_R]]: + # Makes an async function that runs in a cancellation-isolated environment. + + @wraps(fnc) + async def inner(*args: typing.Any, **kwargs: typing.Any) -> _R: + with anyio.CancelScope(shield=True): + return await fnc(*args, **kwargs) + + return inner diff --git a/httpcore/_backends/sync.py b/httpcore/_backends/sync.py index a4c85f04..b427d694 100644 --- a/httpcore/_backends/sync.py +++ b/httpcore/_backends/sync.py @@ -1,3 +1,4 @@ +import contextlib import socket import ssl import sys @@ -16,6 +17,8 @@ from .._utils import is_socket_readable from .base import SOCKET_OPTION, NetworkBackend, NetworkStream +_R = typing.TypeVar("_R") + class SyncStream(NetworkStream): def __init__(self, sock: socket.socket) -> None: @@ -77,6 +80,11 @@ def get_extra_info(self, info: str) -> typing.Any: class SyncBackend(NetworkBackend): + @staticmethod + @contextlib.contextmanager + def fail_after(delay: typing.Optional[float], shield: bool = False) -> typing.Any: + yield # pragma: no cover + def connect_tcp( self, host: str, @@ -131,3 +139,7 @@ def connect_unix_socket( sock.settimeout(timeout) sock.connect(path) return SyncStream(sock) + + @staticmethod + def shield_cancellation(fnc: _R) -> _R: + return fnc diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index f64334af..2f689052 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -1,5 +1,6 @@ import ssl import sys +import typing from types import TracebackType from typing import Iterable, Iterator, Iterable, List, Optional, Type @@ -11,6 +12,8 @@ from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface +GRACEFUL_CLOSE_TIMEOUT = 2 + class RequestStatus: def __init__(self, request: Request): @@ -342,13 +345,33 @@ def __init__( self._pool = pool self._status = status + @SyncBackend.shield_cancellation + def clean_up(self) -> None: + # Close the underlying "connection" if the natural closing was canceled + # or an exception was raised. This happens when a request is cancelled + # in the middle of a cycle, and async backend cancellations close all + # active tasks in that scope, including the clean up function, so we need + # to run those functions again in the cancellation isolated environment. + + self._pool.response_closed(self._status) + def __iter__(self) -> Iterator[bytes]: for part in self._stream: yield part def close(self) -> None: + cancelled: typing.Optional[bool] = None try: if hasattr(self._stream, "close"): + cancelled = True self._stream.close() + cancelled = False finally: - self._pool.response_closed(self._status) + try: + if cancelled and hasattr( + self._stream, "_connection" + ): # pragma: no cover + with SyncBackend.fail_after(GRACEFUL_CLOSE_TIMEOUT): + self._stream._connection.close() + finally: + self.clean_up()