diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index ee5dfa9db735d..130d37b546c3e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -368,12 +368,23 @@ def sync(self) -> None: ) self.fail(task[0], e) except ApiException as e: - body = json.loads(e.body) + try: + if e.body: + body = json.loads(e.body) + else: + # If no body content, use reason as the message + body = {"message": e.reason} + except (json.JSONDecodeError, ValueError, TypeError): + # If the body is a string (e.g., in a 429 error), it can't be parsed as JSON. + # Use the body directly as the message instead. + body = {"message": e.body} + retries = self.task_publish_retries[key] # In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries + message = body.get("message", "") if ( - (str(e.status) == "403" and "exceeded quota" in body["message"]) - or (str(e.status) == "409" and "object has been modified" in body["message"]) + (str(e.status) == "403" and "exceeded quota" in message) + or (str(e.status) == "409" and "object has been modified" in message) ) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries): self.log.warning( "[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s", @@ -381,7 +392,7 @@ def sync(self) -> None: self.task_publish_max_retries, key, e.reason, - body["message"], + message, ) self.task_queue.put(task) self.task_publish_retries[key] = retries + 1 diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py index e0c7c549e9076..718f93428cfe0 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py @@ -65,12 +65,27 @@ def execute(self, context): try: self.hook.apply_from_yaml_file(yaml_objects=yaml_objects) except FailToCreateError as ex: - error_bodies = [json.loads(e.body) for e in ex.api_exceptions] + error_bodies = [] + for e in ex.api_exceptions: + try: + if e.body: + error_bodies.append(json.loads(e.body)) + else: + # If no body content, use reason as the message + reason = getattr(e, "reason", "Unknown") + error_bodies.append({"message": reason, "reason": reason}) + except (json.JSONDecodeError, ValueError, TypeError): + # If the body is a string (e.g., in a 429 error), it can't be parsed as JSON. + # Use the body directly as the message instead. + error_bodies.append({"message": e.body, "reason": getattr(e, "reason", "Unknown")}) if next((e for e in error_bodies if e.get("reason") == "AlreadyExists"), None): self.log.info("Kueue is already enabled for the cluster") if errors := [e for e in error_bodies if e.get("reason") != "AlreadyExists"]: - error_message = "\n".join(e.get("body") for e in errors) + error_message = "\n".join( + e.get("message") or e.get("body") or f"Unknown error: {e.get('reason', 'Unknown')}" + for e in errors + ) raise AirflowException(error_message) return diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 3d6bb97167f42..7fb6f790d8792 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -386,6 +386,27 @@ def setup_method(self) -> None: State.SUCCESS, id="409 conflict", ), + pytest.param( + HTTPResponse(body="Too many requests, please try again later.", status=429), + 0, + False, + State.FAILED, + id="429 Too Many Requests (non-JSON body)", + ), + pytest.param( + HTTPResponse(body="Too many requests, please try again later.", status=429), + 1, + False, + State.FAILED, + id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)", + ), + pytest.param( + HTTPResponse(body="", status=429), + 0, + False, + State.FAILED, + id="429 Too Many Requests (empty body)", + ), ], ) @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py index 8d43db4889f7d..595234df0897c 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py @@ -115,6 +115,29 @@ def test_execute_error(self, mock_hook, mock_log): mock_hook.return_value.check_kueue_deployment_running.assert_not_called() mock_log.info.assert_called_once_with("Kueue is already enabled for the cluster") + @mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesInstallKueueOperator.log")) + @mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesHook")) + def test_execute_non_json_response(self, mock_hook, mock_log): + """Test handling of non-JSON API response bodies (e.g., 429 errors).""" + mock_get_yaml_content_from_file = mock_hook.return_value.get_yaml_content_from_file + mock_yaml_objects = mock_get_yaml_content_from_file.return_value + mock_apply_from_yaml_file = mock_hook.return_value.apply_from_yaml_file + + # Create mock exceptions with non-JSON bodies (simulating 429 errors) + api_exceptions = [ + mock.MagicMock(body="Too many requests, please try again later.", reason="TooManyRequests"), + mock.MagicMock(body="", reason="RateLimited"), # Empty body case + ] + mock_apply_from_yaml_file.side_effect = FailToCreateError(api_exceptions) + expected_error_message = "Too many requests, please try again later.\nRateLimited" + + with pytest.raises(AirflowException, match=expected_error_message): + self.operator.execute(context=mock.MagicMock()) + + mock_get_yaml_content_from_file.assert_called_once_with(kueue_yaml_url=KUEUE_YAML_URL) + mock_apply_from_yaml_file.assert_called_once_with(yaml_objects=mock_yaml_objects) + mock_hook.return_value.check_kueue_deployment_running.assert_not_called() + class TestKubernetesStartKueueJobOperator: def test_template_fields(self):