Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions airflow/models/skipmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.

import warnings
from typing import TYPE_CHECKING, Iterable, Union
from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Union

from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
Expand All @@ -26,6 +26,7 @@
from airflow.utils.state import State

if TYPE_CHECKING:
from pendulum import DateTime
from sqlalchemy import Session

from airflow.models import DagRun
Expand Down Expand Up @@ -66,9 +67,9 @@ def _set_state_to_skipped(self, dag_run: "DagRun", tasks: "Iterable[BaseOperator
def skip(
self,
dag_run: "DagRun",
execution_date: "timezone.DateTime",
tasks: "Iterable[BaseOperator]",
session: "Session" = None,
execution_date: "DateTime",
tasks: Sequence["BaseOperator"],
session: "Session",
):
"""
Sets tasks instances to skipped from the same dag run.
Expand Down Expand Up @@ -114,11 +115,7 @@ def skip(
session.commit()

# SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
try:
task_id = self.task_id
except AttributeError:
task_id = None

task_id: Optional[str] = getattr(self, "task_id", None)
if task_id is not None:
from airflow.models.xcom import XCom

Expand Down
Loading