-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
What do you see as an issue?
There was a problem with the emitting of statistics logs regarding execution time introduced in #30612 .
Problem details:
- Create metrics to track Scheduled->Queued->Running task state transition times #30612 (comment)
- Suppress false warning when TI state is QUEUED and TI doesn't have a start_date #34771 (comment)
When issuing such statistical indicators of execution time, it is important to first clarify what the corresponding time fields are for each state, and to make recognition and implementation consistent.
The time fields corresponding to the transitions to each state of TIs, as I read the code, are as follows:
SCHEDULED: (no time field for this state currently exists)
QUEUED: queued_dttm
RUNNING: start_date
SUCCESS: end_date
FAILED: end_date
SKIPPED: end_date
UPSTREAM_FAILED: end_date
REMOVED: end_date
UP_FOR_RETRY: end_date
UP_FOR_RESCHEDULE: (uncertain)
RESTARTING: (uncertain)
DEFERRED: (uncertain)
So as I understood, the matrix of states and TI dates should look like this (corrections welcome):
SCHEDULED - queued_dttm: no, start_date: no, end_date: no
QUEUED - queued_dttm: yes, start_date: no, end_date: no
RUNNING - queued_dttm: yes, start_date: yes, end_date: no
SUCCESS - queued_dttm: yes, start_date: yes, end_date: yes(1)
FAILED - queued_dttm: yes, start_date: yes, end_date: yes(1)
SKIPPED - queued_dttm: yes, start_date: yes, end_date: yes(1)
UPSTREAM_FAILED - queued_dttm: yes, start_date: yes, end_date: yes(1)
REMOVED - queued_dttm: yes, start_date: yes, end_date: yes(1)
UP_FOR_RETRY - queued_dttm: yes, start_date: yes, end_date: yes(1)
UP_FOR_RESCHEDULE - queued_dttm: yes, start_date: yes, end_date: yes?(2)
RESTARTING - queued_dttm: no?, start_date: no?, end_date: no?(3)
DEFERRED - queued_dttm: no?, start_date: no?, end_date: no?(4)
(1)
airflow/airflow/models/taskinstance.py
Lines 1894 to 1896 in fce3a58
| if ti.state in State.finished or ti.state == TaskInstanceState.UP_FOR_RETRY: | |
| ti.end_date = ti.end_date or current_time | |
| ti.duration = (ti.end_date - ti.start_date).total_seconds() |
and
airflow/airflow/utils/state.py
Lines 160 to 168 in 33e5d03
| finished: frozenset[TaskInstanceState] = frozenset( | |
| [ | |
| TaskInstanceState.SUCCESS, | |
| TaskInstanceState.FAILED, | |
| TaskInstanceState.SKIPPED, | |
| TaskInstanceState.UPSTREAM_FAILED, | |
| TaskInstanceState.REMOVED, | |
| ] | |
| ) |
(2)
airflow/airflow/models/taskinstance.py
Line 2829 in fce3a58
| self.end_date = timezone.utcnow() |
Is this state being handled this way because the task is interpreted as finished but waiting to be picked up by the scheduler?
(3)
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#task-instances
One interpretation of the above diagram should look like this.
If RESTARTING and UP_FOR_RETRY are interpreted as pre-scheduled states, then all times should be no.
If we interpret them as finished states like (2), then all times should be yes.
We could also treat these as having indeterminate time field states, but that would probably not be a good idea.
(4)
Undocumented, but commented as Deferrable operator waiting on a trigger on this line.
airflow/airflow/utils/state.py
Line 59 in 33e5d03
| DEFERRED = "deferred" # Deferrable operator waiting on a trigger |
And these lines imply that this state is a pre-scheduled state
airflow/airflow/models/trigger.py
Lines 191 to 201 in db3181c
| TaskInstance.trigger_id == trigger_id, TaskInstance.state == TaskInstanceState.DEFERRED | |
| ) | |
| ): | |
| # Add the event's payload into the kwargs for the task | |
| next_kwargs = task_instance.next_kwargs or {} | |
| next_kwargs["event"] = event.payload | |
| task_instance.next_kwargs = next_kwargs | |
| # Remove ourselves as its trigger | |
| task_instance.trigger_id = None | |
| # Finally, mark it as scheduled so it gets re-queued | |
| task_instance.state = TaskInstanceState.SCHEDULED |
Solving the problem
So it will be great that
- Fill in the matrix as above by community consensus (and write it into the official document if we can)
- Fix codes that do not follow the matrix above
- Add tests or debug logs that can check for incorrect state and time combinations
Is it worth working on this?
In the first, I tried to tackle the set_state issue mentioned in #34771 (comment) .
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct