diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index b04da5259a99c..4f883b48251ce 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -52,6 +52,7 @@ import useHistoricalMetricsData from "./useHistoricalMetricsData"; import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom"; import useEventLogs from "./useEventLogs"; import useCalendarData from "./useCalendarData"; +import useTaskFails from "./useTaskFails"; axios.interceptors.request.use((config) => { config.paramsSerializer = { @@ -100,4 +101,5 @@ export { useTaskXcomCollection, useEventLogs, useCalendarData, + useTaskFails, }; diff --git a/airflow/www/static/js/api/useTaskFails.ts b/airflow/www/static/js/api/useTaskFails.ts new file mode 100644 index 0000000000000..f256db8f1fbde --- /dev/null +++ b/airflow/www/static/js/api/useTaskFails.ts @@ -0,0 +1,67 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { useQuery } from "react-query"; +import axios, { AxiosResponse } from "axios"; + +import { getMetaValue } from "src/utils"; +import { useAutoRefresh } from "src/context/autorefresh"; + +const DAG_ID_PARAM = "dag_id"; +const RUN_ID_PARAM = "run_id"; +const TASK_ID_PARAM = "task_id"; + +const dagId = getMetaValue(DAG_ID_PARAM); +const taskFailsUrl = getMetaValue("task_fails_url"); + +export interface TaskFail { + runId: string; + taskId: string; + mapIndex?: number; + startDate?: string; + endDate?: string; +} + +interface Props { + runId?: string; + taskId?: string; + enabled?: boolean; +} + +const useTaskFails = ({ runId, taskId, enabled = true }: Props) => { + const { isRefreshOn } = useAutoRefresh(); + + return useQuery( + ["taskFails", runId, taskId], + async () => { + const params = { + [DAG_ID_PARAM]: dagId, + [RUN_ID_PARAM]: runId, + [TASK_ID_PARAM]: taskId, + }; + return axios.get(taskFailsUrl, { params }); + }, + { + enabled, + refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000, + } + ); +}; + +export default useTaskFails; diff --git a/airflow/www/static/js/dag/details/gantt/Row.tsx b/airflow/www/static/js/dag/details/gantt/Row.tsx index 1526e1713f0ad..fbee17e13921a 100644 --- a/airflow/www/static/js/dag/details/gantt/Row.tsx +++ b/airflow/www/static/js/dag/details/gantt/Row.tsx @@ -19,13 +19,17 @@ import React from "react"; import { Box, Tooltip, Flex } from "@chakra-ui/react"; + import useSelection from "src/dag/useSelection"; import { getDuration } from "src/datetime_utils"; -import { SimpleStatus } from "src/dag/StatusBox"; +import { SimpleStatus, boxSize } from "src/dag/StatusBox"; import { useContainerRef } from "src/context/containerRef"; import { hoverDelay } from "src/utils"; import type { Task } from "src/types"; +import { useTaskFails } from "src/api"; + import GanttTooltip from "./GanttTooltip"; +import TaskFail from "./TaskFail"; interface Props { ganttWidth?: number; @@ -59,6 +63,12 @@ const Row = ({ : true); const isOpen = openGroupIds.includes(task.id || ""); + const { data: taskFails } = useTaskFails({ + taskId: task.id || undefined, + runId: runId || undefined, + enabled: !!(instance?.tryNumber && instance?.tryNumber > 1) && !!task.id, // Only try to look up task fails if it even has a try number > 1 + }); + // Calculate durations in ms const taskDuration = getDuration(instance?.startDate, instance?.endDate); const queuedDuration = hasValidQueuedDttm @@ -84,12 +94,14 @@ const Row = ({ return (
- {instance ? ( + {instance && ( } hasArrow @@ -99,9 +111,11 @@ const Row = ({ > { onSelect({ runId: instance.runId, @@ -129,9 +143,24 @@ const Row = ({ /> - ) : ( - )} + {/* Only show fails before the most recent task instance */} + {(taskFails || []) + .filter( + (tf) => + tf.startDate !== instance?.startDate && + // @ts-ignore + moment(tf.startDate).isAfter(ganttStartDate) + ) + .map((taskFail) => ( + + ))} {isOpen && !!task.children && diff --git a/airflow/www/static/js/dag/details/gantt/TaskFail.tsx b/airflow/www/static/js/dag/details/gantt/TaskFail.tsx new file mode 100644 index 0000000000000..1d217d3bfb6d0 --- /dev/null +++ b/airflow/www/static/js/dag/details/gantt/TaskFail.tsx @@ -0,0 +1,91 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; +import { Box, Tooltip, Text } from "@chakra-ui/react"; + +import { getDuration } from "src/datetime_utils"; +import { SimpleStatus } from "src/dag/StatusBox"; +import { useContainerRef } from "src/context/containerRef"; +import { hoverDelay } from "src/utils"; +import Time from "src/components/Time"; + +import type { TaskFail as TaskFailType } from "src/api/useTaskFails"; + +interface Props { + taskFail: TaskFailType; + runDuration: number; + ganttWidth: number; + ganttStartDate?: string | null; +} + +const TaskFail = ({ + taskFail, + runDuration, + ganttWidth, + ganttStartDate, +}: Props) => { + const containerRef = useContainerRef(); + + const duration = getDuration(taskFail?.startDate, taskFail?.endDate); + const percent = duration / runDuration; + const failWidth = ganttWidth * percent; + + const startOffset = getDuration(ganttStartDate, taskFail?.startDate); + const offsetLeft = (startOffset / runDuration) * ganttWidth; + + return ( + + Task Fail + {taskFail?.startDate && ( + + Start: + )} + {taskFail?.endDate && ( + + End: + )} + + Can only show previous Task Fails, other tries are not yet saved. + + + } + hasArrow + portalProps={{ containerRef }} + placement="top" + openDelay={hoverDelay} + top="4px" + > + + + + + ); +}; + +export default TaskFail; diff --git a/airflow/www/static/js/dag/details/task/TaskDuration.tsx b/airflow/www/static/js/dag/details/task/TaskDuration.tsx index 4b7eed9f3d365..27cd8b33ec94e 100644 --- a/airflow/www/static/js/dag/details/task/TaskDuration.tsx +++ b/airflow/www/static/js/dag/details/task/TaskDuration.tsx @@ -22,7 +22,7 @@ import React from "react"; import useSelection from "src/dag/useSelection"; -import { useGridData } from "src/api"; +import { useGridData, useTaskFails } from "src/api"; import { startCase } from "lodash"; import { getDuration, formatDateTime, defaultFormat } from "src/datetime_utils"; import ReactECharts, { ReactEChartsProps } from "src/components/ReactECharts"; @@ -45,6 +45,10 @@ const TaskDuration = () => { onSelect, } = useSelection(); + const { data: taskFails } = useTaskFails({ taskId: taskId || undefined }); + + console.log(taskFails); + const { data: { dagRuns, groups, ordering }, } = useGridData(); diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index c69251e71c9b3..6f5a187f61f68 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -57,6 +57,7 @@ + diff --git a/airflow/www/views.py b/airflow/www/views.py index 5cffd28aaa3b3..743947cdd397b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3505,6 +3505,31 @@ def graph_data(self): {"Content-Type": "application/json; charset=utf-8"}, ) + @expose("/object/task_fails") + @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE) + @provide_session + def task_fails(self, session): + """Return task fails.""" + dag_id = request.args.get("dag_id") + task_id = request.args.get("task_id") + run_id = request.args.get("run_id") + + query = select( + TaskFail.task_id, TaskFail.run_id, TaskFail.map_index, TaskFail.start_date, TaskFail.end_date + ).where(TaskFail.dag_id == dag_id) + + if run_id: + query = query.where(TaskFail.run_id == run_id) + if task_id: + query = query.where(TaskFail.task_id == task_id) + + task_fails = [dict(tf) for tf in session.execute(query).all()] + + return ( + htmlsafe_json_dumps(task_fails, separators=(",", ":"), dumps=flask.json.dumps), + {"Content-Type": "application/json; charset=utf-8"}, + ) + @expose("/object/task_instances") @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE) def task_instances(self):