Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
04c286e
feat(api_connexion): rename dataset_endpoint module as asset_endpoint
Lee-W Sep 30, 2024
3a989d4
feat(api_connexion/openapi): rename tag Dataset as Asset
Lee-W Sep 30, 2024
45faf05
feat(api_connexion): rename create_dataset_event as create_asset_event
Lee-W Sep 30, 2024
5d9cb5c
feat(api_connexion): rename schema CreateDatasetEvent as CreateAssetE…
Lee-W Sep 30, 2024
cb76782
test(api_connexion): rename test_dataset_endpoint as test_asset_endpoint
Lee-W Sep 30, 2024
246eff5
feat(api_connexion): rename delete_dataset_queued_events as delete_as…
Lee-W Sep 30, 2024
2067636
feat(api_connexion): rename get_dataset_queued_events as get_asset_qu…
Lee-W Sep 30, 2024
9a054c1
feat(api_connexion): rename delete_dag_dataset_queued_events as delet…
Lee-W Sep 30, 2024
39e6f4a
feat(api_connexion): rename delete_dag_dataset_queued_event as delete…
Lee-W Sep 30, 2024
08afa57
feat(api_connexion): rename get_dag_dataset_queued_events as get_dag_…
Lee-W Sep 30, 2024
1c328ea
feat(api_connexion): rename get_dag_dataset_queued_event as get_dag_a…
Lee-W Sep 30, 2024
30c4b3b
refactor(api_connexion): remove unused dataset_id in _generate_queued…
Lee-W Sep 30, 2024
48b4233
feat(api_connexion): rename get_dataset_events as get_asset_events
Lee-W Sep 30, 2024
f50ea72
feat(api_connexion): rename get_datasets as get_assets
Lee-W Sep 30, 2024
428d136
feat(api_connexion): rename get_dataset as get_asset
Lee-W Sep 30, 2024
9bc3336
feat(api_connexion/openapi): update api docs
Lee-W Sep 30, 2024
7ed2543
feat(js): rename DatasetEvents as AssetEvents
Lee-W Oct 1, 2024
7d16068
feat(js): rename DatasetDetails as AssetDetails
Lee-W Oct 1, 2024
45db9c1
feat(js): rename DatasetList as AssetList
Lee-W Oct 1, 2024
9c887dc
feat(js/api): rename useUpstreamDatasetEvents as useUpstreamAssetEvents
Lee-W Oct 1, 2024
4fcabb5
feat(js/api): rename useDatasetsSummary as useAssetsSummary
Lee-W Oct 1, 2024
00cd675
feat(js/api): rename useDatasetDependencies as useAssetDependencies
Lee-W Oct 1, 2024
5931131
feat(js/api): rename useDatasetEvents as useAssetEvents
Lee-W Oct 1, 2024
93b52fc
feat(js/api): rename useDatasets as useAssets
Lee-W Oct 1, 2024
b9b2886
feat(js/api): rename useDataset as useAsset
Lee-W Oct 1, 2024
9899e46
feat(api_connexion): rename get_upstream_dataset_events as get_upstre…
Lee-W Oct 1, 2024
287e1b2
feat(api_connexion/openapi/v1): rename DatasetURI as AssetURI
Lee-W Oct 1, 2024
b28753c
feat(api_connexion/openapi/v1): rename DatasetCollection as AssetColl…
Lee-W Oct 1, 2024
97991a0
feat(api_connexion/openapi/v1): rename DagScheduleDatasetReference as…
Lee-W Oct 1, 2024
749b681
feat(api_connexion/openapi/v1): rename TaskOutletDatasetReference as …
Lee-W Oct 1, 2024
4203a41
feat(js/api): rename DatasetEventCollection as AssetEventCollection
Lee-W Oct 1, 2024
a50b4c5
feat(api_connexion/openapi/v1): rename DatasetEvent as AssetEvent
Lee-W Oct 1, 2024
70e14f8
feat(api_connexion/openapi/v1): rename Dataset as Asset
Lee-W Oct 1, 2024
1d1c80c
docs(api_connexion/openapi/v1): update dataset to asset in v1.yaml
Lee-W Oct 1, 2024
9982a6d
feat(api_connexion): rename endpoint datasets as assets
Lee-W Oct 1, 2024
8c82e8a
test(api_connexion): rename dataset as asset
Lee-W Oct 1, 2024
7c2ba6a
fix(api_connexion/openapi/v1): fix queued_events property name error
Lee-W Oct 1, 2024
729314e
feat(api_fastapi): rename next_run_datasets as next_run_assets
Lee-W Oct 1, 2024
d0c08df
test: resolve test conflict
Lee-W Oct 1, 2024
bd3ddfa
docs(newsfragments): add newsfragments for dataset to asset endpoint …
Lee-W Oct 2, 2024
6eb5d29
feat(js/api): rename datasetEvents as assetEvents
Lee-W Oct 2, 2024
2eaff90
feat(js/api): rename variable datasetEvent as assetEvent
Lee-W Oct 2, 2024
d6054ba
feat(js/api): rename dataset_api as asset_api
Lee-W Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@

from airflow.api_connexion.types import APIResponse

RESOURCE_EVENT_PREFIX = "dataset"
RESOURCE_EVENT_PREFIX = "asset"


@security.requires_access_asset("GET")
@provide_session
def get_dataset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse:
"""Get an asset ."""
def get_asset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse:
"""Get an asset."""
asset = session.scalar(
select(AssetModel)
.where(AssetModel.uri == uri)
Expand All @@ -80,7 +80,7 @@ def get_dataset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse:
@security.requires_access_asset("GET")
@format_parameters({"limit": check_limit})
@provide_session
def get_datasets(
def get_assets(
*,
limit: int,
offset: int = 0,
Expand Down Expand Up @@ -109,18 +109,18 @@ def get_datasets(
.offset(offset)
.limit(limit)
).all()
return asset_collection_schema.dump(AssetCollection(datasets=assets, total_entries=total_entries))
return asset_collection_schema.dump(AssetCollection(assets=assets, total_entries=total_entries))


@security.requires_access_asset("GET")
@provide_session
@format_parameters({"limit": check_limit})
def get_dataset_events(
def get_asset_events(
*,
limit: int,
offset: int = 0,
order_by: str = "timestamp",
dataset_id: int | None = None,
asset_id: int | None = None,
source_dag_id: str | None = None,
source_task_id: str | None = None,
source_run_id: str | None = None,
Expand All @@ -132,8 +132,8 @@ def get_dataset_events(

query = select(AssetEvent)

if dataset_id:
query = query.where(AssetEvent.dataset_id == dataset_id)
if asset_id:
query = query.where(AssetEvent.dataset_id == asset_id)
if source_dag_id:
query = query.where(AssetEvent.source_dag_id == source_dag_id)
if source_task_id:
Expand All @@ -149,14 +149,13 @@ def get_dataset_events(
query = apply_sorting(query, order_by, {}, allowed_attrs)
events = session.scalars(query.offset(offset).limit(limit)).all()
return asset_event_collection_schema.dump(
AssetEventCollection(dataset_events=events, total_entries=total_entries)
AssetEventCollection(asset_events=events, total_entries=total_entries)
)


def _generate_queued_event_where_clause(
*,
dag_id: str | None = None,
dataset_id: int | None = None,
uri: str | None = None,
before: str | None = None,
permitted_dag_ids: set[str] | None = None,
Expand All @@ -165,8 +164,6 @@ def _generate_queued_event_where_clause(
where_clause = []
if dag_id is not None:
where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
if dataset_id is not None:
where_clause.append(AssetDagRunQueue.dataset_id == dataset_id)
if uri is not None:
where_clause.append(
AssetDagRunQueue.dataset_id.in_(
Expand All @@ -183,7 +180,7 @@ def _generate_queued_event_where_clause(
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
def get_dag_dataset_queued_event(
def get_dag_asset_queued_event(
*, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Get a queued asset event for a DAG."""
Expand All @@ -206,7 +203,7 @@ def get_dag_dataset_queued_event(
@security.requires_access_dag("GET")
@provide_session
@action_logging
def delete_dag_dataset_queued_event(
def delete_dag_asset_queued_event(
*, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Delete a queued asset event for a DAG."""
Expand All @@ -224,7 +221,7 @@ def delete_dag_dataset_queued_event(
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
def get_dag_dataset_queued_events(
def get_dag_asset_queued_events(
*, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Get queued asset events for a DAG."""
Expand Down Expand Up @@ -253,7 +250,7 @@ def get_dag_dataset_queued_events(
@security.requires_access_dag("GET")
@action_logging
@provide_session
def delete_dag_dataset_queued_events(
def delete_dag_asset_queued_events(
*, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Delete queued asset events for a DAG."""
Expand All @@ -271,7 +268,7 @@ def delete_dag_dataset_queued_events(

@security.requires_access_asset("GET")
@provide_session
def get_dataset_queued_events(
def get_asset_queued_events(
*, uri: str, before: str | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Get queued asset events for an asset."""
Expand Down Expand Up @@ -303,7 +300,7 @@ def get_dataset_queued_events(
@security.requires_access_asset("DELETE")
@action_logging
@provide_session
def delete_dataset_queued_events(
def delete_asset_queued_events(
*, uri: str, before: str | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Delete queued asset events for an asset."""
Expand All @@ -325,23 +322,23 @@ def delete_dataset_queued_events(
@security.requires_access_asset("POST")
@provide_session
@action_logging
def create_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
def create_asset_event(session: Session = NEW_SESSION) -> APIResponse:
"""Create asset event."""
body = get_json_request_dict()
try:
json_body = create_asset_event_schema.load(body)
except ValidationError as err:
raise BadRequest(detail=str(err))

uri = json_body["dataset_uri"]
uri = json_body["asset_uri"]
asset = session.scalar(select(AssetModel).where(AssetModel.uri == uri).limit(1))
if not asset:
raise NotFound(title="Asset not found", detail=f"Asset with uri: '{uri}' not found")
timestamp = timezone.utcnow()
extra = json_body.get("extra", {})
extra["from_rest_api"] = True
asset_event = asset_manager.register_asset_change(
asset=Asset(uri),
asset=Asset(uri=uri),
timestamp=timestamp,
extra=extra,
session=session,
Expand Down
8 changes: 3 additions & 5 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,8 @@ def get_dag_run(
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@security.requires_access_asset("GET")
@provide_session
def get_upstream_dataset_events(
*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION
) -> APIResponse:
"""If dag run is dataset-triggered, return the asset events that triggered it."""
def get_upstream_asset_events(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION) -> APIResponse:
"""If dag run is asset-triggered, return the asset events that triggered it."""
dag_run: DagRun | None = session.scalar(
select(DagRun).where(
DagRun.dag_id == dag_id,
Expand All @@ -131,7 +129,7 @@ def get_upstream_dataset_events(
)
events = dag_run.consumed_dataset_events
return asset_event_collection_schema.dump(
AssetEventCollection(dataset_events=events, total_entries=len(events))
AssetEventCollection(asset_events=events, total_entries=len(events))
)


Expand Down
Loading