diff --git a/CHANGELOG.md b/CHANGELOG.md index f4e0d63bd..6b1b741a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ The types of changes are: * Adds a new Timescale connector [#1327](https://github.com/ethyca/fidesops/pull/1327) * Allow querying the non-default schema with the Postgres Connector [#1375](https://github.com/ethyca/fidesops/pull/1375) * Frontend - ability for users to manually enter PII to an IN PROGRESS subject request [#1016](https://github.com/ethyca/fidesops/pull/1377) +* Enable retries on saas connectors for failures at the http request level [#1376](https://github.com/ethyca/fidesops/pull/1376) ### Removed diff --git a/src/fidesops/ops/service/connectors/saas/authenticated_client.py b/src/fidesops/ops/service/connectors/saas/authenticated_client.py index 5ef6c5aa4..9dec542e2 100644 --- a/src/fidesops/ops/service/connectors/saas/authenticated_client.py +++ b/src/fidesops/ops/service/connectors/saas/authenticated_client.py @@ -1,13 +1,19 @@ from __future__ import annotations +import email import logging -from typing import TYPE_CHECKING, Optional +import re +import time +from functools import wraps +from time import sleep +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union from requests import PreparedRequest, Request, Response, Session from fidesops.ops.common_exceptions import ( ClientUnsuccessfulException, ConnectionException, + FidesopsException, ) from fidesops.ops.core.config import config @@ -75,6 +81,69 @@ def get_authenticated_request( # otherwise just return the prepared request return req + def retry_send( # type: ignore + retry_count: int, + backoff_factor: float, + retry_status_codes: List[int] = [429, 502, 503, 504], + ) -> Callable: + """ + Retry decorator for http requests, backing off exponentially or listening to server retry-after header + + Exponential backoff factor uses the following formula: + backoff_factor * (2 ** (retry_attempt)) + For an backoff_factor of 1 it will sleep for 2,4,8 seconds + + General exceptions are not retried. RequestFailureResponseException exceptions are only retried + if the status code is in retry_status_codes. + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + def result(*args: Any, **kwargs: Any) -> Response: + self = args[0] + last_exception: Optional[Union[BaseException, Exception]] = None + + for attempt in range(retry_count + 1): + sleep_time = backoff_factor * (2 ** (attempt + 1)) + try: + return func(*args, **kwargs) + except RequestFailureResponseException as exc: # pylint: disable=W0703 + response: Response = exc.response + status_code: int = response.status_code + last_exception = ClientUnsuccessfulException( + status_code=status_code + ) + + if status_code not in retry_status_codes: + break + + # override sleep time if retry after header is found + retry_after_time = get_retry_after(response) + sleep_time = ( + retry_after_time if retry_after_time else sleep_time + ) + except Exception as exc: # pylint: disable=W0703 + dev_mode_log = f" with error: {exc}" if config.dev_mode else "" + last_exception = ConnectionException( + f"Operational Error connecting to '{self.key}'{dev_mode_log}" + ) + # requests library can raise ConnectionError, Timeout or TooManyRedirects + # we will not retry these as they don't usually point to intermittent issues + break + + if attempt < retry_count: + logger.warning( + "Retrying http request in %s seconds", sleep_time + ) + sleep(sleep_time) + + raise last_exception # type: ignore + + return result + + return decorator + + @retry_send(retry_count=3, backoff_factor=1.0) # pylint: disable=E1124 def send( self, request_params: SaaSRequestParams, ignore_errors: Optional[bool] = False ) -> Response: @@ -82,20 +151,10 @@ def send( Builds and executes an authenticated request. Optionally ignores non-200 responses if ignore_errors is set to True """ - try: - prepared_request: PreparedRequest = self.get_authenticated_request( - request_params - ) - response = self.session.send(prepared_request) - except Exception as exc: # pylint: disable=W0703 - if config.dev_mode: # pylint: disable=R1720 - raise ConnectionException( - f"Operational Error connecting to '{self.key}' with error: {exc}" - ) - else: - raise ConnectionException( - f"Operational Error connecting to '{self.key}'." - ) + prepared_request: PreparedRequest = self.get_authenticated_request( + request_params + ) + response = self.session.send(prepared_request) log_request_and_response_for_debugging( prepared_request, response @@ -108,10 +167,18 @@ def send( response.status_code, ) return response + raise RequestFailureResponseException(response=response) + return response - raise ClientUnsuccessfulException(status_code=response.status_code) - return response +class RequestFailureResponseException(FidesopsException): + """Exception class which preserves http response""" + + response: Response + + def __init__(self, response: Response): + super().__init__("Received failure response from server") + self.response = response def log_request_and_response_for_debugging( @@ -131,3 +198,27 @@ def log_request_and_response_for_debugging( prepared_request.body, response._content, # pylint: disable=W0212 ) + + +def get_retry_after(response: Response, max_retry_after: int = 300) -> Optional[float]: + """Given a Response object, parses Retry-After header and calculates how long we should sleep for""" + retry_after = response.headers.get("Retry-After", None) + + if retry_after is None: + return None + + seconds: float + # if a number value is provided the server is telling us to sleep for X seconds + if re.match(r"^\s*[0-9]+\s*$", retry_after): + seconds = int(retry_after) + # else we will attempt to parse a timestamp and diff with current time + else: + retry_date_tuple = email.utils.parsedate_tz(retry_after) + if retry_date_tuple is None: + return None + + retry_date = email.utils.mktime_tz(retry_date_tuple) + seconds = retry_date - time.time() + + seconds = max(seconds, 0) + return min(seconds, max_retry_after) diff --git a/tests/ops/integration_tests/saas/test_zendesk_task.py b/tests/ops/integration_tests/saas/test_zendesk_task.py index f07f11748..7e05e2adf 100644 --- a/tests/ops/integration_tests/saas/test_zendesk_task.py +++ b/tests/ops/integration_tests/saas/test_zendesk_task.py @@ -201,8 +201,6 @@ async def test_zendesk_erasure_request_task( merged_graph = zendesk_dataset_config.get_graph() graph = DatasetGraph(merged_graph) - # Since we sometimes get response: b'Number of allowed API requests per minute exceeded' so adding this line to avoid it - time.sleep(60) v = await graph_task.run_access_request( privacy_request, policy, diff --git a/tests/ops/service/connectors/saas/test_authenticated_client.py b/tests/ops/service/connectors/saas/test_authenticated_client.py new file mode 100644 index 000000000..fb11d801f --- /dev/null +++ b/tests/ops/service/connectors/saas/test_authenticated_client.py @@ -0,0 +1,122 @@ +import time +import unittest.mock as mock +from email.utils import formatdate +from typing import Any, Dict + +import pytest +from requests import ConnectionError, Response, Session + +from fidesops.ops.common_exceptions import ( + ClientUnsuccessfulException, + ConnectionException, +) +from fidesops.ops.models.connectionconfig import ConnectionConfig, ConnectionType +from fidesops.ops.schemas.saas.shared_schemas import HTTPMethod, SaaSRequestParams +from fidesops.ops.service.connectors.saas.authenticated_client import ( + AuthenticatedClient, + get_retry_after, +) +from fidesops.ops.util.saas_util import load_config_with_replacement + + +@pytest.fixture +def test_saas_config() -> Dict[str, Any]: + return load_config_with_replacement( + "data/saas/config/segment_config.yml", + "", + "test_config", + ) + + +@pytest.fixture +def test_connection_config(test_saas_config) -> ConnectionConfig: + return ConnectionConfig( + key="test_config", + connection_type=ConnectionType.saas, + saas_config=test_saas_config, + secrets={"access_token": "test_token"}, + ) + + +@pytest.fixture +def test_saas_request() -> SaaSRequestParams: + return SaaSRequestParams( + method=HTTPMethod.GET, + path="test_path", + query_params={}, + ) + + +@pytest.fixture +def test_authenticated_client(test_connection_config) -> AuthenticatedClient: + return AuthenticatedClient("https://test_uri", test_connection_config) + + +@pytest.mark.unit_saas +@mock.patch.object(Session, "send") +class TestAuthenticatedClient: + def test_client_returns_ok_response( + self, send, test_authenticated_client, test_saas_request + ): + test_response = Response() + test_response.status_code = 200 + send.return_value = test_response + returned_response = test_authenticated_client.send(test_saas_request) + assert returned_response == test_response + + def test_client_retries_429_and_throws( + self, send, test_authenticated_client, test_saas_request + ): + test_response = Response() + test_response.status_code = 429 + send.return_value = test_response + with pytest.raises(ClientUnsuccessfulException): + test_authenticated_client.send(test_saas_request) + assert send.call_count == 4 + + def test_client_retries_429_with_success( + self, send, test_authenticated_client, test_saas_request + ): + test_response_1 = Response() + test_response_1.status_code = 429 + test_response_2 = Response() + test_response_2.status_code = 200 + send.side_effect = [test_response_1, test_response_2] + returned_response = test_authenticated_client.send(test_saas_request) + returned_response == test_response_2 + assert send.call_count == 2 + + def test_client_does_not_retry_connection_error( + self, send, test_authenticated_client, test_saas_request + ): + test_side_effect_1 = ConnectionError() + send.side_effect = [test_side_effect_1] + with pytest.raises(ConnectionException): + test_authenticated_client.send(test_saas_request) + assert send.call_count == 1 + + +@pytest.mark.unit_saas +class TestRetryAfterHeaderParsing: + def test_retry_after_parses_seconds_response(self): + test_response = Response() + test_response.status_code = 429 + test_response.headers = {"Retry-After": "30"} + retry_after_sleep = get_retry_after(test_response) + assert retry_after_sleep == 30 + + def test_retry_after_parses_timestamp_in_future(self): + test_response = Response() + test_response.status_code = 429 + time_in_future = time.time() + 30 + test_response.headers = {"Retry-After": formatdate(timeval=time_in_future)} + retry_after_sleep = get_retry_after(test_response) + assert retry_after_sleep > 20 + + def test_retry_after_parses_timestamp_in_past(self): + test_response = Response() + test_response.status_code = 429 + time_in_past = time.time() - 30 + test_response.headers = {"Retry-After": formatdate(timeval=time_in_past)} + retry_after_sleep = get_retry_after(test_response) + assert retry_after_sleep == 0