From bfdf5ab928227a188b83b189d71031615f200a9e Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Fri, 21 Feb 2020 09:50:49 +0100 Subject: [PATCH] [AIRFLOW-6863] Make airflow/task and airflow/ti_deps pylint compatible --- airflow/cli/commands/task_command.py | 3 +- airflow/jobs/backfill_job.py | 3 +- airflow/jobs/scheduler_job.py | 3 +- airflow/models/dagrun.py | 3 +- airflow/models/taskinstance.py | 3 +- .../task/task_runner/standard_task_runner.py | 10 +- airflow/ti_deps/dep_context.py | 99 ----------- airflow/ti_deps/dependencies.py | 116 +++++++++++++ airflow/ti_deps/deps/base_ti_dep.py | 8 +- .../deps/dag_ti_slots_available_dep.py | 3 + airflow/ti_deps/deps/dag_unpaused_dep.py | 3 + airflow/ti_deps/deps/dagrun_exists_dep.py | 3 + .../deps/exec_date_after_start_date_dep.py | 3 + .../ti_deps/deps/not_in_retry_period_dep.py | 6 +- airflow/ti_deps/deps/ready_to_reschedule.py | 3 + .../ti_deps/deps/runnable_exec_date_dep.py | 3 + airflow/ti_deps/deps/trigger_rule_dep.py | 49 +++--- airflow/ti_deps/deps/valid_state_dep.py | 8 +- airflow/www/views.py | 3 +- scripts/ci/pylint_todo.txt | 157 ++++++++---------- tests/models/test_taskinstance.py | 2 +- tests/ti_deps/deps/test_trigger_rule_dep.py | 9 +- tests/www/test_views.py | 2 +- 23 files changed, 262 insertions(+), 240 deletions(-) create mode 100644 airflow/ti_deps/dependencies.py diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index d5ae41a3d4b77..a7c73b486bf52 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -29,7 +29,8 @@ from airflow import DAG, AirflowException, conf, jobs, settings from airflow.executors.executor_loader import ExecutorLoader from airflow.models import DagPickle, TaskInstance -from airflow.ti_deps.dep_context import SCHEDULER_QUEUED_DEPS, DepContext +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.dependencies import SCHEDULER_QUEUED_DEPS from airflow.utils import cli as cli_utils from airflow.utils.cli import get_dag, get_dag_by_file_location, get_dag_by_pickle, get_dags from airflow.utils.log.logging_mixin import StreamLogWriter diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index 39d6835e3f827..075b95af31f72 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -35,7 +35,8 @@ from airflow.jobs.base_job import BaseJob from airflow.models import DAG, DagPickle, DagRun from airflow.models.taskinstance import TaskInstance, TaskInstanceKeyType -from airflow.ti_deps.dep_context import BACKFILL_QUEUED_DEPS, DepContext +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.dependencies import BACKFILL_QUEUED_DEPS from airflow.utils import timezone from airflow.utils.configuration import tmp_configuration_copy from airflow.utils.session import provide_session diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index f40d060c40357..5bc0438f936aa 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -42,7 +42,8 @@ from airflow.models import DAG, DagRun, SlaMiss, errors from airflow.models.taskinstance import SimpleTaskInstance from airflow.stats import Stats -from airflow.ti_deps.dep_context import SCHEDULED_DEPS, DepContext +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.dependencies import SCHEDULED_DEPS from airflow.ti_deps.deps.pool_slots_available_dep import STATES_TO_COUNT_AS_RUNNING from airflow.utils import asciiart, helpers, timezone from airflow.utils.dag_processing import ( diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index f2863bbdf4597..40ac199b574e7 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -28,7 +28,8 @@ from airflow.models.base import ID_LEN, Base from airflow.models.taskinstance import TaskInstance as TI from airflow.stats import Stats -from airflow.ti_deps.dep_context import SCHEDULEABLE_STATES, DepContext +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.dependencies import SCHEDULEABLE_STATES from airflow.utils import timezone from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import provide_session diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index d45b5179b96e7..6b70267f65f88 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -50,7 +50,8 @@ from airflow.models.xcom import XCOM_RETURN_KEY, XCom from airflow.sentry import Sentry from airflow.stats import Stats -from airflow.ti_deps.dep_context import REQUEUEABLE_DEPS, RUNNING_DEPS, DepContext +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.dependencies import REQUEUEABLE_DEPS, RUNNING_DEPS from airflow.utils import timezone from airflow.utils.email import send_email from airflow.utils.helpers import is_container diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index b88245692ecb0..a2dbf98e5cf4e 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -20,7 +20,7 @@ import os import psutil -from setproctitle import setproctitle +from setproctitle import setproctitle # pylint: disable=no-name-in-module from airflow.task.task_runner.base_task_runner import BaseTaskRunner from airflow.utils.helpers import reap_process_group @@ -47,7 +47,7 @@ def _start_by_exec(self): subprocess = self.run_command() return psutil.Process(subprocess.pid) - def _start_by_fork(self): + def _start_by_fork(self): # pylint: disable=inconsistent-return-statements pid = os.fork() if pid: self.log.info("Started process %d to run task", pid) @@ -79,9 +79,9 @@ def _start_by_fork(self): try: args.func(args, dag=self.dag) - os._exit(0) - except Exception: - os._exit(1) + os._exit(0) # pylint: disable=protected-access + except Exception: # pylint: disable=broad-except + os._exit(1) # pylint: disable=protected-access def return_code(self, timeout=0): # We call this multiple times, but we can only wait on the process once diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py index d606f701333b7..ae5b1db0f9a38 100644 --- a/airflow/ti_deps/dep_context.py +++ b/airflow/ti_deps/dep_context.py @@ -19,16 +19,6 @@ import pendulum from sqlalchemy.orm.session import Session -from airflow.ti_deps.deps.dag_ti_slots_available_dep import DagTISlotsAvailableDep -from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep -from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep -from airflow.ti_deps.deps.dagrun_id_dep import DagrunIdDep -from airflow.ti_deps.deps.exec_date_after_start_date_dep import ExecDateAfterStartDateDep -from airflow.ti_deps.deps.pool_slots_available_dep import PoolSlotsAvailableDep -from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep -from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep -from airflow.ti_deps.deps.task_not_running_dep import TaskNotRunningDep -from airflow.ti_deps.deps.valid_state_dep import ValidStateDep from airflow.utils.state import State @@ -113,92 +103,3 @@ def ensure_finished_tasks(self, dag, execution_date: pendulum.datetime, session: session=session, ) return self.finished_tasks - - -# In order to be able to get queued a task must have one of these states -SCHEDULEABLE_STATES = { - State.NONE, - State.UP_FOR_RETRY, - State.UP_FOR_RESCHEDULE, -} - -RUNNABLE_STATES = { - # For cases like unit tests and run manually - State.NONE, - State.UP_FOR_RETRY, - State.UP_FOR_RESCHEDULE, - # For normal scheduler/backfill cases - State.QUEUED, -} - -QUEUEABLE_STATES = { - State.SCHEDULED, -} - -BACKFILL_QUEUEABLE_STATES = { - # For cases like unit tests and run manually - State.NONE, - State.UP_FOR_RESCHEDULE, - State.UP_FOR_RETRY, - # For normal backfill cases - State.SCHEDULED, -} - -# Context to get the dependencies that need to be met in order for a task instance to be -# set to 'scheduled' state. -SCHEDULED_DEPS = { - RunnableExecDateDep(), - ValidStateDep(SCHEDULEABLE_STATES), - TaskNotRunningDep(), -} - -# Dependencies that if met, task instance should be re-queued. -REQUEUEABLE_DEPS = { - DagTISlotsAvailableDep(), - TaskConcurrencyDep(), - PoolSlotsAvailableDep(), -} - -# Dependencies that need to be met for a given task instance to be set to 'RUNNING' state. -RUNNING_DEPS = { - RunnableExecDateDep(), - ValidStateDep(RUNNABLE_STATES), - DagTISlotsAvailableDep(), - TaskConcurrencyDep(), - PoolSlotsAvailableDep(), - TaskNotRunningDep(), -} - -BACKFILL_QUEUED_DEPS = { - RunnableExecDateDep(), - ValidStateDep(BACKFILL_QUEUEABLE_STATES), - DagrunRunningDep(), - TaskNotRunningDep(), -} - -# TODO(aoen): SCHEDULER_QUEUED_DEPS is not coupled to actual scheduling/execution -# in any way and could easily be modified or removed from the scheduler causing -# this dependency to become outdated and incorrect. This coupling should be created -# (e.g. via a dag_deps analog of ti_deps that will be used in the scheduler code, -# or allow batch deps checks) to ensure that the logic here is equivalent to the logic -# in the scheduler. -# Right now there's one discrepancy between this context and how scheduler schedule tasks: -# Scheduler will check if the executor has the task instance--it is not possible -# to check the executor outside scheduler main process. - -# Dependencies that need to be met for a given task instance to be set to 'queued' state -# by the scheduler. -# This context has more DEPs than RUNNING_DEPS, as we can have task triggered by -# components other than scheduler, e.g. webserver. -SCHEDULER_QUEUED_DEPS = { - RunnableExecDateDep(), - ValidStateDep(QUEUEABLE_STATES), - DagTISlotsAvailableDep(), - TaskConcurrencyDep(), - PoolSlotsAvailableDep(), - DagrunRunningDep(), - DagrunIdDep(), - DagUnpausedDep(), - ExecDateAfterStartDateDep(), - TaskNotRunningDep(), -} diff --git a/airflow/ti_deps/dependencies.py b/airflow/ti_deps/dependencies.py new file mode 100644 index 0000000000000..1663cc91a66d7 --- /dev/null +++ b/airflow/ti_deps/dependencies.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.ti_deps.deps.dag_ti_slots_available_dep import DagTISlotsAvailableDep +from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep +from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep +from airflow.ti_deps.deps.dagrun_id_dep import DagrunIdDep +from airflow.ti_deps.deps.exec_date_after_start_date_dep import ExecDateAfterStartDateDep +from airflow.ti_deps.deps.pool_slots_available_dep import PoolSlotsAvailableDep +from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep +from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep +from airflow.ti_deps.deps.task_not_running_dep import TaskNotRunningDep +from airflow.ti_deps.deps.valid_state_dep import ValidStateDep +from airflow.utils.state import State + +# In order to be able to get queued a task must have one of these states +SCHEDULEABLE_STATES = { + State.NONE, + State.UP_FOR_RETRY, + State.UP_FOR_RESCHEDULE, +} + +RUNNABLE_STATES = { + # For cases like unit tests and run manually + State.NONE, + State.UP_FOR_RETRY, + State.UP_FOR_RESCHEDULE, + # For normal scheduler/backfill cases + State.QUEUED, +} + +QUEUEABLE_STATES = { + State.SCHEDULED, +} + +BACKFILL_QUEUEABLE_STATES = { + # For cases like unit tests and run manually + State.NONE, + State.UP_FOR_RESCHEDULE, + State.UP_FOR_RETRY, + # For normal backfill cases + State.SCHEDULED, +} + +# Context to get the dependencies that need to be met in order for a task instance to be +# set to 'scheduled' state. +SCHEDULED_DEPS = { + RunnableExecDateDep(), + ValidStateDep(SCHEDULEABLE_STATES), + TaskNotRunningDep(), +} + +# Dependencies that if met, task instance should be re-queued. +REQUEUEABLE_DEPS = { + DagTISlotsAvailableDep(), + TaskConcurrencyDep(), + PoolSlotsAvailableDep(), +} + +# Dependencies that need to be met for a given task instance to be set to 'RUNNING' state. +RUNNING_DEPS = { + RunnableExecDateDep(), + ValidStateDep(RUNNABLE_STATES), + DagTISlotsAvailableDep(), + TaskConcurrencyDep(), + PoolSlotsAvailableDep(), + TaskNotRunningDep(), +} + +BACKFILL_QUEUED_DEPS = { + RunnableExecDateDep(), + ValidStateDep(BACKFILL_QUEUEABLE_STATES), + DagrunRunningDep(), + TaskNotRunningDep(), +} + +# TODO(aoen): SCHEDULER_QUEUED_DEPS is not coupled to actual scheduling/execution +# in any way and could easily be modified or removed from the scheduler causing +# this dependency to become outdated and incorrect. This coupling should be created +# (e.g. via a dag_deps analog of ti_deps that will be used in the scheduler code, +# or allow batch deps checks) to ensure that the logic here is equivalent to the logic +# in the scheduler. +# Right now there's one discrepancy between this context and how scheduler schedule tasks: +# Scheduler will check if the executor has the task instance--it is not possible +# to check the executor outside scheduler main process. + +# Dependencies that need to be met for a given task instance to be set to 'queued' state +# by the scheduler. +# This context has more DEPs than RUNNING_DEPS, as we can have task triggered by +# components other than scheduler, e.g. webserver. +SCHEDULER_QUEUED_DEPS = { + RunnableExecDateDep(), + ValidStateDep(QUEUEABLE_STATES), + DagTISlotsAvailableDep(), + TaskConcurrencyDep(), + PoolSlotsAvailableDep(), + DagrunRunningDep(), + DagrunIdDep(), + DagUnpausedDep(), + ExecDateAfterStartDateDep(), + TaskNotRunningDep(), +} diff --git a/airflow/ti_deps/deps/base_ti_dep.py b/airflow/ti_deps/deps/base_ti_dep.py index e7491ad3c19bb..9d98b9669b619 100644 --- a/airflow/ti_deps/deps/base_ti_dep.py +++ b/airflow/ti_deps/deps/base_ti_dep.py @@ -18,6 +18,7 @@ from typing import NamedTuple +from airflow.ti_deps.dep_context import DepContext from airflow.utils.session import provide_session @@ -40,7 +41,7 @@ def __init__(self): pass def __eq__(self, other): - return type(self) == type(other) + return isinstance(self, type(other)) def __hash__(self): return hash(type(self)) @@ -56,7 +57,7 @@ def name(self): """ return getattr(self, 'NAME', self.__class__.__name__) - def _get_dep_statuses(self, ti, session, dep_context=None): + def _get_dep_statuses(self, ti, session, dep_context): """ Abstract method that returns an iterable of TIDepStatus objects that describe whether the given task instance has this dependency met. @@ -86,9 +87,6 @@ def get_dep_statuses(self, ti, session, dep_context=None): :param dep_context: the context for which this dependency should be evaluated for :type dep_context: DepContext """ - # this avoids a circular dependency - from airflow.ti_deps.dep_context import DepContext - if dep_context is None: dep_context = DepContext() diff --git a/airflow/ti_deps/deps/dag_ti_slots_available_dep.py b/airflow/ti_deps/deps/dag_ti_slots_available_dep.py index 68ec2c6a48c3d..be7d7c790ae9f 100644 --- a/airflow/ti_deps/deps/dag_ti_slots_available_dep.py +++ b/airflow/ti_deps/deps/dag_ti_slots_available_dep.py @@ -21,6 +21,9 @@ class DagTISlotsAvailableDep(BaseTIDep): + """ + Determines whether a DAG maximum number of running tasks has been reached. + """ NAME = "Task Instance Slots Available" IGNOREABLE = True diff --git a/airflow/ti_deps/deps/dag_unpaused_dep.py b/airflow/ti_deps/deps/dag_unpaused_dep.py index 1c2eb6b0bb8c9..343f5078ea488 100644 --- a/airflow/ti_deps/deps/dag_unpaused_dep.py +++ b/airflow/ti_deps/deps/dag_unpaused_dep.py @@ -21,6 +21,9 @@ class DagUnpausedDep(BaseTIDep): + """ + Determines whether a task's DAG is not paused. + """ NAME = "Dag Not Paused" IGNOREABLE = True diff --git a/airflow/ti_deps/deps/dagrun_exists_dep.py b/airflow/ti_deps/deps/dagrun_exists_dep.py index ff2b8a1d60621..92c0ad8b3004b 100644 --- a/airflow/ti_deps/deps/dagrun_exists_dep.py +++ b/airflow/ti_deps/deps/dagrun_exists_dep.py @@ -22,6 +22,9 @@ class DagrunRunningDep(BaseTIDep): + """ + Determines whether a task's DagRun is in valid state. + """ NAME = "Dagrun Running" IGNOREABLE = True diff --git a/airflow/ti_deps/deps/exec_date_after_start_date_dep.py b/airflow/ti_deps/deps/exec_date_after_start_date_dep.py index 861f7887faea2..266a61ff5f3fe 100644 --- a/airflow/ti_deps/deps/exec_date_after_start_date_dep.py +++ b/airflow/ti_deps/deps/exec_date_after_start_date_dep.py @@ -21,6 +21,9 @@ class ExecDateAfterStartDateDep(BaseTIDep): + """ + Determines whether a task's execution date is after start date. + """ NAME = "Execution Date" IGNOREABLE = True diff --git a/airflow/ti_deps/deps/not_in_retry_period_dep.py b/airflow/ti_deps/deps/not_in_retry_period_dep.py index 7d228cf098eb1..de8d0847af2b7 100644 --- a/airflow/ti_deps/deps/not_in_retry_period_dep.py +++ b/airflow/ti_deps/deps/not_in_retry_period_dep.py @@ -23,6 +23,9 @@ class NotInRetryPeriodDep(BaseTIDep): + """ + Determines whether a task is not in retry period. + """ NAME = "Not In Retry Period" IGNOREABLE = True IS_TASK_DEP = True @@ -31,8 +34,7 @@ class NotInRetryPeriodDep(BaseTIDep): def _get_dep_statuses(self, ti, session, dep_context): if dep_context.ignore_in_retry_period: yield self._passing_status( - reason="The context specified that being in a retry period was " - "permitted.") + reason="The context specified that being in a retry period was permitted.") return if ti.state != State.UP_FOR_RETRY: diff --git a/airflow/ti_deps/deps/ready_to_reschedule.py b/airflow/ti_deps/deps/ready_to_reschedule.py index 82adf8f9e98db..7438c5a976e4f 100644 --- a/airflow/ti_deps/deps/ready_to_reschedule.py +++ b/airflow/ti_deps/deps/ready_to_reschedule.py @@ -24,6 +24,9 @@ class ReadyToRescheduleDep(BaseTIDep): + """ + Determines whether a task is ready to be rescheduled. + """ NAME = "Ready To Reschedule" IGNOREABLE = True IS_TASK_DEP = True diff --git a/airflow/ti_deps/deps/runnable_exec_date_dep.py b/airflow/ti_deps/deps/runnable_exec_date_dep.py index 8c04e2b5849f3..4c4c266f72a71 100644 --- a/airflow/ti_deps/deps/runnable_exec_date_dep.py +++ b/airflow/ti_deps/deps/runnable_exec_date_dep.py @@ -22,6 +22,9 @@ class RunnableExecDateDep(BaseTIDep): + """ + Determines whether a task's execution date is valid. + """ NAME = "Execution Date" IGNOREABLE = True diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index b62eb17201d11..8a89812633e09 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -34,8 +34,7 @@ class TriggerRuleDep(BaseTIDep): IS_TASK_DEP = True @staticmethod - @provide_session - def _get_states_count_upstream_ti(ti, finished_tasks, session): + def _get_states_count_upstream_ti(ti, finished_tasks): """ This function returns the states of the upstream tis for a specific ti in order to determine whether this ti can run in this iteration @@ -77,7 +76,7 @@ def _get_dep_statuses(self, ti, session, dep_context): session=session) @provide_session - def _evaluate_trigger_rule( + def _evaluate_trigger_rule( # pylint: disable=too-many-branches self, ti, successes, @@ -116,7 +115,7 @@ def _evaluate_trigger_rule( task = ti.task upstream = len(task.upstream_task_ids) - tr = task.trigger_rule + trigger_rule = task.trigger_rule upstream_done = done >= upstream upstream_tasks_state = { "total": upstream, "successes": successes, "skipped": skipped, @@ -127,87 +126,87 @@ def _evaluate_trigger_rule( # bundled together for efficiency. # handling instant state assignment based on trigger rules if flag_upstream_failed: - if tr == TR.ALL_SUCCESS: + if trigger_rule == TR.ALL_SUCCESS: if upstream_failed or failed: ti.set_state(State.UPSTREAM_FAILED, session) elif skipped: ti.set_state(State.SKIPPED, session) - elif tr == TR.ALL_FAILED: + elif trigger_rule == TR.ALL_FAILED: if successes or skipped: ti.set_state(State.SKIPPED, session) - elif tr == TR.ONE_SUCCESS: + elif trigger_rule == TR.ONE_SUCCESS: if upstream_done and not successes: ti.set_state(State.SKIPPED, session) - elif tr == TR.ONE_FAILED: + elif trigger_rule == TR.ONE_FAILED: if upstream_done and not (failed or upstream_failed): ti.set_state(State.SKIPPED, session) - elif tr == TR.NONE_FAILED: + elif trigger_rule == TR.NONE_FAILED: if upstream_failed or failed: ti.set_state(State.UPSTREAM_FAILED, session) elif skipped == upstream: ti.set_state(State.SKIPPED, session) - elif tr == TR.NONE_SKIPPED: + elif trigger_rule == TR.NONE_SKIPPED: if skipped: ti.set_state(State.SKIPPED, session) - if tr == TR.ONE_SUCCESS: + if trigger_rule == TR.ONE_SUCCESS: if successes <= 0: yield self._failing_status( reason="Task's trigger rule '{0}' requires one upstream " "task success, but none were found. " "upstream_tasks_state={1}, upstream_task_ids={2}" - .format(tr, upstream_tasks_state, task.upstream_task_ids)) - elif tr == TR.ONE_FAILED: + .format(trigger_rule, upstream_tasks_state, task.upstream_task_ids)) + elif trigger_rule == TR.ONE_FAILED: if not failed and not upstream_failed: yield self._failing_status( reason="Task's trigger rule '{0}' requires one upstream " "task failure, but none were found. " "upstream_tasks_state={1}, upstream_task_ids={2}" - .format(tr, upstream_tasks_state, task.upstream_task_ids)) - elif tr == TR.ALL_SUCCESS: + .format(trigger_rule, upstream_tasks_state, task.upstream_task_ids)) + elif trigger_rule == TR.ALL_SUCCESS: num_failures = upstream - successes if num_failures > 0: yield self._failing_status( reason="Task's trigger rule '{0}' requires all upstream " "tasks to have succeeded, but found {1} non-success(es). " "upstream_tasks_state={2}, upstream_task_ids={3}" - .format(tr, num_failures, upstream_tasks_state, + .format(trigger_rule, num_failures, upstream_tasks_state, task.upstream_task_ids)) - elif tr == TR.ALL_FAILED: + elif trigger_rule == TR.ALL_FAILED: num_successes = upstream - failed - upstream_failed if num_successes > 0: yield self._failing_status( reason="Task's trigger rule '{0}' requires all upstream " "tasks to have failed, but found {1} non-failure(s). " "upstream_tasks_state={2}, upstream_task_ids={3}" - .format(tr, num_successes, upstream_tasks_state, + .format(trigger_rule, num_successes, upstream_tasks_state, task.upstream_task_ids)) - elif tr == TR.ALL_DONE: + elif trigger_rule == TR.ALL_DONE: if not upstream_done: yield self._failing_status( reason="Task's trigger rule '{0}' requires all upstream " "tasks to have completed, but found {1} task(s) that " "weren't done. upstream_tasks_state={2}, " "upstream_task_ids={3}" - .format(tr, upstream_done, upstream_tasks_state, + .format(trigger_rule, upstream_done, upstream_tasks_state, task.upstream_task_ids)) - elif tr == TR.NONE_FAILED: + elif trigger_rule == TR.NONE_FAILED: num_failures = upstream - successes - skipped if num_failures > 0: yield self._failing_status( reason="Task's trigger rule '{0}' requires all upstream " "tasks to have succeeded or been skipped, but found {1} non-success(es). " "upstream_tasks_state={2}, upstream_task_ids={3}" - .format(tr, num_failures, upstream_tasks_state, + .format(trigger_rule, num_failures, upstream_tasks_state, task.upstream_task_ids)) - elif tr == TR.NONE_SKIPPED: + elif trigger_rule == TR.NONE_SKIPPED: if not upstream_done or (skipped > 0): yield self._failing_status( reason="Task's trigger rule '{0}' requires all upstream " "tasks to not have been skipped, but found {1} task(s) skipped. " "upstream_tasks_state={2}, upstream_task_ids={3}" - .format(tr, skipped, upstream_tasks_state, + .format(trigger_rule, skipped, upstream_tasks_state, task.upstream_task_ids)) else: yield self._failing_status( - reason="No strategy to evaluate trigger rule '{0}'.".format(tr)) + reason="No strategy to evaluate trigger rule '{0}'.".format(trigger_rule)) diff --git a/airflow/ti_deps/deps/valid_state_dep.py b/airflow/ti_deps/deps/valid_state_dep.py index 1de5b3f5b74ee..80d62088542dd 100644 --- a/airflow/ti_deps/deps/valid_state_dep.py +++ b/airflow/ti_deps/deps/valid_state_dep.py @@ -22,9 +22,6 @@ class ValidStateDep(BaseTIDep): - NAME = "Task Instance State" - IGNOREABLE = True - """ Ensures that the task instance's state is in a given set of valid states. @@ -33,6 +30,9 @@ class ValidStateDep(BaseTIDep): :type valid_states: set(str) :return: whether or not the task instance's state is valid """ + NAME = "Task Instance State" + IGNOREABLE = True + def __init__(self, valid_states): super().__init__() @@ -42,7 +42,7 @@ def __init__(self, valid_states): self._valid_states = valid_states def __eq__(self, other): - return type(self) == type(other) and self._valid_states == other._valid_states + return isinstance(self, type(other)) and self._valid_states == other._valid_states def __hash__(self): return hash((type(self), tuple(self._valid_states))) diff --git a/airflow/www/views.py b/airflow/www/views.py index b445b4cec6365..e39704b66af91 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -56,7 +56,8 @@ from airflow.executors.executor_loader import ExecutorLoader from airflow.models import Connection, DagModel, DagRun, DagTag, Log, SlaMiss, TaskFail, XCom, errors from airflow.settings import STORE_SERIALIZED_DAGS -from airflow.ti_deps.dep_context import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS, DepContext +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.dependencies import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS from airflow.utils import timezone from airflow.utils.dates import infer_time_unit, scale_time_units from airflow.utils.helpers import alchemy_to_dict, render_log_filename diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt index 54545001fa8bb..4e22f8328f1ff 100644 --- a/scripts/ci/pylint_todo.txt +++ b/scripts/ci/pylint_todo.txt @@ -3,72 +3,8 @@ ./airflow/contrib/auth/backends/google_auth.py ./airflow/contrib/auth/backends/kerberos_auth.py ./airflow/contrib/auth/backends/ldap_auth.py -./airflow/providers/microsoft/azure/hooks/azure_container_instance.py -./airflow/providers/microsoft/azure/hooks/azure_container_volume.py -./airflow/providers/microsoft/azure/hooks/azure_cosmos.py -./airflow/providers/microsoft/azure/hooks/azure_data_lake.py -./airflow/providers/microsoft/azure/hooks/azure_fileshare.py -./airflow/providers/datadog/hooks/datadog.py -./airflow/providers/dingding/hooks/dingding.py -./airflow/providers/ddiscord/hooks/discord_webhook.py -./airflow/hooks/filesystem.py -./airflow/providers/ftp/hooks/ftp.py -./airflow/providers/jenkins/hooks/jenkins.py -./airflow/providers/openfass/hooks/openfaas.py -./airflow/providers/opsgenie/hooks/opsgenie_alert.py -./airflow/providers/apache/pinot/hooks/pinot.py -./airflow/providers/qubole/hooks/qubole_check.py -./airflow/providers/segment/hooks/segment.py -./airflow/providers/slack/hooks/slack_webhook.py -./airflow/providers/apache/spark/hooks/spark_jdbc.py -./airflow/providers/apache/spark/hooks/spark_jdbc_script.py -./airflow/providers/apache/spark/hooks/spark_sql.py -./airflow/providers/apache/spark/hooks/spark_submit.py -./airflow/providers/apache/sqoop/hooks/sqoop.py -./airflow/providers/vertica/hooks/vertica.py -./airflow/providers/microsoft/azure/hooks/wasb.py -./airflow/providers/microsoft/azure/operators/adls_list.py -./airflow/providers/microsoft/azure/operators/azure_container_instances.py -./airflow/providers/microsoft/azure/operators/azure_cosmos.py -./airflow/providers/dindding/operators/dingding.py -./airflow/providers/discord/operators/discord_webhook.py -./airflow/providers/apache/druid/operators/druid.py -./airflow/providers/microsoft/azure/operators/file_to_wasb.py -./airflow/providers/grpc/operators/grpc.py -./airflow/providers/jenkins/operators/jenkins_job_trigger.py -./airflow/providers/jira/operators/jira.py -./airflow/providers/opsgenie/operators/opsgenie_alert.py -./airflow/providers/microsoft/azure/operators/oracle_to_azure_data_lake_transfer.py -./airflow/providers/oracle/operators/oracle_to_oracle_transfer.py -./airflow/providers/qubole/operators/qubole_check.py -./airflow/providers/redis/operators/redis_publish.py -./airflow/providers/google/cloud/operators/s3_to_gcs.py -./airflow/providers/segment/operators/segment_track_event.py -./airflow/providers/slack/operators/slack_webhook.py -./airflow/providers/apache/spark/operators/spark_jdbc.py -./airflow/providers/apache/spark/operators/spark_sql.py -./airflow/providers/apache/spark/operators/spark_submit.py -./airflow/providers/apache/sqoop/operators/sqoop.py -./airflow/providers/ssh/operators/ssh.py -./airflow/providers/vertica/operators/vertica.py -./airflow/providers/mysql/operators/vertica_to_mysql.py -./airflow/providers/microsoft/azure/operators/wasb_delete_blob.py -./airflow/providers/microsoft/winrm/operators/winrm.py ./airflow/hooks/dbapi_hook.py -./airflow/providers/docker/hooks/docker.py -./airflow/providers/apache/druid/hooks/druid.py -./airflow/providers/apache/hive/hooks/hive.py -./airflow/providers/http/hooks/http.py -./airflow/providers/jdbc/hooks/jdbc.py -./airflow/providers/microsoft/mssql/hooks/mssql.py -./airflow/providers/mysql/hooks/mysql.py -./airflow/providers/oracle/hooks/oracle.py -./airflow/providers/apache/pig/hooks/pig.py -./airflow/providers/postgres/hooks/postgres.py -./airflow/providers/presto/hooks/presto.py -./airflow/providers/samba/hooks/samba.py -./airflow/providers/sqlite/hooks/sqlite.py -./airflow/providers/zendesk/hooks/zendesk.py +./airflow/hooks/filesystem.py ./airflow/jobs/backfill_job.py ./airflow/jobs/base_job.py ./airflow/jobs/local_task_job.py @@ -97,43 +33,92 @@ ./airflow/operators/bash.py ./airflow/operators/check_operator.py ./airflow/operators/dagrun_operator.py -./airflow/providers/apache/druid/operators/druid_check.py ./airflow/operators/dummy_operator.py -./airflow/providers/email/operators/email.py ./airflow/operators/generic_transfer.py +./airflow/operators/jdbc_operator.py +./airflow/operators/python.py +./airflow/providers/apache/druid/hooks/druid.py +./airflow/providers/apache/druid/operators/druid.py +./airflow/providers/apache/druid/operators/druid_check.py +./airflow/providers/apache/hive/hooks/hive.py ./airflow/providers/apache/hive/operators/hive.py ./airflow/providers/apache/hive/operators/hive_stats.py +./airflow/providers/apache/pig/hooks/pig.py +./airflow/providers/apache/pig/operators/pig.py +./airflow/providers/apache/pinot/hooks/pinot.py +./airflow/providers/apache/spark/hooks/spark_jdbc.py +./airflow/providers/apache/spark/hooks/spark_jdbc_script.py +./airflow/providers/apache/spark/hooks/spark_sql.py +./airflow/providers/apache/spark/hooks/spark_submit.py +./airflow/providers/apache/spark/operators/spark_jdbc.py +./airflow/providers/apache/spark/operators/spark_sql.py +./airflow/providers/apache/spark/operators/spark_submit.py +./airflow/providers/apache/sqoop/hooks/sqoop.py +./airflow/providers/apache/sqoop/operators/sqoop.py +./airflow/providers/datadog/hooks/datadog.py +./airflow/providers/ddiscord/hooks/discord_webhook.py +./airflow/providers/dindding/operators/dingding.py +./airflow/providers/dingding/hooks/dingding.py +./airflow/providers/discord/operators/discord_webhook.py +./airflow/providers/docker/hooks/docker.py +./airflow/providers/email/operators/email.py +./airflow/providers/ftp/hooks/ftp.py +./airflow/providers/google/cloud/operators/s3_to_gcs.py +./airflow/providers/grpc/operators/grpc.py +./airflow/providers/http/hooks/http.py ./airflow/providers/http/operators/http.py -./airflow/operators/jdbc_operator.py +./airflow/providers/jdbc/hooks/jdbc.py ./airflow/providers/jdbc/operators/jdbc.py +./airflow/providers/jenkins/hooks/jenkins.py +./airflow/providers/jenkins/operators/jenkins_job_trigger.py +./airflow/providers/jira/operators/jira.py +./airflow/providers/microsoft/azure/hooks/azure_container_instance.py +./airflow/providers/microsoft/azure/hooks/azure_container_volume.py +./airflow/providers/microsoft/azure/hooks/azure_cosmos.py +./airflow/providers/microsoft/azure/hooks/azure_data_lake.py +./airflow/providers/microsoft/azure/hooks/azure_fileshare.py +./airflow/providers/microsoft/azure/hooks/wasb.py +./airflow/providers/microsoft/azure/operators/adls_list.py +./airflow/providers/microsoft/azure/operators/azure_container_instances.py +./airflow/providers/microsoft/azure/operators/azure_cosmos.py +./airflow/providers/microsoft/azure/operators/file_to_wasb.py +./airflow/providers/microsoft/azure/operators/oracle_to_azure_data_lake_transfer.py +./airflow/providers/microsoft/azure/operators/wasb_delete_blob.py +./airflow/providers/microsoft/mssql/hooks/mssql.py ./airflow/providers/microsoft/mssql/operators/mssql.py +./airflow/providers/microsoft/winrm/operators/winrm.py ./airflow/providers/mssql/operators/mysql.py +./airflow/providers/mysql/hooks/mysql.py +./airflow/providers/mysql/operators/presto_to_mysql.py +./airflow/providers/mysql/operators/vertica_to_mysql.py +./airflow/providers/openfass/hooks/openfaas.py +./airflow/providers/opsgenie/hooks/opsgenie_alert.py +./airflow/providers/opsgenie/operators/opsgenie_alert.py +./airflow/providers/oracle/hooks/oracle.py ./airflow/providers/oracle/operators/oracle.py +./airflow/providers/oracle/operators/oracle_to_oracle_transfer.py ./airflow/providers/papermill/operators/papermill.py -./airflow/providers/apache/pig/operators/pig.py +./airflow/providers/postgres/hooks/postgres.py ./airflow/providers/postgres/operators/postgres.py +./airflow/providers/presto/hooks/presto.py ./airflow/providers/presto/operators/presto_check.py -./airflow/providers/mysql/operators/presto_to_mysql.py -./airflow/operators/python.py +./airflow/providers/qubole/hooks/qubole_check.py +./airflow/providers/qubole/operators/qubole_check.py +./airflow/providers/redis/operators/redis_publish.py +./airflow/providers/samba/hooks/samba.py +./airflow/providers/segment/hooks/segment.py +./airflow/providers/segment/operators/segment_track_event.py +./airflow/providers/slack/hooks/slack_webhook.py ./airflow/providers/slack/operators/slack.py +./airflow/providers/slack/operators/slack_webhook.py +./airflow/providers/sqlite/hooks/sqlite.py ./airflow/providers/sqlite/operators/sqlite.py +./airflow/providers/ssh/operators/ssh.py +./airflow/providers/vertica/hooks/vertica.py +./airflow/providers/vertica/operators/vertica.py +./airflow/providers/zendesk/hooks/zendesk.py ./airflow/settings.py ./airflow/stats.py -./airflow/task/task_runner/base_task_runner.py -./airflow/task/task_runner/standard_task_runner.py -./airflow/ti_deps/dep_context.py -./airflow/ti_deps/deps/base_ti_dep.py -./airflow/ti_deps/deps/dag_ti_slots_available_dep.py -./airflow/ti_deps/deps/dag_unpaused_dep.py -./airflow/ti_deps/deps/dagrun_exists_dep.py -./airflow/ti_deps/deps/exec_date_after_start_date_dep.py -./airflow/ti_deps/deps/not_in_retry_period_dep.py -./airflow/ti_deps/deps/prev_dagrun_dep.py -./airflow/ti_deps/deps/ready_to_reschedule.py -./airflow/ti_deps/deps/runnable_exec_date_dep.py -./airflow/ti_deps/deps/task_concurrency_dep.py -./airflow/ti_deps/deps/trigger_rule_dep.py -./airflow/ti_deps/deps/valid_state_dep.py ./airflow/version.py ./airflow/www/api/experimental/endpoints.py ./airflow/www/app.py diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 89b5305fde749..0e921b6f52864 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -37,7 +37,7 @@ from airflow.operators.python import PythonOperator from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.sensors.python import PythonSensor -from airflow.ti_deps.dep_context import REQUEUEABLE_DEPS, RUNNABLE_STATES, RUNNING_DEPS +from airflow.ti_deps.dependencies import REQUEUEABLE_DEPS, RUNNABLE_STATES, RUNNING_DEPS from airflow.ti_deps.deps.base_ti_dep import TIDepStatus from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep from airflow.utils import timezone diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py index 174c4561878db..fabfae954ba2d 100644 --- a/tests/ti_deps/deps/test_trigger_rule_dep.py +++ b/tests/ti_deps/deps/test_trigger_rule_dep.py @@ -465,16 +465,13 @@ def test_get_states_count_upstream_ti(self): # check handling with cases that tasks are triggered from backfill with no finished tasks finished_tasks = DepContext().ensure_finished_tasks(ti_op2.task.dag, ti_op2.execution_date, session) - self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, - ti=ti_op2, session=session), + self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, ti=ti_op2), (1, 0, 0, 0, 1)) finished_tasks = dr.get_task_instances(state=State.finished() + [State.UPSTREAM_FAILED], session=session) - self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, ti=ti_op4, - session=session), + self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, ti=ti_op4), (1, 0, 1, 0, 2)) - self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, ti=ti_op5, - session=session), + self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, ti=ti_op5), (2, 0, 1, 0, 3)) dr.update_state() diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 57989b2be0334..b150aea11d045 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -47,7 +47,7 @@ from airflow.models.baseoperator import BaseOperator, BaseOperatorLink from airflow.operators.dummy_operator import DummyOperator from airflow.settings import Session -from airflow.ti_deps.dep_context import QUEUEABLE_STATES, RUNNABLE_STATES +from airflow.ti_deps.dependencies import QUEUEABLE_STATES, RUNNABLE_STATES from airflow.utils import dates, timezone from airflow.utils.session import create_session from airflow.utils.state import State