Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from datetime import datetime

from pydantic import AliasPath, ConfigDict, Field, NonNegativeInt, field_validator
from pydantic import AliasPath, ConfigDict, Field, JsonValue, NonNegativeInt, field_validator

from airflow._shared.secrets_masker import redact
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
Expand Down Expand Up @@ -65,7 +65,7 @@ class AssetResponse(BaseModel):
name: str
uri: str
group: str
extra: dict | None = None
extra: dict[str, JsonValue] | None = None
created_at: datetime
updated_at: datetime
scheduled_dags: list[DagScheduleAssetReference]
Expand Down Expand Up @@ -123,7 +123,7 @@ class AssetEventResponse(BaseModel):
uri: str | None = Field(alias="uri", default=None)
name: str | None = Field(alias="name", default=None)
group: str | None = Field(alias="group", default=None)
extra: dict | None = None
extra: dict[str, JsonValue] | None = None
source_task_id: str | None = None
source_dag_id: str | None = None
source_run_id: str | None = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8666,7 +8666,8 @@ components:
title: Group
extra:
anyOf:
- additionalProperties: true
- additionalProperties:
$ref: '#/components/schemas/JsonValue'
type: object
- type: 'null'
title: Extra
Expand Down Expand Up @@ -8722,7 +8723,8 @@ components:
title: Group
extra:
anyOf:
- additionalProperties: true
- additionalProperties:
$ref: '#/components/schemas/JsonValue'
type: object
- type: 'null'
title: Extra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from __future__ import annotations

from pydantic.types import JsonValue

from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel


Expand All @@ -26,7 +28,7 @@ class AssetResponse(BaseModel):
name: str
uri: str
group: str
extra: dict | None = None
extra: dict[str, JsonValue] | None = None


class AssetAliasResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from datetime import datetime

from pydantic.types import JsonValue

from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.api_fastapi.execution_api.datamodels.asset import AssetResponse

Expand All @@ -41,7 +43,7 @@ class AssetEventResponse(BaseModel):

id: int
timestamp: datetime
extra: dict | None = None
extra: dict[str, JsonValue] | None = None

asset: AssetResponse
created_dagruns: list[DagRunAssetReference]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
AwareDatetime,
Discriminator,
Field,
JsonValue,
Tag,
TypeAdapter,
WithJsonSchema,
Expand Down Expand Up @@ -258,7 +259,7 @@ class AssetReferenceAssetEventDagRun(StrictBaseModel):

name: str
uri: str
extra: dict
extra: dict[str, JsonValue]


class AssetAliasReferenceAssetEventDagRun(StrictBaseModel):
Expand All @@ -271,7 +272,7 @@ class AssetEventDagRunReference(StrictBaseModel):
"""Schema for AssetEvent model used in DagRun."""

asset: AssetReferenceAssetEventDagRun
extra: dict
extra: dict[str, JsonValue]
source_task_id: str | None
source_dag_id: str | None
source_run_id: str | None
Expand Down
8 changes: 5 additions & 3 deletions airflow-core/src/airflow/lineage/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from pydantic.types import JsonValue

from airflow.sdk import BaseHook, ObjectStoragePath

# Store context what sent lineage.
Expand Down Expand Up @@ -107,7 +109,7 @@ def create_asset(
name: str | None = None,
group: str | None = None,
asset_kwargs: dict | None = None,
asset_extra: dict | None = None,
asset_extra: dict[str, JsonValue] | None = None,
) -> Asset | None:
"""
Create an asset instance using the provided parameters.
Expand Down Expand Up @@ -161,7 +163,7 @@ def add_input_asset(
name: str | None = None,
group: str | None = None,
asset_kwargs: dict | None = None,
asset_extra: dict | None = None,
asset_extra: dict[str, JsonValue] | None = None,
):
"""Add the input asset and its corresponding hook execution context to the collector."""
if len(self._inputs) >= MAX_COLLECTED_ASSETS:
Expand All @@ -186,7 +188,7 @@ def add_output_asset(
name: str | None = None,
group: str | None = None,
asset_kwargs: dict | None = None,
asset_extra: dict | None = None,
asset_extra: dict[str, JsonValue] | None = None,
):
"""Add the output asset and its corresponding hook execution context to the collector."""
if len(self._outputs) >= MAX_COLLECTED_ASSETS:
Expand Down
14 changes: 8 additions & 6 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import contextlib
import hashlib
import itertools
import json
import logging
import math
import uuid
Expand Down Expand Up @@ -1380,7 +1381,7 @@ def register_asset_changes_in_db(
session=session,
)

def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, frozenset], set[str]]:
def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, str], set[str]]:
d = defaultdict(set)
for event in outlet_events:
try:
Expand All @@ -1390,19 +1391,20 @@ def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, frozenset],
if alias_name not in outlet_alias_names:
continue
asset_key = AssetUniqueKey(**event["dest_asset_key"])
extra_key = frozenset(event["extra"].items())
d[asset_key, extra_key].add(alias_name)
extra_json = json.dumps(event["extra"], sort_keys=True)
d[asset_key, extra_json].add(alias_name)
return d

outlet_alias_names = {o.name for o in task_outlets if o.type == AssetAlias.__name__ and o.name}
if outlet_alias_names and (event_extras_from_aliases := _asset_event_extras_from_aliases()):
for (asset_key, extra_key), event_aliase_names in event_extras_from_aliases.items():
for (asset_key, extra_json), event_aliase_names in event_extras_from_aliases.items():
extra = json.loads(extra_json)
ti.log.debug("register event for asset %s with aliases %s", asset_key, event_aliase_names)
event = asset_manager.register_asset_change(
task_instance=ti,
asset=asset_key,
source_alias_names=event_aliase_names,
extra=dict(extra_key),
extra=extra,
session=session,
)
if event is None:
Expand All @@ -1413,7 +1415,7 @@ def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, frozenset],
task_instance=ti,
asset=asset_key,
source_alias_names=event_aliase_names,
extra=dict(extra_key),
extra=extra,
session=session,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ export const $AssetEventResponse = {
extra: {
anyOf: [
{
additionalProperties: true,
additionalProperties: {
'$ref': '#/components/schemas/JsonValue'
},
type: 'object'
},
{
Expand Down Expand Up @@ -295,7 +297,9 @@ export const $AssetResponse = {
extra: {
anyOf: [
{
additionalProperties: true,
additionalProperties: {
'$ref': '#/components/schemas/JsonValue'
},
type: 'object'
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export type AssetEventResponse = {
name?: string | null;
group?: string | null;
extra?: {
[key: string]: unknown;
[key: string]: JsonValue;
} | null;
source_task_id?: string | null;
source_dag_id?: string | null;
Expand All @@ -83,7 +83,7 @@ export type AssetResponse = {
uri: string;
group: string;
extra?: {
[key: string]: unknown;
[key: string]: JsonValue;
} | null;
created_at: string;
updated_at: string;
Expand Down
20 changes: 15 additions & 5 deletions airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import operator
import os
import pathlib
from typing import cast
from typing import TYPE_CHECKING, cast
from unittest import mock
from unittest.mock import patch

Expand Down Expand Up @@ -88,6 +88,11 @@
from tests_common.test_utils.mock_operators import MockOperator
from unit.models import DEFAULT_DATE

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from tests_common.pytest_plugin import DagMaker

pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag, pytest.mark.want_activate_assets]


Expand Down Expand Up @@ -1602,14 +1607,19 @@ def test_set_duration_empty_dates(self):
ti.set_duration()
assert ti.duration is None

def test_outlet_asset_extra(self, dag_maker, session):
def test_outlet_asset_extra(self, dag_maker: DagMaker, session: Session):
from airflow.sdk.definitions.asset import Asset

with dag_maker(schedule=None, serialized=True, session=session):

@task(outlets=Asset("test_outlet_asset_extra_1"))
def write1(*, outlet_events):
outlet_events[Asset("test_outlet_asset_extra_1")].extra = {"foo": "bar"}
def write1(*, outlet_events=None):
if TYPE_CHECKING:
assert isinstance(outlet_events, dict)
outlet_events[Asset("test_outlet_asset_extra_1")].extra = {
"foo": "bar",
"this": {"is": "nested", "value": 1},
}

write1()

Expand All @@ -1634,7 +1644,7 @@ def _write2_post_execute(context, _):
assert events["write1"].source_run_id == dr.run_id
assert events["write1"].source_task_id == "write1"
assert events["write1"].asset.uri == "test_outlet_asset_extra_1"
assert events["write1"].extra == {"foo": "bar"}
assert events["write1"].extra == {"foo": "bar", "this": {"is": "nested", "value": 1}}

assert events["write2"].source_dag_id == dr.dag_id
assert events["write2"].source_run_id == dr.run_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sys
from collections.abc import Iterator
from datetime import datetime, timedelta
from typing import TYPE_CHECKING

import pendulum
import pytest
Expand Down Expand Up @@ -85,6 +86,9 @@

from unit.models import DEFAULT_DATE

if TYPE_CHECKING:
from pydantic.types import JsonValue

DAG_ID = "dag_id_1"

TEST_CALLBACK_PATH = f"{__name__}.empty_callback_for_deadline"
Expand Down Expand Up @@ -227,7 +231,7 @@ def validate(self, obj):


def create_outlet_event_accessors(
key: Asset | AssetAlias, extra: dict, asset_alias_events: list[AssetAliasEvent]
key: Asset | AssetAlias, extra: dict[str, JsonValue], asset_alias_events: list[AssetAliasEvent]
) -> OutletEventAccessors:
o = OutletEventAccessors()
o[key].extra = extra
Expand Down
4 changes: 2 additions & 2 deletions airflow-ctl/src/airflowctl/api/datamodels/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ class AssetEventResponse(BaseModel):
uri: Annotated[str | None, Field(title="Uri")] = None
name: Annotated[str | None, Field(title="Name")] = None
group: Annotated[str | None, Field(title="Group")] = None
extra: Annotated[dict[str, Any] | None, Field(title="Extra")] = None
extra: Annotated[dict[str, JsonValue] | None, Field(title="Extra")] = None
source_task_id: Annotated[str | None, Field(title="Source Task Id")] = None
source_dag_id: Annotated[str | None, Field(title="Source Dag Id")] = None
source_run_id: Annotated[str | None, Field(title="Source Run Id")] = None
Expand All @@ -1046,7 +1046,7 @@ class AssetResponse(BaseModel):
name: Annotated[str, Field(title="Name")]
uri: Annotated[str, Field(title="Uri")]
group: Annotated[str, Field(title="Group")]
extra: Annotated[dict[str, Any] | None, Field(title="Extra")] = None
extra: Annotated[dict[str, JsonValue] | None, Field(title="Extra")] = None
created_at: Annotated[datetime, Field(title="Created At")]
updated_at: Annotated[datetime, Field(title="Updated At")]
scheduled_dags: Annotated[list[DagScheduleAssetReference], Field(title="Scheduled Dags")]
Expand Down
2 changes: 1 addition & 1 deletion airflow-ctl/tests/airflow_ctl/api/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class TestAssetsOperations:
id=asset_id,
name="asset",
uri="asset_uri",
extra={"extra": "extra"},
extra={"extra": "extra"}, # type: ignore[dict-item]
created_at=datetime.datetime(2024, 12, 31, 23, 59, 59),
updated_at=datetime.datetime(2025, 1, 1, 0, 0, 0),
scheduled_dags=[],
Expand Down
Loading