From f7979047b4b05febedfcd41ef7f9c825de5e98f6 Mon Sep 17 00:00:00 2001 From: Stefan Seelmann Date: Sun, 11 Nov 2018 10:46:47 +0100 Subject: [PATCH] [AIRFLOW-3072] Assign permission get_logs_with_metadata to viewer role --- airflow/www_rbac/security.py | 1 + tests/www_rbac/test_views.py | 107 +++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/airflow/www_rbac/security.py b/airflow/www_rbac/security.py index 2405aa8f803de..72691565e64be 100644 --- a/airflow/www_rbac/security.py +++ b/airflow/www_rbac/security.py @@ -75,6 +75,7 @@ 'can_task_stats', 'can_code', 'can_log', + 'can_get_logs_with_metadata', 'can_tries', 'can_graph', 'can_tree', diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py index d7f099d6be23b..f378783e90a42 100644 --- a/tests/www_rbac/test_views.py +++ b/tests/www_rbac/test_views.py @@ -786,5 +786,112 @@ def test_start_date_filter(self): pass +class TestLogViewPermission(TestBase): + """ + Test Airflow DAG acl + """ + default_date = timezone.datetime(2018, 6, 1) + run_id = "test_{}".format(models.DagRun.id_for_date(default_date)) + + @classmethod + def setUpClass(cls): + super(TestLogViewPermission, cls).setUpClass() + + def cleanup_dagruns(self): + DR = models.DagRun + dag_ids = ['example_bash_operator', + 'example_subdag_operator'] + (self.session + .query(DR) + .filter(DR.dag_id.in_(dag_ids)) + .filter(DR.run_id == self.run_id) + .delete(synchronize_session='fetch')) + self.session.commit() + + def prepare_dagruns(self): + dagbag = models.DagBag(include_examples=True) + self.bash_dag = dagbag.dags['example_bash_operator'] + self.sub_dag = dagbag.dags['example_subdag_operator'] + + self.bash_dagrun = self.bash_dag.create_dagrun( + run_id=self.run_id, + execution_date=self.default_date, + start_date=timezone.utcnow(), + state=State.RUNNING) + + self.sub_dagrun = self.sub_dag.create_dagrun( + run_id=self.run_id, + execution_date=self.default_date, + start_date=timezone.utcnow(), + state=State.RUNNING) + + def setUp(self): + super(TestLogViewPermission, self).setUp() + self.cleanup_dagruns() + self.prepare_dagruns() + self.logout() + + def login(self, username=None, password=None): + role_admin = self.appbuilder.sm.find_role('Admin') + tester = self.appbuilder.sm.find_user(username='test_admin') + if not tester: + self.appbuilder.sm.add_user( + username='test_admin', + first_name='test_admin', + last_name='test_admin', + email='test_admin@fab.org', + role=role_admin, + password='test_admin') + + role_user = self.appbuilder.sm.find_role('User') + test_user = self.appbuilder.sm.find_user(username='test_user') + if not test_user: + self.appbuilder.sm.add_user( + username='test_user', + first_name='test_user', + last_name='test_user', + email='test_user@fab.org', + role=role_user, + password='test_user') + + return self.client.post('/login/', data=dict( + username=username, + password=password + )) + + def logout(self): + return self.client.get('/logout/') + + def test_log_success_for_admin(self): + self.logout() + self.login(username='test_admin', + password='test_admin') + url = ('log?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('Log by attempts', resp) + url = ('get_logs_with_metadata?task_id=runme_0&dag_id=example_bash_operator&' + 'execution_date={}&try_number=1&metadata=null' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('"message":', resp) + self.check_content_in_response('"metadata":', resp) + + def test_log_success_for_user(self): + self.logout() + self.login(username='test_user', + password='test_user') + url = ('log?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('Log by attempts', resp) + url = ('get_logs_with_metadata?task_id=runme_0&dag_id=example_bash_operator&' + 'execution_date={}&try_number=1&metadata=null' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('"message":', resp) + self.check_content_in_response('"metadata":', resp) + + if __name__ == '__main__': unittest.main()