diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 8d0f22851ccaf..67138440662d9 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -682,29 +682,10 @@ def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW @classmethod @provide_session def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION): - """Sync DAG specific permissions, if necessary""" - from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag - from airflow.www.fab_security.sqla.models import Action, Permission, Resource - root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id - def needs_perms(dag_id: str) -> bool: - dag_resource_name = resource_name_for_dag(dag_id) - for permission_name in DAG_ACTIONS: - if not ( - session.query(Permission) - .join(Action) - .join(Resource) - .filter(Action.name == permission_name) - .filter(Resource.name == dag_resource_name) - .one_or_none() - ): - return True - return False - - if dag.access_control or needs_perms(root_dag_id): - cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id) - from airflow.www.security import ApplessAirflowSecurityManager - - security_manager = ApplessAirflowSecurityManager(session=session) - security_manager.sync_perm_for_dag(root_dag_id, dag.access_control) + cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id) + from airflow.www.security import ApplessAirflowSecurityManager + + security_manager = ApplessAirflowSecurityManager(session=session) + security_manager.sync_perm_for_dag(root_dag_id, dag.access_control) diff --git a/airflow/www/security.py b/airflow/www/security.py index 6b2561292a6d9..31df228ba7eab 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -636,8 +636,7 @@ def sync_perm_for_dag(self, dag_id, access_control=None): for dag_action_name in self.DAG_ACTIONS: self.create_permission(dag_action_name, dag_resource_name) - if access_control: - self._sync_dag_view_permissions(dag_resource_name, access_control) + self._sync_dag_view_permissions(dag_resource_name, access_control) def _sync_dag_view_permissions(self, dag_id, access_control): """ @@ -662,8 +661,17 @@ def _revoke_stale_permissions(resource: Resource): for perm in existing_dag_perms: non_admin_roles = [role for role in perm.role if role.name != "Admin"] for role in non_admin_roles: - target_perms_for_role = access_control.get(role.name, {}) - if perm.action.name not in target_perms_for_role: + if access_control: + target_perms_for_role = access_control.get(role.name, {}) + if perm.action.name not in target_perms_for_role: + self.log.info( + "Revoking '%s' on DAG '%s' for role '%s'", + perm.action, + dag_resource_name, + role.name, + ) + self.remove_permission_from_role(role, perm) + else: self.log.info( "Revoking '%s' on DAG '%s' for role '%s'", perm.action, @@ -676,27 +684,28 @@ def _revoke_stale_permissions(resource: Resource): if resource: _revoke_stale_permissions(resource) - for rolename, action_names in access_control.items(): - role = self.find_role(rolename) - if not role: - raise AirflowException( - f"The access_control mapping for DAG '{dag_id}' includes a role named " - f"'{rolename}', but that role does not exist" - ) - - action_names = set(action_names) - invalid_action_names = action_names - self.DAG_ACTIONS - if invalid_action_names: - raise AirflowException( - f"The access_control map for DAG '{dag_resource_name}' includes " - f"the following invalid permissions: {invalid_action_names}; " - f"The set of valid permissions is: {self.DAG_ACTIONS}" - ) - - for action_name in action_names: - dag_perm = _get_or_create_dag_permission(action_name) - if dag_perm: - self.add_permission_to_role(role, dag_perm) + if access_control: + for rolename, action_names in access_control.items(): + role = self.find_role(rolename) + if not role: + raise AirflowException( + f"The access_control mapping for DAG '{dag_id}' includes a role named " + f"'{rolename}', but that role does not exist" + ) + + action_names = set(action_names) + invalid_action_names = action_names - self.DAG_ACTIONS + if invalid_action_names: + raise AirflowException( + f"The access_control map for DAG '{dag_resource_name}' includes " + f"the following invalid permissions: {invalid_action_names}; " + f"The set of valid permissions is: {self.DAG_ACTIONS}" + ) + + for action_name in action_names: + dag_perm = _get_or_create_dag_permission(action_name) + if dag_perm: + self.add_permission_to_role(role, dag_perm) def create_perm_vm_for_all_dag(self): """Create perm-vm if not exist and insert into FAB security model for all-dags.""" diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 8b64167374136..09d52c3275400 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -932,7 +932,7 @@ def _sync_perms(): # perms now exist _sync_perms() - mock_sync_perm_for_dag.assert_not_called() + mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator", None) # Always sync if we have access_control dag.access_control = {"Public": {"can_read"}} diff --git a/tests/www/views/test_views_home.py b/tests/www/views/test_views_home.py index 6619bcdf71515..018f2a6406e91 100644 --- a/tests/www/views/test_views_home.py +++ b/tests/www/views/test_views_home.py @@ -155,6 +155,38 @@ def working_dags(tmpdir): _process_file(filename, session) +@pytest.fixture() +def working_dags_with_edit_perm(tmpdir): + dag_contents_template = "from airflow import DAG\ndag = DAG('{}', tags=['{}'])" + + with create_session() as session: + for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)): + filename = os.path.join(tmpdir, f"{dag_id}.py") + if dag_id == 'filter_test_1': + with open(filename, "w") as f: + f.writelines("from airflow import DAG\ndag = DAG('{}', tags=['{}'],".format(dag_id, tag) + "access_control={'role_single_dag':{'can_edit'}})" ) + else: + with open(filename, "w") as f: + f.writelines(dag_contents_template.format(dag_id, tag)) + _process_file(filename, session) + + +@pytest.fixture() +def working_dags_with_read_perm(tmpdir): + dag_contents_template = "from airflow import DAG\ndag = DAG('{}', tags=['{}'])" + + with create_session() as session: + for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)): + filename = os.path.join(tmpdir, f"{dag_id}.py") + if dag_id == 'filter_test_1': + with open(filename, "w") as f: + f.writelines("from airflow import DAG\ndag = DAG('{}', tags=['{}'],".format(dag_id, tag) + "access_control={'role_single_dag':{'can_read'}})" ) + else: + with open(filename, "w") as f: + f.writelines(dag_contents_template.format(dag_id, tag)) + _process_file(filename, session) + + @pytest.fixture() def broken_dags(tmpdir, working_dags): with create_session() as session: @@ -165,6 +197,16 @@ def broken_dags(tmpdir, working_dags): _process_file(filename, session) +@pytest.fixture() +def broken_dags_with_read_perm(tmpdir, working_dags_with_read_perm): + with create_session() as session: + for dag_id in TEST_FILTER_DAG_IDS: + filename = os.path.join(tmpdir, f"{dag_id}.py") + with open(filename, "w") as f: + f.writelines("airflow DAG") + _process_file(filename, session) + + def test_home_filter_tags(working_dags, admin_client): with admin_client: admin_client.get("home?tags=example&tags=data", follow_redirects=True) @@ -183,7 +225,7 @@ def test_home_importerrors(broken_dags, user_client): @pytest.mark.parametrize("page", ["home", "home?status=active", "home?status=paused", "home?status=all"]) -def test_home_importerrors_filtered_singledag_user(broken_dags, client_single_dag, page): +def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm, client_single_dag, page): # Users that can only see certain DAGs get a filtered list of import errors resp = client_single_dag.get(page, follow_redirects=True) check_content_in_response("Import Errors", resp) @@ -201,7 +243,7 @@ def test_home_dag_list(working_dags, user_client): check_content_in_response(f"dag_id={dag_id}", resp) -def test_home_dag_list_filtered_singledag_user(working_dags, client_single_dag): +def test_home_dag_list_filtered_singledag_user(working_dags_with_read_perm, client_single_dag): # Users that can only see certain DAGs get a filtered list resp = client_single_dag.get("home", follow_redirects=True) # They can see the first DAG @@ -219,7 +261,7 @@ def test_home_dag_list_search(working_dags, user_client): check_content_not_in_response("dag_id=a_first_dag_id_asc", resp) -def test_home_dag_edit_permissions(capture_templates, working_dags, client_single_dag_edit): +def test_home_dag_edit_permissions(capture_templates, working_dags_with_edit_perm, client_single_dag_edit): with capture_templates() as templates: client_single_dag_edit.get("home", follow_redirects=True)