Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion airflow-core/src/airflow/api_fastapi/execution_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ class InProcessExecutionAPI:
def app(self):
if not self._app:
from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
from airflow.api_fastapi.execution_api.deps import JWTBearerDep, JWTRefresherDep
from airflow.api_fastapi.execution_api.deps import (
JWTBearerDep,
JWTBearerTIPathDep,
JWTRefresherDep,
)
from airflow.api_fastapi.execution_api.routes.connections import has_connection_access
from airflow.api_fastapi.execution_api.routes.variables import has_variable_access
from airflow.api_fastapi.execution_api.routes.xcoms import has_xcom_access
Expand All @@ -235,6 +239,7 @@ def app(self):
async def always_allow(): ...

self._app.dependency_overrides[JWTBearerDep.dependency] = always_allow
self._app.dependency_overrides[JWTBearerTIPathDep.dependency] = always_allow
self._app.dependency_overrides[JWTRefresherDep.dependency] = always_allow
self._app.dependency_overrides[has_connection_access] = always_allow
self._app.dependency_overrides[has_variable_access] = always_allow
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/src/airflow/api_fastapi/execution_api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ async def __call__( # type: ignore[override]

JWTBearerDep: TIToken = Depends(JWTBearer())

# This checks that the UUID in the url matches the one in the token for us.
JWTBearerTIPathDep = Depends(JWTBearer(path_param_name="task_instance_id"))


class JWTReissuer:
"""Re-issue JWTs to requests when they are about to run out."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import structlog
from cadwyn import VersionedAPIRouter
from fastapi import Body, Depends, HTTPException, Query, status
from fastapi import Body, HTTPException, Query, status
from pydantic import JsonValue
from sqlalchemy import func, or_, tuple_, update
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
Expand All @@ -50,7 +50,7 @@
TISuccessStatePayload,
TITerminalStatePayload,
)
from airflow.api_fastapi.execution_api.deps import JWTBearer
from airflow.api_fastapi.execution_api.deps import JWTBearerTIPathDep
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.models.taskinstance import TaskInstance as TI, _stop_remaining_tasks
Expand All @@ -70,7 +70,7 @@
ti_id_router = VersionedAPIRouter(
dependencies=[
# This checks that the UUID in the url matches the one in the token for us.
Depends(JWTBearer(path_param_name="task_instance_id")),
JWTBearerTIPathDep
]
)

Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,6 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No
run_conf=run_conf,
use_executor=use_executor,
mark_success_pattern=mark_success_pattern,
session=session,
)
show_dagrun = args.show_dagrun
imgcat = args.imgcat_dagrun
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
from airflow.exceptions import DagRunNotFound, TaskDeferred, TaskInstanceNotFound
from airflow.models import TaskInstance
from airflow.models.dag import DAG, _run_inline_trigger
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.sdk.definitions.dag import _run_inline_trigger
from airflow.sdk.definitions.param import ParamsDict
from airflow.sdk.execution_time.secrets_masker import RedactedIO
from airflow.ti_deps.dep_context import DepContext
Expand Down
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: Fil

callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
# TODO:We need a proper context object!
context: Context = {} # type: ignore[assignment]
context: Context = { # type: ignore[assignment]
"dag": dag,
"run_id": request.run_id,
"reason": request.msg,
}

for callback in callbacks:
log.info(
Expand Down
Loading
Loading