Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class NodeResponse(BaseModel):
label: str
tooltip: str | None = None
setup_teardown_type: Literal["setup", "teardown"] | None = None
type: Literal["join", "task", "asset_condition"]
type: Literal["join", "task", "asset-condition", "asset", "asset-alias", "dag", "sensor", "trigger"]
operator: str | None = None


Expand Down
14 changes: 13 additions & 1 deletion airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ paths:
type: boolean
default: false
title: Include Downstream
- name: external_dependencies
in: query
required: false
schema:
type: boolean
default: false
title: External Dependencies
responses:
'200':
description: Successful Response
Expand Down Expand Up @@ -8043,7 +8050,12 @@ components:
enum:
- join
- task
- asset_condition
- asset-condition
- asset
- asset-alias
- dag
- sensor
- trigger
title: Type
operator:
anyOf:
Expand Down
43 changes: 39 additions & 4 deletions airflow/api_fastapi/core_api/routes/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.dag_edges import dag_edges
from airflow.utils.task_group import task_group_to_dict

Expand All @@ -30,7 +31,6 @@

@structure_router.get(
"/structure_data",
include_in_schema=False,
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
)
def structure_data(
Expand All @@ -40,6 +40,7 @@ def structure_data(
root: str | None = None,
include_upstream: bool = False,
include_downstream: bool = False,
external_dependencies: bool = False,
) -> StructureDataResponse:
"""Get Structure Data."""
dag = request.app.state.dag_bag.get_dag(dag_id)
Expand All @@ -52,9 +53,7 @@ def structure_data(
task_ids_or_regex=root, include_upstream=include_upstream, include_downstream=include_downstream
)

nodes = [
task_group_to_dict(child) for child in sorted(dag.task_group.children.values(), key=lambda t: t.label)
]
nodes = [task_group_to_dict(child) for child in dag.task_group.topological_sort()]
edges = dag_edges(dag)

data = {
Expand All @@ -63,4 +62,40 @@ def structure_data(
"edges": edges,
}

if external_dependencies:
entry_node_ref = nodes[0] if nodes else None
exit_node_ref = nodes[-1] if nodes else None

start_edges: list[dict] = []
end_edges: list[dict] = []

for dependency_dag_id, dependencies in SerializedDagModel.get_dag_dependencies().items():
for dependency in dependencies:
if dependency_dag_id != dag_id and dependency.target != dag_id:
continue

# Add nodes
nodes.append(
{
"id": dependency.node_id,
"label": dependency.dependency_id,
"type": dependency.dependency_type,
}
)

# Add edges
# start dependency
if (
dependency.source == dependency.dependency_type or dependency.target == dag_id
) and entry_node_ref:
start_edges.append({"source_id": dependency.node_id, "target_id": entry_node_ref["id"]})

# end dependency
elif (
dependency.target == dependency.dependency_type or dependency.source == dag_id
) and exit_node_ref:
end_edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id})

data["edges"] = start_edges + edges + end_edges

return StructureDataResponse(**data)
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,19 +387,23 @@ export const useStructureServiceStructureDataKey =
export const UseStructureServiceStructureDataKeyFn = (
{
dagId,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string;
},
queryKey?: Array<unknown>,
) => [
useStructureServiceStructureDataKey,
...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, root }]),
...(queryKey ?? [
{ dagId, externalDependencies, includeDownstream, includeUpstream, root },
]),
];
export type BackfillServiceListBackfillsDefaultResponse = Awaited<
ReturnType<typeof BackfillService.listBackfills>
Expand Down
5 changes: 5 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -489,18 +489,21 @@ export const prefetchUseDashboardServiceHistoricalMetrics = (
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
* @param data.externalDependencies
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
export const prefetchUseStructureServiceStructureData = (
queryClient: QueryClient,
{
dagId,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string;
Expand All @@ -509,13 +512,15 @@ export const prefetchUseStructureServiceStructureData = (
queryClient.prefetchQuery({
queryKey: Common.UseStructureServiceStructureDataKeyFn({
dagId,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}),
queryFn: () =>
StructureService.structureData({
dagId,
externalDependencies,
includeDownstream,
includeUpstream,
root,
Expand Down
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ export const useDashboardServiceHistoricalMetrics = <
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
* @param data.externalDependencies
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
Expand All @@ -623,11 +624,13 @@ export const useStructureServiceStructureData = <
>(
{
dagId,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string;
Expand All @@ -637,12 +640,13 @@ export const useStructureServiceStructureData = <
) =>
useQuery<TData, TError>({
queryKey: Common.UseStructureServiceStructureDataKeyFn(
{ dagId, includeDownstream, includeUpstream, root },
{ dagId, externalDependencies, includeDownstream, includeUpstream, root },
queryKey,
),
queryFn: () =>
StructureService.structureData({
dagId,
externalDependencies,
includeDownstream,
includeUpstream,
root,
Expand Down
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ export const useDashboardServiceHistoricalMetricsSuspense = <
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
* @param data.externalDependencies
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
Expand All @@ -598,11 +599,13 @@ export const useStructureServiceStructureDataSuspense = <
>(
{
dagId,
externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string;
Expand All @@ -612,12 +615,13 @@ export const useStructureServiceStructureDataSuspense = <
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseStructureServiceStructureDataKeyFn(
{ dagId, includeDownstream, includeUpstream, root },
{ dagId, externalDependencies, includeDownstream, includeUpstream, root },
queryKey,
),
queryFn: () =>
StructureService.structureData({
dagId,
externalDependencies,
includeDownstream,
includeUpstream,
root,
Expand Down
11 changes: 10 additions & 1 deletion airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3186,7 +3186,16 @@ export const $NodeResponse = {
},
type: {
type: "string",
enum: ["join", "task", "asset_condition"],
enum: [
"join",
"task",
"asset-condition",
"asset",
"asset-alias",
"dag",
"sensor",
"trigger",
],
title: "Type",
},
operator: {
Expand Down
2 changes: 2 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ export class StructureService {
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
* @param data.externalDependencies
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
Expand All @@ -756,6 +757,7 @@ export class StructureService {
root: data.root,
include_upstream: data.includeUpstream,
include_downstream: data.includeDownstream,
external_dependencies: data.externalDependencies,
},
errors: {
404: "Not Found",
Expand Down
21 changes: 19 additions & 2 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -773,11 +773,27 @@ export type NodeResponse = {
label: string;
tooltip?: string | null;
setup_teardown_type?: "setup" | "teardown" | null;
type: "join" | "task" | "asset_condition";
type:
| "join"
| "task"
| "asset-condition"
| "asset"
| "asset-alias"
| "dag"
| "sensor"
| "trigger";
operator?: string | null;
};

export type type = "join" | "task" | "asset_condition";
export type type =
| "join"
| "task"
| "asset-condition"
| "asset"
| "asset-alias"
| "dag"
| "sensor"
| "trigger";

/**
* Request body for Clear Task Instances endpoint.
Expand Down Expand Up @@ -1443,6 +1459,7 @@ export type HistoricalMetricsResponse = HistoricalMetricDataResponse;

export type StructureDataData = {
dagId: string;
externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string | null;
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ const generateElkGraph = ({
if (node.type === "join") {
width = 10;
height = 10;
} else if (node.type === "asset_condition") {
} else if (node.type === "asset-condition") {
width = 30;
height = 30;
}
Expand Down
1 change: 1 addition & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3228,6 +3228,7 @@ def extra_links(self, *, session: Session = NEW_SESSION):
else:
return {"url": None, "error": f"No URL found for {link_name}"}, 404

@mark_fastapi_migration_done
@expose("/object/graph_data")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@gzipped
Expand Down
Loading