diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index c81c91c4..71a37b6a 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -2,6 +2,8 @@ import concurrent.futures import dataclasses +import datetime +import email.utils import hashlib import io import json @@ -946,27 +948,33 @@ def _handle_upload_exception( begin_offset = progress.get("begin_offset") offset = progress.get("offset") - if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): - self.emitter.emit("upload_retrying", progress) + LOG.warning( + f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" + ) - LOG.warning( - f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" - ) + if retries <= constants.MAX_UPLOAD_RETRIES: + retriable, retry_after_sec = _is_retriable_exception(ex) + if retriable: + self.emitter.emit("upload_retrying", progress) - # Keep things immutable here. Will increment retries in the caller - retries += 1 - if _is_immediate_retriable_exception(ex): - sleep_for = 0 - else: - sleep_for = min(2**retries, 16) - LOG.info( - f"Retrying in {sleep_for} seconds ({retries}/{constants.MAX_UPLOAD_RETRIES})" - ) - if sleep_for: - time.sleep(sleep_for) - else: - self.emitter.emit("upload_failed", progress) - raise ex + # Keep things immutable here. Will increment retries in the caller + retries += 1 + if _is_immediate_retriable_exception(ex): + sleep_for = 0 + else: + sleep_for = min(2**retries, 16) + sleep_for += retry_after_sec + + LOG.info( + f"Retrying in {sleep_for} seconds ({retries}/{constants.MAX_UPLOAD_RETRIES})" + ) + if sleep_for: + time.sleep(sleep_for) + + return + + self.emitter.emit("upload_failed", progress) + raise ex @classmethod def _upload_name(cls, progress: UploaderProgress): @@ -1083,23 +1091,188 @@ def _is_immediate_retriable_exception(ex: BaseException) -> bool: return False -def _is_retriable_exception(ex: BaseException) -> bool: +def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: + """ + Determine if an exception should be retried and how long to wait. + + Args: + ex: Exception to check for retryability + + Returns: + Tuple of (retriable, retry_after_sec) where: + - retriable: True if the exception should be retried + - retry_after_sec: Seconds to wait before retry (>= 0) + + Examples: + >>> resp = requests.Response() + >>> resp._content = b"foo" + >>> resp.status_code = 400 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (False, 0) + >>> resp._content = b'{"backoff": 13000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.status_code = 400 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 13) + >>> resp._content = b'{"backoff": "foo", "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.status_code = 400 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 10) + >>> resp._content = b'{"debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.status_code = 400 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 10) + >>> resp._content = b"foo" + >>> resp.status_code = 429 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 10) + >>> resp._content = b"foo" + >>> resp.status_code = 429 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 10) + >>> resp._content = b'{"backoff": 12000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.status_code = 429 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 12) + >>> resp._content = b'{"backoff": 12000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.headers = {"Retry-After": "1"} + >>> resp.status_code = 503 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 1) + """ + + DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC = 10 + if isinstance(ex, (requests.ConnectionError, requests.Timeout)): - return True + return True, 0 if isinstance(ex, requests.HTTPError) and isinstance( ex.response, requests.Response ): - if 400 <= ex.response.status_code < 500: + status_code = ex.response.status_code + + # Always retry with some delay + if status_code == 429: + retry_after_sec = ( + _parse_retry_after_from_header(ex.response) + or DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC + ) + try: - resp = ex.response.json() - except json.JSONDecodeError: - return False - return resp.get("debug_info", {}).get("retriable", False) - else: - return True + data = ex.response.json() + except requests.JSONDecodeError: + return True, retry_after_sec - return False + backoff_ms = _parse_backoff(data.get("backoff")) + if backoff_ms is None: + return True, retry_after_sec + else: + return True, max(0, int(int(backoff_ms) / 1000)) + + if 400 <= status_code < 500: + try: + data = ex.response.json() + except requests.JSONDecodeError: + return False, (_parse_retry_after_from_header(ex.response) or 0) + + debug_info = data.get("debug_info", {}) + + if isinstance(debug_info, dict): + error_type = debug_info.get("type") + else: + error_type = None + + # The server may respond 429 RequestRateLimitedError but with retryable=False + # We should retry for this case regardless + # e.g. HTTP 429 {"backoff": 10000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}} + if error_type == "RequestRateLimitedError": + backoff_ms = _parse_backoff(data.get("backoff")) + if backoff_ms is None: + return True, ( + _parse_retry_after_from_header(ex.response) + or DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC + ) + else: + return True, max(0, int(int(backoff_ms) / 1000)) + + return debug_info.get("retriable", False), 0 + + if 500 <= status_code < 600: + return True, (_parse_retry_after_from_header(ex.response) or 0) + + return False, 0 + + +def _parse_backoff(backoff: T.Any) -> int | None: + if backoff is not None: + try: + backoff_ms = int(backoff) + except (ValueError, TypeError): + backoff_ms = None + else: + backoff_ms = None + return backoff_ms + + +def _parse_retry_after_from_header(resp: requests.Response) -> int | None: + """ + Parse Retry-After header from HTTP response. + See See https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Retry-After + + Args: + resp: HTTP response object with headers + + Returns: + Number of seconds to wait (>= 0) or None if header missing/invalid. + + Examples: + >>> resp = requests.Response() + >>> resp.headers = {"Retry-After": "1"} + >>> _parse_retry_after_from_header(resp) + 1 + >>> resp.headers = {"Retry-After": "-1"} + >>> _parse_retry_after_from_header(resp) + 0 + >>> resp.headers = {"Retry-After": "Wed, 21 Oct 2015 07:28:00 GMT"} + >>> _parse_retry_after_from_header(resp) + 0 + >>> resp.headers = {"Retry-After": "Wed, 21 Oct 2315 07:28:00"} + >>> _parse_retry_after_from_header(resp) + """ + + value = resp.headers.get("Retry-After") + if value is None: + return None + + try: + return max(0, int(value)) + except (ValueError, TypeError): + pass + + # e.g. "Wed, 21 Oct 2015 07:28:00 GMT" + try: + dt = email.utils.parsedate_to_datetime(value) + except (ValueError, TypeError): + dt = None + + if dt is None: + LOG.warning(f"Error parsing Retry-After: {value}") + return None + + try: + delta = dt - datetime.datetime.now(datetime.timezone.utc) + except (TypeError, ValueError): + # e.g. TypeError: can't subtract offset-naive and offset-aware datetimes + return None + + return max(0, int(delta.total_seconds())) _SUFFIX_MAP: dict[api_v4.ClusterFileType | types.FileType, str] = {