diff --git a/airflow/providers/amazon/aws/triggers/redshift_cluster.py b/airflow/providers/amazon/aws/triggers/redshift_cluster.py index eebbfce380b80..37ca6b6d9fb12 100644 --- a/airflow/providers/amazon/aws/triggers/redshift_cluster.py +++ b/airflow/providers/amazon/aws/triggers/redshift_cluster.py @@ -311,6 +311,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: "status" ] == "error": yield TriggerEvent(res) + return await asyncio.sleep(self.poke_interval) except Exception as e: yield TriggerEvent({"status": "error", "message": str(e)}) diff --git a/airflow/providers/amazon/aws/triggers/s3.py b/airflow/providers/amazon/aws/triggers/s3.py index fbded51d2eaf5..adecde9aa1bbc 100644 --- a/airflow/providers/amazon/aws/triggers/s3.py +++ b/airflow/providers/amazon/aws/triggers/s3.py @@ -98,8 +98,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) await asyncio.sleep(self.poke_interval) yield TriggerEvent({"status": "running", "files": s3_objects}) - - yield TriggerEvent({"status": "success"}) + else: + yield TriggerEvent({"status": "success"}) + return self.log.info("Sleeping for %s seconds", self.poke_interval) await asyncio.sleep(self.poke_interval) @@ -204,6 +205,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) if result.get("status") in ("success", "error"): yield TriggerEvent(result) + return elif result.get("status") == "pending": self.previous_objects = result.get("previous_objects", set()) self.last_activity_time = result.get("last_activity_time") diff --git a/airflow/providers/amazon/aws/triggers/sagemaker.py b/airflow/providers/amazon/aws/triggers/sagemaker.py index b67759bf8967c..8f104187634bd 100644 --- a/airflow/providers/amazon/aws/triggers/sagemaker.py +++ b/airflow/providers/amazon/aws/triggers/sagemaker.py @@ -245,8 +245,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: job_already_completed = status not in self.hook.non_terminal_states state = LogState.COMPLETE if job_already_completed else LogState.TAILING last_describe_job_call = time.time() - while True: - try: + try: + while True: ( state, last_description, @@ -267,6 +267,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: reason = last_description.get("FailureReason", "(No reason provided)") error_message = f"SageMaker job failed because {reason}" yield TriggerEvent({"status": "error", "message": error_message}) + return else: billable_seconds = SageMakerHook.count_billable_seconds( training_start_time=last_description["TrainingStartTime"], @@ -275,5 +276,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) self.log.info("Billable seconds: %d", billable_seconds) yield TriggerEvent({"status": "success", "message": last_description}) - except Exception as e: - yield TriggerEvent({"status": "error", "message": str(e)}) + return + except Exception as e: + yield TriggerEvent({"status": "error", "message": str(e)})