diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py similarity index 92% rename from airflow/api_connexion/endpoints/dataset_endpoint.py rename to airflow/api_connexion/endpoints/asset_endpoint.py index 95c3bead3da52..cbbe542ea7987 100644 --- a/airflow/api_connexion/endpoints/dataset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -57,13 +57,13 @@ from airflow.api_connexion.types import APIResponse -RESOURCE_EVENT_PREFIX = "dataset" +RESOURCE_EVENT_PREFIX = "asset" @security.requires_access_asset("GET") @provide_session -def get_dataset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse: - """Get an asset .""" +def get_asset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse: + """Get an asset.""" asset = session.scalar( select(AssetModel) .where(AssetModel.uri == uri) @@ -80,7 +80,7 @@ def get_dataset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse: @security.requires_access_asset("GET") @format_parameters({"limit": check_limit}) @provide_session -def get_datasets( +def get_assets( *, limit: int, offset: int = 0, @@ -109,18 +109,18 @@ def get_datasets( .offset(offset) .limit(limit) ).all() - return asset_collection_schema.dump(AssetCollection(datasets=assets, total_entries=total_entries)) + return asset_collection_schema.dump(AssetCollection(assets=assets, total_entries=total_entries)) @security.requires_access_asset("GET") @provide_session @format_parameters({"limit": check_limit}) -def get_dataset_events( +def get_asset_events( *, limit: int, offset: int = 0, order_by: str = "timestamp", - dataset_id: int | None = None, + asset_id: int | None = None, source_dag_id: str | None = None, source_task_id: str | None = None, source_run_id: str | None = None, @@ -132,8 +132,8 @@ def get_dataset_events( query = select(AssetEvent) - if dataset_id: - query = query.where(AssetEvent.dataset_id == dataset_id) + if asset_id: + query = query.where(AssetEvent.dataset_id == asset_id) if source_dag_id: query = query.where(AssetEvent.source_dag_id == source_dag_id) if source_task_id: @@ -149,14 +149,13 @@ def get_dataset_events( query = apply_sorting(query, order_by, {}, allowed_attrs) events = session.scalars(query.offset(offset).limit(limit)).all() return asset_event_collection_schema.dump( - AssetEventCollection(dataset_events=events, total_entries=total_entries) + AssetEventCollection(asset_events=events, total_entries=total_entries) ) def _generate_queued_event_where_clause( *, dag_id: str | None = None, - dataset_id: int | None = None, uri: str | None = None, before: str | None = None, permitted_dag_ids: set[str] | None = None, @@ -165,8 +164,6 @@ def _generate_queued_event_where_clause( where_clause = [] if dag_id is not None: where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) - if dataset_id is not None: - where_clause.append(AssetDagRunQueue.dataset_id == dataset_id) if uri is not None: where_clause.append( AssetDagRunQueue.dataset_id.in_( @@ -183,7 +180,7 @@ def _generate_queued_event_where_clause( @security.requires_access_asset("GET") @security.requires_access_dag("GET") @provide_session -def get_dag_dataset_queued_event( +def get_dag_asset_queued_event( *, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION ) -> APIResponse: """Get a queued asset event for a DAG.""" @@ -206,7 +203,7 @@ def get_dag_dataset_queued_event( @security.requires_access_dag("GET") @provide_session @action_logging -def delete_dag_dataset_queued_event( +def delete_dag_asset_queued_event( *, dag_id: str, uri: str, before: str | None = None, session: Session = NEW_SESSION ) -> APIResponse: """Delete a queued asset event for a DAG.""" @@ -224,7 +221,7 @@ def delete_dag_dataset_queued_event( @security.requires_access_asset("GET") @security.requires_access_dag("GET") @provide_session -def get_dag_dataset_queued_events( +def get_dag_asset_queued_events( *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION ) -> APIResponse: """Get queued asset events for a DAG.""" @@ -253,7 +250,7 @@ def get_dag_dataset_queued_events( @security.requires_access_dag("GET") @action_logging @provide_session -def delete_dag_dataset_queued_events( +def delete_dag_asset_queued_events( *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION ) -> APIResponse: """Delete queued asset events for a DAG.""" @@ -271,7 +268,7 @@ def delete_dag_dataset_queued_events( @security.requires_access_asset("GET") @provide_session -def get_dataset_queued_events( +def get_asset_queued_events( *, uri: str, before: str | None = None, session: Session = NEW_SESSION ) -> APIResponse: """Get queued asset events for an asset.""" @@ -303,7 +300,7 @@ def get_dataset_queued_events( @security.requires_access_asset("DELETE") @action_logging @provide_session -def delete_dataset_queued_events( +def delete_asset_queued_events( *, uri: str, before: str | None = None, session: Session = NEW_SESSION ) -> APIResponse: """Delete queued asset events for an asset.""" @@ -325,7 +322,7 @@ def delete_dataset_queued_events( @security.requires_access_asset("POST") @provide_session @action_logging -def create_dataset_event(session: Session = NEW_SESSION) -> APIResponse: +def create_asset_event(session: Session = NEW_SESSION) -> APIResponse: """Create asset event.""" body = get_json_request_dict() try: @@ -333,7 +330,7 @@ def create_dataset_event(session: Session = NEW_SESSION) -> APIResponse: except ValidationError as err: raise BadRequest(detail=str(err)) - uri = json_body["dataset_uri"] + uri = json_body["asset_uri"] asset = session.scalar(select(AssetModel).where(AssetModel.uri == uri).limit(1)) if not asset: raise NotFound(title="Asset not found", detail=f"Asset with uri: '{uri}' not found") @@ -341,7 +338,7 @@ def create_dataset_event(session: Session = NEW_SESSION) -> APIResponse: extra = json_body.get("extra", {}) extra["from_rest_api"] = True asset_event = asset_manager.register_asset_change( - asset=Asset(uri), + asset=Asset(uri=uri), timestamp=timestamp, extra=extra, session=session, diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 02d4663837f4e..44891c0ef2c84 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -114,10 +114,8 @@ def get_dag_run( @security.requires_access_dag("GET", DagAccessEntity.RUN) @security.requires_access_asset("GET") @provide_session -def get_upstream_dataset_events( - *, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION -) -> APIResponse: - """If dag run is dataset-triggered, return the asset events that triggered it.""" +def get_upstream_asset_events(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION) -> APIResponse: + """If dag run is asset-triggered, return the asset events that triggered it.""" dag_run: DagRun | None = session.scalar( select(DagRun).where( DagRun.dag_id == dag_id, @@ -131,7 +129,7 @@ def get_upstream_dataset_events( ) events = dag_run.consumed_dataset_events return asset_event_collection_schema.dump( - AssetEventCollection(dataset_events=events, total_entries=len(events)) + AssetEventCollection(asset_events=events, total_entries=len(events)) ) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 15ad6fd8a4f63..828a3af25e879 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1181,26 +1181,26 @@ paths: "404": $ref: "#/components/responses/NotFound" - /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents: + /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents: parameters: - $ref: "#/components/parameters/DAGID" - $ref: "#/components/parameters/DAGRunID" get: - summary: Get dataset events for a DAG run + summary: Get asset events for a DAG run description: | - Get datasets for a dag run. + Get asset for a dag run. *New in version 2.4.0* x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint - operationId: get_upstream_dataset_events - tags: [DAGRun, Dataset] + operationId: get_upstream_asset_events + tags: [DAGRun, Asset] responses: "200": description: Success. content: application/json: schema: - $ref: "#/components/schemas/DatasetEventCollection" + $ref: "#/components/schemas/AssetEventCollection" "401": $ref: "#/components/responses/Unauthenticated" "403": @@ -1245,22 +1245,22 @@ paths: "404": $ref: "#/components/responses/NotFound" - /dags/{dag_id}/datasets/queuedEvent/{uri}: + /dags/{dag_id}/assets/queuedEvent/{uri}: parameters: - $ref: "#/components/parameters/DAGID" - - $ref: "#/components/parameters/DatasetURI" + - $ref: "#/components/parameters/AssetURI" get: - summary: Get a queued Dataset event for a DAG + summary: Get a queued asset event for a DAG description: | - Get a queued Dataset event for a DAG. + Get a queued asset event for a DAG. *New in version 2.9.0* - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: get_dag_dataset_queued_event + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: get_dag_asset_queued_event parameters: - $ref: "#/components/parameters/Before" - tags: [Dataset] + tags: [Asset] responses: "200": description: Success. @@ -1276,16 +1276,16 @@ paths: $ref: "#/components/responses/NotFound" delete: - summary: Delete a queued Dataset event for a DAG. + summary: Delete a queued Asset event for a DAG. description: | - Delete a queued Dataset event for a DAG. + Delete a queued Asset event for a DAG. *New in version 2.9.0* - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: delete_dag_dataset_queued_event + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: delete_dag_asset_queued_event parameters: - $ref: "#/components/parameters/Before" - tags: [Dataset] + tags: [Asset] responses: "204": description: Success. @@ -1298,21 +1298,21 @@ paths: "404": $ref: "#/components/responses/NotFound" - /dags/{dag_id}/datasets/queuedEvent: + /dags/{dag_id}/assets/queuedEvent: parameters: - $ref: "#/components/parameters/DAGID" get: - summary: Get queued Dataset events for a DAG. + summary: Get queued Asset events for a DAG. description: | - Get queued Dataset events for a DAG. + Get queued Asset events for a DAG. *New in version 2.9.0* - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: get_dag_dataset_queued_events + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: get_dag_asset_queued_events parameters: - $ref: "#/components/parameters/Before" - tags: [Dataset] + tags: [Asset] responses: "200": description: Success. @@ -1328,16 +1328,16 @@ paths: $ref: "#/components/responses/NotFound" delete: - summary: Delete queued Dataset events for a DAG. + summary: Delete queued Asset events for a DAG. description: | - Delete queued Dataset events for a DAG. + Delete queued Asset events for a DAG. *New in version 2.9.0* - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: delete_dag_dataset_queued_events + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: delete_dag_asset_queued_events parameters: - $ref: "#/components/parameters/Before" - tags: [Dataset] + tags: [Asset] responses: "204": description: Success. @@ -1371,21 +1371,21 @@ paths: "404": $ref: "#/components/responses/NotFound" - /datasets/queuedEvent/{uri}: + /assets/queuedEvent/{uri}: parameters: - - $ref: "#/components/parameters/DatasetURI" + - $ref: "#/components/parameters/AssetURI" get: - summary: Get queued Dataset events for a Dataset. + summary: Get queued Asset events for an Asset. description: | - Get queued Dataset events for a Dataset + Get queued Asset events for an Asset *New in version 2.9.0* - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: get_dataset_queued_events + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: get_asset_queued_events parameters: - $ref: "#/components/parameters/Before" - tags: [Dataset] + tags: [Asset] responses: "200": description: Success. @@ -1401,16 +1401,16 @@ paths: $ref: "#/components/responses/NotFound" delete: - summary: Delete queued Dataset events for a Dataset. + summary: Delete queued Asset events for an Asset. description: | - Delete queued Dataset events for a Dataset. + Delete queued Asset events for a Asset. *New in version 2.9.0* - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: delete_dataset_queued_events + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: delete_asset_queued_events parameters: - $ref: "#/components/parameters/Before" - tags: [Dataset] + tags: [Asset] responses: "204": description: Success. @@ -2517,12 +2517,12 @@ paths: "403": $ref: "#/components/responses/PermissionDenied" - /datasets: + /assets: get: - summary: List datasets - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: get_datasets - tags: [Dataset] + summary: List assets + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: get_assets + tags: [Asset] parameters: - $ref: "#/components/parameters/PageLimit" - $ref: "#/components/parameters/PageOffset" @@ -2533,14 +2533,14 @@ paths: type: string required: false description: | - If set, only return datasets with uris matching this pattern. + If set, only return assets with uris matching this pattern. - name: dag_ids in: query schema: type: string required: false description: | - One or more DAG IDs separated by commas to filter datasets by associated DAGs either consuming or producing. + One or more DAG IDs separated by commas to filter assets by associated DAGs either consuming or producing. *New in version 2.9.0* responses: @@ -2549,28 +2549,28 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/DatasetCollection" + $ref: "#/components/schemas/AssetCollection" "401": $ref: "#/components/responses/Unauthenticated" "403": $ref: "#/components/responses/PermissionDenied" - /datasets/{uri}: + /assets/{uri}: parameters: - - $ref: "#/components/parameters/DatasetURI" + - $ref: "#/components/parameters/AssetURI" get: - summary: Get a dataset - description: Get a dataset by uri. - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: get_dataset - tags: [Dataset] + summary: Get an asset + description: Get an asset by uri. + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: get_asset + tags: [Asset] responses: "200": description: Success. content: application/json: schema: - $ref: "#/components/schemas/Dataset" + $ref: "#/components/schemas/Asset" "401": $ref: "#/components/responses/Unauthenticated" "403": @@ -2578,18 +2578,18 @@ paths: "404": $ref: "#/components/responses/NotFound" - /datasets/events: + /assets/events: get: - summary: Get dataset events - description: Get dataset events - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: get_dataset_events - tags: [Dataset] + summary: Get asset events + description: Get asset events + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: get_asset_events + tags: [Asset] parameters: - $ref: "#/components/parameters/PageLimit" - $ref: "#/components/parameters/PageOffset" - $ref: "#/components/parameters/OrderBy" - - $ref: "#/components/parameters/FilterDatasetID" + - $ref: "#/components/parameters/FilterAssetID" - $ref: "#/components/parameters/FilterSourceDAGID" - $ref: "#/components/parameters/FilterSourceTaskID" - $ref: "#/components/parameters/FilterSourceRunID" @@ -2600,7 +2600,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/DatasetEventCollection" + $ref: "#/components/schemas/AssetEventCollection" "401": $ref: "#/components/responses/Unauthenticated" "403": @@ -2608,24 +2608,24 @@ paths: "404": $ref: "#/components/responses/NotFound" post: - summary: Create dataset event - description: Create dataset event - x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint - operationId: create_dataset_event - tags: [Dataset] + summary: Create asset event + description: Create asset event + x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint + operationId: create_asset_event + tags: [Asset] requestBody: required: true content: application/json: schema: - $ref: '#/components/schemas/CreateDatasetEvent' + $ref: '#/components/schemas/CreateAssetEvent' responses: '200': description: Success. content: application/json: schema: - $ref: '#/components/schemas/DatasetEvent' + $ref: '#/components/schemas/AssetEvent' "400": $ref: "#/components/responses/BadRequest" '401': @@ -4133,7 +4133,7 @@ components: nullable: true dataset_expression: type: object - description: Nested dataset any/all conditions + description: Nested asset any/all conditions nullable: true doc_md: type: string @@ -4507,133 +4507,133 @@ components: $ref: "#/components/schemas/Resource" description: The permission resource - Dataset: + Asset: description: | - A dataset item. + An asset item. *New in version 2.4.0* type: object properties: id: type: integer - description: The dataset id + description: The asset id uri: type: string - description: The dataset uri + description: The asset uri nullable: false extra: type: object - description: The dataset extra + description: The asset extra nullable: true created_at: type: string - description: The dataset creation time + description: The asset creation time nullable: false updated_at: type: string - description: The dataset update time + description: The asset update time nullable: false consuming_dags: type: array items: - $ref: "#/components/schemas/DagScheduleDatasetReference" + $ref: "#/components/schemas/DagScheduleAssetReference" producing_tasks: type: array items: - $ref: "#/components/schemas/TaskOutletDatasetReference" + $ref: "#/components/schemas/TaskOutletAssetReference" - TaskOutletDatasetReference: + TaskOutletAssetReference: description: | - A datasets reference to an upstream task. + An asset reference to an upstream task. *New in version 2.4.0* type: object properties: dag_id: type: string - description: The DAG ID that updates the dataset. + description: The DAG ID that updates the asset. nullable: true task_id: type: string - description: The task ID that updates the dataset. + description: The task ID that updates the asset. nullable: true created_at: type: string - description: The dataset creation time + description: The asset creation time nullable: false updated_at: type: string - description: The dataset update time + description: The asset update time nullable: false - DagScheduleDatasetReference: + DagScheduleAssetReference: description: | - A datasets reference to a downstream DAG. + An asset reference to a downstream DAG. *New in version 2.4.0* type: object properties: dag_id: type: string - description: The DAG ID that depends on the dataset. + description: The DAG ID that depends on the asset. nullable: true created_at: type: string - description: The dataset reference creation time + description: The asset reference creation time nullable: false updated_at: type: string - description: The dataset reference update time + description: The asset reference update time nullable: false - DatasetCollection: + AssetCollection: description: | - A collection of datasets. + A collection of assets. *New in version 2.4.0* type: object allOf: - type: object properties: - datasets: + assets: type: array items: - $ref: "#/components/schemas/Dataset" + $ref: "#/components/schemas/Asset" - $ref: "#/components/schemas/CollectionInfo" - DatasetEvent: + AssetEvent: description: | - A dataset event. + An asset event. *New in version 2.4.0* type: object properties: dataset_id: type: integer - description: The dataset id + description: The asset id dataset_uri: type: string - description: The URI of the dataset + description: The URI of the asset nullable: false extra: type: object - description: The dataset event extra + description: The asset event extra nullable: true source_dag_id: type: string - description: The DAG ID that updated the dataset. + description: The DAG ID that updated the asset. nullable: true source_task_id: type: string - description: The task ID that updated the dataset. + description: The task ID that updated the asset. nullable: true source_run_id: type: string - description: The DAG run ID that updated the dataset. + description: The DAG run ID that updated the asset. nullable: true source_map_index: type: integer - description: The task map index that updated the dataset. + description: The task map index that updated the asset. nullable: true created_dagruns: type: array @@ -4641,21 +4641,21 @@ components: $ref: "#/components/schemas/BasicDAGRun" timestamp: type: string - description: The dataset event creation time + description: The asset event creation time nullable: false - CreateDatasetEvent: + CreateAssetEvent: type: object required: - - dataset_uri + - asset_uri properties: - dataset_uri: + asset_uri: type: string - description: The URI of the dataset + description: The URI of the asset nullable: false extra: type: object - description: The dataset event extra + description: The asset event extra nullable: true QueuedEvent: @@ -4663,7 +4663,7 @@ components: properties: uri: type: string - description: The datata uri. + description: The asset uri. dag_id: type: string description: The DAG ID. @@ -4674,14 +4674,14 @@ components: QueuedEventCollection: description: | - A collection of Dataset Dag Run Queues. + A collection of asset Dag Run Queues. *New in version 2.9.0* type: object allOf: - type: object properties: - datasets: + queued_events: type: array items: $ref: "#/components/schemas/QueuedEvent" @@ -4737,19 +4737,19 @@ components: state: $ref: "#/components/schemas/DagState" - DatasetEventCollection: + AssetEventCollection: description: | - A collection of dataset events. + A collection of asset events. *New in version 2.4.0* type: object allOf: - type: object properties: - dataset_events: + asset_events: type: array items: - $ref: "#/components/schemas/DatasetEvent" + $ref: "#/components/schemas/AssetEvent" - $ref: "#/components/schemas/CollectionInfo" # Configuration @@ -5545,14 +5545,14 @@ components: required: true description: The import error ID. - DatasetURI: + AssetURI: in: path name: uri schema: type: string format: path required: true - description: The encoded Dataset URI + description: The encoded Asset URI PoolName: in: path @@ -5733,40 +5733,40 @@ components: *New in version 2.2.0* - FilterDatasetID: + FilterAssetID: in: query - name: dataset_id + name: asset_id schema: type: integer - description: The Dataset ID that updated the dataset. + description: The Asset ID that updated the asset. FilterSourceDAGID: in: query name: source_dag_id schema: type: string - description: The DAG ID that updated the dataset. + description: The DAG ID that updated the asset. FilterSourceTaskID: in: query name: source_task_id schema: type: string - description: The task ID that updated the dataset. + description: The task ID that updated the asset. FilterSourceRunID: in: query name: source_run_id schema: type: string - description: The DAG run ID that updated the dataset. + description: The DAG run ID that updated the asset. FilterSourceMapIndex: in: query name: source_map_index schema: type: integer - description: The map index that updated the dataset. + description: The map index that updated the asset. FilterMapIndex: in: query @@ -6024,12 +6024,12 @@ components: security: [] tags: + - name: Asset - name: Config - name: Connection - name: DAG - name: DAGRun - name: DagWarning - - name: Dataset - name: EventLog - name: ImportError - name: Monitoring diff --git a/airflow/api_connexion/schemas/asset_schema.py b/airflow/api_connexion/schemas/asset_schema.py index 791941f42016d..662f73a50d8b9 100644 --- a/airflow/api_connexion/schemas/asset_schema.py +++ b/airflow/api_connexion/schemas/asset_schema.py @@ -93,14 +93,14 @@ class Meta: class AssetCollection(NamedTuple): """List of Assets with meta.""" - datasets: list[AssetModel] + assets: list[AssetModel] total_entries: int class AssetCollectionSchema(Schema): """Asset Collection Schema.""" - datasets = fields.List(fields.Nested(AssetSchema)) + assets = fields.List(fields.Nested(AssetSchema)) total_entries = fields.Int() @@ -150,21 +150,21 @@ class Meta: class AssetEventCollection(NamedTuple): """List of Asset events with meta.""" - dataset_events: list[AssetEvent] + asset_events: list[AssetEvent] total_entries: int class AssetEventCollectionSchema(Schema): """Asset Event Collection Schema.""" - dataset_events = fields.List(fields.Nested(AssetEventSchema)) + asset_events = fields.List(fields.Nested(AssetEventSchema)) total_entries = fields.Int() class CreateAssetEventSchema(Schema): """Create Asset Event Schema.""" - dataset_uri = fields.String() + asset_uri = fields.String() extra = JsonObjectField() diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index a54e0e4ca57dd..37fa4c3293fab 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -7,7 +7,7 @@ info: Users should not rely on those but use the public ones instead. version: 0.1.0 paths: - /ui/next_run_datasets/{dag_id}: + /ui/next_run_assets/{dag_id}: get: tags: - Asset diff --git a/airflow/api_fastapi/views/ui/assets.py b/airflow/api_fastapi/views/ui/assets.py index 01cc9fd1cfbff..4a4ad1d0df9b4 100644 --- a/airflow/api_fastapi/views/ui/assets.py +++ b/airflow/api_fastapi/views/ui/assets.py @@ -30,7 +30,7 @@ assets_router = AirflowRouter(tags=["Asset"]) -@assets_router.get("/next_run_datasets/{dag_id}", include_in_schema=False) +@assets_router.get("/next_run_assets/{dag_id}", include_in_schema=False) async def next_run_assets( dag_id: str, request: Request, diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 24c960d2b7d5f..3859523fa8001 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -28,7 +28,7 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/ui/next_run_datasets/{dag_id}", + url: "/ui/next_run_assets/{dag_id}", path: { dag_id: data.dagId, }, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index b38d5c00a69f3..da109af81e574 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -141,7 +141,7 @@ export type DeleteConnectionData = { export type DeleteConnectionResponse = void; export type $OpenApiTs = { - "/ui/next_run_datasets/{dag_id}": { + "/ui/next_run_assets/{dag_id}": { get: { req: NextRunAssetsData; res: { diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index a4a45a08bfef8..c2a9885b2c7ea 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -32,14 +32,14 @@ import useMarkTaskDryRun from "./useMarkTaskDryRun"; import useGraphData from "./useGraphData"; import useGridData from "./useGridData"; import useMappedInstances from "./useMappedInstances"; -import useDatasets from "./useDatasets"; -import useDatasetsSummary from "./useDatasetsSummary"; -import useDataset from "./useDataset"; -import useDatasetDependencies from "./useDatasetDependencies"; -import useDatasetEvents from "./useDatasetEvents"; +import useAssets from "./useAssets"; +import useAssetsSummary from "./useAssetsSummary"; +import useAsset from "./useAsset"; +import useAssetDependencies from "./useAssetDependencies"; +import useAssetEvents from "./useAssetEvents"; import useSetDagRunNote from "./useSetDagRunNote"; import useSetTaskInstanceNote from "./useSetTaskInstanceNote"; -import useUpstreamDatasetEvents from "./useUpstreamDatasetEvents"; +import useUpstreamAssetEvents from "./useUpstreamAssetEvents"; import useTaskInstance from "./useTaskInstance"; import useTaskFailedDependency from "./useTaskFailedDependency"; import useDag from "./useDag"; @@ -53,7 +53,7 @@ import useHistoricalMetricsData from "./useHistoricalMetricsData"; import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom"; import useEventLogs from "./useEventLogs"; import useCalendarData from "./useCalendarData"; -import useCreateDatasetEvent from "./useCreateDatasetEvent"; +import useCreateAssetEvent from "./useCreateAssetEvent"; import useRenderedK8s from "./useRenderedK8s"; import useTaskDetail from "./useTaskDetail"; import useTIHistory from "./useTIHistory"; @@ -85,11 +85,11 @@ export { useDagDetails, useDagRuns, useDags, - useDataset, - useDatasets, - useDatasetDependencies, - useDatasetEvents, - useDatasetsSummary, + useAsset, + useAssets, + useAssetDependencies, + useAssetEvents, + useAssetsSummary, useExtraLinks, useGraphData, useGridData, @@ -105,14 +105,14 @@ export { useSetDagRunNote, useSetTaskInstanceNote, useTaskInstance, - useUpstreamDatasetEvents, + useUpstreamAssetEvents, useHistoricalMetricsData, useTaskXcomEntry, useTaskXcomCollection, useTaskFailedDependency, useEventLogs, useCalendarData, - useCreateDatasetEvent, + useCreateAssetEvent, useRenderedK8s, useTaskDetail, useTIHistory, diff --git a/airflow/www/static/js/api/useDataset.ts b/airflow/www/static/js/api/useAsset.ts similarity index 86% rename from airflow/www/static/js/api/useDataset.ts rename to airflow/www/static/js/api/useAsset.ts index 4793464fac378..b490ca6e46565 100644 --- a/airflow/www/static/js/api/useDataset.ts +++ b/airflow/www/static/js/api/useAsset.ts @@ -27,12 +27,12 @@ interface Props { uri: string; } -export default function useDataset({ uri }: Props) { +export default function useAsset({ uri }: Props) { return useQuery(["dataset", uri], () => { - const datasetUrl = getMetaValue("dataset_api").replace( + const datasetUrl = getMetaValue("asset_api").replace( "__URI__", encodeURIComponent(uri) ); - return axios.get(datasetUrl); + return axios.get(datasetUrl); }); } diff --git a/airflow/www/static/js/api/useDatasetDependencies.ts b/airflow/www/static/js/api/useAssetDependencies.ts similarity index 94% rename from airflow/www/static/js/api/useDatasetDependencies.ts rename to airflow/www/static/js/api/useAssetDependencies.ts index d2ba627f64458..11e7219c53fc8 100644 --- a/airflow/www/static/js/api/useDatasetDependencies.ts +++ b/airflow/www/static/js/api/useAssetDependencies.ts @@ -82,15 +82,15 @@ const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => { return graph as DatasetGraph; }; -export default function useDatasetDependencies() { +export default function useAssetDependencies() { return useQuery("datasetDependencies", async () => { const datasetDepsUrl = getMetaValue("dataset_dependencies_url"); return axios.get(datasetDepsUrl); }); } -export const useDatasetGraphs = () => { - const { data: datasetDependencies } = useDatasetDependencies(); +export const useAssetGraphs = () => { + const { data: datasetDependencies } = useAssetDependencies(); return useQuery(["datasetGraphs", datasetDependencies], () => { if (datasetDependencies) { return formatDependencies(datasetDependencies); diff --git a/airflow/www/static/js/api/useDatasetEvents.ts b/airflow/www/static/js/api/useAssetEvents.ts similarity index 80% rename from airflow/www/static/js/api/useDatasetEvents.ts rename to airflow/www/static/js/api/useAssetEvents.ts index 30e4670a87d3e..068bb471ef64e 100644 --- a/airflow/www/static/js/api/useDatasetEvents.ts +++ b/airflow/www/static/js/api/useAssetEvents.ts @@ -23,16 +23,16 @@ import { useQuery, UseQueryOptions } from "react-query"; import { getMetaValue } from "src/utils"; import URLSearchParamsWrapper from "src/utils/URLSearchParamWrapper"; import type { - DatasetEventCollection, - GetDatasetEventsVariables, + AssetEventCollection, + GetAssetEventsVariables, } from "src/types/api-generated"; -interface Props extends GetDatasetEventsVariables { - options?: UseQueryOptions; +interface Props extends GetAssetEventsVariables { + options?: UseQueryOptions; } -const useDatasetEvents = ({ - datasetId, +const useAssetEvents = ({ + assetId, sourceDagId, sourceRunId, sourceTaskId, @@ -42,10 +42,10 @@ const useDatasetEvents = ({ orderBy, options, }: Props) => { - const query = useQuery( + const query = useQuery( [ "datasets-events", - datasetId, + assetId, sourceDagId, sourceRunId, sourceTaskId, @@ -55,14 +55,14 @@ const useDatasetEvents = ({ orderBy, ], () => { - const datasetsUrl = getMetaValue("dataset_events_api"); + const datasetsUrl = getMetaValue("asset_events_api"); const params = new URLSearchParamsWrapper(); if (limit) params.set("limit", limit.toString()); if (offset) params.set("offset", offset.toString()); if (orderBy) params.set("order_by", orderBy); - if (datasetId) params.set("dataset_id", datasetId.toString()); + if (assetId) params.set("asset_id", assetId.toString()); if (sourceDagId) params.set("source_dag_id", sourceDagId); if (sourceRunId) params.set("source_run_id", sourceRunId); if (sourceTaskId) params.set("source_task_id", sourceTaskId); @@ -80,8 +80,8 @@ const useDatasetEvents = ({ ); return { ...query, - data: query.data ?? { datasetEvents: [], totalEntries: 0 }, + data: query.data ?? { assetEvents: [], totalEntries: 0 }, }; }; -export default useDatasetEvents; +export default useAssetEvents; diff --git a/airflow/www/static/js/api/useDatasets.ts b/airflow/www/static/js/api/useAssets.ts similarity index 90% rename from airflow/www/static/js/api/useDatasets.ts rename to airflow/www/static/js/api/useAssets.ts index db46415062c1a..3654c583c12ef 100644 --- a/airflow/www/static/js/api/useDatasets.ts +++ b/airflow/www/static/js/api/useAssets.ts @@ -28,7 +28,7 @@ interface Props { enabled?: boolean; } -export default function useDatasets({ dagIds, enabled = true }: Props) { +export default function useAssets({ dagIds, enabled = true }: Props) { return useQuery( ["datasets", dagIds], () => { @@ -36,7 +36,7 @@ export default function useDatasets({ dagIds, enabled = true }: Props) { const dagIdsParam = dagIds && dagIds.length ? { dag_ids: dagIds.join(",") } : {}; - return axios.get(datasetsUrl, { + return axios.get(datasetsUrl, { params: { ...dagIdsParam, }, diff --git a/airflow/www/static/js/api/useDatasetsSummary.ts b/airflow/www/static/js/api/useAssetsSummary.ts similarity index 98% rename from airflow/www/static/js/api/useDatasetsSummary.ts rename to airflow/www/static/js/api/useAssetsSummary.ts index 6f902946f6296..66b56ca9f6925 100644 --- a/airflow/www/static/js/api/useDatasetsSummary.ts +++ b/airflow/www/static/js/api/useAssetsSummary.ts @@ -42,7 +42,7 @@ interface Props { updatedAfter?: DateOption; } -export default function useDatasetsSummary({ +export default function useAssetsSummary({ limit, offset, order, diff --git a/airflow/www/static/js/api/useCreateDatasetEvent.ts b/airflow/www/static/js/api/useCreateAssetEvent.ts similarity index 77% rename from airflow/www/static/js/api/useCreateDatasetEvent.ts rename to airflow/www/static/js/api/useCreateAssetEvent.ts index f14b35ee375fe..7d2322c33ce9d 100644 --- a/airflow/www/static/js/api/useCreateDatasetEvent.ts +++ b/airflow/www/static/js/api/useCreateAssetEvent.ts @@ -29,22 +29,19 @@ interface Props { uri?: string; } -const createDatasetUrl = getMetaValue("create_dataset_event_api"); +const createAssetUrl = getMetaValue("create_asset_event_api"); -export default function useCreateDatasetEvent({ datasetId, uri }: Props) { +export default function useCreateAssetEvent({ datasetId, uri }: Props) { const queryClient = useQueryClient(); const errorToast = useErrorToast(); return useMutation( - ["createDatasetEvent", uri], - (extra?: API.DatasetEvent["extra"]) => - axios.post( - createDatasetUrl, - { - dataset_uri: uri, - extra: extra || {}, - } - ), + ["createAssetEvent", uri], + (extra?: API.AssetEvent["extra"]) => + axios.post(createAssetUrl, { + asset_uri: uri, + extra: extra || {}, + }), { onSuccess: () => { queryClient.invalidateQueries(["datasets-events", datasetId]); diff --git a/airflow/www/static/js/api/useUpstreamDatasetEvents.ts b/airflow/www/static/js/api/useUpstreamAssetEvents.ts similarity index 67% rename from airflow/www/static/js/api/useUpstreamDatasetEvents.ts rename to airflow/www/static/js/api/useUpstreamAssetEvents.ts index 32d1c7aeff2d8..437205501d6c5 100644 --- a/airflow/www/static/js/api/useUpstreamDatasetEvents.ts +++ b/airflow/www/static/js/api/useUpstreamAssetEvents.ts @@ -22,30 +22,30 @@ import { useQuery, UseQueryOptions } from "react-query"; import { getMetaValue } from "src/utils"; import type { - DatasetEventCollection, - GetUpstreamDatasetEventsVariables, + AssetEventCollection, + GetUpstreamAssetEventsVariables, } from "src/types/api-generated"; -interface Props extends GetUpstreamDatasetEventsVariables { - options?: UseQueryOptions; +interface Props extends GetUpstreamAssetEventsVariables { + options?: UseQueryOptions; } -const useUpstreamDatasetEvents = ({ dagId, dagRunId, options }: Props) => { +const useUpstreamAssetEvents = ({ dagId, dagRunId, options }: Props) => { const upstreamEventsUrl = ( - getMetaValue("upstream_dataset_events_api") || - `api/v1/dags/${dagId}/dagRuns/_DAG_RUN_ID_/upstreamDatasetEvents` + getMetaValue("upstream_asset_events_api") || + `api/v1/dags/${dagId}/dagRuns/_DAG_RUN_ID_/upstreamAssetEvents` ).replace("_DAG_RUN_ID_", encodeURIComponent(dagRunId)); - const query = useQuery( - ["upstreamDatasetEvents", dagRunId], + const query = useQuery( + ["upstreamAssetEvents", dagRunId], () => axios.get(upstreamEventsUrl), options ); return { ...query, - data: query.data ?? { datasetEvents: [], totalEntries: 0 }, + data: query.data ?? { assetEvents: [], totalEntries: 0 }, }; }; -export default useUpstreamDatasetEvents; +export default useUpstreamAssetEvents; diff --git a/airflow/www/static/js/components/DatasetEventCard.tsx b/airflow/www/static/js/components/DatasetEventCard.tsx index 2367c8efa9b4a..9dd1ee91e3731 100644 --- a/airflow/www/static/js/components/DatasetEventCard.tsx +++ b/airflow/www/static/js/components/DatasetEventCard.tsx @@ -21,7 +21,7 @@ import React from "react"; import { isEmpty } from "lodash"; import { TbApi } from "react-icons/tb"; -import type { DatasetEvent } from "src/types/api-generated"; +import type { AssetEvent } from "src/types/api-generated"; import { Box, Flex, @@ -43,7 +43,7 @@ import SourceTaskInstance from "./SourceTaskInstance"; import TriggeredDagRuns from "./TriggeredDagRuns"; type CardProps = { - datasetEvent: DatasetEvent; + assetEvent: AssetEvent; showSource?: boolean; showTriggeredDagRuns?: boolean; }; @@ -51,7 +51,7 @@ type CardProps = { const datasetsUrl = getMetaValue("datasets_url"); const DatasetEventCard = ({ - datasetEvent, + assetEvent, showSource = true, showTriggeredDagRuns = true, }: CardProps) => { @@ -60,14 +60,16 @@ const DatasetEventCard = ({ const selectedUri = decodeURIComponent(searchParams.get("uri") || ""); const containerRef = useContainerRef(); - const { from_rest_api: fromRestApi, ...extra } = - datasetEvent?.extra as Record; + const { from_rest_api: fromRestApi, ...extra } = assetEvent?.extra as Record< + string, + string + >; return ( - @@ -111,17 +112,17 @@ const DatasetEventCard = ({ )} - {!!datasetEvent.sourceTaskId && ( - + {!!assetEvent.sourceTaskId && ( + )} )} - {showTriggeredDagRuns && !!datasetEvent?.createdDagruns?.length && ( + {showTriggeredDagRuns && !!assetEvent?.createdDagruns?.length && ( <> Triggered Dag Runs: - + )} diff --git a/airflow/www/static/js/components/SourceTaskInstance.tsx b/airflow/www/static/js/components/SourceTaskInstance.tsx index 4c63198c5f40c..4343d3ce82443 100644 --- a/airflow/www/static/js/components/SourceTaskInstance.tsx +++ b/airflow/www/static/js/components/SourceTaskInstance.tsx @@ -22,7 +22,7 @@ import { Box, Link, Tooltip, Flex } from "@chakra-ui/react"; import { FiLink } from "react-icons/fi"; import { useTaskInstance } from "src/api"; -import type { DatasetEvent } from "src/types/api-generated"; +import type { AssetEvent } from "src/types/api-generated"; import { useContainerRef } from "src/context/containerRef"; import { SimpleStatus } from "src/dag/StatusBox"; import InstanceTooltip from "src/components/InstanceTooltip"; @@ -30,20 +30,16 @@ import type { TaskInstance } from "src/types"; import { getMetaValue } from "src/utils"; type SourceTIProps = { - datasetEvent: DatasetEvent; + assetEvent: AssetEvent; showLink?: boolean; }; const gridUrl = getMetaValue("grid_url"); const dagId = getMetaValue("dag_id") || "__DAG_ID__"; -const SourceTaskInstance = ({ - datasetEvent, - showLink = true, -}: SourceTIProps) => { +const SourceTaskInstance = ({ assetEvent, showLink = true }: SourceTIProps) => { const containerRef = useContainerRef(); - const { sourceDagId, sourceRunId, sourceTaskId, sourceMapIndex } = - datasetEvent; + const { sourceDagId, sourceRunId, sourceTaskId, sourceMapIndex } = assetEvent; const { data: taskInstance } = useTaskInstance({ dagId: sourceDagId || "", diff --git a/airflow/www/static/js/dag/details/dagRun/DatasetTriggerEvents.tsx b/airflow/www/static/js/dag/details/dagRun/DatasetTriggerEvents.tsx index 5fa585830b437..6deedb073e8d9 100644 --- a/airflow/www/static/js/dag/details/dagRun/DatasetTriggerEvents.tsx +++ b/airflow/www/static/js/dag/details/dagRun/DatasetTriggerEvents.tsx @@ -19,10 +19,10 @@ import React, { useMemo } from "react"; import { Box, Text } from "@chakra-ui/react"; -import { useUpstreamDatasetEvents } from "src/api"; +import { useUpstreamAssetEvents } from "src/api"; import type { DagRun as DagRunType } from "src/types"; import { CardDef, CardList } from "src/components/Table"; -import type { DatasetEvent } from "src/types/api-generated"; +import type { AssetEvent } from "src/types/api-generated"; import DatasetEventCard from "src/components/DatasetEventCard"; import { getMetaValue } from "src/utils"; @@ -32,17 +32,17 @@ interface Props { const dagId = getMetaValue("dag_id"); -const cardDef: CardDef = { +const cardDef: CardDef = { card: ({ row }) => ( - + ), }; const DatasetTriggerEvents = ({ runId }: Props) => { const { - data: { datasetEvents = [] }, + data: { assetEvents = [] }, isLoading, - } = useUpstreamDatasetEvents({ dagRunId: runId, dagId }); + } = useUpstreamAssetEvents({ dagRunId: runId, dagId }); const columns = useMemo( () => [ @@ -66,7 +66,7 @@ const DatasetTriggerEvents = ({ runId }: Props) => { [] ); - const data = useMemo(() => datasetEvents, [datasetEvents]); + const data = useMemo(() => assetEvents, [assetEvents]); return ( diff --git a/airflow/www/static/js/dag/details/graph/DatasetNode.tsx b/airflow/www/static/js/dag/details/graph/DatasetNode.tsx index bfd288f072dc3..d80f399b032a3 100644 --- a/airflow/www/static/js/dag/details/graph/DatasetNode.tsx +++ b/airflow/www/static/js/dag/details/graph/DatasetNode.tsx @@ -47,11 +47,11 @@ import type { CustomNodeProps } from "./Node"; const datasetsUrl = getMetaValue("datasets_url"); const DatasetNode = ({ - data: { label, height, width, latestDagRunId, isZoomedOut, datasetEvent }, + data: { label, height, width, latestDagRunId, isZoomedOut, assetEvent }, }: NodeProps) => { const containerRef = useContainerRef(); - const { from_rest_api: fromRestApi } = (datasetEvent?.extra || {}) as Record< + const { from_rest_api: fromRestApi } = (assetEvent?.extra || {}) as Record< string, string >; @@ -61,8 +61,8 @@ const DatasetNode = ({ Dataset - {!!datasetEvent && ( + {!!assetEvent && ( {/* @ts-ignore */} - {moment(datasetEvent.timestamp).fromNow()} + {moment(assetEvent.timestamp).fromNow()} )} @@ -120,23 +120,23 @@ const DatasetNode = ({ {label} - {!!datasetEvent && ( + {!!assetEvent && ( -