-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.7.1
What happened
We are using Airflow which handles about 100 DAGs in our company. We recently updated to version 2.7.1.
Since the update, we get a lot of WARNING logs with cannot record scheduled_duration for task %s because previous state change time has not been saved.
After investigation, we found the following
- This log comes from
airflow.models.taskinstance.TaskInstance.emit_state_change_metric - This log comes out when
self.start_date is None - Enter the branch which produces this logs when task_instance's state transitions to
TaskInstanceState.QUEUED - This pattern invocation is only from
ti.emit_state_change_metric(TaskInstanceState.QUEUED)inairflow.jobs.scheduler_job_runner.SchedulerJobRunner._executable_task_instances_to_queued - The ti passed to this method is from query in the same method, filtered by
TI.state == TaskInstanceState.SCHEDULED
When the state of ti is TaskInstanceState.SCHEDULED, it is possible that start_date is empty even in a normal case, which is why this warning appears.
Ffor example, backfill uses ti.set_state to transition to SCHEDULED, in which case start_date is always set.
On the other hand, if it is generated by models.dagrun.py or something like update(TI).values(state=TaskInstanceState.SCHEDULED), it may not be in there.
Therefore, the log level of WARNING does not seem appropriate, as this log contains content that occurs in many normal conditions.
What you think should happen instead
The immediate desired simple solution is to have the relevant log removed or dropped to a log level such as debug.
This log was introduced in the PR below. Looking at the conversations, a more fundamental solution may be to make the metrics work accurately for the intent.
To do so, it is necessary to ensure that start_date is set at the stage of transition to SCHEDULED, for example, it may be necessary to do the following
- At the time of
__init__, set start_date if state isSCHEDULEDor its downstream - Make state as a property, and whenever start_date is changed, run a process equivalent to
set_state
However, this seems like to make a wide range of effect and I am not sure if it is realistic from my point of view.
How to reproduce
-
Procedure to reproduce warning logs
- Simply check out and start Airflow on the main or 2.7.1 branch and run any DAG with a schedule such as
example_bash_operatorand you will get this warning.
- Simply check out and start Airflow on the main or 2.7.1 branch and run any DAG with a schedule such as
-
Procedure to confirm that start_date is not filled when transitioning to
TaskInstanceState.SCHEDULED- Add the following code to test_dagrun.py
def test_schedule_tis_state_transition(dag_maker, session):
with dag_maker(session=session, dag_id="test"):
task = BaseOperator(task_id="task_1")
dr = DagRun(dag_id="test", run_id="test", run_type=DagRunType.MANUAL)
ti = TI(task=task, run_id=dr.run_id, map_index=0, state=None)
session.add_all((dr, ti))
session.flush()
assert ti.state is None
assert ti.start_date is None
assert dr.schedule_tis((ti,), session=session) == 1
session.refresh(ti)
assert ti.state == TaskInstanceState.SCHEDULED
assert ti.start_date is NoneOperating System
Linux and MacOS
Versions of Apache Airflow Providers
No response
Deployment
Other Docker-based deployment
Deployment details
No response
Anything else
I can create a PR if the log level change seems to be sufficient to solve the problem!
However, a fundamental solution is difficult with my experience with this repository and should be left to someone more experienced.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct