diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index 5df46587e08de..d055f737d965c 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -24,7 +24,6 @@ import useClearRun from './useClearRun'; import useQueueRun from './useQueueRun'; import useMarkFailedRun from './useMarkFailedRun'; import useMarkSuccessRun from './useMarkSuccessRun'; -import useRunTask from './useRunTask'; import useClearTask from './useClearTask'; import useMarkFailedTask from './useMarkFailedTask'; import useMarkSuccessTask from './useMarkSuccessTask'; @@ -63,7 +62,6 @@ export { useMarkSuccessRun, useMarkSuccessTask, useQueueRun, - useRunTask, useSetDagRunNote, useSetTaskInstanceNote, useTaskInstance, diff --git a/airflow/www/static/js/api/useRunTask.ts b/airflow/www/static/js/api/useRunTask.ts deleted file mode 100644 index 98b3cc873cb10..0000000000000 --- a/airflow/www/static/js/api/useRunTask.ts +++ /dev/null @@ -1,75 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import axios from 'axios'; -import { useMutation, useQueryClient } from 'react-query'; -import URLSearchParamsWrapper from 'src/utils/URLSearchParamWrapper'; -import { getMetaValue } from '../utils'; -import { useAutoRefresh } from '../context/autorefresh'; -import useErrorToast from '../utils/useErrorToast'; - -const csrfToken = getMetaValue('csrf_token'); -const runUrl = getMetaValue('run_url'); - -export default function useRunTask(dagId: string, runId: string, taskId: string) { - const queryClient = useQueryClient(); - const errorToast = useErrorToast(); - const { startRefresh } = useAutoRefresh(); - return useMutation( - ['runTask', dagId, runId, taskId], - async ({ - ignoreAllDeps, - ignoreTaskState, - ignoreTaskDeps, - mapIndexes, - }:{ - ignoreAllDeps: boolean, - ignoreTaskState: boolean, - ignoreTaskDeps: boolean, - mapIndexes: number[], - }) => Promise.all( - (mapIndexes.length ? mapIndexes : [-1]).map((mi) => { - const params = new URLSearchParamsWrapper({ - csrf_token: csrfToken, - dag_id: dagId, - dag_run_id: runId, - task_id: taskId, - ignore_all_deps: ignoreAllDeps, - ignore_task_deps: ignoreTaskDeps, - ignore_ti_state: ignoreTaskState, - map_index: mi, - }).toString(); - - return axios.post(runUrl, params, { - headers: { - 'Content-Type': 'application/x-www-form-urlencoded', - }, - }); - }), - ), - { - onSuccess: () => { - queryClient.invalidateQueries('gridData'); - queryClient.invalidateQueries(['mappedInstances', dagId, runId, taskId]); - startRefresh(); - }, - onError: (error: Error) => errorToast({ error }), - }, - ); -} diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx deleted file mode 100644 index 62d08a77efcc4..0000000000000 --- a/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx +++ /dev/null @@ -1,97 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import React, { useState } from 'react'; -import { - Button, - Flex, - ButtonGroup, -} from '@chakra-ui/react'; - -import { useRunTask } from 'src/api'; -import { getMetaValue } from 'src/utils'; - -const canEdit = getMetaValue('can_edit') === 'True'; - -interface Props { - dagId: string; - runId: string; - taskId: string; - mapIndexes: number[]; -} - -const Run = ({ - dagId, - runId, - taskId, - mapIndexes, -}: Props) => { - const [ignoreAllDeps, setIgnoreAllDeps] = useState(false); - const onToggleAllDeps = () => setIgnoreAllDeps(!ignoreAllDeps); - - const [ignoreTaskState, setIgnoreTaskState] = useState(false); - const onToggleTaskState = () => setIgnoreTaskState(!ignoreTaskState); - - const [ignoreTaskDeps, setIgnoreTaskDeps] = useState(false); - const onToggleTaskDeps = () => setIgnoreTaskDeps(!ignoreTaskDeps); - - const { mutate: onRun, isLoading } = useRunTask(dagId, runId, taskId); - - const onClick = () => { - onRun({ - ignoreAllDeps, - ignoreTaskState, - ignoreTaskDeps, - mapIndexes, - }); - }; - - return ( - - - - Ignore All Deps - - - Ignore Task State - - - Ignore Task Deps - - - - Run - - - ); -}; - -export default Run; diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx index 72f3f238ab23a..55a1a2529d55a 100644 --- a/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx @@ -27,7 +27,6 @@ import { } from '@chakra-ui/react'; import type { CommonActionProps } from './types'; -import RunAction from './Run'; import ClearAction from './Clear'; import MarkFailedAction from './MarkFailed'; import MarkSuccessAction from './MarkSuccess'; @@ -54,12 +53,6 @@ const TaskActions = ({ /> ) : ( }> - - @@ -332,39 +331,6 @@ {% endif %} Task Actions - - - - - - - - - - - - Ignore All Deps - - - Ignore Task State - - - - Ignore Task Deps - - - - - Run - - - - - diff --git a/airflow/www/views.py b/airflow/www/views.py index 5335d7bd37cda..6a6a9fb02bb7b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -89,7 +89,6 @@ from airflow.configuration import AIRFLOW_CONFIG, conf from airflow.datasets import Dataset from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning -from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.base_job import BaseJob from airflow.jobs.scheduler_job import SchedulerJob from airflow.jobs.triggerer_job import TriggererJob @@ -106,7 +105,7 @@ from airflow.providers_manager import ProvidersManager from airflow.security import permissions from airflow.ti_deps.dep_context import DepContext -from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS +from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS from airflow.timetables._cron import CronMixin from airflow.timetables.base import DataInterval, TimeRestriction from airflow.utils import json as utils_json, timezone, yaml @@ -1836,73 +1835,6 @@ def xcom(self, session=None): title=title, ) - @expose("/run", methods=["POST"]) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE), - ] - ) - @action_logging - @provide_session - def run(self, session=None): - """Runs Task Instance.""" - dag_id = request.form.get("dag_id") - task_id = request.form.get("task_id") - dag_run_id = request.form.get("dag_run_id") - map_index = request.args.get("map_index", -1, type=int) - origin = get_safe_url(request.form.get("origin")) - dag = get_airflow_app().dag_bag.get_dag(dag_id) - if not dag: - return redirect_or_json(origin, "DAG not found", "error", 404) - task = dag.get_task(task_id) - - ignore_all_deps = request.form.get("ignore_all_deps") == "true" - ignore_task_deps = request.form.get("ignore_task_deps") == "true" - ignore_ti_state = request.form.get("ignore_ti_state") == "true" - - executor = ExecutorLoader.get_default_executor() - - if not executor.supports_ad_hoc_ti_run: - msg = f"{executor.__class__.__name__} does not support ad hoc task runs" - return redirect_or_json(origin, msg, "error", 400) - dag_run = dag.get_dagrun(run_id=dag_run_id, session=session) - if not dag_run: - return redirect_or_json(origin, "DAG run not found", "error", 404) - ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session) - if not ti: - msg = "Could not queue task instance for execution, task instance is missing" - return redirect_or_json(origin, msg, "error", 400) - - ti.refresh_from_task(task) - - # Make sure the task instance can be run - dep_context = DepContext( - deps=RUNNING_DEPS, - ignore_all_deps=ignore_all_deps, - ignore_task_deps=ignore_task_deps, - ignore_ti_state=ignore_ti_state, - ) - failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context)) - if failed_deps: - failed_deps_str = ", ".join(f"{dep.dep_name}: {dep.reason}" for dep in failed_deps) - msg = f"Could not queue task instance for execution, dependencies not met: {failed_deps_str}" - return redirect_or_json(origin, msg, "error", 400) - - executor.job_id = None - executor.start() - executor.queue_task_instance( - ti, - ignore_all_deps=ignore_all_deps, - ignore_task_deps=ignore_task_deps, - ignore_ti_state=ignore_ti_state, - ) - executor.heartbeat() - ti.queued_dttm = timezone.utcnow() - session.merge(ti) - msg = f"Sent {ti} to the message queue, it should start any moment now." - return redirect_or_json(origin, msg) - @expose("/delete", methods=["POST"]) @auth.has_access( [ diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py index 6b197f3bf82e1..d087c311e32e0 100644 --- a/tests/www/views/test_views_acl.py +++ b/tests/www/views/test_views_acl.py @@ -636,19 +636,6 @@ def test_failure(dag_faker_client, url, unexpected_content): check_content_not_in_response(unexpected_content, resp) -@pytest.mark.parametrize("client", ["dag_test_client", "all_dag_user_client"]) -def test_run_success(request, client): - form = dict( - task_id="runme_0", - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="true", - execution_date=DEFAULT_DATE, - ) - resp = request.getfixturevalue(client).post("run", data=form) - assert resp.status_code == 302 - - def test_blocked_success(client_all_dags_dagruns): resp = client_all_dags_dagruns.post("blocked", follow_redirects=True) check_content_in_response("example_bash_operator", resp) diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 5add70b261029..6f7d70f19d4a9 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -30,12 +30,10 @@ from airflow import settings from airflow.exceptions import AirflowException from airflow.executors.celery_executor import CeleryExecutor -from airflow.executors.local_executor import LocalExecutor from airflow.models import DAG, DagBag, DagModel, TaskFail, TaskInstance, TaskReschedule from airflow.models.dagcode import DagCode from airflow.operators.bash import BashOperator from airflow.security import permissions -from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES from airflow.utils import timezone from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import create_session @@ -485,24 +483,12 @@ def test_code_from_db_all_example_dags(admin_client): ), "example_bash_operator", ), - ( - "run", - dict( - task_id="runme_0", - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="true", - dag_run_id=DEFAULT_DAGRUN, - ), - "", - ), ], ids=[ "paused", "failed-flash-hint", "success-flash-hint", "clear", - "run", ], ) def test_views_post(admin_client, url, data, content): @@ -533,120 +519,6 @@ def heartbeat(self): return True -@pytest.mark.parametrize("state", RUNNABLE_STATES) -@unittest.mock.patch( - "airflow.executors.executor_loader.ExecutorLoader.get_default_executor", - return_value=_ForceHeartbeatCeleryExecutor(), -) -def test_run_with_runnable_states(_, admin_client, session, state): - task_id = "runme_0" - session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update( - {"state": state, "end_date": timezone.utcnow()} - ) - session.commit() - - form = dict( - task_id=task_id, - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="false", - dag_run_id=DEFAULT_DAGRUN, - origin="/home", - ) - resp = admin_client.post("run", data=form, follow_redirects=True) - check_content_in_response("", resp) - - msg = f"Task is in the '{state}' state." - assert not re.search(msg, resp.get_data(as_text=True)) - - -@unittest.mock.patch( - "airflow.executors.executor_loader.ExecutorLoader.get_default_executor", - return_value=_ForceHeartbeatCeleryExecutor(), -) -def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session, time_machine): - task_id = "runme_0" - session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update( - {"state": State.SCHEDULED, "queued_dttm": None} - ) - session.commit() - - assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).all() == [(None,)] - - form = dict( - task_id=task_id, - dag_id="example_bash_operator", - ignore_all_deps="true", - dag_run_id=DEFAULT_DAGRUN, - origin="/home", - ) - now = timezone.utcnow() - - time_machine.move_to(now, tick=False) - resp = admin_client.post("run", data=form, follow_redirects=True) - - assert resp.status_code == 200 - assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).scalar() == now - - -@pytest.mark.parametrize("state", QUEUEABLE_STATES) -@unittest.mock.patch( - "airflow.executors.executor_loader.ExecutorLoader.get_default_executor", - return_value=CeleryExecutor(), -) -def test_run_with_not_runnable_states(_, admin_client, session, state): - assert state not in RUNNABLE_STATES - - task_id = "runme_0" - session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update( - {"state": state, "end_date": timezone.utcnow()} - ) - session.commit() - - form = dict( - task_id=task_id, - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="false", - dag_run_id=DEFAULT_DAGRUN, - origin="/home", - ) - resp = admin_client.post("run", data=form, follow_redirects=True) - check_content_in_response("", resp) - - msg = f"Task is in the '{state}' state." - assert re.search(msg, resp.get_data(as_text=True)) - - -@pytest.mark.parametrize("state", QUEUEABLE_STATES) -@unittest.mock.patch( - "airflow.executors.executor_loader.ExecutorLoader.get_default_executor", - return_value=LocalExecutor(), -) -def test_run_with_the_unsupported_executor(_, admin_client, session, state): - assert state not in RUNNABLE_STATES - - task_id = "runme_0" - session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update( - {"state": state, "end_date": timezone.utcnow()} - ) - session.commit() - - form = dict( - task_id=task_id, - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="false", - dag_run_id=DEFAULT_DAGRUN, - origin="/home", - ) - resp = admin_client.post("run", data=form, follow_redirects=True) - check_content_in_response("", resp) - - msg = "LocalExecutor does not support ad hoc task runs" - assert re.search(msg, resp.get_data(as_text=True)) - - @pytest.fixture() def new_id_example_bash_operator(): dag_id = "example_bash_operator"