Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions airflow/www/static/js/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import useClearRun from './useClearRun';
import useQueueRun from './useQueueRun';
import useMarkFailedRun from './useMarkFailedRun';
import useMarkSuccessRun from './useMarkSuccessRun';
import useRunTask from './useRunTask';
import useClearTask from './useClearTask';
import useMarkFailedTask from './useMarkFailedTask';
import useMarkSuccessTask from './useMarkSuccessTask';
Expand Down Expand Up @@ -63,7 +62,6 @@ export {
useMarkSuccessRun,
useMarkSuccessTask,
useQueueRun,
useRunTask,
useSetDagRunNote,
useSetTaskInstanceNote,
useTaskInstance,
Expand Down
75 changes: 0 additions & 75 deletions airflow/www/static/js/api/useRunTask.ts

This file was deleted.

97 changes: 0 additions & 97 deletions airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import {
} from '@chakra-ui/react';

import type { CommonActionProps } from './types';
import RunAction from './Run';
import ClearAction from './Clear';
import MarkFailedAction from './MarkFailed';
import MarkSuccessAction from './MarkSuccess';
Expand All @@ -54,12 +53,6 @@ const TaskActions = ({
/>
) : (
<VStack justifyContent="center" divider={<StackDivider my={3} />}>
<RunAction
runId={runId}
taskId={taskId}
dagId={dagId}
mapIndexes={mapIndexes}
/>
<ClearAction
runId={runId}
taskId={taskId}
Expand Down
34 changes: 0 additions & 34 deletions airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
<meta name="confirm_url" content="{{ url_for('Airflow.confirm') }}">
<meta name="grid_data_url" content="{{ url_for('Airflow.grid_data') }}">
<meta name="next_run_datasets_url" content="{{ url_for('Airflow.next_run_datasets', dag_id=dag.dag_id) }}">
<meta name="run_url" content="{{ url_for('Airflow.run') }}">
<meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id) }}">
<meta name="datasets_url" content="{{ url_for('Airflow.datasets') }}">
<meta name="grid_url_no_root" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id, num_runs=num_runs_arg, base_date=base_date_arg) }}">
Expand Down Expand Up @@ -332,39 +331,6 @@ <h4 class="modal-title" id="taskInstanceModalLabel">
</div>
{% endif %}
<h4 id="task_actions">Task Actions</h4>
<form method="POST" data-action="{{ url_for('Airflow.run') }}" id="run_action">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
<input type="hidden" name="task_id">
<input type="hidden" name="dag_run_id">
<input type="hidden" name="map_index">
<input type="hidden" name="origin" value="{{ request.base_url }}">
<div class="row">
<span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
<label
class="btn btn-default"
title="Ignores all non-critical dependencies, including task state and task_deps">
<input type="checkbox" value="true" name="ignore_all_deps" autocomplete="off">
Ignore All Deps</label>
<label class="btn btn-default"
title="Ignore previous success/failure">
<input type="checkbox" value="true" name="ignore_ti_state" autocomplete="off">
Ignore Task State
</label>
<label class="btn btn-default"
title="Disregard the task-specific dependencies, e.g. status of upstream task instances and depends_on_past">
<input type="checkbox" value="true" name="ignore_task_deps" autocomplete="off">
Ignore Task Deps
</label>
</span>
<span class="col-xs-12 col-sm-3 task-instance-modal-column">
<button type="submit" id="btn_run" class="btn btn-primary btn-block" title="Runs a single task instance">
Run
</button>
</span>
</div>
<hr style="margin-bottom: 8px;">
</form>
<form method="POST" data-action="{{ url_for('Airflow.clear') }}" id="clear_action">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
Expand Down
70 changes: 1 addition & 69 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
from airflow.configuration import AIRFLOW_CONFIG, conf
from airflow.datasets import Dataset
from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job import BaseJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.jobs.triggerer_job import TriggererJob
Expand All @@ -106,7 +105,7 @@
from airflow.providers_manager import ProvidersManager
from airflow.security import permissions
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DataInterval, TimeRestriction
from airflow.utils import json as utils_json, timezone, yaml
Expand Down Expand Up @@ -1836,73 +1835,6 @@ def xcom(self, session=None):
title=title,
)

@expose("/run", methods=["POST"])
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE),
]
)
@action_logging
@provide_session
def run(self, session=None):
"""Runs Task Instance."""
dag_id = request.form.get("dag_id")
task_id = request.form.get("task_id")
dag_run_id = request.form.get("dag_run_id")
map_index = request.args.get("map_index", -1, type=int)
origin = get_safe_url(request.form.get("origin"))
dag = get_airflow_app().dag_bag.get_dag(dag_id)
if not dag:
return redirect_or_json(origin, "DAG not found", "error", 404)
task = dag.get_task(task_id)

ignore_all_deps = request.form.get("ignore_all_deps") == "true"
ignore_task_deps = request.form.get("ignore_task_deps") == "true"
ignore_ti_state = request.form.get("ignore_ti_state") == "true"

executor = ExecutorLoader.get_default_executor()

if not executor.supports_ad_hoc_ti_run:
msg = f"{executor.__class__.__name__} does not support ad hoc task runs"
return redirect_or_json(origin, msg, "error", 400)
dag_run = dag.get_dagrun(run_id=dag_run_id, session=session)
if not dag_run:
return redirect_or_json(origin, "DAG run not found", "error", 404)
ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session)
if not ti:
msg = "Could not queue task instance for execution, task instance is missing"
return redirect_or_json(origin, msg, "error", 400)

ti.refresh_from_task(task)

# Make sure the task instance can be run
dep_context = DepContext(
deps=RUNNING_DEPS,
ignore_all_deps=ignore_all_deps,
ignore_task_deps=ignore_task_deps,
ignore_ti_state=ignore_ti_state,
)
failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context))
if failed_deps:
failed_deps_str = ", ".join(f"{dep.dep_name}: {dep.reason}" for dep in failed_deps)
msg = f"Could not queue task instance for execution, dependencies not met: {failed_deps_str}"
return redirect_or_json(origin, msg, "error", 400)

executor.job_id = None
executor.start()
executor.queue_task_instance(
ti,
ignore_all_deps=ignore_all_deps,
ignore_task_deps=ignore_task_deps,
ignore_ti_state=ignore_ti_state,
)
executor.heartbeat()
ti.queued_dttm = timezone.utcnow()
session.merge(ti)
msg = f"Sent {ti} to the message queue, it should start any moment now."
return redirect_or_json(origin, msg)

@expose("/delete", methods=["POST"])
@auth.has_access(
[
Expand Down
13 changes: 0 additions & 13 deletions tests/www/views/test_views_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,19 +636,6 @@ def test_failure(dag_faker_client, url, unexpected_content):
check_content_not_in_response(unexpected_content, resp)


@pytest.mark.parametrize("client", ["dag_test_client", "all_dag_user_client"])
def test_run_success(request, client):
form = dict(
task_id="runme_0",
dag_id="example_bash_operator",
ignore_all_deps="false",
ignore_ti_state="true",
execution_date=DEFAULT_DATE,
)
resp = request.getfixturevalue(client).post("run", data=form)
assert resp.status_code == 302


def test_blocked_success(client_all_dags_dagruns):
resp = client_all_dags_dagruns.post("blocked", follow_redirects=True)
check_content_in_response("example_bash_operator", resp)
Expand Down
Loading