Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions airflow/www/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import gzip
import logging
from io import BytesIO as IO
from itertools import chain
from typing import Callable, TypeVar, cast

import pendulum
Expand Down Expand Up @@ -48,13 +49,19 @@ def wrapper(*args, **kwargs):
user = g.user.username

fields_skip_logging = {'csrf_token', '_csrf_token'}
log_fields = {
k: v
for k, v in chain(request.values.items(), request.view_args.items())
if k not in fields_skip_logging
}

log = Log(
event=f.__name__,
task_instance=None,
owner=user,
extra=str([(k, v) for k, v in request.values.items() if k not in fields_skip_logging]),
task_id=request.values.get('task_id'),
dag_id=request.values.get('dag_id'),
extra=str([(k, log_fields[k]) for k in log_fields]),
task_id=log_fields.get('task_id'),
dag_id=log_fields.get('dag_id'),
)

if 'execution_date' in request.values:
Expand Down
10 changes: 5 additions & 5 deletions airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ <h4 class="pull-right" style="user-select: none;-moz-user-select: auto;">
<div class="row">
<div class="col-md-10">
<ul class="nav nav-pills">
<li><a href="{{ url_for('Airflow.tree', dag_id=dag.dag_id, num_runs=num_runs_arg, root=root, base_date=base_date_arg) }}">
<span class="material-icons" aria-hidden="true">nature</span>
Tree
<li><a href="{{ url_for('Airflow.grid', dag_id=dag.dag_id, num_runs=num_runs_arg, root=root, base_date=base_date_arg) }}">
<span class="material-icons" aria-hidden="true">grid_on</span>
Grid
</a></li>
<li><a href="{{ url_for('Airflow.graph', dag_id=dag.dag_id, root=root, num_runs=num_runs_arg, base_date=base_date_arg, execution_date=execution_date_arg) }}">
<span class="material-icons" aria-hidden="true">account_tree</span>
Expand Down Expand Up @@ -182,7 +182,7 @@ <h4 class="modal-title" id="taskInstanceModalLabel">
</div>
<div class="modal-body">
<div id="div_btn_subdag">
<a id="btn_subdag" class="btn btn-primary" data-base-url="{{ url_for('Airflow.' + dag.default_view) }}">
<a id="btn_subdag" class="btn btn-primary" data-base-url="{{ url_for('Airflow.legacy_' + dag.default_view) }}">
Zoom into Sub DAG
</a>
<hr>
Expand Down Expand Up @@ -402,7 +402,7 @@ <h4 class="modal-title" id="dagModalLabel">
</form>
</span>
<span class="col-md-4 text-right">
<a id="btn_dag_graph_view" class="btn" data-base-url="{{ url_for('Airflow.graph') }}" role="button">
<a id="btn_dag_graph_view" class="btn" data-base-url="{{ url_for('Airflow.graph', dag_id=dag.dag_id) }}" role="button">
<span class="material-icons" aria-hidden="true">account_tree</span>
Graph
</a>
Expand Down
8 changes: 4 additions & 4 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<meta name="paused_url" content="{{ url_for('Airflow.paused') }}">
<meta name="status_filter" content="{{ status_filter }}">
<meta name="autocomplete_url" content="{{ url_for('AutocompleteView.autocomplete') }}">
<meta name="graph_url" content="{{ url_for('Airflow.graph') }}">
<meta name="graph_url" content="{{ url_for('Airflow.legacy_graph') }}">
<meta name="dag_run_url" content="{{ url_for('DagRunModelView.list') }}">
<meta name="task_instance_url" content="{{ url_for('TaskInstanceModelView.list') }}">
<meta name="blocked_url" content="{{ url_for('Airflow.blocked') }}">
Expand Down Expand Up @@ -293,9 +293,9 @@ <h2>{{ page_title }}</h2>
<span class="material-icons" aria-hidden="true">account_tree</span>
Graph
</a>
<a href="{{ url_for('Airflow.tree', dag_id=dag.dag_id, num_runs=num_runs) }}" class="dags-table-more__link">
<span class="material-icons" aria-hidden="true">nature</span>
Tree
<a href="{{ url_for('Airflow.grid', dag_id=dag.dag_id, num_runs=num_runs) }}" class="dags-table-more__link">
<span class="material-icons" aria-hidden="true">grid_on</span>
Grid
</a>
</div>
<span class="dags-table-more__toggle"><span class="material-icons">more_horiz</span></span>
Expand Down
4 changes: 3 additions & 1 deletion airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,10 @@ def dag_link(attr):
"""Generates a URL to the Graph view for a Dag."""
dag_id = attr.get('dag_id')
execution_date = attr.get('execution_date')
if not dag_id:
return Markup('None')
url = url_for('Airflow.graph', dag_id=dag_id, execution_date=execution_date)
return Markup('<a href="{}">{}</a>').format(url, dag_id) if dag_id else Markup('None')
return Markup('<a href="{}">{}</a>').format(url, dag_id)


def dag_run_link(attr):
Expand Down
169 changes: 149 additions & 20 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,15 +1054,24 @@ def last_dagruns(self, session=None):
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
]
)
def legacy_code(self):
"""Redirect from url param."""
return redirect(url_for('Airflow.code', **request.args))

@expose('/dags/<string:dag_id>/code')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
]
)
@provide_session
def code(self, session=None):
def code(self, dag_id, session=None):
"""Dag Code."""
all_errors = ""
dag_orm = None
dag_id = None

try:
dag_id = request.args.get('dag_id')
dag_orm = DagModel.get_dagmodel(dag_id, session=session)
code = DagCode.get_code_by_fileloc(dag_orm.fileloc)
html_code = Markup(highlight(code, lexers.PythonLexer(), HtmlFormatter(linenos=True)))
Expand Down Expand Up @@ -1095,10 +1104,20 @@ def code(self, session=None):
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
]
)
def legacy_dag_details(self):
"""Redirect from url param."""
return redirect(url_for('Airflow.dag_details', **request.args))

@expose('/dags/<string:dag_id>/details')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
]
)
@provide_session
def dag_details(self, session=None):
def dag_details(self, dag_id, session=None):
"""Get Dag details."""
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
dag_model = DagModel.get_dagmodel(dag_id)

Expand Down Expand Up @@ -2301,6 +2320,34 @@ def success(self):
State.SUCCESS,
)

@expose('/dags/<string:dag_id>')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
]
)
@gzipped
@action_logging
def dag(self, dag_id):
"""Redirect to default DAG view."""
return redirect(url_for('Airflow.grid', dag_id=dag_id, **request.args))

@expose('/legacy_tree')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
]
)
@gzipped
@action_logging
def legacy_tree(self):
"""Redirect to the replacement - grid view."""
return redirect(url_for('Airflow.grid', **request.args))

@expose('/tree')
@auth.has_access(
[
Expand All @@ -2311,10 +2358,23 @@ def success(self):
)
@gzipped
@action_logging
def tree(self):
"""Redirect to the replacement - grid view. Kept for backwards compatibility."""
return redirect(url_for('Airflow.grid', **request.args))

@expose('/dags/<string:dag_id>/grid')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
]
)
@gzipped
@action_logging
@provide_session
def tree(self, session=None):
"""Get Dag as tree."""
dag_id = request.args.get('dag_id')
def grid(self, dag_id, session=None):
"""Get Dag's grid view."""
dag = current_app.dag_bag.get_dag(dag_id)
dag_model = DagModel.get_dagmodel(dag_id)
if not dag:
Expand Down Expand Up @@ -2399,8 +2459,21 @@ def tree(self, session=None):
)
@gzipped
@action_logging
def legacy_calendar(self):
"""Redirect from url param."""
return redirect(url_for('Airflow.calendar', **request.args))

@expose('/dags/<string:dag_id>/calendar')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
]
)
@gzipped
@action_logging
@provide_session
def calendar(self, session=None):
def calendar(self, dag_id, session=None):
"""Get DAG runs as calendar"""

def _convert_to_date(session, column):
Expand All @@ -2410,7 +2483,6 @@ def _convert_to_date(session, column):
else:
return func.date(column)

dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
dag_model = DagModel.get_dagmodel(dag_id)
if not dag:
Expand Down Expand Up @@ -2476,10 +2548,23 @@ def _convert_to_date(session, column):
)
@gzipped
@action_logging
def legacy_graph(self):
"""Redirect from url param."""
return redirect(url_for('Airflow.graph', **request.args))

@expose('/dags/<string:dag_id>/graph')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
]
)
@gzipped
@action_logging
@provide_session
def graph(self, session=None):
def graph(self, dag_id, session=None):
"""Get DAG as Graph."""
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
dag_model = DagModel.get_dagmodel(dag_id)
if not dag:
Expand Down Expand Up @@ -2569,11 +2654,22 @@ class GraphForm(DateTimeWithNumRunsWithDagRunsForm):
]
)
@action_logging
def legacy_duration(self):
"""Redirect from url param."""
return redirect(url_for('Airflow.duration', **request.args))

@expose('/dags/<string:dag_id>/duration')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
]
)
@action_logging
@provide_session
def duration(self, session=None):
def duration(self, dag_id, session=None):
"""Get Dag as duration graph."""
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag_model = DagModel.get_dagmodel(dag_id)

dag: Optional[DAG] = current_app.dag_bag.get_dag(dag_id)
Expand Down Expand Up @@ -2711,11 +2807,22 @@ def duration(self, session=None):
]
)
@action_logging
def legacy_tries(self):
"""Redirect from url param."""
return redirect(url_for('Airflow.tries', **request.args))

@expose('/dags/<string:dag_id>/tries')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
]
)
@action_logging
@provide_session
def tries(self, session=None):
def tries(self, dag_id, session=None):
"""Shows all tries."""
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
dag_model = DagModel.get_dagmodel(dag_id)
base_date = request.args.get('base_date')
Expand Down Expand Up @@ -2788,11 +2895,22 @@ def tries(self, session=None):
]
)
@action_logging
def legacy_landing_times(self):
"""Redirect from url param."""
return redirect(url_for('Airflow.landing_times', **request.args))

@expose('/dags/<string:dag_id>/landing-times')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
]
)
@action_logging
@provide_session
def landing_times(self, session=None):
def landing_times(self, dag_id, session=None):
"""Shows landing times."""
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag: DAG = current_app.dag_bag.get_dag(dag_id)
dag_model = DagModel.get_dagmodel(dag_id)
base_date = request.args.get('base_date')
Expand Down Expand Up @@ -2893,10 +3011,21 @@ def paused(self):
]
)
@action_logging
def legacy_gantt(self):
"""Redirect from url param."""
return redirect(url_for('Airflow.gantt', **request.args))

@expose('/dags/<string:dag_id>/gantt')
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
]
)
@action_logging
@provide_session
def gantt(self, session=None):
def gantt(self, dag_id, session=None):
"""Show GANTT chart."""
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
dag_model = DagModel.get_dagmodel(dag_id)

Expand Down
20 changes: 19 additions & 1 deletion tests/www/views/test_views_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def _check_last_log(session, dag_id, event, execution_date):

def test_action_logging_get(session, admin_client):
url = (
f'graph?dag_id=example_bash_operator&'
f'dags/example_bash_operator/graph?'
f'execution_date={urllib.parse.quote_plus(str(EXAMPLE_DAG_DEFAULT_DATE))}'
)
resp = admin_client.get(url, follow_redirects=True)
Expand All @@ -131,6 +131,24 @@ def test_action_logging_get(session, admin_client):
)


def test_action_logging_get_legacy_view(session, admin_client):
url = (
f'graph?dag_id=example_bash_operator&'
f'execution_date={urllib.parse.quote_plus(str(EXAMPLE_DAG_DEFAULT_DATE))}'
)
resp = admin_client.get(url, follow_redirects=True)
check_content_in_response('runme_1', resp)

# In mysql backend, this commit() is needed to write down the logs
session.commit()
_check_last_log(
session,
dag_id="example_bash_operator",
event="legacy_graph",
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
)


def test_action_logging_post(session, admin_client):
form = dict(
task_id="runme_1",
Expand Down
Loading