diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 1c9d351c1d292..6c06799b82ee6 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -459,8 +459,6 @@ class DAG(LoggingMixin): that it is executed when the dag succeeds. :param access_control: Specify optional DAG-level actions, e.g., "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 'can_delete'}}" - or it can specify the resource name if there is a DAGs Run resource, e.g., - "{'role1': {'DAG Runs': {'can_create'}}, 'role2': {'DAGs': {'can_read', 'can_edit', 'can_delete'}}" :param is_paused_upon_creation: Specifies if the dag is paused when created for the first time. If the dag exists already, this flag will be ignored. If this optional parameter is not specified, the global config setting will be used. @@ -550,7 +548,7 @@ def __init__( on_failure_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None, doc_md: str | None = None, params: abc.MutableMapping | None = None, - access_control: dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None = None, + access_control: dict | None = None, is_paused_upon_creation: bool | None = None, jinja_environment_kwargs: dict | None = None, render_template_as_native_obj: bool = False, @@ -917,33 +915,21 @@ def _upgrade_outdated_dag_access_control(access_control=None): """ if access_control is None: return None - new_dag_perm_mapping = { + new_perm_mapping = { permissions.DEPRECATED_ACTION_CAN_DAG_READ: permissions.ACTION_CAN_READ, permissions.DEPRECATED_ACTION_CAN_DAG_EDIT: permissions.ACTION_CAN_EDIT, } - - def update_old_perm(permission: str): - new_perm = new_dag_perm_mapping.get(permission, permission) - if new_perm != permission: - warnings.warn( - f"The '{permission}' permission is deprecated. Please use '{new_perm}'.", - RemovedInAirflow3Warning, - stacklevel=3, - ) - return new_perm - updated_access_control = {} for role, perms in access_control.items(): - updated_access_control[role] = updated_access_control.get(role, {}) - if isinstance(perms, (set, list)): - # Support for old-style access_control where only the actions are specified - updated_access_control[role][permissions.RESOURCE_DAG] = set(perms) - else: - updated_access_control[role] = perms - if permissions.RESOURCE_DAG in updated_access_control[role]: - updated_access_control[role][permissions.RESOURCE_DAG] = { - update_old_perm(perm) for perm in updated_access_control[role][permissions.RESOURCE_DAG] - } + updated_access_control[role] = {new_perm_mapping.get(perm, perm) for perm in perms} + + if access_control != updated_access_control: + warnings.warn( + "The 'can_dag_read' and 'can_dag_edit' permissions are deprecated. " + "Please use 'can_read' and 'can_edit', respectively.", + RemovedInAirflow3Warning, + stacklevel=3, + ) return updated_access_control @@ -4180,7 +4166,7 @@ def dag( on_failure_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None, doc_md: str | None = None, params: abc.MutableMapping | None = None, - access_control: dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None = None, + access_control: dict | None = None, is_paused_upon_creation: bool | None = None, jinja_environment_kwargs: dict | None = None, render_template_as_native_obj: bool = False, diff --git a/airflow/providers/fab/auth_manager/fab_auth_manager.py b/airflow/providers/fab/auth_manager/fab_auth_manager.py index 344df7588de7d..ffd5e5cab5d3e 100644 --- a/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -61,6 +61,7 @@ RESOURCE_DAG, RESOURCE_DAG_CODE, RESOURCE_DAG_DEPENDENCIES, + RESOURCE_DAG_PREFIX, RESOURCE_DAG_RUN, RESOURCE_DAG_WARNING, RESOURCE_DATASET, @@ -241,8 +242,6 @@ def is_authorized_dag( return all( self._is_authorized(method=method, resource_type=resource_type, user=user) - if resource_type != RESOURCE_DAG_RUN or not hasattr(permissions, "resource_name") - else self._is_authorized_dag_run(method=method, details=details, user=user) for resource_type in resource_types ) @@ -413,33 +412,7 @@ def _is_authorized_dag( if details and details.id: # Check whether the user has permissions to access a specific DAG - resource_dag_name = self._resource_name(details.id, RESOURCE_DAG) - return self._is_authorized(method=method, resource_type=resource_dag_name, user=user) - - return False - - def _is_authorized_dag_run( - self, - method: ResourceMethod, - details: DagDetails | None = None, - user: BaseUser | None = None, - ) -> bool: - """ - Return whether the user is authorized to perform a given action on a DAG Run. - - :param method: the method to perform - :param details: optional, details about the DAG - :param user: optional, the user to perform the action on. If not provided, it uses the current user - - :meta private: - """ - is_global_authorized = self._is_authorized(method=method, resource_type=RESOURCE_DAG_RUN, user=user) - if is_global_authorized: - return True - - if details and details.id: - # Check whether the user has permissions to access a specific DAG Run permission on a DAG Level - resource_dag_name = self._resource_name(details.id, RESOURCE_DAG_RUN) + resource_dag_name = self._resource_name_for_dag(details.id) return self._is_authorized(method=method, resource_type=resource_dag_name, user=user) return False @@ -471,7 +444,7 @@ def _get_fab_resource_types(dag_access_entity: DagAccessEntity) -> tuple[str, .. raise AirflowException(f"Unknown DAG access entity: {dag_access_entity}") return _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE[dag_access_entity] - def _resource_name(self, dag_id: str, resource_type: str) -> str: + def _resource_name_for_dag(self, dag_id: str) -> str: """ Return the FAB resource name for a DAG id. @@ -480,9 +453,11 @@ def _resource_name(self, dag_id: str, resource_type: str) -> str: :meta private: """ root_dag_id = self._get_root_dag_id(dag_id) - if hasattr(permissions, "resource_name"): - return getattr(permissions, "resource_name")(root_dag_id, resource_type) - return getattr(permissions, "resource_name_for_dag")(root_dag_id) + if root_dag_id == RESOURCE_DAG: + return root_dag_id + if root_dag_id.startswith(RESOURCE_DAG_PREFIX): + return root_dag_id + return f"{RESOURCE_DAG_PREFIX}{root_dag_id}" @staticmethod def _get_user_permissions(user: BaseUser): diff --git a/airflow/providers/fab/auth_manager/security_manager/override.py b/airflow/providers/fab/auth_manager/security_manager/override.py index e2208e5fb409f..fd0dce0b625b6 100644 --- a/airflow/providers/fab/auth_manager/security_manager/override.py +++ b/airflow/providers/fab/auth_manager/security_manager/override.py @@ -344,16 +344,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): ] # global resource for dag-level access - RESOURCE_DETAILS_MAP = getattr( - permissions, - "RESOURCE_DETAILS_MAP", - { - permissions.RESOURCE_DAG: { - "actions": permissions.DAG_ACTIONS, - } - }, - ) - DAG_ACTIONS = RESOURCE_DETAILS_MAP[permissions.RESOURCE_DAG]["actions"] + DAG_ACTIONS = permissions.DAG_ACTIONS def __init__(self, appbuilder): # done in super, but we need it before we can call super. @@ -1040,7 +1031,7 @@ def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool: """Check if user has read or write access to some dags.""" if dag_id and dag_id != "~": root_dag_id = self._get_root_dag_id(dag_id) - return self.has_access(action, self._resource_name(root_dag_id, permissions.RESOURCE_DAG)) + return self.has_access(action, permissions.resource_name_for_dag(root_dag_id)) user = g.user if action == permissions.ACTION_CAN_READ: @@ -1074,25 +1065,24 @@ def create_dag_specific_permissions(self) -> None: for dag in dags: root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id - for resource_name, resource_values in self.RESOURCE_DETAILS_MAP.items(): - dag_resource_name = self._resource_name(root_dag_id, resource_name) - for action_name in resource_values["actions"]: - if (action_name, dag_resource_name) not in perms: - self._merge_perm(action_name, dag_resource_name) + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) + for action_name in self.DAG_ACTIONS: + if (action_name, dag_resource_name) not in perms: + self._merge_perm(action_name, dag_resource_name) if dag.access_control is not None: - self.sync_perm_for_dag(root_dag_id, dag.access_control) + self.sync_perm_for_dag(dag_resource_name, dag.access_control) def prefixed_dag_id(self, dag_id: str) -> str: """Return the permission name for a DAG id.""" warnings.warn( "`prefixed_dag_id` has been deprecated. " - "Please use `airflow.security.permissions.resource_name` instead.", + "Please use `airflow.security.permissions.resource_name_for_dag` instead.", RemovedInAirflow3Warning, stacklevel=2, ) root_dag_id = self._get_root_dag_id(dag_id) - return self._resource_name(root_dag_id, permissions.RESOURCE_DAG) + return permissions.resource_name_for_dag(root_dag_id) def is_dag_resource(self, resource_name: str) -> bool: """Determine if a resource belongs to a DAG or all DAGs.""" @@ -1103,7 +1093,7 @@ def is_dag_resource(self, resource_name: str) -> bool: def sync_perm_for_dag( self, dag_id: str, - access_control: dict[str, dict[str, Collection[str]]] | None = None, + access_control: dict[str, Collection[str]] | None = None, ) -> None: """ Sync permissions for given dag id. @@ -1111,78 +1101,62 @@ def sync_perm_for_dag( The dag id surely exists in our dag bag as only / refresh button or DagBag will call this function. :param dag_id: the ID of the DAG whose permissions should be updated - :param access_control: a dict where each key is a role name and each value can be: - - a set() of DAGs resource action names (e.g. `{'can_read'}`) - - or a dict where each key is a resource name ('DAGs' or 'DAG Runs') and each value - is a set() of action names (e.g., `{'DAG Runs': {'can_create'}, 'DAGs': {'can_read'}}`) + :param access_control: a dict where each key is a role name and + each value is a set() of action names (e.g., + {'can_read'} :return: """ - for resource_name, resource_values in self.RESOURCE_DETAILS_MAP.items(): - dag_resource_name = self._resource_name(dag_id, resource_name) - for dag_action_name in resource_values["actions"]: - self.create_permission(dag_action_name, dag_resource_name) + dag_resource_name = permissions.resource_name_for_dag(dag_id) + for dag_action_name in self.DAG_ACTIONS: + self.create_permission(dag_action_name, dag_resource_name) if access_control is not None: - self.log.debug("Syncing DAG-level permissions for DAG '%s'", dag_id) - self._sync_dag_view_permissions(dag_id, access_control.copy()) + self.log.debug("Syncing DAG-level permissions for DAG '%s'", dag_resource_name) + self._sync_dag_view_permissions(dag_resource_name, access_control) else: self.log.debug( "Not syncing DAG-level permissions for DAG '%s' as access control is unset.", - dag_id, + dag_resource_name, ) - def _resource_name(self, dag_id: str, resource_name: str) -> str: - """ - Get the resource name from permissions. - - This method is to keep compatibility with new FAB versions - running with old airflow versions. - """ - if hasattr(permissions, "resource_name"): - return getattr(permissions, "resource_name")(dag_id, resource_name) - return getattr(permissions, "resource_name_for_dag")(dag_id) - - def _sync_dag_view_permissions( - self, - dag_id: str, - access_control: dict[str, dict[str, Collection[str]]], - ) -> None: + def _sync_dag_view_permissions(self, dag_id: str, access_control: dict[str, Collection[str]]) -> None: """ Set the access policy on the given DAG's ViewModel. :param dag_id: the ID of the DAG whose permissions should be updated - :param access_control: a dict where each key is a role name and each value is: - - a dict where each key is a resource name ('DAGs' or 'DAG Runs') and each value - is a set() of action names (e.g., `{'DAG Runs': {'can_create'}, 'DAGs': {'can_read'}}`) + :param access_control: a dict where each key is a role name and + each value is a set() of action names (e.g. {'can_read'}) """ + dag_resource_name = permissions.resource_name_for_dag(dag_id) - def _get_or_create_dag_permission(action_name: str, dag_resource_name: str) -> Permission | None: + def _get_or_create_dag_permission(action_name: str) -> Permission | None: perm = self.get_permission(action_name, dag_resource_name) if not perm: self.log.info("Creating new action '%s' on resource '%s'", action_name, dag_resource_name) perm = self.create_permission(action_name, dag_resource_name) + return perm - # Revoking stale permissions for all possible DAG level resources - for resource_name in self.RESOURCE_DETAILS_MAP.keys(): - dag_resource_name = self._resource_name(dag_id, resource_name) - if resource := self.get_resource(dag_resource_name): - existing_dag_perms = self.get_resource_permissions(resource) - for perm in existing_dag_perms: - non_admin_roles = [role for role in perm.role if role.name != "Admin"] - for role in non_admin_roles: - target_perms_for_role = access_control.get(role.name, {}).get(resource_name, set()) - if perm.action.name not in target_perms_for_role: - self.log.info( - "Revoking '%s' on DAG '%s' for role '%s'", - perm.action, - dag_resource_name, - role.name, - ) - self.remove_permission_from_role(role, perm) - - # Adding the access control permissions - for rolename, resource_actions in access_control.items(): + def _revoke_stale_permissions(resource: Resource): + existing_dag_perms = self.get_resource_permissions(resource) + for perm in existing_dag_perms: + non_admin_roles = [role for role in perm.role if role.name != "Admin"] + for role in non_admin_roles: + target_perms_for_role = access_control.get(role.name, ()) + if perm.action.name not in target_perms_for_role: + self.log.info( + "Revoking '%s' on DAG '%s' for role '%s'", + perm.action, + dag_resource_name, + role.name, + ) + self.remove_permission_from_role(role, perm) + + resource = self.get_resource(dag_resource_name) + if resource: + _revoke_stale_permissions(resource) + + for rolename, action_names in access_control.items(): role = self.find_role(rolename) if not role: raise AirflowException( @@ -1190,34 +1164,19 @@ def _get_or_create_dag_permission(action_name: str, dag_resource_name: str) -> P f"'{rolename}', but that role does not exist" ) - if isinstance(resource_actions, (set, list)): - # Support for old-style access_control where only the actions are specified - resource_actions = {permissions.RESOURCE_DAG: set(resource_actions)} - - for resource_name, actions in resource_actions.items(): - if resource_name not in self.RESOURCE_DETAILS_MAP: - raise AirflowException( - f"The access_control map for DAG '{dag_id}' includes the following invalid " - f"resource name: '{resource_name}'; " - f"The set of valid resource names is: {self.RESOURCE_DETAILS_MAP.keys()}" - ) - - dag_resource_name = self._resource_name(dag_id, resource_name) - self.log.debug("Syncing DAG-level permissions for DAG '%s'", dag_resource_name) - - invalid_actions = set(actions) - self.RESOURCE_DETAILS_MAP[resource_name]["actions"] - - if invalid_actions: - raise AirflowException( - f"The access_control map for DAG '{dag_resource_name}' includes " - f"the following invalid permissions: {invalid_actions}; " - f"The set of valid permissions is: {self.RESOURCE_DETAILS_MAP[resource_name]['actions']}" - ) + action_names = set(action_names) + invalid_action_names = action_names - self.DAG_ACTIONS + if invalid_action_names: + raise AirflowException( + f"The access_control map for DAG '{dag_resource_name}' includes " + f"the following invalid permissions: {invalid_action_names}; " + f"The set of valid permissions is: {self.DAG_ACTIONS}" + ) - for action_name in actions: - dag_perm = _get_or_create_dag_permission(action_name, dag_resource_name) - if dag_perm: - self.add_permission_to_role(role, dag_perm) + for action_name in action_names: + dag_perm = _get_or_create_dag_permission(action_name) + if dag_perm: + self.add_permission_to_role(role, dag_perm) def add_permissions_view(self, base_action_names, resource_name): # Keep name for compatibility with FAB. """ @@ -1348,9 +1307,8 @@ def update_admin_permission(self) -> None: Add the missing ones to the table for admin. """ session = self.appbuilder.get_session - prefixes = getattr(permissions, "PREFIX_LIST", [permissions.RESOURCE_DAG_PREFIX]) dag_resources = session.scalars( - select(Resource).where(or_(*[Resource.name.like(f"{prefix}%") for prefix in prefixes])) + select(Resource).where(Resource.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%")) ) resource_ids = [resource.id for resource in dag_resources] diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py index 058cde2927d27..e333876184714 100644 --- a/airflow/security/permissions.py +++ b/airflow/security/permissions.py @@ -16,8 +16,6 @@ # under the License. from __future__ import annotations -from typing import TypedDict - # Resource Constants RESOURCE_ACTION = "Permissions" RESOURCE_ADMIN_MENU = "Admin" @@ -30,7 +28,6 @@ RESOURCE_DAG_DEPENDENCIES = "DAG Dependencies" RESOURCE_DAG_PREFIX = "DAG:" RESOURCE_DAG_RUN = "DAG Runs" -RESOURCE_DAG_RUN_PREFIX = "DAG Run:" RESOURCE_DAG_WARNING = "DAG Warnings" RESOURCE_CLUSTER_ACTIVITY = "Cluster Activity" RESOURCE_DATASET = "Datasets" @@ -67,32 +64,10 @@ DEPRECATED_ACTION_CAN_DAG_READ = "can_dag_read" DEPRECATED_ACTION_CAN_DAG_EDIT = "can_dag_edit" - -class ResourceDetails(TypedDict): - """Details of a resource (actions and prefix).""" - - actions: set[str] - prefix: str - - -# Keeping DAG_ACTIONS to keep the compatibility with outdated versions of FAB provider DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE} -RESOURCE_DETAILS_MAP = { - RESOURCE_DAG: ResourceDetails( - actions={ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}, prefix=RESOURCE_DAG_PREFIX - ), - RESOURCE_DAG_RUN: ResourceDetails( - actions={ACTION_CAN_READ, ACTION_CAN_CREATE, ACTION_CAN_DELETE, ACTION_CAN_ACCESS_MENU}, - prefix=RESOURCE_DAG_RUN_PREFIX, - ), -} -PREFIX_LIST = [details["prefix"] for details in RESOURCE_DETAILS_MAP.values()] -PREFIX_RESOURCES_MAP = {details["prefix"]: resource for resource, details in RESOURCE_DETAILS_MAP.items()} - - -def resource_name(root_dag_id: str, resource: str) -> str: +def resource_name_for_dag(root_dag_id: str) -> str: """ Return the resource name for a DAG id. @@ -100,8 +75,8 @@ def resource_name(root_dag_id: str, resource: str) -> str: parent DAG, you should pass ``DagModel.root_dag_id`` to this function, for a subdag. A normal dag should pass the ``DagModel.dag_id``. """ - if root_dag_id in RESOURCE_DETAILS_MAP.keys(): + if root_dag_id == RESOURCE_DAG: return root_dag_id - if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())): + if root_dag_id.startswith(RESOURCE_DAG_PREFIX): return root_dag_id - return f"{RESOURCE_DETAILS_MAP[resource]['prefix']}{root_dag_id}" + return f"{RESOURCE_DAG_PREFIX}{root_dag_id}" diff --git a/airflow/www/views.py b/airflow/www/views.py index 236beed4511a3..bfa7b3d177301 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -996,6 +996,9 @@ def index(self): .unique() .all() ) + can_create_dag_run = get_auth_manager().is_authorized_dag( + method="POST", access_entity=DagAccessEntity.RUN, user=g.user + ) dataset_triggered_dag_ids = {dag.dag_id for dag in dags if dag.schedule_interval == "Dataset"} if dataset_triggered_dag_ids: @@ -1010,12 +1013,6 @@ def index(self): dag.can_edit = get_auth_manager().is_authorized_dag( method="PUT", details=DagDetails(id=dag.dag_id), user=g.user ) - can_create_dag_run = get_auth_manager().is_authorized_dag( - method="POST", - access_entity=DagAccessEntity.RUN, - details=DagDetails(id=dag.dag_id), - user=g.user, - ) dag.can_trigger = dag.can_edit and can_create_dag_run dag.can_delete = get_auth_manager().is_authorized_dag( method="DELETE", details=DagDetails(id=dag.dag_id), user=g.user @@ -5745,7 +5742,7 @@ def add_user_permissions_to_dag(sender, template, context, **extra): return dag = context["dag"] can_create_dag_run = get_auth_manager().is_authorized_dag( - method="POST", access_entity=DagAccessEntity.RUN, details=DagDetails(id=dag.dag_id) + method="POST", access_entity=DagAccessEntity.RUN ) dag.can_edit = get_auth_manager().is_authorized_dag(method="PUT", details=DagDetails(id=dag.dag_id)) diff --git a/docs/apache-airflow-providers-fab/auth-manager/access-control.rst b/docs/apache-airflow-providers-fab/auth-manager/access-control.rst index b71f08f557b28..4d5637d40d5a2 100644 --- a/docs/apache-airflow-providers-fab/auth-manager/access-control.rst +++ b/docs/apache-airflow-providers-fab/auth-manager/access-control.rst @@ -138,7 +138,7 @@ There are five default roles: Public, Viewer, User, Op, and Admin. Each one has DAG-level permissions ^^^^^^^^^^^^^^^^^^^^^ -For DAG-level permissions exclusively, access can be controlled at the level of all DAGs or individual DAG objects. This includes ``DAGs.can_read``, ``DAGs.can_edit``, ``DAGs.can_delete``, ``DAG Runs.can_read``, ``DAG Runs.can_create``, ``DAG Runs.can_delete``, and ``DAG Runs.menu_access``. When these permissions are listed, access is granted to users who either have the listed permission or the same permission for the specific DAG being acted upon. For individual DAGs, the resource name is ``DAG:`` + the DAG ID, or for the DAG Runs resource the resource name is ``DAG Run:``. +For DAG-level permissions exclusively, access can be controlled at the level of all DAGs or individual DAG objects. This includes ``DAGs.can_read``, ``DAGs.can_edit``, and ``DAGs.can_delete``. When these permissions are listed, access is granted to users who either have the listed permission or the same permission for the specific DAG being acted upon. For individual DAGs, the resource name is ``DAG:`` + the DAG ID. For example, if a user is trying to view DAG information for the ``example_dag_id``, and the endpoint requires ``DAGs.can_read`` access, access will be granted if the user has either ``DAGs.can_read`` or ``DAG:example_dag_id.can_read`` access. @@ -322,18 +322,6 @@ Setting ``access_control`` on a DAG will overwrite any previously existing DAG-l }, ) -It's also possible to add DAG Runs resource permissions in a similar way, but explicit adding the resource name to identify which resource the permissions are for: - -.. code-block:: python - - DAG( - dag_id="example_fine_grained_access", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - access_control={ - "Viewer": {"DAGs": {"can_edit", "can_read", "can_delete"}, "DAG Runs": {"can_create"}}, - }, - ) - This also means that setting ``access_control={}`` will wipe any existing DAG-level permissions for a given DAG from the DB: .. code-block:: python diff --git a/newsfragments/40703.feature.rst b/newsfragments/40703.feature.rst deleted file mode 100644 index 4fd2fddf7e66a..0000000000000 --- a/newsfragments/40703.feature.rst +++ /dev/null @@ -1 +0,0 @@ -Allow set Dag Run resource into Dag Level permission: extends Dag's access_control feature to allow Dag Run resource permissions. diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index a546e6bec4751..283bc3457c241 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -69,19 +69,11 @@ def configured_app(minimal_app_for_api): create_user(app, username="test_granular_permissions", role_name="TestGranularDag") # type: ignore app.appbuilder.sm.sync_perm_for_dag( # type: ignore "TEST_DAG_1", - access_control={ - "TestGranularDag": { - permissions.RESOURCE_DAG: {permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ} - }, - }, + access_control={"TestGranularDag": [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]}, ) app.appbuilder.sm.sync_perm_for_dag( # type: ignore "TEST_DAG_1", - access_control={ - "TestGranularDag": { - permissions.RESOURCE_DAG: {permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ} - }, - }, + access_control={"TestGranularDag": [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]}, ) with DAG( diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index dc77648784ce5..1e17548b099c6 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -62,20 +62,6 @@ def configured_app(minimal_app_for_api): (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), ], ) - create_user( - app, # type: ignore - username="test_no_dag_run_create_permission", - role_name="TestNoDagRunCreatePermission", - permissions=[ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_CLUSTER_ACTIVITY), - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), - (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), - ], - ) create_user( app, # type: ignore username="test_dag_view_only", @@ -105,10 +91,7 @@ def configured_app(minimal_app_for_api): ) app.appbuilder.sm.sync_perm_for_dag( # type: ignore "TEST_DAG_ID", - access_control={ - "TestGranularDag": {permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ}, - "TestNoDagRunCreatePermission": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, - }, + access_control={"TestGranularDag": [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]}, ) create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore @@ -119,7 +102,6 @@ def configured_app(minimal_app_for_api): delete_user(app, username="test_view_dags") # type: ignore delete_user(app, username="test_granular_permissions") # type: ignore delete_user(app, username="test_no_permissions") # type: ignore - delete_user(app, username="test_no_dag_run_create_permission") # type: ignore delete_roles(app) @@ -1349,15 +1331,6 @@ def test_raises_validation_error_for_invalid_params(self): assert response.status_code == 400 assert "Invalid input for param" in response.json["detail"] - def test_dagrun_trigger_with_dag_level_permissions(self): - self._create_dag("TEST_DAG_ID") - response = self.client.post( - "api/v1/dags/TEST_DAG_ID/dagRuns", - json={"conf": {"validated_number": 1}}, - environ_overrides={"REMOTE_USER": "test_no_dag_run_create_permission"}, - ) - assert response.status_code == 200 - @mock.patch("airflow.api_connexion.endpoints.dag_run_endpoint.get_airflow_app") def test_dagrun_creation_exception_is_handled(self, mock_get_app, session): self._create_dag("TEST_DAG_ID") diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py b/tests/api_connexion/endpoints/test_import_error_endpoint.py index 635e159bb292c..4483646ce53fa 100644 --- a/tests/api_connexion/endpoints/test_import_error_endpoint.py +++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py @@ -29,7 +29,6 @@ from tests.test_utils.compat import ParseImportError from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_dags, clear_db_import_errors -from tests.test_utils.permissions import _resource_name pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -61,12 +60,7 @@ def configured_app(minimal_app_for_api): [ { "role": "TestSingleDAG", - "perms": [ - ( - permissions.ACTION_CAN_READ, - _resource_name(TEST_DAG_IDS[0], permissions.RESOURCE_DAG), - ) - ], + "perms": [(permissions.ACTION_CAN_READ, permissions.resource_name_for_dag(TEST_DAG_IDS[0]))], } ] ) diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index e006f36478f7b..87418fa8bcd33 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -1055,7 +1055,7 @@ def test_counter_for_last_num_of_db_queries(self): with create_session() as session: with assert_queries_count( - expected_count=154, + expected_count=94, margin=10, session=session, ): diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 3d39a7290d909..d480c3a1ffab2 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2788,27 +2788,19 @@ def test_replace_outdated_access_control_actions(self): outdated_permissions = { "role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, "role2": {permissions.DEPRECATED_ACTION_CAN_DAG_READ, permissions.DEPRECATED_ACTION_CAN_DAG_EDIT}, - "role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, } updated_permissions = { - "role1": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}}, - "role2": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}}, - "role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, + "role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, + "role2": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, } - with pytest.warns(DeprecationWarning) as deprecation_warnings: + with pytest.warns(DeprecationWarning): dag = DAG(dag_id="dag_with_outdated_perms", access_control=outdated_permissions) assert dag.access_control == updated_permissions - assert len(deprecation_warnings) == 2 - assert "permission is deprecated" in str(deprecation_warnings[0].message) - assert "permission is deprecated" in str(deprecation_warnings[1].message) - with pytest.warns(DeprecationWarning) as deprecation_warnings: + with pytest.warns(DeprecationWarning): dag.access_control = outdated_permissions assert dag.access_control == updated_permissions - assert len(deprecation_warnings) == 2 - assert "permission is deprecated" in str(deprecation_warnings[0].message) - assert "permission is deprecated" in str(deprecation_warnings[1].message) def test_validate_executor_field_executor_not_configured(self): dag = DAG( diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 936852dd082e4..0dcce4acf7cd3 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -972,44 +972,7 @@ def _sync_perms(): dag.access_control = {"Public": {"can_read"}} _sync_perms() mock_sync_perm_for_dag.assert_called_once_with( - "test_example_bash_operator", {"Public": {"DAGs": {"can_read"}}} - ) - - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode - @patch("airflow.www.security_appless.ApplessAirflowSecurityManager") - def test_sync_perm_for_dag_with_dict_access_control(self, mock_security_manager): - """ - Test that dagbag._sync_perm_for_dag will call ApplessAirflowSecurityManager.sync_perm_for_dag - """ - db_clean_up() - with create_session() as session: - security_manager = ApplessAirflowSecurityManager(session) - mock_sync_perm_for_dag = mock_security_manager.return_value.sync_perm_for_dag - mock_sync_perm_for_dag.side_effect = security_manager.sync_perm_for_dag - - dagbag = DagBag( - dag_folder=os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py"), - include_examples=False, - ) - dag = dagbag.dags["test_example_bash_operator"] - - def _sync_perms(): - mock_sync_perm_for_dag.reset_mock() - DagBag._sync_perm_for_dag(dag, session=session) - - # perms dont exist - _sync_perms() - mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator", None) - - # perms now exist - _sync_perms() - mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator", None) - - # Always sync if we have access_control - dag.access_control = {"Public": {"DAGs": {"can_read"}, "DAG Runs": {"can_create"}}} - _sync_perms() - mock_sync_perm_for_dag.assert_called_once_with( - "test_example_bash_operator", {"Public": {"DAGs": {"can_read"}, "DAG Runs": {"can_create"}}} + "test_example_bash_operator", {"Public": {"can_read"}} ) @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode diff --git a/tests/providers/fab/auth_manager/test_security.py b/tests/providers/fab/auth_manager/test_security.py index 5ff7f34d018c0..d3238ba0f1734 100644 --- a/tests/providers/fab/auth_manager/test_security.py +++ b/tests/providers/fab/auth_manager/test_security.py @@ -59,12 +59,11 @@ from tests.test_utils.asserts import assert_queries_count from tests.test_utils.db import clear_db_dags, clear_db_runs from tests.test_utils.mock_security_manager import MockSecurityManager -from tests.test_utils.permissions import _resource_name pytestmark = pytest.mark.db_test -READ_WRITE = {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}} -READ_ONLY = {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ}} +READ_WRITE = {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT} +READ_ONLY = {permissions.ACTION_CAN_READ} logging.basicConfig(format="%(asctime)s:%(levelname)s:%(name)s:%(message)s") logging.getLogger().setLevel(logging.DEBUG) @@ -109,7 +108,7 @@ def _clear_db_dag_and_runs(): def _delete_dag_permissions(dag_id, security_manager): - dag_resource_name = _resource_name(dag_id, permissions.RESOURCE_DAG) + dag_resource_name = permissions.resource_name_for_dag(dag_id) for dag_action_name in security_manager.DAG_ACTIONS: security_manager.delete_permission(dag_action_name, dag_resource_name) @@ -752,19 +751,6 @@ def test_access_control_with_non_existent_role(security_manager): assert "role does not exist" in str(ctx.value) -def test_access_control_with_non_allowed_resource(security_manager): - with pytest.raises(AirflowException) as ctx: - security_manager._sync_dag_view_permissions( - dag_id="access-control-test", - access_control={ - "Public": { - permissions.RESOURCE_POOL: {permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ} - } - }, - ) - assert "invalid resource name" in str(ctx.value) - - def test_all_dag_access_doesnt_give_non_dag_access(app, security_manager): username = "dag_access_user" role_name = "dag_access_role" @@ -804,17 +790,6 @@ def test_access_control_with_invalid_permission(app, security_manager): ) assert "invalid permissions" in str(ctx.value) - with pytest.raises(AirflowException) as ctx: - security_manager._sync_dag_view_permissions( - "access_control_test", - access_control={rolename: {permissions.RESOURCE_DAG_RUN: {action}}}, - ) - if hasattr(permissions, "resource_name"): - assert "invalid permission" in str(ctx.value) - else: - # Test with old airflow running new FAB - assert "invalid resource name" in str(ctx.value) - def test_access_control_is_set_on_init( app, @@ -915,11 +890,7 @@ def test_correct_roles_have_perms_to_read_config(security_manager): def test_create_dag_specific_permissions(session, security_manager, monkeypatch, sample_dags): - access_control = ( - {"Public": {"DAGs": {permissions.ACTION_CAN_READ}}} - if hasattr(permissions, "resource_name") - else {"Public": {permissions.ACTION_CAN_READ}} - ) + access_control = {"Public": {permissions.ACTION_CAN_READ}} collect_dags_from_db_mock = mock.Mock() dagbag_mock = mock.Mock() @@ -935,7 +906,7 @@ def test_create_dag_specific_permissions(session, security_manager, monkeypatch, security_manager._sync_dag_view_permissions = mock.Mock() for dag in sample_dags: - dag_resource_name = _resource_name(dag.dag_id, permissions.RESOURCE_DAG) + dag_resource_name = permissions.resource_name_for_dag(dag.dag_id) all_perms = security_manager.get_all_permissions() assert ("can_read", dag_resource_name) not in all_perms assert ("can_edit", dag_resource_name) not in all_perms @@ -946,13 +917,13 @@ def test_create_dag_specific_permissions(session, security_manager, monkeypatch, collect_dags_from_db_mock.assert_called_once_with() for dag in sample_dags: - dag_resource_name = _resource_name(dag.dag_id, permissions.RESOURCE_DAG) + dag_resource_name = permissions.resource_name_for_dag(dag.dag_id) all_perms = security_manager.get_all_permissions() assert ("can_read", dag_resource_name) in all_perms assert ("can_edit", dag_resource_name) in all_perms security_manager._sync_dag_view_permissions.assert_called_once_with( - "has_access_control", + permissions.resource_name_for_dag("has_access_control"), access_control, ) @@ -1002,7 +973,7 @@ def test_prefixed_dag_id_is_deprecated(security_manager): DeprecationWarning, match=( "`prefixed_dag_id` has been deprecated. " - "Please use `airflow.security.permissions.resource_name` instead." + "Please use `airflow.security.permissions.resource_name_for_dag` instead." ), ): security_manager.prefixed_dag_id("hello") diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index e9c8ceaf03979..cefc5b7a2dcf3 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -238,13 +238,8 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i "__type": "dict", "__var": { "test_role": { - "__type": "dict", - "__var": { - "DAGs": { - "__type": "set", - "__var": [permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT], - } - }, + "__type": "set", + "__var": [permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT], } }, }, diff --git a/tests/test_utils/permissions.py b/tests/test_utils/permissions.py deleted file mode 100644 index 46e964c1f6190..0000000000000 --- a/tests/test_utils/permissions.py +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from airflow.security import permissions - - -def _resource_name(dag_id: str, resource_name: str) -> str: - """ - This method is to keep compatibility with new FAB versions - running with old airflow versions. - """ - if hasattr(permissions, "resource_name"): - return getattr(permissions, "resource_name")(dag_id, resource_name) - if resource_name == permissions.RESOURCE_DAG: - return getattr(permissions, "resource_name_for_dag")(dag_id) - raise Exception("Only DAG resource is supported in this Airflow version.") diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py index 17700749f6e32..1f0dd0d07241a 100644 --- a/tests/www/views/test_views_acl.py +++ b/tests/www/views/test_views_acl.py @@ -32,7 +32,6 @@ from airflow.www.views import FILTER_STATUS_COOKIE from tests.test_utils.api_connexion_utils import create_user_scope from tests.test_utils.db import clear_db_runs -from tests.test_utils.permissions import _resource_name from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login pytestmark = pytest.mark.db_test @@ -884,10 +883,7 @@ def user_dag_level_access_with_ti_edit(acl_app): (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), - ( - permissions.ACTION_CAN_EDIT, - _resource_name("example_bash_operator", permissions.RESOURCE_DAG), - ), + (permissions.ACTION_CAN_EDIT, permissions.resource_name_for_dag("example_bash_operator")), ], ) as user: yield user diff --git a/tests/www/views/test_views_home.py b/tests/www/views/test_views_home.py index f769311287e0c..275c585170c3f 100644 --- a/tests/www/views/test_views_home.py +++ b/tests/www/views/test_views_home.py @@ -29,7 +29,6 @@ from airflow.www.views import FILTER_LASTRUN_COOKIE, FILTER_STATUS_COOKIE, FILTER_TAGS_COOKIE from tests.test_utils.api_connexion_utils import create_user from tests.test_utils.db import clear_db_dags, clear_db_import_errors, clear_db_serialized_dags -from tests.test_utils.permissions import _resource_name from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login pytestmark = pytest.mark.db_test @@ -148,10 +147,7 @@ def user_single_dag(app): permissions=[ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR), - ( - permissions.ACTION_CAN_READ, - _resource_name(TEST_FILTER_DAG_IDS[0], permissions.RESOURCE_DAG), - ), + (permissions.ACTION_CAN_READ, permissions.resource_name_for_dag(TEST_FILTER_DAG_IDS[0])), ], ) @@ -176,10 +172,7 @@ def user_single_dag_edit(app): permissions=[ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), - ( - permissions.ACTION_CAN_EDIT, - _resource_name("filter_test_1", permissions.RESOURCE_DAG), - ), + (permissions.ACTION_CAN_EDIT, permissions.resource_name_for_dag("filter_test_1")), ], )