From 3f1eb55353fbb0f5aeda65b15cc9549d8066ab07 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 27 Aug 2025 16:59:05 -0700 Subject: [PATCH 1/4] handle HTTP 429 as retriable error --- mapillary_tools/uploader.py | 104 ++++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 29 deletions(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index c81c91c4..a279c343 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -946,27 +946,33 @@ def _handle_upload_exception( begin_offset = progress.get("begin_offset") offset = progress.get("offset") - if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): - self.emitter.emit("upload_retrying", progress) + if retries <= constants.MAX_UPLOAD_RETRIES: + retriable, retry_after_sec = _is_retriable_exception(ex) + if retriable: + self.emitter.emit("upload_retrying", progress) - LOG.warning( - f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" - ) + LOG.warning( + f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" + ) - # Keep things immutable here. Will increment retries in the caller - retries += 1 - if _is_immediate_retriable_exception(ex): - sleep_for = 0 - else: - sleep_for = min(2**retries, 16) - LOG.info( - f"Retrying in {sleep_for} seconds ({retries}/{constants.MAX_UPLOAD_RETRIES})" - ) - if sleep_for: - time.sleep(sleep_for) - else: - self.emitter.emit("upload_failed", progress) - raise ex + # Keep things immutable here. Will increment retries in the caller + retries += 1 + if _is_immediate_retriable_exception(ex): + sleep_for = 0 + else: + sleep_for = min(2**retries, 16) + sleep_for += retry_after_sec + + LOG.info( + f"Retrying in {sleep_for} seconds ({retries}/{constants.MAX_UPLOAD_RETRIES})" + ) + if sleep_for: + time.sleep(sleep_for) + + return + + self.emitter.emit("upload_failed", progress) + raise ex @classmethod def _upload_name(cls, progress: UploaderProgress): @@ -1083,23 +1089,63 @@ def _is_immediate_retriable_exception(ex: BaseException) -> bool: return False -def _is_retriable_exception(ex: BaseException) -> bool: +def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: + """Return tuple(retriable, retry_after_sec) where retry_after_sec is what the server recommends to wait for retrying in seconds.""" + + DEFAULT_RETRY_AFTER_SEC = 10 + if isinstance(ex, (requests.ConnectionError, requests.Timeout)): - return True + return True, 0 if isinstance(ex, requests.HTTPError) and isinstance( ex.response, requests.Response ): - if 400 <= ex.response.status_code < 500: + status_code = ex.response.status_code + + if status_code == 429: + retry_after_sec = ex.response.headers.get("Retry-After") + try: - resp = ex.response.json() - except json.JSONDecodeError: - return False - return resp.get("debug_info", {}).get("retriable", False) - else: - return True + data = ex.response.json() + except requests.JSONDecodeError: + return True, int(retry_after_sec or DEFAULT_RETRY_AFTER_SEC) - return False + backoff_ms = data.get("backoff") + if backoff_ms is not None: + return True, int(int(backoff_ms) / 1000) + else: + return True, int(retry_after_sec or DEFAULT_RETRY_AFTER_SEC) + + if 400 <= status_code < 500: + try: + data = ex.response.json() + except requests.JSONDecodeError: + return False, 0 + + debug_info = data.get("debug_info", {}) + + if isinstance(debug_info, dict): + error_type = debug_info.get("type") + else: + error_type = None + + # The server may respond 429 RequestRateLimitedError but with retryable=False + # We should retry for this case regardless + # For example: HTTP 429 + # {"backoff": 10000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}} + if error_type == "RequestRateLimitedError": + backoff_ms = data.get("backoff") + if backoff_ms is not None: + return True, int(int(backoff_ms) / 1000) + else: + return True, DEFAULT_RETRY_AFTER_SEC + + return debug_info.get("retriable", False), 0 + + if status_code >= 500: + return True, 0 + + return False, 0 _SUFFIX_MAP: dict[api_v4.ClusterFileType | types.FileType, str] = { From fb62bc55f2b883292e75dc778b8f60a6ad208d74 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 27 Aug 2025 19:46:39 -0700 Subject: [PATCH 2/4] add doctests --- mapillary_tools/uploader.py | 147 +++++++++++++++++++++++++++++++----- 1 file changed, 130 insertions(+), 17 deletions(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index a279c343..5de83ad1 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -2,6 +2,8 @@ import concurrent.futures import dataclasses +import datetime +import email.utils import hashlib import io import json @@ -946,15 +948,16 @@ def _handle_upload_exception( begin_offset = progress.get("begin_offset") offset = progress.get("offset") + if not isinstance(ex, KeyboardInterrupt): + LOG.warning( + f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" + ) + if retries <= constants.MAX_UPLOAD_RETRIES: retriable, retry_after_sec = _is_retriable_exception(ex) if retriable: self.emitter.emit("upload_retrying", progress) - LOG.warning( - f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" - ) - # Keep things immutable here. Will increment retries in the caller retries += 1 if _is_immediate_retriable_exception(ex): @@ -1090,9 +1093,58 @@ def _is_immediate_retriable_exception(ex: BaseException) -> bool: def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: - """Return tuple(retriable, retry_after_sec) where retry_after_sec is what the server recommends to wait for retrying in seconds.""" + """ + Determine if an exception should be retried and how long to wait. + + Args: + ex: Exception to check for retryability + + Returns: + Tuple of (retriable, retry_after_sec) where: + - retriable: True if the exception should be retried + - retry_after_sec: Seconds to wait before retry (>= 0) + + Examples: + >>> resp = requests.Response() + >>> resp._content = b"foo" + >>> resp.status_code = 400 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (False, 0) + >>> resp._content = b'{"backoff": 13000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.status_code = 400 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 13) + >>> resp._content = b'{"debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.status_code = 400 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 10) + >>> resp._content = b"foo" + >>> resp.status_code = 429 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 10) + >>> resp._content = b"foo" + >>> resp.status_code = 429 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 10) + >>> resp._content = b'{"backoff": 12000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.status_code = 429 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 12) + >>> resp._content = b'{"backoff": 12000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.headers = {"Retry-After": "1"} + >>> resp.status_code = 503 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 1) + """ - DEFAULT_RETRY_AFTER_SEC = 10 + DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC = 10 if isinstance(ex, (requests.ConnectionError, requests.Timeout)): return True, 0 @@ -1102,25 +1154,29 @@ def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: ): status_code = ex.response.status_code + # Always retry with some delay if status_code == 429: - retry_after_sec = ex.response.headers.get("Retry-After") + retry_after_sec = ( + _parse_retry_after_from_header(ex.response) + or DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC + ) try: data = ex.response.json() except requests.JSONDecodeError: - return True, int(retry_after_sec or DEFAULT_RETRY_AFTER_SEC) + return True, retry_after_sec backoff_ms = data.get("backoff") - if backoff_ms is not None: - return True, int(int(backoff_ms) / 1000) + if backoff_ms is None: + return True, retry_after_sec else: - return True, int(retry_after_sec or DEFAULT_RETRY_AFTER_SEC) + return True, max(0, int(int(backoff_ms) / 1000)) if 400 <= status_code < 500: try: data = ex.response.json() except requests.JSONDecodeError: - return False, 0 + return False, (_parse_retry_after_from_header(ex.response) or 0) debug_info = data.get("debug_info", {}) @@ -1135,19 +1191,76 @@ def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: # {"backoff": 10000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}} if error_type == "RequestRateLimitedError": backoff_ms = data.get("backoff") - if backoff_ms is not None: - return True, int(int(backoff_ms) / 1000) + if backoff_ms is None: + return True, ( + _parse_retry_after_from_header(ex.response) + or DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC + ) else: - return True, DEFAULT_RETRY_AFTER_SEC + return True, max(0, int(int(backoff_ms) / 1000)) return debug_info.get("retriable", False), 0 - if status_code >= 500: - return True, 0 + if 500 <= status_code < 600: + return True, (_parse_retry_after_from_header(ex.response) or 0) return False, 0 +def _parse_retry_after_from_header(resp: requests.Response) -> int | None: + """ + Parse Retry-After header from HTTP response. + See See https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Retry-After + + Args: + resp: HTTP response object with headers + + Returns: + Number of seconds to wait (>= 0) or None if header missing/invalid. + + Examples: + >>> resp = requests.Response() + >>> resp.headers = {"Retry-After": "1"} + >>> _parse_retry_after_from_header(resp) + 1 + >>> resp.headers = {"Retry-After": "-1"} + >>> _parse_retry_after_from_header(resp) + 0 + >>> resp.headers = {"Retry-After": "Wed, 21 Oct 2015 07:28:00 GMT"} + >>> _parse_retry_after_from_header(resp) + 0 + >>> resp.headers = {"Retry-After": "Wed, 21 Oct 2315 07:28:00"} + >>> _parse_retry_after_from_header(resp) + """ + + value = resp.headers.get("Retry-After") + if value is None: + return None + + try: + return max(0, int(value)) + except (ValueError, TypeError): + pass + + # e.g. "Wed, 21 Oct 2015 07:28:00 GMT" + try: + dt = email.utils.parsedate_to_datetime(value) + except (ValueError, TypeError): + dt = None + + if dt is None: + LOG.warning(f"Error parsing Retry-After: {value}") + return None + + try: + delta = dt - datetime.datetime.now(datetime.UTC) + except (TypeError, ValueError): + # e.g. TypeError: can't subtract offset-naive and offset-aware datetimes + return None + + return max(0, int(delta.total_seconds())) + + _SUFFIX_MAP: dict[api_v4.ClusterFileType | types.FileType, str] = { api_v4.ClusterFileType.ZIP: ".zip", api_v4.ClusterFileType.CAMM: ".mp4", From 14529f78c4b8b03fca8fbe3e12fce01fb882cbcc Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 27 Aug 2025 20:07:22 -0700 Subject: [PATCH 3/4] fix --- mapillary_tools/uploader.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 5de83ad1..17068e8d 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -1116,6 +1116,11 @@ def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: >>> ex = requests.HTTPError("error", response=resp) >>> _is_retriable_exception(ex) (True, 13) + >>> resp._content = b'{"backoff": "foo", "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' + >>> resp.status_code = 400 + >>> ex = requests.HTTPError("error", response=resp) + >>> _is_retriable_exception(ex) + (True, 10) >>> resp._content = b'{"debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' >>> resp.status_code = 400 >>> ex = requests.HTTPError("error", response=resp) @@ -1166,7 +1171,7 @@ def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: except requests.JSONDecodeError: return True, retry_after_sec - backoff_ms = data.get("backoff") + backoff_ms = _parse_backoff(data.get("backoff")) if backoff_ms is None: return True, retry_after_sec else: @@ -1187,10 +1192,9 @@ def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: # The server may respond 429 RequestRateLimitedError but with retryable=False # We should retry for this case regardless - # For example: HTTP 429 - # {"backoff": 10000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}} + # e.g. HTTP 429 {"backoff": 10000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}} if error_type == "RequestRateLimitedError": - backoff_ms = data.get("backoff") + backoff_ms = _parse_backoff(data.get("backoff")) if backoff_ms is None: return True, ( _parse_retry_after_from_header(ex.response) @@ -1207,6 +1211,17 @@ def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: return False, 0 +def _parse_backoff(backoff: T.Any) -> int | None: + if backoff is not None: + try: + backoff_ms = int(backoff) + except (ValueError, TypeError): + backoff_ms = None + else: + backoff_ms = None + return backoff_ms + + def _parse_retry_after_from_header(resp: requests.Response) -> int | None: """ Parse Retry-After header from HTTP response. @@ -1253,7 +1268,7 @@ def _parse_retry_after_from_header(resp: requests.Response) -> int | None: return None try: - delta = dt - datetime.datetime.now(datetime.UTC) + delta = dt - datetime.datetime.now(datetime.timezone.utc) except (TypeError, ValueError): # e.g. TypeError: can't subtract offset-naive and offset-aware datetimes return None From 9d3aa7c2a822f4db101060d399147a3edb8aff04 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 27 Aug 2025 20:13:51 -0700 Subject: [PATCH 4/4] warning all exceptions --- mapillary_tools/uploader.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 17068e8d..71a37b6a 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -948,10 +948,9 @@ def _handle_upload_exception( begin_offset = progress.get("begin_offset") offset = progress.get("offset") - if not isinstance(ex, KeyboardInterrupt): - LOG.warning( - f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" - ) + LOG.warning( + f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" + ) if retries <= constants.MAX_UPLOAD_RETRIES: retriable, retry_after_sec = _is_retriable_exception(ex)