Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/providers/amazon/aws/triggers/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
"status"
] == "error":
yield TriggerEvent(res)
return
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
6 changes: 4 additions & 2 deletions airflow/providers/amazon/aws/triggers/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)
await asyncio.sleep(self.poke_interval)
yield TriggerEvent({"status": "running", "files": s3_objects})

yield TriggerEvent({"status": "success"})
else:
yield TriggerEvent({"status": "success"})
return

self.log.info("Sleeping for %s seconds", self.poke_interval)
await asyncio.sleep(self.poke_interval)
Expand Down Expand Up @@ -204,6 +205,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)
if result.get("status") in ("success", "error"):
yield TriggerEvent(result)
return
elif result.get("status") == "pending":
self.previous_objects = result.get("previous_objects", set())
self.last_activity_time = result.get("last_activity_time")
Expand Down
10 changes: 6 additions & 4 deletions airflow/providers/amazon/aws/triggers/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
job_already_completed = status not in self.hook.non_terminal_states
state = LogState.COMPLETE if job_already_completed else LogState.TAILING
last_describe_job_call = time.time()
while True:
try:
try:
while True:
(
state,
last_description,
Expand All @@ -267,6 +267,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
reason = last_description.get("FailureReason", "(No reason provided)")
error_message = f"SageMaker job failed because {reason}"
yield TriggerEvent({"status": "error", "message": error_message})
return
else:
billable_seconds = SageMakerHook.count_billable_seconds(
training_start_time=last_description["TrainingStartTime"],
Expand All @@ -275,5 +276,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)
self.log.info("Billable seconds: %d", billable_seconds)
yield TriggerEvent({"status": "success", "message": last_description})
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
return
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})