-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.7.2
What happened?
The trigger instance handled by triggerer-2 was canceled due to reassignment to triggerer-1.
As a result, triggerer-2 cleaned up the local trigger instance, which terminated the remote job using its ID.
The trigger instance in triggerer-2 keep polling the status of remote job with the same id, but detected that the remote job was killed, so completed its run, and exited.
Consequently, the remote job failed due to the reassignment.
Production log summarized by time sequence
Time: 2024-07-11, 00:00:12 -07
Event: Start to run on worker
Runtime: worker
Log: {task_command.py:416} INFO - Running <TaskInstance: f67fb155-6ff2-4fde-bbb2-6f0ef16af05e.91fade50-a3da-4966-a0ff-44c24d59303a scheduled__2024-07-11T06:00:00+00:00 [running]> on host worker-host-3
----------------------------------
Time: 2024-07-11, 00:00:14 -07
Event: Defer to trigger
Runtime: worker
Log: Pausing task as DEFERRED. dag_id=f67fb155-6ff2-4fde-bbb2-6f0ef16af05e, task_id=91fade50-a3da-4966-a0ff-44c24d59303a, execution_date=20240711T060000, start_date=20240711T070011
----------------------------------
Time: 2024-07-11T00:00:54.788-0700
Event: Trigger instance-1 started
Runtime: triggerer-2
Log: {triggerer_job_runner.py:595} INFO - trigger f67fb155-6ff2-4fde-bbb2-6f0ef16af05e/scheduled__2024-07-11T06:00:00+00:00/91fade50-a3da-4966-a0ff-44c24d59303a/-1/1 (ID 78297) starting
----------------------------------
Time: 2024-07-11T00:00:54.788-0700
Event: Trigger instance-1 check job status
Runtime: triggerer-2
Log: {datax_trigger.py:26} INFO - Check if job finished.
----------------------------------
Time: 2024-07-11T00:00:54.823-0700
Event: Trigger instance-1 get job running
Runtime: triggerer-2
Log: {datax_trigger.py:32} INFO - {"code":"0000","message":"success","result":{"dagRunId":"manual__2024-07-11T07:00:43.473585+00:00","startTime":"2024-07-11T00:00:14+00:00","endTime":null,"status":"RUNNING","extraInfo":{"dagId":"f67fb155-6ff2-4fde-bbb2-6f0ef16af05e-61566","status":"<strong style=\"color:blue;\">RUNNING</strong>"}}}
----------------------------------
Time: 2024-07-11T00:01:00.503-0700
Event: Triggerer-2 cancel trigger due to find the trigger has been occupied by another triggerer process
Runetime: triggerer-2
Log: {triggerer_job_runner.py:607} ERROR - Trigger cancelled; message=
----------------------------------
Time: 2024-07-11, 00:01:00 -07
Event: Kyuubi Trigger cleanup called
Runtime: triggerer-2
Log: {datax_trigger.py:51} INFO - Trigger is cancelled, clean up now.
----------------------------------
Time: 2024-07-11T00:01:00.977-0700
Event: Trigger instance-1 cleanup
Runeimte: triggerer-2
Log: {triggerer_job_runner.py:621} INFO - trigger f67fb155-6ff2-4fde-bbb2-6f0ef16af05e/scheduled__2024-07-11T06:00:00+00:00/91fade50-a3da-4966-a0ff-44c24d59303a/-1/1 (ID 78297) completed
----------------------------------
Time: 2024-07-11, 00:02:00 -07
Event: Trigger check if job success
Runtime: triggerer-1
Log: {datax_trigger.py:26} INFO - Check if job finished.
----------------------------------
Time: 2024-07-11, 00:02:00 -07
Event: check job status
Runtime: triggerer-1
Log: {datax_trigger.py:32} INFO - {"code":"0000","message":"success","result":{"dagRunId":"manual__2024-07-11T07:00:43.473585+00:00","startTime":"2024-07-11T00:00:14+00:00","endTime":null,"status":"FAILED","extraInfo":{"executionLogAddress":"<a target=\"_blank\" href=\"https://host/api/v1/logs?JobId=79028\">Execution Log</a>","dagId":"f67fb155-6ff2-4fde-bbb2-6f0ef16af05e-61566","status":"<strong style=\"color:red;\">FAILED</strong>"}}}
----------------------------------
What you think should happen instead?
Triggers will be canceled and cleaned up by the current triggerer process if the current triggerer finds that those running trigger instances have been reassigned to other triggerers.
However, if the cleanup is called prematurely while the trigger is still in an active state, it could impact the same trigger when reassigned to another triggerer process.
Therefore, introducing the capability to disable this cleanup behavior in such scenarios.
How to reproduce
- Set the triggerer_health_check_threshold to a small number like 3(s).
- Make triggerer instances at least 2.
- Use blocking io like http request in the trigger's run method.
- Trigger multiple deferred tasks(for our cases: 100+ trigger jobs) in the same time.
Operating System
Ubuntu
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct