Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import DagRun as DR, XCom
from airflow.settings import conf
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager
Expand Down Expand Up @@ -83,6 +84,7 @@ def get_xcom_entries(
return xcom_collection_schema.dump(XComCollection(xcom_entries=query, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.XCOM)
@provide_session
def get_xcom_entry(
Expand Down
49 changes: 49 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/xcom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime
from typing import Any

from pydantic import BaseModel, field_validator


class XComResponse(BaseModel):
"""Serializer for a xcom item."""

key: str
timestamp: datetime
execution_date: datetime
map_index: int
task_id: str
dag_id: str


class XComResponseNative(XComResponse):
"""XCom response serializer with native return type."""

value: Any


class XComResponseString(XComResponse):
"""XCom response serializer with string return type."""

value: Any

@field_validator("value")
def value_to_string(cls, v):
return str(v) if v is not None else None
163 changes: 163 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,99 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
get:
tags:
- XCom
summary: Get Xcom Entry
description: Get an XCom entry.
operationId: get_xcom_entry
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: xcom_key
in: path
required: true
schema:
type: string
title: Xcom Key
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
- name: deserialize
in: query
required: false
schema:
type: boolean
default: false
title: Deserialize
- name: stringify
in: query
required: false
schema:
type: boolean
default: true
title: Stringify
responses:
'200':
description: Successful Response
content:
application/json:
schema:
anyOf:
- $ref: '#/components/schemas/XComResponseNative'
- $ref: '#/components/schemas/XComResponseString'
title: Response Get Xcom Entry
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
AppBuilderMenuItemResponse:
Expand Down Expand Up @@ -5134,3 +5227,73 @@ components:
- git_version
title: VersionInfo
description: Version information serializer for responses.
XComResponseNative:
properties:
key:
type: string
title: Key
timestamp:
type: string
format: date-time
title: Timestamp
execution_date:
type: string
format: date-time
title: Execution Date
map_index:
type: integer
title: Map Index
task_id:
type: string
title: Task Id
dag_id:
type: string
title: Dag Id
value:
title: Value
type: object
required:
- key
- timestamp
- execution_date
- map_index
- task_id
- dag_id
- value
title: XComResponseNative
description: XCom response serializer with native return type.
XComResponseString:
properties:
key:
type: string
title: Key
timestamp:
type: string
format: date-time
title: Timestamp
execution_date:
type: string
format: date-time
title: Execution Date
map_index:
type: integer
title: Map Index
task_id:
type: string
title: Task Id
dag_id:
type: string
title: Dag Id
value:
title: Value
type: object
required:
- key
- timestamp
- execution_date
- map_index
- task_id
- dag_id
- value
title: XComResponseString
description: XCom response serializer with string return type.
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.api_fastapi.core_api.routes.public.task_instances import task_instances_router
from airflow.api_fastapi.core_api.routes.public.variables import variables_router
from airflow.api_fastapi.core_api.routes.public.version import version_router
from airflow.api_fastapi.core_api.routes.public.xcom import xcom_router

public_router = AirflowRouter(prefix="/public")

Expand All @@ -56,3 +57,4 @@
public_router.include_router(variables_router)
public_router.include_router(version_router)
public_router.include_router(dag_stats_router)
public_router.include_router(xcom_router)
88 changes: 88 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/xcom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import copy

from fastapi import Depends, HTTPException
from sqlalchemy import and_, select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.xcom import (
XComResponseNative,
XComResponseString,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models import DagRun as DR, XCom
from airflow.settings import conf

xcom_router = AirflowRouter(
tags=["XCom"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries"
)


@xcom_router.get(
"/{xcom_key}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404]),
)
def get_xcom_entry(
dag_id: str,
task_id: str,
dag_run_id: str,
xcom_key: str,
session: Annotated[Session, Depends(get_session)],
map_index: int = -1,
deserialize: bool = False,
stringify: bool = True,
) -> XComResponseNative | XComResponseString:
"""Get an XCom entry."""
if deserialize:
if not conf.getboolean("api", "enable_xcom_deserialize_support", fallback=False):
raise HTTPException(400, "XCom deserialization is disabled in configuration.")
query = select(XCom, XCom.value)
else:
query = select(XCom)
print()

query = query.where(
XCom.dag_id == dag_id, XCom.task_id == task_id, XCom.key == xcom_key, XCom.map_index == map_index
)
query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id == DR.run_id))
query = query.where(DR.run_id == dag_run_id)

if deserialize:
item = session.execute(query).one_or_none()
else:
item = session.scalars(query).one_or_none()

if item is None:
raise HTTPException(404, f"XCom entry with key: `{xcom_key}` not found")

if deserialize:
xcom, value = item
xcom_stub = copy.copy(xcom)
xcom_stub.value = value
xcom_stub.value = XCom.deserialize_value(xcom_stub)
item = xcom_stub

if stringify or conf.getboolean("core", "enable_xcom_pickling"):
return XComResponseString.model_validate(item, from_attributes=True)

return XComResponseNative.model_validate(item, from_attributes=True)
Loading