From 90141394158070b29810cd31a244fd6182483e6e Mon Sep 17 00:00:00 2001 From: Huy Mac Date: Tue, 28 Mar 2023 10:01:17 +0700 Subject: [PATCH 1/3] Reset permission if `access_control` is empty --- airflow/models/dagbag.py | 27 +++------------- airflow/www/security.py | 16 ++++++++++ tests/models/test_dagbag.py | 3 +- tests/www/views/test_views_home.py | 50 ++++++++++++++++++++++++++++-- 4 files changed, 69 insertions(+), 27 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 3d00ac973fd64..db11440d945da 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -689,29 +689,12 @@ def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW @classmethod @provide_session def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION): - """Sync DAG specific permissions, if necessary""" - from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag - from airflow.www.fab_security.sqla.models import Action, Permission, Resource + """Sync DAG specific permissions""" root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id - def needs_perms(dag_id: str) -> bool: - dag_resource_name = resource_name_for_dag(dag_id) - for permission_name in DAG_ACTIONS: - if not ( - session.query(Permission) - .join(Action) - .join(Resource) - .filter(Action.name == permission_name) - .filter(Resource.name == dag_resource_name) - .one_or_none() - ): - return True - return False - - if dag.access_control or needs_perms(root_dag_id): - cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id) - from airflow.www.security import ApplessAirflowSecurityManager + cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id) + from airflow.www.security import ApplessAirflowSecurityManager - security_manager = ApplessAirflowSecurityManager(session=session) - security_manager.sync_perm_for_dag(root_dag_id, dag.access_control) + security_manager = ApplessAirflowSecurityManager(session=session) + security_manager.sync_perm_for_dag(root_dag_id, dag.access_control) diff --git a/airflow/www/security.py b/airflow/www/security.py index 201c9ada0ad45..f3b0b742a10e5 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -655,8 +655,24 @@ def sync_perm_for_dag( for dag_action_name in self.DAG_ACTIONS: self.create_permission(dag_action_name, dag_resource_name) + def _revoke_all_stale_permissions(resource: Resource): + existing_dag_perms = self.get_resource_permissions(resource) + for perm in existing_dag_perms: + non_admin_roles = [role for role in perm.role if role.name != "Admin"] + for role in non_admin_roles: + self.log.info( + "Revoking '%s' on DAG '%s' for role '%s'", + perm.action, + dag_resource_name, + role.name, + ) + self.remove_permission_from_role(role, perm) + if access_control: self._sync_dag_view_permissions(dag_resource_name, access_control) + else: + resource = self.get_resource(dag_resource_name) + _revoke_all_stale_permissions(resource) def _sync_dag_view_permissions(self, dag_id: str, access_control: dict[str, Collection[str]]) -> None: """ diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index fd4d5bdc820bc..22ff525bb8206 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -908,7 +908,6 @@ def _sync_to_db(): def test_sync_perm_for_dag(self, mock_security_manager): """ Test that dagbag._sync_perm_for_dag will call ApplessAirflowSecurityManager.sync_perm_for_dag - when DAG specific perm views don't exist already or the DAG has access_control set. """ db_clean_up() with create_session() as session: @@ -932,7 +931,7 @@ def _sync_perms(): # perms now exist _sync_perms() - mock_sync_perm_for_dag.assert_not_called() + mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator", None) # Always sync if we have access_control dag.access_control = {"Public": {"can_read"}} diff --git a/tests/www/views/test_views_home.py b/tests/www/views/test_views_home.py index 6619bcdf71515..63303c8d5adfb 100644 --- a/tests/www/views/test_views_home.py +++ b/tests/www/views/test_views_home.py @@ -155,6 +155,40 @@ def working_dags(tmpdir): _process_file(filename, session) +@pytest.fixture() +def working_dags_with_read_perm(tmpdir): + dag_contents_template = "from airflow import DAG\ndag = DAG('{}', tags=['{}'])" + dag_contents_template_with_read_perm = "from airflow import DAG\ndag = DAG('{}', tags=['{}'], " \ + "access_control={{'role_single_dag':{{'can_read'}}}}) " + with create_session() as session: + for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)): + filename = os.path.join(tmpdir, f"{dag_id}.py") + if dag_id == 'filter_test_1': + with open(filename, "w") as f: + f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag)) + else: + with open(filename, "w") as f: + f.writelines(dag_contents_template.format(dag_id, tag)) + _process_file(filename, session) + + +@pytest.fixture() +def working_dags_with_edit_perm(tmpdir): + dag_contents_template = "from airflow import DAG\ndag = DAG('{}', tags=['{}'])" + dag_contents_template_with_read_perm = "from airflow import DAG\ndag = DAG('{}', tags=['{}'], " \ + "access_control={{'role_single_dag':{{'can_edit'}}}}) " + with create_session() as session: + for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)): + filename = os.path.join(tmpdir, f"{dag_id}.py") + if dag_id == 'filter_test_1': + with open(filename, "w") as f: + f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag)) + else: + with open(filename, "w") as f: + f.writelines(dag_contents_template.format(dag_id, tag)) + _process_file(filename, session) + + @pytest.fixture() def broken_dags(tmpdir, working_dags): with create_session() as session: @@ -165,6 +199,16 @@ def broken_dags(tmpdir, working_dags): _process_file(filename, session) +@pytest.fixture() +def broken_dags_with_read_perm(tmpdir, working_dags_with_read_perm): + with create_session() as session: + for dag_id in TEST_FILTER_DAG_IDS: + filename = os.path.join(tmpdir, f"{dag_id}.py") + with open(filename, "w") as f: + f.writelines("airflow DAG") + _process_file(filename, session) + + def test_home_filter_tags(working_dags, admin_client): with admin_client: admin_client.get("home?tags=example&tags=data", follow_redirects=True) @@ -183,7 +227,7 @@ def test_home_importerrors(broken_dags, user_client): @pytest.mark.parametrize("page", ["home", "home?status=active", "home?status=paused", "home?status=all"]) -def test_home_importerrors_filtered_singledag_user(broken_dags, client_single_dag, page): +def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm, client_single_dag, page): # Users that can only see certain DAGs get a filtered list of import errors resp = client_single_dag.get(page, follow_redirects=True) check_content_in_response("Import Errors", resp) @@ -201,7 +245,7 @@ def test_home_dag_list(working_dags, user_client): check_content_in_response(f"dag_id={dag_id}", resp) -def test_home_dag_list_filtered_singledag_user(working_dags, client_single_dag): +def test_home_dag_list_filtered_singledag_user(working_dags_with_read_perm, client_single_dag): # Users that can only see certain DAGs get a filtered list resp = client_single_dag.get("home", follow_redirects=True) # They can see the first DAG @@ -219,7 +263,7 @@ def test_home_dag_list_search(working_dags, user_client): check_content_not_in_response("dag_id=a_first_dag_id_asc", resp) -def test_home_dag_edit_permissions(capture_templates, working_dags, client_single_dag_edit): +def test_home_dag_edit_permissions(capture_templates, working_dags_with_edit_perm, client_single_dag_edit): with capture_templates() as templates: client_single_dag_edit.get("home", follow_redirects=True) From 74fa762a45a0805dc2cc4a5f74e0affc1c66dc12 Mon Sep 17 00:00:00 2001 From: Huy Mac Date: Tue, 28 Mar 2023 10:06:38 +0700 Subject: [PATCH 2/3] Check `resource` before call `_revoke_all_stale_permissions` --- airflow/www/security.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/www/security.py b/airflow/www/security.py index f3b0b742a10e5..50c484082ca65 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -672,7 +672,8 @@ def _revoke_all_stale_permissions(resource: Resource): self._sync_dag_view_permissions(dag_resource_name, access_control) else: resource = self.get_resource(dag_resource_name) - _revoke_all_stale_permissions(resource) + if resource: + _revoke_all_stale_permissions(resource) def _sync_dag_view_permissions(self, dag_id: str, access_control: dict[str, Collection[str]]) -> None: """ From ee6af2648393c65d630cb115f81da8ae02293bef Mon Sep 17 00:00:00 2001 From: Huy Mac <57699454+huymq1710@users.noreply.github.com> Date: Thu, 6 Apr 2023 11:08:02 +0700 Subject: [PATCH 3/3] Fix static checks --- airflow/models/dagbag.py | 1 - tests/www/views/test_views_home.py | 16 ++++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index db11440d945da..305009e2eae6e 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -690,7 +690,6 @@ def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW @provide_session def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION): """Sync DAG specific permissions""" - root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id) diff --git a/tests/www/views/test_views_home.py b/tests/www/views/test_views_home.py index 63303c8d5adfb..045e34b00249a 100644 --- a/tests/www/views/test_views_home.py +++ b/tests/www/views/test_views_home.py @@ -158,12 +158,14 @@ def working_dags(tmpdir): @pytest.fixture() def working_dags_with_read_perm(tmpdir): dag_contents_template = "from airflow import DAG\ndag = DAG('{}', tags=['{}'])" - dag_contents_template_with_read_perm = "from airflow import DAG\ndag = DAG('{}', tags=['{}'], " \ - "access_control={{'role_single_dag':{{'can_read'}}}}) " + dag_contents_template_with_read_perm = ( + "from airflow import DAG\ndag = DAG('{}', tags=['{}'], " + "access_control={{'role_single_dag':{{'can_read'}}}}) " + ) with create_session() as session: for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)): filename = os.path.join(tmpdir, f"{dag_id}.py") - if dag_id == 'filter_test_1': + if dag_id == "filter_test_1": with open(filename, "w") as f: f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag)) else: @@ -175,12 +177,14 @@ def working_dags_with_read_perm(tmpdir): @pytest.fixture() def working_dags_with_edit_perm(tmpdir): dag_contents_template = "from airflow import DAG\ndag = DAG('{}', tags=['{}'])" - dag_contents_template_with_read_perm = "from airflow import DAG\ndag = DAG('{}', tags=['{}'], " \ - "access_control={{'role_single_dag':{{'can_edit'}}}}) " + dag_contents_template_with_read_perm = ( + "from airflow import DAG\ndag = DAG('{}', tags=['{}'], " + "access_control={{'role_single_dag':{{'can_edit'}}}}) " + ) with create_session() as session: for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)): filename = os.path.join(tmpdir, f"{dag_id}.py") - if dag_id == 'filter_test_1': + if dag_id == "filter_test_1": with open(filename, "w") as f: f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag)) else: