Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

from pydantic import BaseModel, Field, field_validator

from airflow.utils.log.secrets_masker import redact


class DagScheduleAssetReference(BaseModel):
"""DAG schedule reference serializer for assets."""
Expand Down Expand Up @@ -58,6 +60,11 @@ class AssetResponse(BaseModel):
producing_tasks: list[TaskOutletAssetReference]
aliases: list[AssetAliasSchema]

@field_validator("extra", mode="after")
@classmethod
def redact_extra(cls, v: dict):
return redact(v)


class AssetCollectionResponse(BaseModel):
"""Asset collection response."""
Expand Down Expand Up @@ -93,6 +100,11 @@ class AssetEventResponse(BaseModel):
created_dagruns: list[DagRunAssetReference]
timestamp: datetime

@field_validator("extra", mode="after")
@classmethod
def redact_extra(cls, v: dict):
return redact(v)


class AssetEventCollectionResponse(BaseModel):
"""Asset event collection response."""
Expand Down
145 changes: 145 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ def _create_assets(session, num: int = 2) -> None:
session.commit()


def _create_assets_with_sensitive_extra(session, num: int = 2) -> None:
default_time = "2020-06-11T18:00:00+00:00"
assets = [
AssetModel(
id=i,
uri=f"s3://bucket/key/{i}",
extra={"password": "bar"},
created_at=timezone.parse(default_time),
updated_at=timezone.parse(default_time),
)
for i in range(1, 1 + num)
]
session.add_all(assets)
session.commit()


def _create_provided_asset(session, asset: AssetModel) -> None:
session.add(asset)
session.commit()
Expand All @@ -82,6 +98,24 @@ def _create_assets_events(session, num: int = 2) -> None:
session.commit()


def _create_assets_events_with_sensitive_extra(session, num: int = 2) -> None:
default_time = "2020-06-11T18:00:00+00:00"
assets_events = [
AssetEvent(
id=i,
asset_id=i,
extra={"password": "bar"},
source_task_id="source_task_id",
source_dag_id="source_dag_id",
source_run_id=f"source_run_id_{i}",
timestamp=timezone.parse(default_time),
)
for i in range(1, 1 + num)
]
session.add_all(assets_events)
session.commit()


def _create_provided_asset_event(session, asset_event: AssetEvent) -> None:
session.add(asset_event)
session.commit()
Expand Down Expand Up @@ -142,6 +176,10 @@ def teardown_method(self) -> None:
def create_assets(self, session, num: int = 2):
_create_assets(session=session, num=num)

@provide_session
def create_assets_with_sensitive_extra(self, session, num: int = 2):
_create_assets_with_sensitive_extra(session=session, num=num)

@provide_session
def create_provided_asset(self, session, asset: AssetModel):
_create_provided_asset(session=session, asset=asset)
Expand All @@ -150,6 +188,10 @@ def create_provided_asset(self, session, asset: AssetModel):
def create_assets_events(self, session, num: int = 2):
_create_assets_events(session=session, num=num)

@provide_session
def create_assets_events_with_sensitive_extra(self, session, num: int = 2):
_create_assets_events_with_sensitive_extra(session=session, num=num)

@provide_session
def create_provided_asset_event(self, session, asset_event: AssetEvent):
_create_provided_asset_event(session=session, asset_event=asset_event)
Expand Down Expand Up @@ -439,6 +481,68 @@ def test_limit_and_offset(self, test_client, params, expected_asset_uris):
asset_uris = [asset["uri"] for asset in response.json()["asset_events"]]
assert asset_uris == expected_asset_uris

@pytest.mark.usefixtures("time_freezer")
@pytest.mark.enable_redact
def test_should_mask_sensitive_extra(self, test_client, session):
self.create_assets_with_sensitive_extra()
self.create_assets_events_with_sensitive_extra()
self.create_dag_run()
self.create_asset_dag_run()
response = test_client.get("/public/assets/events")
assert response.status_code == 200
response_data = response.json()
assert response_data == {
"asset_events": [
{
"id": 1,
"asset_id": 1,
"uri": "s3://bucket/key/1",
"extra": {"password": "***"},
"source_task_id": "source_task_id",
"source_dag_id": "source_dag_id",
"source_run_id": "source_run_id_1",
"source_map_index": -1,
"created_dagruns": [
{
"run_id": "source_run_id_1",
"dag_id": "source_dag_id",
"logical_date": "2020-06-11T18:00:00Z",
"start_date": "2020-06-11T18:00:00Z",
"end_date": "2020-06-11T18:00:00Z",
"state": "success",
"data_interval_start": "2020-06-11T18:00:00Z",
"data_interval_end": "2020-06-11T18:00:00Z",
}
],
"timestamp": "2020-06-11T18:00:00Z",
},
{
"id": 2,
"asset_id": 2,
"uri": "s3://bucket/key/2",
"extra": {"password": "***"},
"source_task_id": "source_task_id",
"source_dag_id": "source_dag_id",
"source_run_id": "source_run_id_2",
"source_map_index": -1,
"created_dagruns": [
{
"run_id": "source_run_id_2",
"dag_id": "source_dag_id",
"logical_date": "2020-06-11T18:00:00Z",
"start_date": "2020-06-11T18:00:00Z",
"end_date": "2020-06-11T18:00:00Z",
"state": "success",
"data_interval_start": "2020-06-11T18:00:00Z",
"data_interval_end": "2020-06-11T18:00:00Z",
}
],
"timestamp": "2020-06-11T18:00:00Z",
},
],
"total_entries": 2,
}


class TestGetAssetEndpoint(TestAssets):
@pytest.mark.parametrize(
Expand Down Expand Up @@ -478,6 +582,27 @@ def test_should_respond_404(self, test_client):
assert response.status_code == 404
assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found"

@pytest.mark.usefixtures("time_freezer")
@pytest.mark.enable_redact
def test_should_mask_sensitive_extra(self, test_client, session):
self.create_assets_with_sensitive_extra()
tz_datetime_format = self.default_time.replace("+00:00", "Z")
uri = "s3://bucket/key/1"
response = test_client.get(
f"/public/assets/{uri}",
)
assert response.status_code == 200
assert response.json() == {
"id": 1,
"uri": "s3://bucket/key/1",
"extra": {"password": "***"},
"created_at": tz_datetime_format,
"updated_at": tz_datetime_format,
"consuming_dags": [],
"producing_tasks": [],
"aliases": [],
}


class TestQueuedEventEndpoint(TestAssets):
def _create_asset_dag_run_queues(self, dag_id, asset_id, session):
Expand Down Expand Up @@ -549,3 +674,23 @@ def test_invalid_attr_not_allowed(self, test_client, session):
response = test_client.post("/public/assets/events", json=event_invalid_payload)

assert response.status_code == 422

@pytest.mark.usefixtures("time_freezer")
@pytest.mark.enable_redact
def test_should_mask_sensitive_extra(self, test_client, session):
self.create_assets()
event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": "bar"}}
response = test_client.post("/public/assets/events", json=event_payload)
assert response.status_code == 200
assert response.json() == {
"id": mock.ANY,
"asset_id": 1,
"uri": "s3://bucket/key/1",
"extra": {"password": "***", "from_rest_api": True},
"source_task_id": None,
"source_dag_id": None,
"source_run_id": None,
"source_map_index": -1,
"created_dagruns": [],
"timestamp": self.default_time.replace("+00:00", "Z"),
}