Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from airflow import DAG, AirflowException, conf, jobs, settings
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import DagPickle, TaskInstance
from airflow.ti_deps.dep_context import SCHEDULER_QUEUED_DEPS, DepContext
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies import SCHEDULER_QUEUED_DEPS
from airflow.utils import cli as cli_utils
from airflow.utils.cli import get_dag, get_dag_by_file_location, get_dag_by_pickle, get_dags
from airflow.utils.log.logging_mixin import StreamLogWriter
Expand Down
3 changes: 2 additions & 1 deletion airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
from airflow.jobs.base_job import BaseJob
from airflow.models import DAG, DagPickle, DagRun
from airflow.models.taskinstance import TaskInstance, TaskInstanceKeyType
from airflow.ti_deps.dep_context import BACKFILL_QUEUED_DEPS, DepContext
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies import BACKFILL_QUEUED_DEPS
from airflow.utils import timezone
from airflow.utils.configuration import tmp_configuration_copy
from airflow.utils.session import provide_session
Expand Down
3 changes: 2 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
from airflow.models import DAG, DagRun, SlaMiss, errors
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.stats import Stats
from airflow.ti_deps.dep_context import SCHEDULED_DEPS, DepContext
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies import SCHEDULED_DEPS
from airflow.ti_deps.deps.pool_slots_available_dep import STATES_TO_COUNT_AS_RUNNING
from airflow.utils import asciiart, helpers, timezone
from airflow.utils.dag_processing import (
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from airflow.models.base import ID_LEN, Base
from airflow.models.taskinstance import TaskInstance as TI
from airflow.stats import Stats
from airflow.ti_deps.dep_context import SCHEDULEABLE_STATES, DepContext
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies import SCHEDULEABLE_STATES
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
from airflow.models.xcom import XCOM_RETURN_KEY, XCom
from airflow.sentry import Sentry
from airflow.stats import Stats
from airflow.ti_deps.dep_context import REQUEUEABLE_DEPS, RUNNING_DEPS, DepContext
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies import REQUEUEABLE_DEPS, RUNNING_DEPS
from airflow.utils import timezone
from airflow.utils.email import send_email
from airflow.utils.helpers import is_container
Expand Down
10 changes: 5 additions & 5 deletions airflow/task/task_runner/standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import os

import psutil
from setproctitle import setproctitle
from setproctitle import setproctitle # pylint: disable=no-name-in-module

from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.helpers import reap_process_group
Expand All @@ -47,7 +47,7 @@ def _start_by_exec(self):
subprocess = self.run_command()
return psutil.Process(subprocess.pid)

def _start_by_fork(self):
def _start_by_fork(self): # pylint: disable=inconsistent-return-statements
pid = os.fork()
if pid:
self.log.info("Started process %d to run task", pid)
Expand Down Expand Up @@ -79,9 +79,9 @@ def _start_by_fork(self):

try:
args.func(args, dag=self.dag)
os._exit(0)
except Exception:
os._exit(1)
os._exit(0) # pylint: disable=protected-access
except Exception: # pylint: disable=broad-except
os._exit(1) # pylint: disable=protected-access

def return_code(self, timeout=0):
# We call this multiple times, but we can only wait on the process once
Expand Down
99 changes: 0 additions & 99 deletions airflow/ti_deps/dep_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,6 @@
import pendulum
from sqlalchemy.orm.session import Session

from airflow.ti_deps.deps.dag_ti_slots_available_dep import DagTISlotsAvailableDep
from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep
from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep
from airflow.ti_deps.deps.dagrun_id_dep import DagrunIdDep
from airflow.ti_deps.deps.exec_date_after_start_date_dep import ExecDateAfterStartDateDep
from airflow.ti_deps.deps.pool_slots_available_dep import PoolSlotsAvailableDep
from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
from airflow.ti_deps.deps.task_not_running_dep import TaskNotRunningDep
from airflow.ti_deps.deps.valid_state_dep import ValidStateDep
from airflow.utils.state import State


Expand Down Expand Up @@ -113,92 +103,3 @@ def ensure_finished_tasks(self, dag, execution_date: pendulum.datetime, session:
session=session,
)
return self.finished_tasks


# In order to be able to get queued a task must have one of these states
SCHEDULEABLE_STATES = {
State.NONE,
State.UP_FOR_RETRY,
State.UP_FOR_RESCHEDULE,
}

RUNNABLE_STATES = {
# For cases like unit tests and run manually
State.NONE,
State.UP_FOR_RETRY,
State.UP_FOR_RESCHEDULE,
# For normal scheduler/backfill cases
State.QUEUED,
}

QUEUEABLE_STATES = {
State.SCHEDULED,
}

BACKFILL_QUEUEABLE_STATES = {
# For cases like unit tests and run manually
State.NONE,
State.UP_FOR_RESCHEDULE,
State.UP_FOR_RETRY,
# For normal backfill cases
State.SCHEDULED,
}

# Context to get the dependencies that need to be met in order for a task instance to be
# set to 'scheduled' state.
SCHEDULED_DEPS = {
RunnableExecDateDep(),
ValidStateDep(SCHEDULEABLE_STATES),
TaskNotRunningDep(),
}

# Dependencies that if met, task instance should be re-queued.
REQUEUEABLE_DEPS = {
DagTISlotsAvailableDep(),
TaskConcurrencyDep(),
PoolSlotsAvailableDep(),
}

# Dependencies that need to be met for a given task instance to be set to 'RUNNING' state.
RUNNING_DEPS = {
RunnableExecDateDep(),
ValidStateDep(RUNNABLE_STATES),
DagTISlotsAvailableDep(),
TaskConcurrencyDep(),
PoolSlotsAvailableDep(),
TaskNotRunningDep(),
}

BACKFILL_QUEUED_DEPS = {
RunnableExecDateDep(),
ValidStateDep(BACKFILL_QUEUEABLE_STATES),
DagrunRunningDep(),
TaskNotRunningDep(),
}

# TODO(aoen): SCHEDULER_QUEUED_DEPS is not coupled to actual scheduling/execution
# in any way and could easily be modified or removed from the scheduler causing
# this dependency to become outdated and incorrect. This coupling should be created
# (e.g. via a dag_deps analog of ti_deps that will be used in the scheduler code,
# or allow batch deps checks) to ensure that the logic here is equivalent to the logic
# in the scheduler.
# Right now there's one discrepancy between this context and how scheduler schedule tasks:
# Scheduler will check if the executor has the task instance--it is not possible
# to check the executor outside scheduler main process.

# Dependencies that need to be met for a given task instance to be set to 'queued' state
# by the scheduler.
# This context has more DEPs than RUNNING_DEPS, as we can have task triggered by
# components other than scheduler, e.g. webserver.
SCHEDULER_QUEUED_DEPS = {
RunnableExecDateDep(),
ValidStateDep(QUEUEABLE_STATES),
DagTISlotsAvailableDep(),
TaskConcurrencyDep(),
PoolSlotsAvailableDep(),
DagrunRunningDep(),
DagrunIdDep(),
DagUnpausedDep(),
ExecDateAfterStartDateDep(),
TaskNotRunningDep(),
}
116 changes: 116 additions & 0 deletions airflow/ti_deps/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.ti_deps.deps.dag_ti_slots_available_dep import DagTISlotsAvailableDep
from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep
from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep
from airflow.ti_deps.deps.dagrun_id_dep import DagrunIdDep
from airflow.ti_deps.deps.exec_date_after_start_date_dep import ExecDateAfterStartDateDep
from airflow.ti_deps.deps.pool_slots_available_dep import PoolSlotsAvailableDep
from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
from airflow.ti_deps.deps.task_not_running_dep import TaskNotRunningDep
from airflow.ti_deps.deps.valid_state_dep import ValidStateDep
from airflow.utils.state import State

# In order to be able to get queued a task must have one of these states
SCHEDULEABLE_STATES = {
State.NONE,
State.UP_FOR_RETRY,
State.UP_FOR_RESCHEDULE,
}

RUNNABLE_STATES = {
# For cases like unit tests and run manually
State.NONE,
State.UP_FOR_RETRY,
State.UP_FOR_RESCHEDULE,
# For normal scheduler/backfill cases
State.QUEUED,
}

QUEUEABLE_STATES = {
State.SCHEDULED,
}

BACKFILL_QUEUEABLE_STATES = {
# For cases like unit tests and run manually
State.NONE,
State.UP_FOR_RESCHEDULE,
State.UP_FOR_RETRY,
# For normal backfill cases
State.SCHEDULED,
}

# Context to get the dependencies that need to be met in order for a task instance to be
# set to 'scheduled' state.
SCHEDULED_DEPS = {
RunnableExecDateDep(),
ValidStateDep(SCHEDULEABLE_STATES),
TaskNotRunningDep(),
}

# Dependencies that if met, task instance should be re-queued.
REQUEUEABLE_DEPS = {
DagTISlotsAvailableDep(),
TaskConcurrencyDep(),
PoolSlotsAvailableDep(),
}

# Dependencies that need to be met for a given task instance to be set to 'RUNNING' state.
RUNNING_DEPS = {
RunnableExecDateDep(),
ValidStateDep(RUNNABLE_STATES),
DagTISlotsAvailableDep(),
TaskConcurrencyDep(),
PoolSlotsAvailableDep(),
TaskNotRunningDep(),
}

BACKFILL_QUEUED_DEPS = {
RunnableExecDateDep(),
ValidStateDep(BACKFILL_QUEUEABLE_STATES),
DagrunRunningDep(),
TaskNotRunningDep(),
}

# TODO(aoen): SCHEDULER_QUEUED_DEPS is not coupled to actual scheduling/execution
# in any way and could easily be modified or removed from the scheduler causing
# this dependency to become outdated and incorrect. This coupling should be created
# (e.g. via a dag_deps analog of ti_deps that will be used in the scheduler code,
# or allow batch deps checks) to ensure that the logic here is equivalent to the logic
# in the scheduler.
# Right now there's one discrepancy between this context and how scheduler schedule tasks:
# Scheduler will check if the executor has the task instance--it is not possible
# to check the executor outside scheduler main process.

# Dependencies that need to be met for a given task instance to be set to 'queued' state
# by the scheduler.
# This context has more DEPs than RUNNING_DEPS, as we can have task triggered by
# components other than scheduler, e.g. webserver.
SCHEDULER_QUEUED_DEPS = {
RunnableExecDateDep(),
ValidStateDep(QUEUEABLE_STATES),
DagTISlotsAvailableDep(),
TaskConcurrencyDep(),
PoolSlotsAvailableDep(),
DagrunRunningDep(),
DagrunIdDep(),
DagUnpausedDep(),
ExecDateAfterStartDateDep(),
TaskNotRunningDep(),
}
8 changes: 3 additions & 5 deletions airflow/ti_deps/deps/base_ti_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from typing import NamedTuple

from airflow.ti_deps.dep_context import DepContext
from airflow.utils.session import provide_session


Expand All @@ -40,7 +41,7 @@ def __init__(self):
pass

def __eq__(self, other):
return type(self) == type(other)
return isinstance(self, type(other))

def __hash__(self):
return hash(type(self))
Expand All @@ -56,7 +57,7 @@ def name(self):
"""
return getattr(self, 'NAME', self.__class__.__name__)

def _get_dep_statuses(self, ti, session, dep_context=None):
def _get_dep_statuses(self, ti, session, dep_context):
"""
Abstract method that returns an iterable of TIDepStatus objects that describe
whether the given task instance has this dependency met.
Expand Down Expand Up @@ -86,9 +87,6 @@ def get_dep_statuses(self, ti, session, dep_context=None):
:param dep_context: the context for which this dependency should be evaluated for
:type dep_context: DepContext
"""
# this avoids a circular dependency
from airflow.ti_deps.dep_context import DepContext
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we had cyclic import...
Cyclic import (airflow.ti_deps.dep_context -> airflow.ti_deps.deps.dag_unpaused_dep -> airflow.ti_deps.deps.base_ti_dep) (cyclic-import)


if dep_context is None:
dep_context = DepContext()

Expand Down
3 changes: 3 additions & 0 deletions airflow/ti_deps/deps/dag_ti_slots_available_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@


class DagTISlotsAvailableDep(BaseTIDep):
"""
Determines whether a DAG maximum number of running tasks has been reached.
"""
NAME = "Task Instance Slots Available"
IGNOREABLE = True

Expand Down
3 changes: 3 additions & 0 deletions airflow/ti_deps/deps/dag_unpaused_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@


class DagUnpausedDep(BaseTIDep):
"""
Determines whether a task's DAG is not paused.
"""
NAME = "Dag Not Paused"
IGNOREABLE = True

Expand Down
3 changes: 3 additions & 0 deletions airflow/ti_deps/deps/dagrun_exists_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@


class DagrunRunningDep(BaseTIDep):
"""
Determines whether a task's DagRun is in valid state.
"""
NAME = "Dagrun Running"
IGNOREABLE = True

Expand Down
3 changes: 3 additions & 0 deletions airflow/ti_deps/deps/exec_date_after_start_date_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@


class ExecDateAfterStartDateDep(BaseTIDep):
"""
Determines whether a task's execution date is after start date.
"""
NAME = "Execution Date"
IGNOREABLE = True

Expand Down
Loading