diff --git a/scripts/perf/scheduler_dag_execution_timing.py b/scripts/perf/scheduler_dag_execution_timing.py index 1b7a36f9db52c..c3c4fc61c35b9 100755 --- a/scripts/perf/scheduler_dag_execution_timing.py +++ b/scripts/perf/scheduler_dag_execution_timing.py @@ -131,11 +131,18 @@ def create_dag_runs(dag, num_runs, session): from airflow.utils import timezone from airflow.utils.state import State + try: + from airflow.utils.types import DagRunType + ID_PREFIX = f'{DagRunType.SCHEDULED.value}__' + except ImportError: + from airflow.models.dagrun import DagRun + ID_PREFIX = DagRun.ID_PREFIX + next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) for _ in range(num_runs): dag.create_dagrun( - run_id="scheduled__" + next_run_date.isoformat(), + run_id=ID_PREFIX + next_run_date.isoformat(), execution_date=next_run_date, start_date=timezone.utcnow(), state=State.RUNNING,