From 84a279fcd2bcfd0ee6444345fb5fb73ba09e3ae2 Mon Sep 17 00:00:00 2001 From: evgenyslab Date: Mon, 19 Feb 2024 14:42:35 -0500 Subject: [PATCH] updated logic to allow retries to be effective --- .../providers/amazon/aws/hooks/batch_client.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index af9d79c1aeab5..e76ce1712b546 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -395,7 +395,13 @@ def get_job_description(self, job_id: str) -> dict: try: response = self.get_conn().describe_jobs(jobs=[job_id]) - return self.parse_job_description(job_id, response) + job_description = self.parse_job_description(job_id, response) + # allow us to retry getting the job description in case + # we called it before AWS could register the job + if job_description: + return job_description + else: + continue except botocore.exceptions.ClientError as err: # Allow it to retry in case of exceeded quota limit of requests to AWS API if err.response.get("Error", {}).get("Code") != "TooManyRequestsException": @@ -413,7 +419,7 @@ def get_job_description(self, job_id: str) -> dict: ) @staticmethod - def parse_job_description(job_id: str, response: dict) -> dict: + def parse_job_description(job_id: str, response: dict) -> dict | None: """ Parse job description to extract description for job_id. @@ -421,14 +427,14 @@ def parse_job_description(job_id: str, response: dict) -> dict: :param response: an API response for describe jobs - :return: an API response to describe job_id + :return: an API response to describe job_id or None if job_id not found in response - :raises: AirflowException + :raises: """ jobs = response.get("jobs", []) matching_jobs = [job for job in jobs if job.get("jobId") == job_id] if len(matching_jobs) != 1: - raise AirflowException(f"AWS Batch job ({job_id}) description error: response: {response}") + return None return matching_jobs[0]