diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 0f199f827b684..22b788febf11f 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -866,6 +866,13 @@ type: string example: ~ default: "0" + - name: session_lifetime_days + description: | + The UI cookie lifetime in days + version_added: ~ + type: string + example: ~ + default: "30" - name: email description: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index e28a24453aad1..5b05eed9b34ff 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -411,6 +411,9 @@ update_fab_perms = True # 0 means never get forcibly logged out force_log_out_after = 0 +# The UI cookie lifetime in days +session_lifetime_days = 30 + [email] email_backend = airflow.utils.email.send_email_smtp diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 0952a9ff484e2..921c4ab02a325 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -36,6 +36,7 @@ default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60), + tags=['example'] ) run_this_last = DummyOperator( diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index b9282c387d1f3..e67590b032f18 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -35,6 +35,7 @@ dag_id='example_branch_operator', default_args=args, schedule_interval="@daily", + tags=['example'] ) run_this_first = DummyOperator( diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py index 4246e473991dd..6237e495036e8 100644 --- a/airflow/example_dags/example_branch_python_dop_operator_3.py +++ b/airflow/example_dags/example_branch_python_dop_operator_3.py @@ -37,6 +37,7 @@ dag_id='example_branch_dop_operator_v3', schedule_interval='*/1 * * * *', default_args=args, + tags=['example'] ) diff --git a/airflow/example_dags/example_gcs_to_bq.py b/airflow/example_dags/example_gcs_to_bq.py index 027da3ef8e9e8..acde283c5577e 100644 --- a/airflow/example_dags/example_gcs_to_bq.py +++ b/airflow/example_dags/example_gcs_to_bq.py @@ -31,7 +31,7 @@ dag = models.DAG( dag_id='example_gcs_to_bq_operator', default_args=args, - schedule_interval=None) + schedule_interval=None, tags=['example']) create_test_dataset = bash_operator.BashOperator( task_id='create_airflow_test_dataset', diff --git a/airflow/example_dags/example_gcs_to_gcs.py b/airflow/example_dags/example_gcs_to_gcs.py index a8f5a57dbb566..6d20b839a1ce0 100644 --- a/airflow/example_dags/example_gcs_to_gcs.py +++ b/airflow/example_dags/example_gcs_to_gcs.py @@ -39,7 +39,7 @@ with models.DAG( - "example_gcs_to_gcs", default_args=default_args, schedule_interval=None + "example_gcs_to_gcs", default_args=default_args, schedule_interval=None, tags=['example'] ) as dag: sync_full_bucket = GoogleCloudStorageSynchronizeBuckets( task_id="sync-full-bucket", diff --git a/airflow/example_dags/example_gcs_to_sftp.py b/airflow/example_dags/example_gcs_to_sftp.py index 017d2f9562e33..e2129082eafc2 100644 --- a/airflow/example_dags/example_gcs_to_sftp.py +++ b/airflow/example_dags/example_gcs_to_sftp.py @@ -37,7 +37,7 @@ with models.DAG( - "example_gcs_to_sftp", default_args=default_args, schedule_interval=None + "example_gcs_to_sftp", default_args=default_args, schedule_interval=None, tags=['example'] ) as dag: # [START howto_operator_gcs_to_sftp_copy_single_file] copy_file_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator( diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py index bcfaa1a7506ef..f25b305fff7cf 100644 --- a/airflow/example_dags/example_http_operator.py +++ b/airflow/example_dags/example_http_operator.py @@ -38,7 +38,7 @@ 'retry_delay': timedelta(minutes=5), } -dag = DAG('example_http_operator', default_args=default_args) +dag = DAG('example_http_operator', default_args=default_args, tags=['example']) dag.doc_md = __doc__ diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py index 77d8f5c989419..f52955f82ada8 100644 --- a/airflow/example_dags/example_latest_only.py +++ b/airflow/example_dags/example_latest_only.py @@ -30,6 +30,7 @@ dag_id='latest_only', schedule_interval=dt.timedelta(hours=4), start_date=days_ago(2), + tags=['example'] ) latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index 6ca6b3adb4469..59b33d611c505 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -31,6 +31,7 @@ dag_id='latest_only_with_trigger', schedule_interval=dt.timedelta(hours=4), start_date=days_ago(2), + tags=['example'] ) latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index 811a4a0894c1c..5ec672b5ae47a 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -34,6 +34,7 @@ }, schedule_interval='*/1 * * * *', dagrun_timeout=timedelta(minutes=4), + tags=['example'] ) diff --git a/airflow/example_dags/example_pig_operator.py b/airflow/example_dags/example_pig_operator.py index 9f9a3ab3dd01f..6b565d1384ba2 100644 --- a/airflow/example_dags/example_pig_operator.py +++ b/airflow/example_dags/example_pig_operator.py @@ -32,6 +32,7 @@ dag_id='example_pig_operator', default_args=args, schedule_interval=None, + tags=['example'] ) run_this = PigOperator( diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index cbb0bdd46f434..fe48e4f3ba558 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -35,6 +35,7 @@ dag_id='example_python_operator', default_args=args, schedule_interval=None, + tags=['example'] ) diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index 784cdc791ce9b..f47f7167e3eb1 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -29,7 +29,7 @@ 'start_date': dates.days_ago(2), } -dag = DAG(dag_id='example_short_circuit_operator', default_args=args) +dag = DAG(dag_id='example_short_circuit_operator', default_args=args, tags=['example']) cond_true = ShortCircuitOperator( task_id='condition_is_True', diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index 0a21fa4b74b6f..1ce105839f369 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -58,6 +58,6 @@ def create_test_pipeline(suffix, trigger_rule, dag_): join >> final -dag = DAG(dag_id='example_skip_dag', default_args=args) +dag = DAG(dag_id='example_skip_dag', default_args=args, tags=['example']) create_test_pipeline('1', 'all_success', dag) create_test_pipeline('2', 'one_success', dag) diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py index 68f6c1ffd862a..e635e74a6aecc 100644 --- a/airflow/example_dags/example_subdag_operator.py +++ b/airflow/example_dags/example_subdag_operator.py @@ -36,6 +36,7 @@ dag_id=DAG_NAME, default_args=args, schedule_interval="@once", + tags=['example'] ) start = DummyOperator( diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index 57a61ab408634..88e278756de0d 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -31,6 +31,7 @@ dag_id="example_trigger_controller_dag", default_args={"owner": "airflow", "start_date": days_ago(2)}, schedule_interval="@once", + tags=['example'] ) trigger = TriggerDagRunOperator( diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 3f65f0824a1d7..ee0237c054a82 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -32,6 +32,7 @@ dag_id="example_trigger_target_dag", default_args={"start_date": days_ago(2), "owner": "airflow"}, schedule_interval=None, + tags=['example'] ) diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 2aab3e12acef8..56c279827769d 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -28,7 +28,7 @@ 'start_date': days_ago(2), } -dag = DAG('example_xcom', schedule_interval="@once", default_args=args) +dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example']) value_1 = [1, 2, 3] value_2 = {'a': 'b'} diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py index 9c02be6b3471c..1d995f1fca79a 100644 --- a/airflow/example_dags/test_utils.py +++ b/airflow/example_dags/test_utils.py @@ -21,7 +21,7 @@ from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago -dag = DAG(dag_id='test_utils', schedule_interval=None) +dag = DAG(dag_id='test_utils', schedule_interval=None, tags=['example']) task = BashOperator( task_id='sleeps_forever', diff --git a/airflow/migrations/versions/7939bcff74ba_add_dagtags_table.py b/airflow/migrations/versions/7939bcff74ba_add_dagtags_table.py new file mode 100644 index 0000000000000..3bd49033e6e30 --- /dev/null +++ b/airflow/migrations/versions/7939bcff74ba_add_dagtags_table.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add DagTags table + +Revision ID: 7939bcff74ba +Revises: fe461863935f +Create Date: 2020-01-07 19:39:01.247442 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '7939bcff74ba' +down_revision = 'fe461863935f' +branch_labels = None +depends_on = None + + +def upgrade(): + """Apply Add DagTags table""" + op.create_table( + 'dag_tag', + sa.Column('name', sa.String(length=100), nullable=False), + sa.Column('dag_id', sa.String(length=250), nullable=False), + sa.ForeignKeyConstraint(['dag_id'], ['dag.dag_id'], ), + sa.PrimaryKeyConstraint('name', 'dag_id') + ) + + +def downgrade(): + """Unapply Add DagTags table""" + op.drop_table('dag_tag') diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 079cbb406b6db..a7a9ea6a23d4d 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -20,7 +20,7 @@ from airflow.models.base import ID_LEN, Base # noqa: F401 from airflow.models.baseoperator import BaseOperator, BaseOperatorLink # noqa: F401 from airflow.models.connection import Connection # noqa: F401 -from airflow.models.dag import DAG, DagModel # noqa: F401 +from airflow.models.dag import DAG, DagModel, DagTag # noqa: F401 from airflow.models.dagbag import DagBag # noqa: F401 from airflow.models.dagpickle import DagPickle # noqa: F401 from airflow.models.dagrun import DagRun # noqa: F401 diff --git a/airflow/models/dag.py b/airflow/models/dag.py index a62a52b8103f9..6d64bad1ff97f 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -32,7 +32,8 @@ import pendulum from croniter import croniter from dateutil.relativedelta import relativedelta -from sqlalchemy import Boolean, Column, Index, Integer, String, Text, func, or_ +from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, or_ +from sqlalchemy.orm import backref, relationship from airflow import settings, utils from airflow.configuration import conf @@ -181,6 +182,8 @@ class DAG(BaseDag, LoggingMixin): `_ :type jinja_environment_kwargs: dict + :param tags: List of tags to help filtering DAGS in the UI. + :type tags: List[str] """ _comps = { 'dag_id', @@ -221,7 +224,8 @@ def __init__( params: Optional[Dict] = None, access_control: Optional[Dict] = None, is_paused_upon_creation: Optional[bool] = None, - jinja_environment_kwargs: Optional[Dict] = None + jinja_environment_kwargs: Optional[Dict] = None, + tags: Optional[List[str]] = None ): self.user_defined_macros = user_defined_macros self.user_defined_filters = user_defined_filters @@ -310,6 +314,7 @@ def __init__( self.is_paused_upon_creation = is_paused_upon_creation self.jinja_environment_kwargs = jinja_environment_kwargs + self.tags = tags def __repr__(self): return "".format(self=self) @@ -1366,6 +1371,7 @@ def sync_to_db(self, owner=None, sync_time=None, session=None): if self.is_paused_upon_creation is not None: orm_dag.is_paused = self.is_paused_upon_creation self.log.info("Creating ORM DAG for %s", self.dag_id) + session.add(orm_dag) if self.is_subdag: orm_dag.is_subdag = True orm_dag.fileloc = self.parent_dag.fileloc @@ -1379,7 +1385,8 @@ def sync_to_db(self, owner=None, sync_time=None, session=None): orm_dag.default_view = self._default_view orm_dag.description = self.description orm_dag.schedule_interval = self.schedule_interval - session.merge(orm_dag) + orm_dag.tags = self.get_dagtags(session=session) + session.commit() for subdag in self.subdags: @@ -1395,6 +1402,28 @@ def sync_to_db(self, owner=None, sync_time=None, session=None): session=session ) + @provide_session + def get_dagtags(self, session=None): + """ + Creating a list of DagTags, if one is missing from the DB, will insert. + + :return: The DagTag list. + :rtype: list + """ + tags = [] + if not self.tags: + return tags + + for name in set(self.tags): + tag = session.query( + DagTag).filter(DagTag.name == name).filter(DagTag.dag_id == self.dag_id).first() + if not tag: + tag = DagTag(name=name, dag_id=self.dag_id) + session.add(tag) + tags.append(tag) + session.commit() + return tags + @staticmethod @provide_session def deactivate_unknown_dags(active_dag_ids, session=None): @@ -1529,6 +1558,15 @@ def get_serialized_fields(cls): return cls.__serialized_fields +class DagTag(Base): + """ + A tag name per dag, to allow quick filtering in the DAG view. + """ + __tablename__ = "dag_tag" + name = Column(String(100), primary_key=True) + dag_id = Column(String(ID_LEN), ForeignKey('dag.dag_id'), primary_key=True) + + class DagModel(Base): __tablename__ = "dag" @@ -1571,6 +1609,8 @@ class DagModel(Base): default_view = Column(String(25)) # Schedule interval schedule_interval = Column(Interval) + # Tags for view filter + tags = relationship('DagTag', cascade='all,delete-orphan', backref=backref('dag')) __table_args__ = ( Index('idx_root_dag_id', root_dag_id, unique=False), diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index 0f15d27a09bb5..8f3c9f4ff1469 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -95,7 +95,8 @@ "doc_md": { "type" : "string"}, "_default_view": { "type" : "string"}, "_access_control": {"$ref": "#/definitions/dict" }, - "is_paused_upon_creation": { "type": "boolean" } + "is_paused_upon_creation": { "type": "boolean" }, + "tags": { "type": "array" } }, "required": [ "params", diff --git a/airflow/www/app.py b/airflow/www/app.py index 4bd872d6ae107..24db0a95bca29 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -20,12 +20,13 @@ import datetime import logging import socket +from datetime import timedelta from typing import Any, Optional from urllib.parse import urlparse import flask import flask_login -from flask import Flask +from flask import Flask, session as flask_session from flask_appbuilder import SQLA, AppBuilder from flask_caching import Cache from flask_wtf.csrf import CSRFProtect @@ -60,6 +61,9 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"): ) app.secret_key = conf.get('webserver', 'SECRET_KEY') + session_lifetime_days = conf.getint('webserver', 'SESSION_LIFETIME_DAYS', fallback=30) + app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=session_lifetime_days) + app.config.from_pyfile(settings.WEBSERVER_CONFIG, silent=True) app.config['APP_NAME'] = app_name app.config['TESTING'] = testing @@ -257,6 +261,10 @@ def apply_caching(response): def shutdown_session(exception=None): # pylint: disable=unused-variable settings.Session.remove() + @app.before_request + def make_session_permanent(): + flask_session.permanent = True + return app, appbuilder diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index f7b0570e9eadc..8820e25b485b3 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -32,10 +32,21 @@

DAGs

-
+
+
+
+ + + +
+
-
-
+
+
@@ -81,9 +92,17 @@

DAGs

- - {{ dag.dag_id }} - + + + {{ dag.dag_id }} + + + +
+ {% for tag in dag.tags %} + {{ tag.name }} + {% endfor %} +
@@ -217,6 +236,11 @@

DAGs

const DAGS_INDEX = "{{ url_for('Airflow.index') }}"; const ENTER_KEY_CODE = 13; + $('#tags_filter').select2({ + placeholder: "Filter dags", + allowClear: true + }); + $('#dag_query').on('keypress', function (e) { // check for key press on ENTER (key code 13) to trigger the search if (e.which === ENTER_KEY_CODE) { diff --git a/airflow/www/utils.py b/airflow/www/utils.py index b359bc994035c..1d746568c0668 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -471,9 +471,12 @@ def clean_column_names(): def is_utcdatetime(self, col_name): from airflow.utils.sqlalchemy import UtcDateTime - obj = self.list_columns[col_name].type - return isinstance(obj, UtcDateTime) or \ - isinstance(obj, sqla.types.TypeDecorator) and \ - isinstance(obj.impl, UtcDateTime) + + if col_name in self.list_columns: + obj = self.list_columns[col_name].type + return isinstance(obj, UtcDateTime) or \ + isinstance(obj, sqla.types.TypeDecorator) and \ + isinstance(obj.impl, UtcDateTime) + return False filter_converter_class = UtcAwareFilterConverter diff --git a/airflow/www/views.py b/airflow/www/views.py index c64d84491b447..4be16d0f83ba9 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -34,7 +34,8 @@ import markdown import sqlalchemy as sqla from flask import ( - Markup, Response, escape, flash, jsonify, make_response, redirect, render_template, request, url_for, + Markup, Response, escape, flash, jsonify, make_response, redirect, render_template, request, + session as flask_session, url_for, ) from flask_appbuilder import BaseView, ModelView, expose, has_access from flask_appbuilder.actions import action @@ -43,6 +44,7 @@ from pygments import highlight, lexers from pygments.formatters import HtmlFormatter from sqlalchemy import and_, desc, or_, union_all +from sqlalchemy.orm import joinedload from wtforms import SelectField, validators import airflow @@ -53,7 +55,7 @@ ) from airflow.configuration import AIRFLOW_CONFIG, conf from airflow.executors.executor_loader import ExecutorLoader -from airflow.models import Connection, DagModel, DagRun, Log, SlaMiss, TaskFail, XCom, errors +from airflow.models import Connection, DagModel, DagRun, DagTag, Log, SlaMiss, TaskFail, XCom, errors from airflow.settings import STORE_SERIALIZED_DAGS from airflow.ti_deps.dep_context import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS, DepContext from airflow.utils import timezone @@ -70,6 +72,7 @@ from airflow.www.widgets import AirflowModelListWidget PAGE_SIZE = conf.getint('webserver', 'page_size') +FILTER_TAGS_COOKIE = 'tags_filter' if os.environ.get('SKIP_DAGS_PARSING') != 'True': dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS) @@ -225,6 +228,17 @@ def get_int_arg(value, default=0): arg_current_page = request.args.get('page', '0') arg_search_query = request.args.get('search', None) + arg_tags_filter = request.args.getlist('tags', None) + + if request.args.get('reset_tags') is not None: + flask_session[FILTER_TAGS_COOKIE] = None + arg_tags_filter = None + else: + cookie_val = flask_session.get(FILTER_TAGS_COOKIE) + if arg_tags_filter: + flask_session[FILTER_TAGS_COOKIE] = ','.join(arg_tags_filter) + elif cookie_val: + arg_tags_filter = cookie_val.split(',') dags_per_page = PAGE_SIZE current_page = get_int_arg(arg_current_page, default=0) @@ -258,10 +272,21 @@ def get_int_arg(value, default=0): DagModel.owners.ilike('%' + arg_search_query + '%') ) + if arg_tags_filter: + dags_query = dags_query.filter(DagModel.tags.any(DagTag.name.in_(arg_tags_filter))) + if 'all_dags' not in filter_dag_ids: dags_query = dags_query.filter(DagModel.dag_id.in_(filter_dag_ids)) - dags = dags_query.order_by(DagModel.dag_id).offset(start).limit(dags_per_page).all() + dags = dags_query.order_by(DagModel.dag_id).options( + joinedload(DagModel.tags)).offset(start).limit(dags_per_page).all() + tags = [] + + dagtags = session.query(DagTag.name).distinct(DagTag.name).all() + tags = [ + {"name": name, "selected": bool(arg_tags_filter and name in arg_tags_filter)} + for name, in dagtags + ] import_errors = session.query(errors.ImportError).all() @@ -301,7 +326,8 @@ def get_int_arg(value, default=0): search=escape(arg_search_query) if arg_search_query else None, showPaused=not hide_paused), auto_complete_data=auto_complete_data, - num_runs=num_runs) + num_runs=num_runs, + tags=tags) @expose('/dag_stats') @has_access diff --git a/docs/img/dags.png b/docs/img/dags.png index 04c82131f945c..57eb4f9d4475e 100644 Binary files a/docs/img/dags.png and b/docs/img/dags.png differ diff --git a/docs/ui.rst b/docs/ui.rst index c351ea1de406f..5e50a026fefae 100644 --- a/docs/ui.rst +++ b/docs/ui.rst @@ -30,6 +30,15 @@ List of the DAGs in your environment, and a set of shortcuts to useful pages. You can see exactly how many tasks succeeded, failed, or are currently running at a glance. To hide completed tasks set show_recent_stats_for_completed_runs = False +In order to filter DAGs (e.g by team), you can add tags in each dag. +The filter is saved in a cookie and can be reset by the reset button. +For example: + +.. code:: python + + dag = DAG('dag', tags=['team1', 'sql']) + + ------------ .. image:: img/dags.png diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py index 54c027277ca10..2c7d682914b1a 100644 --- a/tests/test_utils/db.py +++ b/tests/test_utils/db.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from airflow.models import Connection, DagModel, DagRun, Pool, SlaMiss, TaskInstance, errors +from airflow.models import Connection, DagModel, DagRun, DagTag, Pool, SlaMiss, TaskInstance, errors from airflow.utils.db import add_default_pool_if_not_exists, create_default_connections from airflow.utils.session import create_session @@ -29,6 +29,7 @@ def clear_db_runs(): def clear_db_dags(): with create_session() as session: + session.query(DagTag).delete() session.query(DagModel).delete() diff --git a/tests/www/test_views.py b/tests/www/test_views.py index c86e436862465..4cdae6630a9ec 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -33,7 +33,7 @@ from urllib.parse import quote_plus import jinja2 -from flask import Markup, url_for +from flask import Markup, session as flask_session, url_for from parameterized import parameterized from werkzeug.test import Client from werkzeug.wrappers import BaseResponse @@ -446,6 +446,15 @@ def test_home(self): resp = self.client.get('home', follow_redirects=True) self.check_content_in_response('DAGs', resp) + def test_home_filter_tags(self): + from airflow.www.views import FILTER_TAGS_COOKIE + with self.client: + self.client.get('home?tags=example&tags=data', follow_redirects=True) + self.assertEqual('example,data', flask_session[FILTER_TAGS_COOKIE]) + + self.client.get('home?reset_tags', follow_redirects=True) + self.assertEqual(None, flask_session[FILTER_TAGS_COOKIE]) + def test_task(self): url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))) @@ -687,17 +696,22 @@ def test_delete_dag_button_for_dag_on_scheduler_only(self): # The delete-dag URL should be generated correctly for DAGs # that exist on the scheduler (DB) but not the webserver DagBag + dag_id = 'example_bash_operator' test_dag_id = "non_existent_dag" DM = models.DagModel - self.session.query(DM).filter(DM.dag_id == 'example_bash_operator').update({'dag_id': test_dag_id}) + dag_query = self.session.query(DM).filter(DM.dag_id == dag_id) + dag_query.first().tags = [] # To avoid "FOREIGN KEY constraint" error + self.session.commit() + + dag_query.update({'dag_id': test_dag_id}) self.session.commit() resp = self.client.get('/', follow_redirects=True) self.check_content_in_response('/delete?dag_id={}'.format(test_dag_id), resp) self.check_content_in_response("return confirmDeleteDag(this, '{}')".format(test_dag_id), resp) - self.session.query(DM).filter(DM.dag_id == test_dag_id).update({'dag_id': 'example_bash_operator'}) + self.session.query(DM).filter(DM.dag_id == test_dag_id).update({'dag_id': dag_id}) self.session.commit()