Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from sqlalchemy import select

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunNotFound, TaskDeferred, TaskInstanceNotFound
Expand Down Expand Up @@ -193,7 +192,6 @@ def _get_dag_run(
raise ValueError(f"unknown create_if_necessary value: {create_if_necessary!r}")


@internal_api_call
@provide_session
def _get_ti_db_access(
dag: DAG,
Expand Down
5 changes: 0 additions & 5 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from tabulate import tabulate

import airflow.models
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
Expand Down Expand Up @@ -499,7 +498,6 @@ def _scan_stale_dags(self):
self.last_deactivate_stale_dags_time = timezone.utcnow()

@classmethod
@internal_api_call
@provide_session
def deactivate_stale_dags(
cls,
Expand Down Expand Up @@ -698,7 +696,6 @@ def _run_parsing_loop(self):
poll_time = 0.0

@classmethod
@internal_api_call
@provide_session
def _fetch_callbacks(
cls,
Expand Down Expand Up @@ -765,7 +762,6 @@ def _refresh_requested_filelocs(self) -> None:
self._file_path_queue.appendleft(fileloc)

@classmethod
@internal_api_call
@provide_session
def _get_priority_filelocs(cls, session: Session = NEW_SESSION):
"""Get filelocs from DB table."""
Expand Down Expand Up @@ -831,7 +827,6 @@ def _print_stat(self):
self.last_stat_print_time = time.monotonic()

@staticmethod
@internal_api_call
@provide_session
def clear_nonexistent_import_errors(
file_paths: list[str] | None, processor_subdir: str | None, session=NEW_SESSION
Expand Down
7 changes: 0 additions & 7 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from sqlalchemy import delete, event, select

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import (
DagCallbackRequest,
TaskCallbackRequest,
Expand Down Expand Up @@ -430,7 +429,6 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L
self._last_num_of_db_queries = 0

@staticmethod
@internal_api_call
@provide_session
def update_import_errors(
file_last_changed: dict[str, datetime],
Expand Down Expand Up @@ -507,7 +505,6 @@ def get_pools(dag) -> dict[str, set[str]]:
return DagFileProcessor._validate_task_pools_and_update_dag_warnings(pool_dict, dag_ids)

@classmethod
@internal_api_call
@provide_session
def _validate_task_pools_and_update_dag_warnings(
cls, pool_dict: dict[str, set[str]], dag_ids: set[str], session: Session = NEW_SESSION
Expand Down Expand Up @@ -545,7 +542,6 @@ def _validate_task_pools_and_update_dag_warnings(
session.commit()

@classmethod
@internal_api_call
@provide_session
def execute_callbacks(
cls,
Expand Down Expand Up @@ -582,7 +578,6 @@ def execute_callbacks(
session.commit()

@classmethod
@internal_api_call
@provide_session
def execute_callbacks_without_dag(
cls, callback_requests: list[CallbackRequest], unit_test_mode: bool, session: Session = NEW_SESSION
Expand Down Expand Up @@ -621,7 +616,6 @@ def _execute_dag_callbacks(cls, dagbag: DagBag, request: DagCallbackRequest, ses
DAG.execute_callback(callbacks, context, dag.dag_id)

@classmethod
@internal_api_call
@provide_session
def _execute_task_callbacks(
cls, dagbag: DagBag | None, request: TaskCallbackRequest, unit_test_mode: bool, session: Session
Expand Down Expand Up @@ -779,7 +773,6 @@ def _cache_last_num_of_db_queries(self, query_counter: _QueryCounter | None = No
return self._last_num_of_db_queries

@staticmethod
@internal_api_call
@provide_session
def save_dag_to_db(
dags: dict[str, DAG],
Expand Down
2 changes: 0 additions & 2 deletions airflow/models/dagwarning.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

from sqlalchemy import Column, ForeignKeyConstraint, Index, String, Text, delete, false, select

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.models.base import Base, StringID
from airflow.utils import timezone
from airflow.utils.retries import retry_db_transaction
Expand Down Expand Up @@ -71,7 +70,6 @@ def __hash__(self) -> int:
return hash((self.dag_id, self.warning_type))

@classmethod
@internal_api_call
@provide_session
def purge_inactive_dag_warnings(cls, session: Session = NEW_SESSION) -> None:
"""
Expand Down
2 changes: 0 additions & 2 deletions airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import relationship

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf
from airflow.models.base import StringID, TaskInstanceDependencies
from airflow.serialization.helpers import serialize_template_field
Expand Down Expand Up @@ -155,7 +154,6 @@ def _redact(self):
self.rendered_fields[field] = redact(rendered, field)

@classmethod
@internal_api_call
@provide_session
def _update_runtime_evaluated_template_fields(
cls, ti: TaskInstance, session: Session = NEW_SESSION
Expand Down
3 changes: 0 additions & 3 deletions airflow/models/skipmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

from sqlalchemy import update

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.exceptions import AirflowException
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
Expand Down Expand Up @@ -106,7 +105,6 @@ def skip(
SkipMixin._skip(dag_run=dag_run, task_id=task_id, tasks=tasks, map_index=map_index)

@staticmethod
@internal_api_call
@provide_session
def _skip(
dag_run: DagRun | DagRunPydantic,
Expand Down Expand Up @@ -163,7 +161,6 @@ def skip_all_except(
SkipMixin._skip_all_except(ti=ti, branch_task_ids=branch_task_ids)

@classmethod
@internal_api_call
@provide_session
def _skip_all_except(
cls,
Expand Down
18 changes: 0 additions & 18 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
from sqlalchemy_utils import UUIDType

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.assets.manager import asset_manager
from airflow.configuration import conf
from airflow.exceptions import (
Expand Down Expand Up @@ -185,14 +184,12 @@ class TaskReturnCode(Enum):
"""When task exits with deferral to trigger."""


@internal_api_call
@provide_session
def _merge_ti(ti, session: Session = NEW_SESSION):
session.merge(ti)
session.commit()


@internal_api_call
@provide_session
def _add_log(
event,
Expand All @@ -215,7 +212,6 @@ def _add_log(
)


@internal_api_call
@provide_session
def _update_ti_heartbeat(id: str, when: datetime, session: Session = NEW_SESSION):
session.execute(update(TaskInstance).where(TaskInstance.id == id).values(last_heartbeat_at=when))
Expand Down Expand Up @@ -546,7 +542,6 @@ def clear_task_instances(
session.flush()


@internal_api_call
@provide_session
def _xcom_pull(
*,
Expand Down Expand Up @@ -920,7 +915,6 @@ def _clear_next_method_args(*, task_instance: TaskInstance | TaskInstancePydanti
task_instance.next_kwargs = None


@internal_api_call
def _get_template_context(
*,
task_instance: TaskInstance | TaskInstancePydantic,
Expand Down Expand Up @@ -1095,7 +1089,6 @@ def _is_eligible_to_retry(*, task_instance: TaskInstance | TaskInstancePydantic)


@provide_session
@internal_api_call
def _handle_failure(
*,
task_instance: TaskInstance | TaskInstancePydantic,
Expand Down Expand Up @@ -1208,7 +1201,6 @@ def _refresh_from_task(
task_instance_mutation_hook(task_instance)


@internal_api_call
@provide_session
def _record_task_map_for_downstreams(
*,
Expand Down Expand Up @@ -1546,7 +1538,6 @@ def _get_previous_ti(
return dagrun.get_task_instance(task_instance.task_id, session=session)


@internal_api_call
@provide_session
def _update_rtif(ti, rendered_fields, session: Session = NEW_SESSION):
from airflow.models.renderedtifields import RenderedTaskInstanceFields
Expand Down Expand Up @@ -1577,7 +1568,6 @@ def _coalesce_to_orm_ti(*, ti: TaskInstancePydantic | TaskInstance, session: Ses
return ti


@internal_api_call
@provide_session
def _defer_task(
ti: TaskInstance | TaskInstancePydantic,
Expand Down Expand Up @@ -1652,7 +1642,6 @@ def _defer_task(
return ti


@internal_api_call
@provide_session
def _handle_reschedule(
ti,
Expand Down Expand Up @@ -2142,7 +2131,6 @@ def error(self, session: Session = NEW_SESSION) -> None:
session.commit()

@classmethod
@internal_api_call
@provide_session
def get_task_instance(
cls,
Expand Down Expand Up @@ -2195,7 +2183,6 @@ def refresh_from_task(self, task: Operator, pool_override: str | None = None) ->
_refresh_from_task(task_instance=self, task=task, pool_override=pool_override)

@staticmethod
@internal_api_call
@provide_session
def _clear_xcom_data(ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION) -> None:
"""
Expand Down Expand Up @@ -2231,7 +2218,6 @@ def key(self) -> TaskInstanceKey:
return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, self.try_number, self.map_index)

@staticmethod
@internal_api_call
def _set_state(ti: TaskInstance | TaskInstancePydantic, state, session: Session) -> bool:
if not isinstance(ti, TaskInstance):
ti = session.scalars(
Expand Down Expand Up @@ -2465,7 +2451,6 @@ def ready_for_retry(self) -> bool:
return self.state == TaskInstanceState.UP_FOR_RETRY and self.next_retry_datetime() < timezone.utcnow()

@staticmethod
@internal_api_call
def _get_dagrun(dag_id, run_id, session) -> DagRun:
from airflow.models.dagrun import DagRun # Avoid circular import

Expand Down Expand Up @@ -2515,7 +2500,6 @@ def ensure_dag(
return task_instance.task.dag

@classmethod
@internal_api_call
@provide_session
def _check_and_change_state_before_execution(
cls,
Expand Down Expand Up @@ -2791,7 +2775,6 @@ def _register_asset_changes(
TaskInstance._register_asset_changes_int(ti=self, events=events)

@staticmethod
@internal_api_call
@provide_session
def _register_asset_changes_int(
ti: TaskInstance, *, events: OutletEventAccessors, session: Session = NEW_SESSION
Expand Down Expand Up @@ -3174,7 +3157,6 @@ def fetch_handle_failure_context(
}

@staticmethod
@internal_api_call
@provide_session
def save_to_db(ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION):
ti = _coalesce_to_orm_ti(ti=ti, session=session)
Expand Down
4 changes: 0 additions & 4 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlalchemy.orm import declared_attr, reconstructor, synonym

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import ensure_secrets_loaded
from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
Expand Down Expand Up @@ -182,7 +181,6 @@ def set(

@staticmethod
@provide_session
@internal_api_call
def _set(
key: str,
value: Any,
Expand Down Expand Up @@ -238,7 +236,6 @@ def update(

@staticmethod
@provide_session
@internal_api_call
def _update(
key: str,
value: Any,
Expand Down Expand Up @@ -279,7 +276,6 @@ def delete(key: str, session: Session = None) -> int:

@staticmethod
@provide_session
@internal_api_call
def _delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.
Expand Down
2 changes: 0 additions & 2 deletions airflow/models/xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

from sqlalchemy import func, or_, select

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.exceptions import AirflowException, XComNotFound
from airflow.models import MappedOperator, TaskInstance
from airflow.models.abstractoperator import AbstractOperator
Expand Down Expand Up @@ -232,7 +231,6 @@ def __exit__(self, exc_type, exc_val, exc_tb):
SetupTeardownContext.set_work_task_roots_and_leaves()


@internal_api_call
@provide_session
def _get_task_map_length(
*,
Expand Down
2 changes: 0 additions & 2 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from sqlalchemy import select

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
Expand Down Expand Up @@ -83,7 +82,6 @@ def __bool__(self) -> bool:
return self.is_done


@internal_api_call
@provide_session
def _orig_start_date(
dag_id: str, task_id: str, run_id: str, map_index: int, try_number: int, session: Session = NEW_SESSION
Expand Down
2 changes: 0 additions & 2 deletions airflow/utils/cli_action_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import logging
from typing import TYPE_CHECKING, Callable

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
Expand Down Expand Up @@ -121,7 +120,6 @@ def default_action_log(sub_command, user, task_id, dag_id, logical_date, host_na
)


@internal_api_call
@provide_session
def _default_action_log_internal(
*,
Expand Down
Loading