diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py
index 360ecdf277e76..b3aa88d96f589 100644
--- a/airflow/api_connexion/schemas/task_instance_schema.py
+++ b/airflow/api_connexion/schemas/task_instance_schema.py
@@ -60,6 +60,7 @@ class Meta:
priority_weight = auto_field()
operator = auto_field()
queued_dttm = auto_field(data_key="queued_when")
+ scheduled_dttm = auto_field(data_key="scheduled_when")
pid = auto_field()
executor = auto_field()
executor_config = auto_field()
@@ -102,6 +103,7 @@ class Meta:
priority_weight = auto_field()
operator = auto_field()
queued_dttm = auto_field(data_key="queued_when")
+ scheduled_dttm = auto_field(data_key="scheduled_when")
pid = auto_field()
executor = auto_field()
executor_config = auto_field()
diff --git a/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow/api_fastapi/core_api/datamodels/task_instances.py
index d9c87b972ba9c..eaebe589613f8 100644
--- a/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -64,6 +64,7 @@ class TaskInstanceResponse(BaseModel):
priority_weight: int | None
operator: str | None
queued_dttm: datetime | None = Field(alias="queued_when")
+ scheduled_dttm: datetime | None = Field(alias="scheduled_when")
pid: int | None
executor: str | None
executor_config: Annotated[str, BeforeValidator(str)]
@@ -147,6 +148,7 @@ class TaskInstanceHistoryResponse(BaseModel):
priority_weight: int | None
operator: str | None
queued_dttm: datetime | None = Field(alias="queued_when")
+ scheduled_dttm: datetime | None = Field(alias="scheduled_when")
pid: int | None
executor: str | None
executor_config: Annotated[str, BeforeValidator(str)]
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 55f738fb1d408..e3bef11d279e3 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -9635,6 +9635,12 @@ components:
format: date-time
- type: 'null'
title: Queued When
+ scheduled_when:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Scheduled When
pid:
anyOf:
- type: integer
@@ -9669,6 +9675,7 @@ components:
- priority_weight
- operator
- queued_when
+ - scheduled_when
- pid
- executor
- executor_config
@@ -9762,6 +9769,12 @@ components:
format: date-time
- type: 'null'
title: Queued When
+ scheduled_when:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Scheduled When
pid:
anyOf:
- type: integer
@@ -9820,6 +9833,7 @@ components:
- priority_weight
- operator
- queued_when
+ - scheduled_when
- pid
- executor
- executor_config
diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py
index 92b7c2b0010ed..898bfe9c95ea4 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -775,7 +775,7 @@ def process_executor_events(
"TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s, map_index=%s, "
"run_start_date=%s, run_end_date=%s, "
"run_duration=%s, state=%s, executor=%s, executor_state=%s, try_number=%s, max_tries=%s, "
- "pool=%s, queue=%s, priority_weight=%d, operator=%s, queued_dttm=%s, "
+ "pool=%s, queue=%s, priority_weight=%d, operator=%s, queued_dttm=%s, scheduled_dttm=%s,"
"queued_by_job_id=%s, pid=%s"
)
cls.logger().info(
@@ -797,6 +797,7 @@ def process_executor_events(
ti.priority_weight,
ti.operator,
ti.queued_dttm,
+ ti.scheduled_dttm,
ti.queued_by_job_id,
ti.pid,
)
@@ -1808,6 +1809,7 @@ def _reschedule_stuck_task(self, ti: TaskInstance, session: Session):
.values(
state=TaskInstanceState.SCHEDULED,
queued_dttm=None,
+ scheduled_dttm=timezone.utcnow(),
)
.execution_options(synchronize_session=False)
)
@@ -1962,6 +1964,7 @@ def check_trigger_timeouts(
state=TaskInstanceState.SCHEDULED,
next_method=TRIGGER_FAIL_REPR,
next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT},
+ scheduled_dttm=timezone.utcnow(),
trigger_id=None,
)
).rowcount
diff --git a/airflow/migrations/versions/0057_3_0_0_add_new_task_instance_field_scheduled_.py b/airflow/migrations/versions/0057_3_0_0_add_new_task_instance_field_scheduled_.py
new file mode 100644
index 0000000000000..3f464e45d7c60
--- /dev/null
+++ b/airflow/migrations/versions/0057_3_0_0_add_new_task_instance_field_scheduled_.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+add new task_instance field scheduled_dttm.
+
+Revision ID: 33b04e4bfa19
+Revises: 8ea135928435
+Create Date: 2025-01-22 11:22:01.272681
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+# revision identifiers, used by Alembic.
+revision = "33b04e4bfa19"
+down_revision = "8ea135928435"
+branch_labels = None
+depends_on = None
+airflow_version = "3.0.0"
+
+
+def upgrade():
+ """Apply add new task_instance field scheduled_dttm."""
+ # ### commands auto generated by Alembic - please adjust! ###
+ with op.batch_alter_table("task_instance", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("scheduled_dttm", UtcDateTime(timezone=True), nullable=True))
+
+ with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("scheduled_dttm", UtcDateTime(timezone=True), nullable=True))
+
+ # ### end Alembic commands ###
+
+
+def downgrade():
+ """Unapply add new task_instance field scheduled_dttm."""
+ # ### commands auto generated by Alembic - please adjust! ###
+ with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
+ batch_op.drop_column("scheduled_dttm")
+
+ with op.batch_alter_table("task_instance", schema=None) as batch_op:
+ batch_op.drop_column("scheduled_dttm")
+
+ # ### end Alembic commands ###
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 964efab4b848b..a22452a748cec 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1666,6 +1666,7 @@ def add_logger_if_needed(ti: TaskInstance):
if s.state != TaskInstanceState.UP_FOR_RESCHEDULE:
s.try_number += 1
s.state = TaskInstanceState.SCHEDULED
+ s.scheduled_dttm = timezone.utcnow()
session.commit()
# triggerer may mark tasks scheduled so we read from DB
all_tis = set(dr.get_task_instances(session=session))
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 35d8af4322c49..727746b9b0333 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1662,6 +1662,7 @@ def schedule_tis(
)
.values(
state=TaskInstanceState.SCHEDULED,
+ scheduled_dttm=timezone.utcnow(),
try_number=case(
(
or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE),
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 093b96c2d7699..04c5e693e2489 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -824,6 +824,7 @@ def _set_ti_attrs(target, source, include_dag_run=False):
target.operator = source.operator
target.custom_operator_name = source.custom_operator_name
target.queued_dttm = source.queued_dttm
+ target.scheduled_dttm = source.scheduled_dttm
target.queued_by_job_id = source.queued_by_job_id
target.last_heartbeat_at = source.last_heartbeat_at
target.pid = source.pid
@@ -1712,6 +1713,7 @@ class TaskInstance(Base, LoggingMixin):
operator = Column(String(1000))
custom_operator_name = Column(String(1000))
queued_dttm = Column(UtcDateTime)
+ scheduled_dttm = Column(UtcDateTime)
queued_by_job_id = Column(Integer)
last_heartbeat_at = Column(UtcDateTime)
@@ -2705,23 +2707,24 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
timing = timezone.utcnow() - self.queued_dttm
elif new_state == TaskInstanceState.QUEUED:
metric_name = "scheduled_duration"
- if self.start_date is None:
- # This check does not work correctly before fields like `scheduled_dttm` are implemented.
- # TODO: Change the level to WARNING once it's viable.
- # see #30612 #34493 and #34771 for more details
- self.log.debug(
+ if self.scheduled_dttm is None:
+ self.log.warning(
"cannot record %s for task %s because previous state change time has not been saved",
metric_name,
self.task_id,
)
return
- timing = timezone.utcnow() - self.start_date
+ timing = timezone.utcnow() - self.scheduled_dttm
else:
raise NotImplementedError("no metric emission setup for state %s", new_state)
# send metric twice, once (legacy) with tags in the name and once with tags as tags
Stats.timing(f"dag.{self.dag_id}.{self.task_id}.{metric_name}", timing)
- Stats.timing(f"task.{metric_name}", timing, tags={"task_id": self.task_id, "dag_id": self.dag_id})
+ Stats.timing(
+ f"task.{metric_name}",
+ timing,
+ tags={"task_id": self.task_id, "dag_id": self.dag_id, "queue": self.queue},
+ )
def clear_next_method_args(self) -> None:
"""Ensure we unset next_method and next_kwargs to ensure that any retries don't reuse them."""
diff --git a/airflow/models/taskinstancehistory.py b/airflow/models/taskinstancehistory.py
index 9ac11cad7dba5..e97e6de22ec9a 100644
--- a/airflow/models/taskinstancehistory.py
+++ b/airflow/models/taskinstancehistory.py
@@ -77,6 +77,7 @@ class TaskInstanceHistory(Base):
operator = Column(String(1000))
custom_operator_name = Column(String(1000))
queued_dttm = Column(UtcDateTime)
+ scheduled_dttm = Column(UtcDateTime)
queued_by_job_id = Column(Integer)
pid = Column(Integer)
executor = Column(String(1000))
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 2e0fe9f7f2bbe..ce139c3134135 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -281,6 +281,7 @@ def submit_failure(cls, trigger_id, exc=None, session: Session = NEW_SESSION) ->
task_instance.trigger_id = None
# Finally, mark it as scheduled so it gets re-queued
task_instance.state = TaskInstanceState.SCHEDULED
+ task_instance.scheduled_dttm = timezone.utcnow()
@classmethod
@provide_session
diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py
index cf71f1a426ddd..4e88465d533a7 100644
--- a/airflow/triggers/base.py
+++ b/airflow/triggers/base.py
@@ -25,6 +25,7 @@
from airflow.callbacks.callback_requests import TaskCallbackRequest
from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
+from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState
@@ -172,6 +173,7 @@ def handle_submit(self, *, task_instance: TaskInstance, session: Session = NEW_S
# Set the state of the task instance to scheduled
task_instance.state = TaskInstanceState.SCHEDULED
+ task_instance.scheduled_dttm = timezone.utcnow()
class BaseTaskEndEvent(TriggerEvent):
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 8b30ecc9c1ec0..d30aacd6be03e 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -4667,6 +4667,18 @@ export const $TaskInstanceHistoryResponse = {
],
title: "Queued When",
},
+ scheduled_when: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Scheduled When",
+ },
pid: {
anyOf: [
{
@@ -4715,6 +4727,7 @@ export const $TaskInstanceHistoryResponse = {
"priority_weight",
"operator",
"queued_when",
+ "scheduled_when",
"pid",
"executor",
"executor_config",
@@ -4882,6 +4895,18 @@ export const $TaskInstanceResponse = {
],
title: "Queued When",
},
+ scheduled_when: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Scheduled When",
+ },
pid: {
anyOf: [
{
@@ -4979,6 +5004,7 @@ export const $TaskInstanceResponse = {
"priority_weight",
"operator",
"queued_when",
+ "scheduled_when",
"pid",
"executor",
"executor_config",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts
index 050ad299cf2ba..c8bf27701b10e 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1223,6 +1223,7 @@ export type TaskInstanceHistoryResponse = {
priority_weight: number | null;
operator: string | null;
queued_when: string | null;
+ scheduled_when: string | null;
pid: number | null;
executor: string | null;
executor_config: string;
@@ -1253,6 +1254,7 @@ export type TaskInstanceResponse = {
priority_weight: number | null;
operator: string | null;
queued_when: string | null;
+ scheduled_when: string | null;
pid: number | null;
executor: string | null;
executor_config: string;
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 9e21885eb3a99..0dc4dbb1b61f4 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol):
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
- "3.0.0": "8ea135928435",
+ "3.0.0": "33b04e4bfa19",
}
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256
index 5b584764eb69e..e3f420e62c1d6 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-ff7265e5bc09d6b46d8e95f0c247b3dc5b1262451ab128c711888ffafa21c9db
\ No newline at end of file
+829be35e333798f7c33c5fe0130ed12fad481c92145abc398ae23b815dd7b6ed
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg
index 378cfdc0f259c..d20fa37b8ea66 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -649,24 +649,24 @@
dagrun_asset_event
-
-dagrun_asset_event
-
-dag_run_id
-
- [INTEGER]
- NOT NULL
-
-event_id
-
- [INTEGER]
- NOT NULL
+
+dagrun_asset_event
+
+dag_run_id
+
+ [INTEGER]
+ NOT NULL
+
+event_id
+
+ [INTEGER]
+ NOT NULL
asset_event--dagrun_asset_event
-
-0..N
+
+0..N
1
@@ -709,687 +709,695 @@
task_instance
-
-task_instance
-
-id
-
- [UUID]
- NOT NULL
-
-custom_operator_name
-
- [VARCHAR(1000)]
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_version_id
-
- [UUID]
-
-duration
-
- [DOUBLE_PRECISION]
-
-end_date
-
- [TIMESTAMP]
-
-executor
-
- [VARCHAR(1000)]
-
-executor_config
-
- [BYTEA]
-
-external_executor_id
-
- [VARCHAR(250)]
-
-hostname
-
- [VARCHAR(1000)]
-
-last_heartbeat_at
-
- [TIMESTAMP]
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-max_tries
-
- [INTEGER]
-
-next_kwargs
-
- [JSON]
-
-next_method
-
- [VARCHAR(1000)]
-
-operator
-
- [VARCHAR(1000)]
-
-pid
-
- [INTEGER]
-
-pool
-
- [VARCHAR(256)]
- NOT NULL
-
-pool_slots
-
- [INTEGER]
- NOT NULL
-
-priority_weight
-
- [INTEGER]
-
-queue
-
- [VARCHAR(256)]
-
-queued_by_job_id
-
- [INTEGER]
-
-queued_dttm
-
- [TIMESTAMP]
-
-rendered_map_index
-
- [VARCHAR(250)]
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-start_date
-
- [TIMESTAMP]
-
-state
-
- [VARCHAR(20)]
-
-task_display_name
-
- [VARCHAR(2000)]
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-trigger_id
-
- [INTEGER]
-
-trigger_timeout
-
- [TIMESTAMP]
-
-try_number
-
- [INTEGER]
-
-unixname
-
- [VARCHAR(1000)]
-
-updated_at
-
- [TIMESTAMP]
+
+task_instance
+
+id
+
+ [UUID]
+ NOT NULL
+
+custom_operator_name
+
+ [VARCHAR(1000)]
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_version_id
+
+ [UUID]
+
+duration
+
+ [DOUBLE_PRECISION]
+
+end_date
+
+ [TIMESTAMP]
+
+executor
+
+ [VARCHAR(1000)]
+
+executor_config
+
+ [BYTEA]
+
+external_executor_id
+
+ [VARCHAR(250)]
+
+hostname
+
+ [VARCHAR(1000)]
+
+last_heartbeat_at
+
+ [TIMESTAMP]
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+max_tries
+
+ [INTEGER]
+
+next_kwargs
+
+ [JSON]
+
+next_method
+
+ [VARCHAR(1000)]
+
+operator
+
+ [VARCHAR(1000)]
+
+pid
+
+ [INTEGER]
+
+pool
+
+ [VARCHAR(256)]
+ NOT NULL
+
+pool_slots
+
+ [INTEGER]
+ NOT NULL
+
+priority_weight
+
+ [INTEGER]
+
+queue
+
+ [VARCHAR(256)]
+
+queued_by_job_id
+
+ [INTEGER]
+
+queued_dttm
+
+ [TIMESTAMP]
+
+rendered_map_index
+
+ [VARCHAR(250)]
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+scheduled_dttm
+
+ [TIMESTAMP]
+
+start_date
+
+ [TIMESTAMP]
+
+state
+
+ [VARCHAR(20)]
+
+task_display_name
+
+ [VARCHAR(2000)]
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+trigger_id
+
+ [INTEGER]
+
+trigger_timeout
+
+ [TIMESTAMP]
+
+try_number
+
+ [INTEGER]
+
+unixname
+
+ [VARCHAR(1000)]
+
+updated_at
+
+ [TIMESTAMP]
trigger--task_instance
-
-0..N
+
+0..N
{0,1}
task_reschedule
-
-task_reschedule
-
-id
-
- [INTEGER]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-duration
-
- [INTEGER]
- NOT NULL
-
-end_date
-
- [TIMESTAMP]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-reschedule_date
-
- [TIMESTAMP]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-start_date
-
- [TIMESTAMP]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-try_number
-
- [INTEGER]
- NOT NULL
+
+task_reschedule
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+duration
+
+ [INTEGER]
+ NOT NULL
+
+end_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+reschedule_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+start_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+try_number
+
+ [INTEGER]
+ NOT NULL
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
rendered_task_instance_fields
-
-rendered_task_instance_fields
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-k8s_pod_yaml
-
- [JSON]
-
-rendered_fields
-
- [JSON]
- NOT NULL
+
+rendered_task_instance_fields
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+k8s_pod_yaml
+
+ [JSON]
+
+rendered_fields
+
+ [JSON]
+ NOT NULL
task_instance--rendered_task_instance_fields
-
-0..N
-1
+
+0..N
+1
task_instance--rendered_task_instance_fields
-
-0..N
-1
+
+0..N
+1
task_instance--rendered_task_instance_fields
-
-0..N
-1
+
+0..N
+1
task_instance--rendered_task_instance_fields
-
-0..N
-1
+
+0..N
+1
task_map
-
-task_map
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-keys
-
- [JSON]
-
-length
-
- [INTEGER]
- NOT NULL
+
+task_map
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+keys
+
+ [JSON]
+
+length
+
+ [INTEGER]
+ NOT NULL
task_instance--task_map
-
-0..N
-1
+
+0..N
+1
task_instance--task_map
-
-0..N
-1
+
+0..N
+1
task_instance--task_map
-
-0..N
-1
+
+0..N
+1
task_instance--task_map
-
-0..N
-1
+
+0..N
+1
xcom
-
-xcom
-
-dag_run_id
-
- [INTEGER]
- NOT NULL
-
-key
-
- [VARCHAR(512)]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-timestamp
-
- [TIMESTAMP]
- NOT NULL
-
-value
-
- [JSONB]
+
+xcom
+
+dag_run_id
+
+ [INTEGER]
+ NOT NULL
+
+key
+
+ [VARCHAR(512)]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+timestamp
+
+ [TIMESTAMP]
+ NOT NULL
+
+value
+
+ [JSONB]
task_instance--xcom
-
-0..N
-1
+
+0..N
+1
task_instance--xcom
-
-0..N
-1
+
+0..N
+1
task_instance--xcom
-
-0..N
-1
+
+0..N
+1
task_instance--xcom
-
-0..N
-1
+
+0..N
+1
task_instance_note
-
-task_instance_note
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-content
-
- [VARCHAR(1000)]
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
-
-user_id
-
- [VARCHAR(128)]
+
+task_instance_note
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+content
+
+ [VARCHAR(1000)]
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+user_id
+
+ [VARCHAR(128)]
task_instance--task_instance_note
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_note
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_note
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_note
-
-0..N
-1
+
+0..N
+1
task_instance_history
-
-task_instance_history
-
-id
-
- [INTEGER]
- NOT NULL
-
-custom_operator_name
-
- [VARCHAR(1000)]
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_version_id
-
- [UUID]
-
-duration
-
- [DOUBLE_PRECISION]
-
-end_date
-
- [TIMESTAMP]
-
-executor
-
- [VARCHAR(1000)]
-
-executor_config
-
- [BYTEA]
-
-external_executor_id
-
- [VARCHAR(250)]
-
-hostname
-
- [VARCHAR(1000)]
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-max_tries
-
- [INTEGER]
-
-next_kwargs
-
- [JSON]
-
-next_method
-
- [VARCHAR(1000)]
-
-operator
-
- [VARCHAR(1000)]
-
-pid
-
- [INTEGER]
-
-pool
-
- [VARCHAR(256)]
- NOT NULL
-
-pool_slots
-
- [INTEGER]
- NOT NULL
-
-priority_weight
-
- [INTEGER]
-
-queue
-
- [VARCHAR(256)]
-
-queued_by_job_id
-
- [INTEGER]
-
-queued_dttm
-
- [TIMESTAMP]
-
-rendered_map_index
-
- [VARCHAR(250)]
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-start_date
-
- [TIMESTAMP]
-
-state
-
- [VARCHAR(20)]
-
-task_display_name
-
- [VARCHAR(2000)]
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-trigger_id
-
- [INTEGER]
-
-trigger_timeout
-
- [TIMESTAMP]
-
-try_number
-
- [INTEGER]
- NOT NULL
-
-unixname
-
- [VARCHAR(1000)]
-
-updated_at
-
- [TIMESTAMP]
+
+task_instance_history
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+custom_operator_name
+
+ [VARCHAR(1000)]
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_version_id
+
+ [UUID]
+
+duration
+
+ [DOUBLE_PRECISION]
+
+end_date
+
+ [TIMESTAMP]
+
+executor
+
+ [VARCHAR(1000)]
+
+executor_config
+
+ [BYTEA]
+
+external_executor_id
+
+ [VARCHAR(250)]
+
+hostname
+
+ [VARCHAR(1000)]
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+max_tries
+
+ [INTEGER]
+
+next_kwargs
+
+ [JSON]
+
+next_method
+
+ [VARCHAR(1000)]
+
+operator
+
+ [VARCHAR(1000)]
+
+pid
+
+ [INTEGER]
+
+pool
+
+ [VARCHAR(256)]
+ NOT NULL
+
+pool_slots
+
+ [INTEGER]
+ NOT NULL
+
+priority_weight
+
+ [INTEGER]
+
+queue
+
+ [VARCHAR(256)]
+
+queued_by_job_id
+
+ [INTEGER]
+
+queued_dttm
+
+ [TIMESTAMP]
+
+rendered_map_index
+
+ [VARCHAR(250)]
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+scheduled_dttm
+
+ [TIMESTAMP]
+
+start_date
+
+ [TIMESTAMP]
+
+state
+
+ [VARCHAR(20)]
+
+task_display_name
+
+ [VARCHAR(2000)]
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+trigger_id
+
+ [INTEGER]
+
+trigger_timeout
+
+ [TIMESTAMP]
+
+try_number
+
+ [INTEGER]
+ NOT NULL
+
+unixname
+
+ [VARCHAR(1000)]
+
+updated_at
+
+ [TIMESTAMP]
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
@@ -1744,155 +1752,155 @@
deadline
-
-deadline
-
-id
-
- [UUID]
- NOT NULL
-
-callback
-
- [VARCHAR(500)]
- NOT NULL
-
-callback_kwargs
-
- [JSON]
-
-dag_id
-
- [VARCHAR(250)]
-
-dagrun_id
-
- [INTEGER]
-
-deadline
-
- [TIMESTAMP]
- NOT NULL
+
+deadline
+
+id
+
+ [UUID]
+ NOT NULL
+
+callback
+
+ [VARCHAR(500)]
+ NOT NULL
+
+callback_kwargs
+
+ [JSON]
+
+dag_id
+
+ [VARCHAR(250)]
+
+dagrun_id
+
+ [INTEGER]
+
+deadline
+
+ [TIMESTAMP]
+ NOT NULL
dag--deadline
-
-0..N
+
+0..N
{0,1}
dag_version--task_instance
-
-0..N
+
+0..N
{0,1}
dag_run
-
-dag_run
-
-id
-
- [INTEGER]
- NOT NULL
-
-backfill_id
-
- [INTEGER]
-
-bundle_version
-
- [VARCHAR(250)]
-
-clear_number
-
- [INTEGER]
- NOT NULL
-
-conf
-
- [JSONB]
-
-creating_job_id
-
- [INTEGER]
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_version_id
-
- [UUID]
-
-data_interval_end
-
- [TIMESTAMP]
-
-data_interval_start
-
- [TIMESTAMP]
-
-end_date
-
- [TIMESTAMP]
-
-external_trigger
-
- [BOOLEAN]
-
-last_scheduling_decision
-
- [TIMESTAMP]
-
-log_template_id
-
- [INTEGER]
-
-logical_date
-
- [TIMESTAMP]
- NOT NULL
-
-queued_at
-
- [TIMESTAMP]
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-run_type
-
- [VARCHAR(50)]
- NOT NULL
-
-start_date
-
- [TIMESTAMP]
-
-state
-
- [VARCHAR(50)]
-
-triggered_by
-
- [VARCHAR(50)]
-
-updated_at
-
- [TIMESTAMP]
+
+dag_run
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+backfill_id
+
+ [INTEGER]
+
+bundle_version
+
+ [VARCHAR(250)]
+
+clear_number
+
+ [INTEGER]
+ NOT NULL
+
+conf
+
+ [JSONB]
+
+creating_job_id
+
+ [INTEGER]
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_version_id
+
+ [UUID]
+
+data_interval_end
+
+ [TIMESTAMP]
+
+data_interval_start
+
+ [TIMESTAMP]
+
+end_date
+
+ [TIMESTAMP]
+
+external_trigger
+
+ [BOOLEAN]
+
+last_scheduling_decision
+
+ [TIMESTAMP]
+
+log_template_id
+
+ [INTEGER]
+
+logical_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+queued_at
+
+ [TIMESTAMP]
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+run_type
+
+ [VARCHAR(50)]
+ NOT NULL
+
+start_date
+
+ [TIMESTAMP]
+
+state
+
+ [VARCHAR(50)]
+
+triggered_by
+
+ [VARCHAR(50)]
+
+updated_at
+
+ [TIMESTAMP]
dag_version--dag_run
-
-0..N
+
+0..N
{0,1}
@@ -1992,121 +2000,121 @@
dag_run--dagrun_asset_event
-
-0..N
-1
+
+0..N
+1
dag_run--task_instance
-
-0..N
-1
+
+0..N
+1
dag_run--task_instance
-
-0..N
-1
+
+0..N
+1
dag_run--deadline
-
-0..N
-{0,1}
+
+0..N
+{0,1}
backfill_dag_run
-
-backfill_dag_run
-
-id
-
- [INTEGER]
- NOT NULL
-
-backfill_id
-
- [INTEGER]
- NOT NULL
-
-dag_run_id
-
- [INTEGER]
-
-exception_reason
-
- [VARCHAR(250)]
-
-logical_date
-
- [TIMESTAMP]
- NOT NULL
-
-sort_ordinal
-
- [INTEGER]
- NOT NULL
+
+backfill_dag_run
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+backfill_id
+
+ [INTEGER]
+ NOT NULL
+
+dag_run_id
+
+ [INTEGER]
+
+exception_reason
+
+ [VARCHAR(250)]
+
+logical_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+sort_ordinal
+
+ [INTEGER]
+ NOT NULL
dag_run--backfill_dag_run
-
-0..N
-{0,1}
+
+0..N
+{0,1}
dag_run_note
-
-dag_run_note
-
-dag_run_id
-
- [INTEGER]
- NOT NULL
-
-content
-
- [VARCHAR(1000)]
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
-
-user_id
-
- [VARCHAR(128)]
+
+dag_run_note
+
+dag_run_id
+
+ [INTEGER]
+ NOT NULL
+
+content
+
+ [VARCHAR(1000)]
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+user_id
+
+ [VARCHAR(128)]
dag_run--dag_run_note
-
-1
-1
+
+1
+1
dag_run--task_reschedule
-
-0..N
-1
+
+0..N
+1
dag_run--task_reschedule
-
-0..N
-1
+
+0..N
+1
@@ -2137,9 +2145,9 @@
log_template--dag_run
-
-0..N
-{0,1}
+
+0..N
+{0,1}
@@ -2203,16 +2211,16 @@
backfill--dag_run
-
-0..N
-{0,1}
+
+0..N
+{0,1}
backfill--backfill_dag_run
-
-0..N
-1
+
+0..N
+1
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index b145e71d11ca3..633c2338f4064 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
-| ``8ea135928435`` (head) | ``e39a26ac59f6`` | ``3.0.0`` | Add relative fileloc column. |
+| ``33b04e4bfa19`` (head) | ``8ea135928435`` | ``3.0.0`` | add new task_instance field scheduled_dttm. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``8ea135928435`` | ``e39a26ac59f6`` | ``3.0.0`` | Add relative fileloc column. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``e39a26ac59f6`` | ``38770795785f`` | ``3.0.0`` | remove pickled data from dagrun table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
index 79dcde6cbd2a5..68ba4395cfc0a 100644
--- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
@@ -226,6 +226,7 @@ def test_mapped_task_instances(self, one_task_with_mapped_tis, session):
"priority_weight": 1,
"queue": "default",
"queued_when": None,
+ "scheduled_when": None,
"rendered_fields": {},
"rendered_map_index": None,
"start_date": "2020-01-01T00:00:00+00:00",
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index b5079c47aa17e..c51116c368ded 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -196,6 +196,7 @@ def test_should_respond_200(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "running",
"task_id": "print_the_context",
@@ -254,6 +255,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "deferred",
"task_id": "print_the_context",
@@ -301,6 +303,7 @@ def test_should_respond_200_with_task_state_in_removed(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "removed",
"task_id": "print_the_context",
@@ -344,6 +347,7 @@ def test_should_respond_200_task_instance_with_rendered(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "running",
"task_id": "print_the_context",
@@ -396,6 +400,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "running",
"task_id": "print_the_context",
@@ -2305,6 +2310,7 @@ def test_should_respond_200(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "running",
"task_id": "print_the_context",
@@ -2364,6 +2370,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "running",
"task_id": "print_the_context",
@@ -2586,6 +2593,7 @@ def test_should_respond_200(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "success",
"task_id": "print_the_context",
@@ -2621,6 +2629,7 @@ def test_should_respond_200_with_different_try_numbers(self, try_number, session
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "success" if try_number == 1 else None,
"task_id": "print_the_context",
@@ -2683,6 +2692,7 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers(self, try_
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "failed" if try_number == 1 else None,
"task_id": "print_the_context",
@@ -2745,6 +2755,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "failed",
"task_id": "print_the_context",
@@ -2780,6 +2791,7 @@ def test_should_respond_200_with_task_state_in_removed(self, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "removed",
"task_id": "print_the_context",
diff --git a/tests/api_connexion/schemas/test_task_instance_schema.py b/tests/api_connexion/schemas/test_task_instance_schema.py
index 3f130517dc3ed..a14cd7dbbd1ec 100644
--- a/tests/api_connexion/schemas/test_task_instance_schema.py
+++ b/tests/api_connexion/schemas/test_task_instance_schema.py
@@ -87,6 +87,7 @@ def test_task_instance_schema_without_rendered(self, session):
"priority_weight": 1,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00+00:00",
"state": "running",
"task_id": "TEST_TASK_ID",
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index 80cf61dc684cf..857ca1ffa65db 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -189,6 +189,7 @@ def test_should_respond_200(self, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_id": "print_the_context",
@@ -247,6 +248,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "deferred",
"task_id": "print_the_context",
@@ -294,6 +296,7 @@ def test_should_respond_200_with_task_state_in_removed(self, test_client, sessio
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "removed",
"task_id": "print_the_context",
@@ -337,6 +340,7 @@ def test_should_respond_200_task_instance_with_rendered(self, test_client, sessi
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_id": "print_the_context",
@@ -437,6 +441,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, test_client, se
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_id": "print_the_context",
@@ -1509,6 +1514,7 @@ def test_should_respond_200(self, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "success",
"task_id": "print_the_context",
@@ -1542,6 +1548,7 @@ def test_should_respond_200_with_different_try_numbers(self, test_client, try_nu
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "success" if try_number == 1 else None,
"task_id": "print_the_context",
@@ -1604,6 +1611,7 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers(
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "failed" if try_number == 1 else None,
"task_id": "print_the_context",
@@ -1664,6 +1672,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "failed",
"task_id": "print_the_context",
@@ -1698,6 +1707,7 @@ def test_should_respond_200_with_task_state_in_removed(self, test_client, sessio
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "removed",
"task_id": "print_the_context",
@@ -2146,6 +2156,7 @@ def test_should_respond_200_with_dag_run_id(self, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"rendered_fields": {},
"rendered_map_index": None,
"start_date": "2020-01-02T00:00:00Z",
@@ -2485,6 +2496,7 @@ def test_should_respond_200(self, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "success",
"task_id": "print_the_context",
@@ -2509,6 +2521,7 @@ def test_should_respond_200(self, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": None,
"task_id": "print_the_context",
@@ -2554,6 +2567,7 @@ def test_ti_in_retry_state_not_returned(self, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "success",
"task_id": "print_the_context",
@@ -2620,6 +2634,7 @@ def test_mapped_task_should_respond_200(self, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "failed",
"task_id": "print_the_context",
@@ -2644,6 +2659,7 @@ def test_mapped_task_should_respond_200(self, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": None,
"task_id": "print_the_context",
@@ -2718,6 +2734,7 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_display_name": self.TASK_ID,
@@ -2912,6 +2929,7 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_display_name": "print_the_context",
@@ -3009,6 +3027,7 @@ def test_update_mask_set_note_should_respond_200(self, test_client, session, new
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_id": self.TASK_ID,
@@ -3049,6 +3068,7 @@ def test_set_note_should_respond_200(self, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_id": self.TASK_ID,
@@ -3103,6 +3123,7 @@ def test_set_note_should_respond_200_mapped_task_instance_with_rtif(self, test_c
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_id": self.TASK_ID,
@@ -3202,6 +3223,7 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session):
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_display_name": self.TASK_ID,
@@ -3423,6 +3445,7 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
+ "scheduled_when": None,
"start_date": "2020-01-02T00:00:00Z",
"state": "running",
"task_display_name": "print_the_context",
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index f819784b3e1d1..3e4cbbd7c3cf8 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -4007,6 +4007,7 @@ def test_refresh_from_db(self, create_task_instance):
"operator": "some_custom_operator",
"custom_operator_name": "some_custom_operator",
"queued_dttm": run_date + datetime.timedelta(hours=1),
+ "scheduled_dttm": run_date + datetime.timedelta(hours=1),
"rendered_map_index": None,
"queued_by_job_id": 321,
"pid": 123,
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 44c4316058171..3efb0a13189cc 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -1094,6 +1094,7 @@ def test_task_instances(admin_client):
"queue": "default",
"queued_by_job_id": None,
"queued_dttm": None,
+ "scheduled_dttm": None,
"rendered_map_index": None,
"run_id": "TEST_DAGRUN",
"start_date": None,
@@ -1131,6 +1132,7 @@ def test_task_instances(admin_client):
"queue": "default",
"queued_by_job_id": None,
"queued_dttm": None,
+ "scheduled_dttm": None,
"rendered_map_index": None,
"run_id": "TEST_DAGRUN",
"start_date": None,
@@ -1168,6 +1170,7 @@ def test_task_instances(admin_client):
"queue": "default",
"queued_by_job_id": None,
"queued_dttm": None,
+ "scheduled_dttm": None,
"rendered_map_index": None,
"run_id": "TEST_DAGRUN",
"start_date": None,
@@ -1205,6 +1208,7 @@ def test_task_instances(admin_client):
"queue": "default",
"queued_by_job_id": None,
"queued_dttm": None,
+ "scheduled_dttm": None,
"rendered_map_index": None,
"run_id": "TEST_DAGRUN",
"start_date": None,
@@ -1242,6 +1246,7 @@ def test_task_instances(admin_client):
"queue": "default",
"queued_by_job_id": None,
"queued_dttm": None,
+ "scheduled_dttm": None,
"rendered_map_index": None,
"run_id": "TEST_DAGRUN",
"start_date": None,
@@ -1279,6 +1284,7 @@ def test_task_instances(admin_client):
"queue": "default",
"queued_by_job_id": None,
"queued_dttm": None,
+ "scheduled_dttm": None,
"rendered_map_index": None,
"run_id": "TEST_DAGRUN",
"start_date": None,
@@ -1316,6 +1322,7 @@ def test_task_instances(admin_client):
"queue": "default",
"queued_by_job_id": None,
"queued_dttm": None,
+ "scheduled_dttm": None,
"rendered_map_index": None,
"run_id": "TEST_DAGRUN",
"start_date": None,