diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 23e5f5a6d16f4..7980012488e18 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -48,7 +48,6 @@ from airflow.api_connexion.security import get_readable_dags from airflow.auth.managers.models.resource_details import DagAccessEntity, DagDetails from airflow.exceptions import TaskNotFound -from airflow.models import SlaMiss from airflow.models.dagrun import DagRun as DR from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH @@ -84,15 +83,6 @@ def get_task_instance( select(TI) .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id) .join(TI.dag_run) - .outerjoin( - SlaMiss, - and_( - SlaMiss.dag_id == TI.dag_id, - SlaMiss.execution_date == DR.execution_date, - SlaMiss.task_id == TI.task_id, - ), - ) - .add_columns(SlaMiss) .options(joinedload(TI.rendered_task_instance_fields)) ) @@ -127,15 +117,6 @@ def get_mapped_task_instance( select(TI) .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id, TI.map_index == map_index) .join(TI.dag_run) - .outerjoin( - SlaMiss, - and_( - SlaMiss.dag_id == TI.dag_id, - SlaMiss.execution_date == DR.execution_date, - SlaMiss.task_id == TI.task_id, - ), - ) - .add_columns(SlaMiss) .options(joinedload(TI.rendered_task_instance_fields)) ) task_instance = session.execute(query).one_or_none() @@ -232,27 +213,12 @@ def get_mapped_task_instances( # Count elements before joining extra columns total_entries = get_query_count(base_query, session=session) - # Add SLA miss - entry_query = ( - base_query.outerjoin( - SlaMiss, - and_( - SlaMiss.dag_id == TI.dag_id, - SlaMiss.task_id == TI.task_id, - SlaMiss.execution_date == DR.execution_date, - ), - ) - .add_columns(SlaMiss) - .options(joinedload(TI.rendered_task_instance_fields)) - ) - try: order_by_params = _get_order_by_params(order_by) - entry_query = entry_query.order_by(*order_by_params) + entry_query = base_query.order_by(*order_by_params) except _UnsupportedOrderBy as e: raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported") - # using execute because we want the SlaMiss entity. Scalars don't return None for missing entities task_instances = session.execute(entry_query.offset(offset).limit(limit)).all() return task_instance_collection_schema.dump( TaskInstanceCollection(task_instances=task_instances, total_entries=total_entries) @@ -384,27 +350,12 @@ def get_task_instances( # Count elements before joining extra columns total_entries = get_query_count(base_query, session=session) - # Add join - entry_query = ( - base_query.outerjoin( - SlaMiss, - and_( - SlaMiss.dag_id == TI.dag_id, - SlaMiss.task_id == TI.task_id, - SlaMiss.execution_date == DR.execution_date, - ), - ) - .add_columns(SlaMiss) - .options(joinedload(TI.rendered_task_instance_fields)) - ) - try: order_by_params = _get_order_by_params(order_by) - entry_query = entry_query.order_by(*order_by_params) + entry_query = base_query.order_by(*order_by_params) except _UnsupportedOrderBy as e: raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported") - # using execute because we want the SlaMiss entity. Scalars don't return None for missing entities task_instances = session.execute(entry_query.offset(offset).limit(limit)).all() return task_instance_collection_schema.dump( TaskInstanceCollection(task_instances=task_instances, total_entries=total_entries) @@ -463,16 +414,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse: # Count elements before joining extra columns total_entries = get_query_count(base_query, session=session) - # Add join - base_query = base_query.join( - SlaMiss, - and_( - SlaMiss.dag_id == TI.dag_id, - SlaMiss.task_id == TI.task_id, - SlaMiss.execution_date == DR.execution_date, - ), - isouter=True, - ).add_columns(SlaMiss) + ti_query = base_query.options( joinedload(TI.rendered_task_instance_fields), joinedload(TI.task_instance_note) ) @@ -483,7 +425,6 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse: except _UnsupportedOrderBy as e: raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported") - # using execute because we want the SlaMiss entity. Scalars don't return None for missing entities task_instances = session.execute(ti_query).all() return task_instance_collection_schema.dump( @@ -690,15 +631,6 @@ def set_task_instance_note( select(TI) .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id) .join(TI.dag_run) - .outerjoin( - SlaMiss, - and_( - SlaMiss.dag_id == TI.dag_id, - SlaMiss.execution_date == DR.execution_date, - SlaMiss.task_id == TI.task_id, - ), - ) - .add_columns(SlaMiss) .options(joinedload(TI.rendered_task_instance_fields)) ) if map_index == -1: @@ -716,7 +648,7 @@ def set_task_instance_note( error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}" raise NotFound(error_message) - ti, sla_miss = result + ti = result[0] current_user_id = get_auth_manager().get_user_id() if ti.task_instance_note is None: @@ -725,7 +657,7 @@ def set_task_instance_note( ti.task_instance_note.content = new_note ti.task_instance_note.user_id = current_user_id session.commit() - return task_instance_schema.dump((ti, sla_miss)) + return task_instance_schema.dump((ti,)) @security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE) diff --git a/airflow/api_connexion/schemas/sla_miss_schema.py b/airflow/api_connexion/schemas/sla_miss_schema.py deleted file mode 100644 index 97a462e186d59..0000000000000 --- a/airflow/api_connexion/schemas/sla_miss_schema.py +++ /dev/null @@ -1,38 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field - -from airflow.models import SlaMiss - - -class SlaMissSchema(SQLAlchemySchema): - """Sla Miss Schema.""" - - class Meta: - """Meta.""" - - model = SlaMiss - - task_id = auto_field(dump_only=True) - dag_id = auto_field(dump_only=True) - execution_date = auto_field(dump_only=True) - email_sent = auto_field(dump_only=True) - timestamp = auto_field(dump_only=True) - description = auto_field(dump_only=True) - notification_sent = auto_field(dump_only=True) diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 0c1daf6ce2c7d..08b7cfdbac9cb 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -26,7 +26,6 @@ from airflow.api_connexion.schemas.common_schema import JsonObjectField from airflow.api_connexion.schemas.enum_schemas import TaskInstanceStateField from airflow.api_connexion.schemas.job_schema import JobSchema -from airflow.api_connexion.schemas.sla_miss_schema import SlaMissSchema from airflow.api_connexion.schemas.trigger_schema import TriggerSchema from airflow.models import TaskInstance from airflow.models.taskinstancehistory import TaskInstanceHistory @@ -69,7 +68,6 @@ class Meta: executor = auto_field() executor_config = auto_field() note = auto_field() - sla_miss = fields.Nested(SlaMissSchema, dump_default=None) rendered_map_index = auto_field() rendered_fields = JsonObjectField(dump_default={}) trigger = fields.Nested(TriggerSchema) diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index 0e31775c7a9a7..43edd27788121 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -55,7 +55,6 @@ # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, - # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # or list of functions # 'on_success_callback': some_other_function, # or list of functions diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 375761bc20f52..1a998d60c5c92 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -41,7 +41,6 @@ "Pool", "RenderedTaskInstanceFields", "SkipMixin", - "SlaMiss", "TaskFail", "TaskInstance", "TaskReschedule", @@ -104,7 +103,6 @@ def __getattr__(name): "Pool": "airflow.models.pool", "RenderedTaskInstanceFields": "airflow.models.renderedtifields", "SkipMixin": "airflow.models.skipmixin", - "SlaMiss": "airflow.models.slamiss", "TaskFail": "airflow.models.taskfail", "TaskInstance": "airflow.models.taskinstance", "TaskReschedule": "airflow.models.taskreschedule", @@ -134,7 +132,6 @@ def __getattr__(name): from airflow.models.pool import Pool from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.skipmixin import SkipMixin - from airflow.models.slamiss import SlaMiss from airflow.models.taskfail import TaskFail from airflow.models.taskinstance import TaskInstance, clear_task_instances from airflow.models.taskinstancehistory import TaskInstanceHistory diff --git a/airflow/models/slamiss.py b/airflow/models/slamiss.py deleted file mode 100644 index 4fb7e53a17bab..0000000000000 --- a/airflow/models/slamiss.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from sqlalchemy import Boolean, Column, Index, String, Text - -from airflow.models.base import COLLATION_ARGS, ID_LEN, Base -from airflow.utils.sqlalchemy import UtcDateTime - - -class SlaMiss(Base): - """ - Model that stores a history of the SLA that have been missed. - - It is used to keep track of SLA failures over time and to avoid double triggering alert emails. - """ - - __tablename__ = "sla_miss" - - task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True) - dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True) - execution_date = Column(UtcDateTime, primary_key=True) - email_sent = Column(Boolean, default=False) - timestamp = Column(UtcDateTime) - description = Column(Text) - notification_sent = Column(Boolean, default=False) - - __table_args__ = (Index("sm_dag", dag_id, unique=False),) - - def __repr__(self): - return str((self.dag_id, self.task_id, self.execution_date.isoformat())) diff --git a/airflow/notifications/basenotifier.py b/airflow/notifications/basenotifier.py index 91fef87167ccb..dc3510604b6a4 100644 --- a/airflow/notifications/basenotifier.py +++ b/airflow/notifications/basenotifier.py @@ -20,6 +20,7 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Sequence +from airflow.exceptions import AirflowException from airflow.template.templater import Templater from airflow.utils.context import context_merge @@ -87,20 +88,10 @@ def __call__(self, *args) -> None: :param context: The airflow context """ - # Currently, there are two ways a callback is invoked - # 1. callback(context) - for on_*_callbacks - # 2. callback(dag, task_list, blocking_task_list, slas, blocking_tis) - for sla_miss_callback - # we have to distinguish between the two calls so that we can prepare the correct context, - if len(args) == 1: - context = args[0] - else: - context = { - "dag": args[0], - "task_list": args[1], - "blocking_task_list": args[2], - "slas": args[3], - "blocking_tis": args[4], - } + if len(args) != 1: + self.log.exception("Too many args provided to notifier.") + + context = args[0] self._update_context(context) self.render_template_fields(context) try: diff --git a/airflow/www/auth.py b/airflow/www/auth.py index 74f31d135c1aa..d4b8ad619e6e5 100644 --- a/airflow/www/auth.py +++ b/airflow/www/auth.py @@ -49,7 +49,7 @@ IsAuthorizedPoolRequest, IsAuthorizedVariableRequest, ) - from airflow.models import DagRun, Pool, SlaMiss, TaskInstance, Variable + from airflow.models import DagRun, Pool, TaskInstance, Variable from airflow.models.connection import Connection from airflow.models.xcom import BaseXCom @@ -239,7 +239,7 @@ def has_access_dag_entities(method: ResourceMethod, access_entity: DagAccessEnti def has_access_decorator(func: T): @wraps(func) def decorated(*args, **kwargs): - items: set[SlaMiss | BaseXCom | DagRun | TaskInstance] = set(args[1]) + items: set[BaseXCom | DagRun | TaskInstance] = set(args[1]) requests: Sequence[IsAuthorizedDagRequest] = [ { "method": method, diff --git a/airflow/www/extensions/init_views.py b/airflow/www/extensions/init_views.py index a04116a7c7d4d..1b0eddf55420a 100644 --- a/airflow/www/extensions/init_views.py +++ b/airflow/www/extensions/init_views.py @@ -103,9 +103,6 @@ def init_appbuilder_views(app): appbuilder.add_view( views.ConnectionModelView, permissions.RESOURCE_CONNECTION, category=permissions.RESOURCE_ADMIN_MENU ) - appbuilder.add_view( - views.SlaMissModelView, permissions.RESOURCE_SLA_MISS, category=permissions.RESOURCE_BROWSE_MENU - ) appbuilder.add_view( views.PluginView, permissions.RESOURCE_PLUGIN, category=permissions.RESOURCE_ADMIN_MENU ) diff --git a/airflow/www/views.py b/airflow/www/views.py index 47c548d5e7667..2f1f397e51e28 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -103,7 +103,7 @@ from airflow.jobs.job import Job from airflow.jobs.scheduler_job_runner import SchedulerJobRunner from airflow.jobs.triggerer_job_runner import TriggererJobRunner -from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, Trigger, XCom +from airflow.models import Connection, DagModel, DagTag, Log, Trigger, XCom from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel, DagScheduleAssetReference from airflow.models.dag import get_asset_triggered_next_run_info from airflow.models.dagrun import RUN_ID_REGEX, DagRun, DagRunType @@ -3876,119 +3876,6 @@ def action_post(self): return action.func(items) -class SlaMissModelView(AirflowModelView): - """View to show SlaMiss table.""" - - route_base = "/slamiss" - - datamodel = AirflowModelView.CustomSQLAInterface(SlaMiss) # type: ignore - - class_permission_name = permissions.RESOURCE_SLA_MISS - method_permission_name = { - "list": "read", - "action_muldelete": "delete", - "action_mulnotificationsent": "edit", - "action_mulnotificationsentfalse": "edit", - "action_mulemailsent": "edit", - "action_mulemailsentfalse": "edit", - } - - base_permissions = [ - permissions.ACTION_CAN_READ, - permissions.ACTION_CAN_ACCESS_MENU, - ] - - list_columns = ["dag_id", "task_id", "execution_date", "email_sent", "notification_sent", "timestamp"] - - label_columns = { - "execution_date": "Logical Date", - } - - add_columns = ["dag_id", "task_id", "execution_date", "email_sent", "notification_sent", "timestamp"] - edit_columns = ["dag_id", "task_id", "execution_date", "email_sent", "notification_sent", "timestamp"] - search_columns = ["dag_id", "task_id", "email_sent", "notification_sent", "timestamp", "execution_date"] - base_order = ("execution_date", "desc") - base_filters = [["dag_id", DagFilter, list]] - - formatters_columns = { - "task_id": wwwutils.task_instance_link, - "execution_date": wwwutils.datetime_f("execution_date"), - "timestamp": wwwutils.datetime_f("timestamp"), - "dag_id": wwwutils.dag_link, - "map_index": wwwutils.format_map_index, - } - - @action("muldelete", "Delete", "Are you sure you want to delete selected records?", single=False) - @auth.has_access_dag_entities("DELETE", DagAccessEntity.SLA_MISS) - def action_muldelete(self, items): - """Multiple delete action.""" - self.datamodel.delete_all(items) - self.update_redirect() - return redirect(self.get_redirect()) - - @action( - "mulnotificationsent", - "Set notification sent to true", - "Are you sure you want to set all these notifications to sent?", - single=False, - ) - @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS) - def action_mulnotificationsent(self, items: list[SlaMiss]): - return self._set_notification_property(items, "notification_sent", True) - - @action( - "mulnotificationsentfalse", - "Set notification sent to false", - "Are you sure you want to mark these SLA alerts as notification not sent yet?", - single=False, - ) - @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS) - def action_mulnotificationsentfalse(self, items: list[SlaMiss]): - return self._set_notification_property(items, "notification_sent", False) - - @action( - "mulemailsent", - "Set email sent to true", - "Are you sure you want to mark these SLA alerts as emails were sent?", - single=False, - ) - @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS) - def action_mulemailsent(self, items: list[SlaMiss]): - return self._set_notification_property(items, "email_sent", True) - - @action( - "mulemailsentfalse", - "Set email sent to false", - "Are you sure you want to mark these SLA alerts as emails not sent yet?", - single=False, - ) - @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS) - def action_mulemailsentfalse(self, items: list[SlaMiss]): - return self._set_notification_property(items, "email_sent", False) - - @provide_session - def _set_notification_property( - self, - items: list[SlaMiss], - attr: str, - new_value: bool, - session: Session = NEW_SESSION, - ): - try: - count = 0 - for sla in items: - count += 1 - setattr(sla, attr, new_value) - session.merge(sla) - session.commit() - flash(f"{count} SLAMisses had {attr} set to {new_value}.") - except Exception as ex: - flash(str(ex), "error") - flash("Failed to set state", "error") - self.update_redirect() - return redirect(self.get_default_url()) - - class XComModelView(AirflowModelView): """View to show records from XCom table.""" diff --git a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py index fc53b8952f4aa..3615c1b2a5b70 100644 --- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py @@ -33,7 +33,7 @@ from airflow.utils.state import State, TaskInstanceState from airflow.utils.timezone import datetime from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user -from tests.test_utils.db import clear_db_runs, clear_db_sla_miss, clear_rendered_ti_fields +from tests.test_utils.db import clear_db_runs, clear_rendered_ti_fields from tests.test_utils.mock_operators import MockOperator pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -81,7 +81,6 @@ def setup_attrs(self, configured_app) -> None: self.app = configured_app self.client = self.app.test_client() # type:ignore clear_db_runs() - clear_db_sla_miss() clear_rendered_ti_fields() def create_dag_runs_with_mapped_tasks(self, dag_maker, session, dags=None): @@ -227,7 +226,6 @@ def test_mapped_task_instances(self, one_task_with_mapped_tis, session): "queued_when": None, "rendered_fields": {}, "rendered_map_index": None, - "sla_miss": None, "start_date": "2020-01-01T00:00:00+00:00", "state": "success", "task_id": "task_2", diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index b5b3163e988d0..be92aa9a0dba1 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -27,7 +27,7 @@ from airflow.jobs.job import Job from airflow.jobs.triggerer_job_runner import TriggererJobRunner -from airflow.models import DagRun, SlaMiss, TaskInstance, Trigger +from airflow.models import DagRun, TaskInstance, Trigger from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.utils.platform import getuser @@ -36,7 +36,7 @@ from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user -from tests.test_utils.db import clear_db_runs, clear_db_sla_miss, clear_rendered_ti_fields +from tests.test_utils.db import clear_db_runs, clear_rendered_ti_fields from tests.test_utils.www import _check_last_log pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -85,7 +85,6 @@ def setup_attrs(self, configured_app, dagbag) -> None: self.app = configured_app self.client = self.app.test_client() # type:ignore clear_db_runs() - clear_db_sla_miss() clear_rendered_ti_fields() self.dagbag = dagbag @@ -197,7 +196,6 @@ def test_should_respond_200(self, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "running", "task_id": "print_the_context", @@ -256,7 +254,6 @@ def test_should_respond_200_with_task_state_in_deferred(self, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "deferred", "task_id": "print_the_context", @@ -304,7 +301,6 @@ def test_should_respond_200_with_task_state_in_removed(self, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "removed", "task_id": "print_the_context", @@ -318,16 +314,9 @@ def test_should_respond_200_with_task_state_in_removed(self, session): "triggerer_job": None, } - def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): + def test_should_respond_200_task_instance_with_rendered(self, session): tis = self.create_task_instances(session) session.query() - sla_miss = SlaMiss( - task_id="print_the_context", - dag_id="example_python_operator", - execution_date=self.default_time, - timestamp=self.default_time, - ) - session.add(sla_miss) rendered_fields = RTIF(tis[0], render_templates=False) session.add(rendered_fields) session.commit() @@ -355,15 +344,6 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": { - "dag_id": "example_python_operator", - "description": None, - "email_sent": False, - "execution_date": "2020-01-01T00:00:00+00:00", - "notification_sent": False, - "task_id": "print_the_context", - "timestamp": "2020-01-01T00:00:00+00:00", - }, "start_date": "2020-01-02T00:00:00+00:00", "state": "running", "task_id": "print_the_context", @@ -416,7 +396,6 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "running", "task_id": "print_the_context", @@ -2320,7 +2299,6 @@ def test_should_respond_200(self, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "running", "task_id": "print_the_context", @@ -2380,7 +2358,6 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "running", "task_id": "print_the_context", diff --git a/tests/api_connexion/schemas/test_task_instance_schema.py b/tests/api_connexion/schemas/test_task_instance_schema.py index 181e8351d2f75..4af699981b30b 100644 --- a/tests/api_connexion/schemas/test_task_instance_schema.py +++ b/tests/api_connexion/schemas/test_task_instance_schema.py @@ -26,7 +26,7 @@ set_task_instance_state_form, task_instance_schema, ) -from airflow.models import RenderedTaskInstanceFields as RTIF, SlaMiss, TaskInstance as TI +from airflow.models import RenderedTaskInstanceFields as RTIF, TaskInstance as TI from airflow.operators.empty import EmptyOperator from airflow.utils.platform import getuser from airflow.utils.state import State @@ -64,7 +64,7 @@ def set_attrs(self, session, dag_maker): session.rollback() - def test_task_instance_schema_without_sla_and_rendered(self, session): + def test_task_instance_schema_without_rendered(self, session): ti = TI(task=self.task, **self.default_ti_init) session.add(ti) for key, value in self.default_ti_extras.items(): @@ -88,7 +88,6 @@ def test_task_instance_schema_without_sla_and_rendered(self, session): "priority_weight": 1, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "running", "task_id": "TEST_TASK_ID", @@ -103,63 +102,6 @@ def test_task_instance_schema_without_sla_and_rendered(self, session): } assert serialized_ti == expected_json - def test_task_instance_schema_with_sla_and_rendered(self, session): - sla_miss = SlaMiss( - task_id="TEST_TASK_ID", - dag_id="TEST_DAG_ID", - execution_date=self.default_time, - ) - session.add(sla_miss) - session.flush() - ti = TI(task=self.task, **self.default_ti_init) - session.add(ti) - for key, value in self.default_ti_extras.items(): - setattr(ti, key, value) - self.task.template_fields = ["partitions"] - setattr(self.task, "partitions", "data/ds=2022-02-17") - ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) - serialized_ti = task_instance_schema.dump((ti, sla_miss)) - expected_json = { - "dag_id": "TEST_DAG_ID", - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00+00:00", - "execution_date": "2020-01-01T00:00:00+00:00", - "executor": None, - "executor_config": "{}", - "hostname": "", - "map_index": -1, - "max_tries": 0, - "note": "added some notes", - "operator": "EmptyOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 1, - "queue": "default_queue", - "queued_when": None, - "sla_miss": { - "dag_id": "TEST_DAG_ID", - "description": None, - "email_sent": False, - "execution_date": "2020-01-01T00:00:00+00:00", - "notification_sent": False, - "task_id": "TEST_TASK_ID", - "timestamp": None, - }, - "start_date": "2020-01-02T00:00:00+00:00", - "state": "running", - "task_id": "TEST_TASK_ID", - "task_display_name": "TEST_TASK_ID", - "try_number": 0, - "unixname": getuser(), - "dag_run_id": None, - "rendered_fields": {"partitions": "data/ds=2022-02-17"}, - "rendered_map_index": None, - "trigger": None, - "triggerer_job": None, - } - assert serialized_ti == expected_json - class TestClearTaskInstanceFormSchema: @pytest.mark.parametrize( diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index b4e4c10cff456..a4138e017e143 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -84,7 +84,6 @@ clear_db_pools, clear_db_runs, clear_db_serialized_dags, - clear_db_sla_miss, set_default_pool_slots, ) from tests.test_utils.mock_executor import MockExecutor @@ -142,7 +141,6 @@ def clean_db(): clear_db_backfills() clear_db_pools() clear_db_dags() - clear_db_sla_miss() clear_db_import_errors() clear_db_jobs() clear_db_assets() @@ -6321,7 +6319,6 @@ def clean_db(): clear_db_pools() clear_db_backfills() clear_db_dags() - clear_db_sla_miss() clear_db_import_errors() clear_db_jobs() clear_db_serialized_dags() diff --git a/tests/listeners/test_dag_import_error_listener.py b/tests/listeners/test_dag_import_error_listener.py index 57fd4f79dd526..70c4e1e08ac91 100644 --- a/tests/listeners/test_dag_import_error_listener.py +++ b/tests/listeners/test_dag_import_error_listener.py @@ -40,7 +40,6 @@ clear_db_pools, clear_db_runs, clear_db_serialized_dags, - clear_db_sla_miss, ) from tests.test_utils.mock_executor import MockExecutor @@ -75,7 +74,6 @@ def clean_db(): clear_db_runs() clear_db_pools() clear_db_dags() - clear_db_sla_miss() clear_db_import_errors() clear_db_jobs() clear_db_serialized_dags() diff --git a/tests/providers/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py b/tests/providers/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py index 69b3c221eae93..72667bd343c26 100644 --- a/tests/providers/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py +++ b/tests/providers/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py @@ -34,7 +34,7 @@ delete_user, ) from tests.test_utils.compat import AIRFLOW_V_3_0_PLUS -from tests.test_utils.db import clear_db_runs, clear_db_sla_miss, clear_rendered_ti_fields +from tests.test_utils.db import clear_db_runs, clear_rendered_ti_fields pytestmark = [ pytest.mark.db_test, @@ -123,7 +123,6 @@ def setup_attrs(self, configured_app, dagbag) -> None: self.app = configured_app self.client = self.app.test_client() # type:ignore clear_db_runs() - clear_db_sla_miss() clear_rendered_ti_fields() self.dagbag = dagbag diff --git a/tests/providers/smtp/notifications/test_smtp.py b/tests/providers/smtp/notifications/test_smtp.py index 39b51e8e02ce7..351a24c449dc7 100644 --- a/tests/providers/smtp/notifications/test_smtp.py +++ b/tests/providers/smtp/notifications/test_smtp.py @@ -23,7 +23,6 @@ import pytest from airflow.configuration import conf -from airflow.models import SlaMiss from airflow.operators.empty import EmptyOperator from airflow.providers.smtp.hooks.smtp import SmtpHook from airflow.providers.smtp.notifications.smtp import ( @@ -145,38 +144,6 @@ def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance): content = mock_smtphook_hook.return_value.__enter__().send_email_smtp.call_args.kwargs["html_content"] assert f"{NUM_TRY} of 1" in content - @mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook") - def test_notifier_with_defaults_sla(self, mock_smtphook_hook, dag_maker): - with dag_maker("test_notifier") as dag: - EmptyOperator(task_id="task1") - context = { - "dag": dag, - "slas": [SlaMiss(task_id="op", dag_id=dag.dag_id, execution_date=timezone.datetime(2018, 1, 1))], - "task_list": [], - "blocking_task_list": [], - "blocking_tis": [], - } - notifier = SmtpNotifier( - from_email=conf.get("smtp", "smtp_mail_from"), - to="test_reciver@test.com", - ) - notifier(context) - mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with( - from_email=conf.get("smtp", "smtp_mail_from"), - to="test_reciver@test.com", - subject="SLA Missed for DAG test_notifier - Task op", - html_content=mock.ANY, - smtp_conn_id="smtp_default", - files=None, - cc=None, - bcc=None, - mime_subtype="mixed", - mime_charset="utf-8", - custom_headers=None, - ) - content = mock_smtphook_hook.return_value.__enter__().send_email_smtp.call_args.kwargs["html_content"] - assert "Task List:" in content - @mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook") def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_task_instance): ti = create_task_instance(dag_id="dag", task_id="op", execution_date=timezone.datetime(2018, 1, 1)) diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py index a5dd94e2d009d..f4992620210ec 100644 --- a/tests/test_utils/db.py +++ b/tests/test_utils/db.py @@ -27,7 +27,6 @@ Log, Pool, RenderedTaskInstanceFields, - SlaMiss, TaskFail, TaskInstance, TaskReschedule, @@ -108,11 +107,6 @@ def clear_db_serialized_dags(): session.query(SerializedDagModel).delete() -def clear_db_sla_miss(): - with create_session() as session: - session.query(SlaMiss).delete() - - def clear_db_pools(): with create_session() as session: session.query(Pool).delete() @@ -235,7 +229,6 @@ def clear_all(): clear_db_assets() clear_db_dags() clear_db_serialized_dags() - clear_db_sla_miss() clear_db_dag_code() clear_db_callbacks() clear_rendered_ti_fields()