Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 5 additions & 73 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from airflow.api_connexion.security import get_readable_dags
from airflow.auth.managers.models.resource_details import DagAccessEntity, DagDetails
from airflow.exceptions import TaskNotFound
from airflow.models import SlaMiss
from airflow.models.dagrun import DagRun as DR
from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
Expand Down Expand Up @@ -84,15 +83,6 @@ def get_task_instance(
select(TI)
.where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id)
.join(TI.dag_run)
.outerjoin(
SlaMiss,
and_(
SlaMiss.dag_id == TI.dag_id,
SlaMiss.execution_date == DR.execution_date,
SlaMiss.task_id == TI.task_id,
),
)
.add_columns(SlaMiss)
.options(joinedload(TI.rendered_task_instance_fields))
)

Expand Down Expand Up @@ -127,15 +117,6 @@ def get_mapped_task_instance(
select(TI)
.where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id, TI.map_index == map_index)
.join(TI.dag_run)
.outerjoin(
SlaMiss,
and_(
SlaMiss.dag_id == TI.dag_id,
SlaMiss.execution_date == DR.execution_date,
SlaMiss.task_id == TI.task_id,
),
)
.add_columns(SlaMiss)
.options(joinedload(TI.rendered_task_instance_fields))
)
task_instance = session.execute(query).one_or_none()
Expand Down Expand Up @@ -232,27 +213,12 @@ def get_mapped_task_instances(
# Count elements before joining extra columns
total_entries = get_query_count(base_query, session=session)

# Add SLA miss
entry_query = (
base_query.outerjoin(
SlaMiss,
and_(
SlaMiss.dag_id == TI.dag_id,
SlaMiss.task_id == TI.task_id,
SlaMiss.execution_date == DR.execution_date,
),
)
.add_columns(SlaMiss)
.options(joinedload(TI.rendered_task_instance_fields))
)

try:
order_by_params = _get_order_by_params(order_by)
entry_query = entry_query.order_by(*order_by_params)
entry_query = base_query.order_by(*order_by_params)
except _UnsupportedOrderBy as e:
raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported")

# using execute because we want the SlaMiss entity. Scalars don't return None for missing entities
task_instances = session.execute(entry_query.offset(offset).limit(limit)).all()
return task_instance_collection_schema.dump(
TaskInstanceCollection(task_instances=task_instances, total_entries=total_entries)
Expand Down Expand Up @@ -384,27 +350,12 @@ def get_task_instances(
# Count elements before joining extra columns
total_entries = get_query_count(base_query, session=session)

# Add join
entry_query = (
base_query.outerjoin(
SlaMiss,
and_(
SlaMiss.dag_id == TI.dag_id,
SlaMiss.task_id == TI.task_id,
SlaMiss.execution_date == DR.execution_date,
),
)
.add_columns(SlaMiss)
.options(joinedload(TI.rendered_task_instance_fields))
)

try:
order_by_params = _get_order_by_params(order_by)
entry_query = entry_query.order_by(*order_by_params)
entry_query = base_query.order_by(*order_by_params)
except _UnsupportedOrderBy as e:
raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported")

# using execute because we want the SlaMiss entity. Scalars don't return None for missing entities
task_instances = session.execute(entry_query.offset(offset).limit(limit)).all()
return task_instance_collection_schema.dump(
TaskInstanceCollection(task_instances=task_instances, total_entries=total_entries)
Expand Down Expand Up @@ -463,16 +414,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:

# Count elements before joining extra columns
total_entries = get_query_count(base_query, session=session)
# Add join
base_query = base_query.join(
SlaMiss,
and_(
SlaMiss.dag_id == TI.dag_id,
SlaMiss.task_id == TI.task_id,
SlaMiss.execution_date == DR.execution_date,
),
isouter=True,
).add_columns(SlaMiss)

ti_query = base_query.options(
joinedload(TI.rendered_task_instance_fields), joinedload(TI.task_instance_note)
)
Expand All @@ -483,7 +425,6 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
except _UnsupportedOrderBy as e:
raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported")

# using execute because we want the SlaMiss entity. Scalars don't return None for missing entities
task_instances = session.execute(ti_query).all()

return task_instance_collection_schema.dump(
Expand Down Expand Up @@ -690,15 +631,6 @@ def set_task_instance_note(
select(TI)
.where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id)
.join(TI.dag_run)
.outerjoin(
SlaMiss,
and_(
SlaMiss.dag_id == TI.dag_id,
SlaMiss.execution_date == DR.execution_date,
SlaMiss.task_id == TI.task_id,
),
)
.add_columns(SlaMiss)
.options(joinedload(TI.rendered_task_instance_fields))
)
if map_index == -1:
Expand All @@ -716,7 +648,7 @@ def set_task_instance_note(
error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}"
raise NotFound(error_message)

ti, sla_miss = result
ti = result[0]

current_user_id = get_auth_manager().get_user_id()
if ti.task_instance_note is None:
Expand All @@ -725,7 +657,7 @@ def set_task_instance_note(
ti.task_instance_note.content = new_note
ti.task_instance_note.user_id = current_user_id
session.commit()
return task_instance_schema.dump((ti, sla_miss))
return task_instance_schema.dump((ti,))


@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
Expand Down
38 changes: 0 additions & 38 deletions airflow/api_connexion/schemas/sla_miss_schema.py

This file was deleted.

2 changes: 0 additions & 2 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from airflow.api_connexion.schemas.common_schema import JsonObjectField
from airflow.api_connexion.schemas.enum_schemas import TaskInstanceStateField
from airflow.api_connexion.schemas.job_schema import JobSchema
from airflow.api_connexion.schemas.sla_miss_schema import SlaMissSchema
from airflow.api_connexion.schemas.trigger_schema import TriggerSchema
from airflow.models import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
Expand Down Expand Up @@ -69,7 +68,6 @@ class Meta:
executor = auto_field()
executor_config = auto_field()
note = auto_field()
sla_miss = fields.Nested(SlaMissSchema, dump_default=None)
rendered_map_index = auto_field()
rendered_fields = JsonObjectField(dump_default={})
trigger = fields.Nested(TriggerSchema)
Expand Down
1 change: 0 additions & 1 deletion airflow/example_dags/tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
Expand Down
3 changes: 0 additions & 3 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
"Pool",
"RenderedTaskInstanceFields",
"SkipMixin",
"SlaMiss",
"TaskFail",
"TaskInstance",
"TaskReschedule",
Expand Down Expand Up @@ -104,7 +103,6 @@ def __getattr__(name):
"Pool": "airflow.models.pool",
"RenderedTaskInstanceFields": "airflow.models.renderedtifields",
"SkipMixin": "airflow.models.skipmixin",
"SlaMiss": "airflow.models.slamiss",
"TaskFail": "airflow.models.taskfail",
"TaskInstance": "airflow.models.taskinstance",
"TaskReschedule": "airflow.models.taskreschedule",
Expand Down Expand Up @@ -134,7 +132,6 @@ def __getattr__(name):
from airflow.models.pool import Pool
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.skipmixin import SkipMixin
from airflow.models.slamiss import SlaMiss
from airflow.models.taskfail import TaskFail
from airflow.models.taskinstance import TaskInstance, clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory
Expand Down
46 changes: 0 additions & 46 deletions airflow/models/slamiss.py

This file was deleted.

19 changes: 5 additions & 14 deletions airflow/notifications/basenotifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from abc import abstractmethod
from typing import TYPE_CHECKING, Sequence

from airflow.exceptions import AirflowException
from airflow.template.templater import Templater
from airflow.utils.context import context_merge

Expand Down Expand Up @@ -87,20 +88,10 @@ def __call__(self, *args) -> None:

:param context: The airflow context
"""
# Currently, there are two ways a callback is invoked
# 1. callback(context) - for on_*_callbacks
# 2. callback(dag, task_list, blocking_task_list, slas, blocking_tis) - for sla_miss_callback
# we have to distinguish between the two calls so that we can prepare the correct context,
if len(args) == 1:
context = args[0]
else:
context = {
"dag": args[0],
"task_list": args[1],
"blocking_task_list": args[2],
"slas": args[3],
"blocking_tis": args[4],
}
if len(args) != 1:
self.log.exception("Too many args provided to notifier.")

context = args[0]
self._update_context(context)
self.render_template_fields(context)
try:
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
IsAuthorizedPoolRequest,
IsAuthorizedVariableRequest,
)
from airflow.models import DagRun, Pool, SlaMiss, TaskInstance, Variable
from airflow.models import DagRun, Pool, TaskInstance, Variable
from airflow.models.connection import Connection
from airflow.models.xcom import BaseXCom

Expand Down Expand Up @@ -239,7 +239,7 @@ def has_access_dag_entities(method: ResourceMethod, access_entity: DagAccessEnti
def has_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
items: set[SlaMiss | BaseXCom | DagRun | TaskInstance] = set(args[1])
items: set[BaseXCom | DagRun | TaskInstance] = set(args[1])
requests: Sequence[IsAuthorizedDagRequest] = [
{
"method": method,
Expand Down
3 changes: 0 additions & 3 deletions airflow/www/extensions/init_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ def init_appbuilder_views(app):
appbuilder.add_view(
views.ConnectionModelView, permissions.RESOURCE_CONNECTION, category=permissions.RESOURCE_ADMIN_MENU
)
appbuilder.add_view(
views.SlaMissModelView, permissions.RESOURCE_SLA_MISS, category=permissions.RESOURCE_BROWSE_MENU
)
appbuilder.add_view(
views.PluginView, permissions.RESOURCE_PLUGIN, category=permissions.RESOURCE_ADMIN_MENU
)
Expand Down
Loading