Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 12 additions & 26 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,6 @@ class DAG(LoggingMixin):
that it is executed when the dag succeeds.
:param access_control: Specify optional DAG-level actions, e.g.,
"{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 'can_delete'}}"
or it can specify the resource name if there is a DAGs Run resource, e.g.,
"{'role1': {'DAG Runs': {'can_create'}}, 'role2': {'DAGs': {'can_read', 'can_edit', 'can_delete'}}"
:param is_paused_upon_creation: Specifies if the dag is paused when created for the first time.
If the dag exists already, this flag will be ignored. If this optional parameter
is not specified, the global config setting will be used.
Expand Down Expand Up @@ -550,7 +548,7 @@ def __init__(
on_failure_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None,
doc_md: str | None = None,
params: abc.MutableMapping | None = None,
access_control: dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None = None,
access_control: dict | None = None,
is_paused_upon_creation: bool | None = None,
jinja_environment_kwargs: dict | None = None,
render_template_as_native_obj: bool = False,
Expand Down Expand Up @@ -917,33 +915,21 @@ def _upgrade_outdated_dag_access_control(access_control=None):
"""
if access_control is None:
return None
new_dag_perm_mapping = {
new_perm_mapping = {
permissions.DEPRECATED_ACTION_CAN_DAG_READ: permissions.ACTION_CAN_READ,
permissions.DEPRECATED_ACTION_CAN_DAG_EDIT: permissions.ACTION_CAN_EDIT,
}

def update_old_perm(permission: str):
new_perm = new_dag_perm_mapping.get(permission, permission)
if new_perm != permission:
warnings.warn(
f"The '{permission}' permission is deprecated. Please use '{new_perm}'.",
RemovedInAirflow3Warning,
stacklevel=3,
)
return new_perm

updated_access_control = {}
for role, perms in access_control.items():
updated_access_control[role] = updated_access_control.get(role, {})
if isinstance(perms, (set, list)):
# Support for old-style access_control where only the actions are specified
updated_access_control[role][permissions.RESOURCE_DAG] = set(perms)
else:
updated_access_control[role] = perms
if permissions.RESOURCE_DAG in updated_access_control[role]:
updated_access_control[role][permissions.RESOURCE_DAG] = {
update_old_perm(perm) for perm in updated_access_control[role][permissions.RESOURCE_DAG]
}
updated_access_control[role] = {new_perm_mapping.get(perm, perm) for perm in perms}

if access_control != updated_access_control:
warnings.warn(
"The 'can_dag_read' and 'can_dag_edit' permissions are deprecated. "
"Please use 'can_read' and 'can_edit', respectively.",
RemovedInAirflow3Warning,
stacklevel=3,
)

return updated_access_control

Expand Down Expand Up @@ -4180,7 +4166,7 @@ def dag(
on_failure_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None,
doc_md: str | None = None,
params: abc.MutableMapping | None = None,
access_control: dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None = None,
access_control: dict | None = None,
is_paused_upon_creation: bool | None = None,
jinja_environment_kwargs: dict | None = None,
render_template_as_native_obj: bool = False,
Expand Down
41 changes: 8 additions & 33 deletions airflow/providers/fab/auth_manager/fab_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
RESOURCE_DAG,
RESOURCE_DAG_CODE,
RESOURCE_DAG_DEPENDENCIES,
RESOURCE_DAG_PREFIX,
RESOURCE_DAG_RUN,
RESOURCE_DAG_WARNING,
RESOURCE_DATASET,
Expand Down Expand Up @@ -241,8 +242,6 @@ def is_authorized_dag(

return all(
self._is_authorized(method=method, resource_type=resource_type, user=user)
if resource_type != RESOURCE_DAG_RUN or not hasattr(permissions, "resource_name")
else self._is_authorized_dag_run(method=method, details=details, user=user)
for resource_type in resource_types
)

Expand Down Expand Up @@ -413,33 +412,7 @@ def _is_authorized_dag(

if details and details.id:
# Check whether the user has permissions to access a specific DAG
resource_dag_name = self._resource_name(details.id, RESOURCE_DAG)
return self._is_authorized(method=method, resource_type=resource_dag_name, user=user)

return False

def _is_authorized_dag_run(
self,
method: ResourceMethod,
details: DagDetails | None = None,
user: BaseUser | None = None,
) -> bool:
"""
Return whether the user is authorized to perform a given action on a DAG Run.

:param method: the method to perform
:param details: optional, details about the DAG
:param user: optional, the user to perform the action on. If not provided, it uses the current user

:meta private:
"""
is_global_authorized = self._is_authorized(method=method, resource_type=RESOURCE_DAG_RUN, user=user)
if is_global_authorized:
return True

if details and details.id:
# Check whether the user has permissions to access a specific DAG Run permission on a DAG Level
resource_dag_name = self._resource_name(details.id, RESOURCE_DAG_RUN)
resource_dag_name = self._resource_name_for_dag(details.id)
return self._is_authorized(method=method, resource_type=resource_dag_name, user=user)

return False
Expand Down Expand Up @@ -471,7 +444,7 @@ def _get_fab_resource_types(dag_access_entity: DagAccessEntity) -> tuple[str, ..
raise AirflowException(f"Unknown DAG access entity: {dag_access_entity}")
return _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE[dag_access_entity]

def _resource_name(self, dag_id: str, resource_type: str) -> str:
def _resource_name_for_dag(self, dag_id: str) -> str:
"""
Return the FAB resource name for a DAG id.

Expand All @@ -480,9 +453,11 @@ def _resource_name(self, dag_id: str, resource_type: str) -> str:
:meta private:
"""
root_dag_id = self._get_root_dag_id(dag_id)
if hasattr(permissions, "resource_name"):
return getattr(permissions, "resource_name")(root_dag_id, resource_type)
return getattr(permissions, "resource_name_for_dag")(root_dag_id)
if root_dag_id == RESOURCE_DAG:
return root_dag_id
if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
return root_dag_id
return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"

@staticmethod
def _get_user_permissions(user: BaseUser):
Expand Down
158 changes: 58 additions & 100 deletions airflow/providers/fab/auth_manager/security_manager/override.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,16 +344,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
]

# global resource for dag-level access
RESOURCE_DETAILS_MAP = getattr(
permissions,
"RESOURCE_DETAILS_MAP",
{
permissions.RESOURCE_DAG: {
"actions": permissions.DAG_ACTIONS,
}
},
)
DAG_ACTIONS = RESOURCE_DETAILS_MAP[permissions.RESOURCE_DAG]["actions"]
DAG_ACTIONS = permissions.DAG_ACTIONS

def __init__(self, appbuilder):
# done in super, but we need it before we can call super.
Expand Down Expand Up @@ -1040,7 +1031,7 @@ def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool:
"""Check if user has read or write access to some dags."""
if dag_id and dag_id != "~":
root_dag_id = self._get_root_dag_id(dag_id)
return self.has_access(action, self._resource_name(root_dag_id, permissions.RESOURCE_DAG))
return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))

user = g.user
if action == permissions.ACTION_CAN_READ:
Expand Down Expand Up @@ -1074,25 +1065,24 @@ def create_dag_specific_permissions(self) -> None:

for dag in dags:
root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
for resource_name, resource_values in self.RESOURCE_DETAILS_MAP.items():
dag_resource_name = self._resource_name(root_dag_id, resource_name)
for action_name in resource_values["actions"]:
if (action_name, dag_resource_name) not in perms:
self._merge_perm(action_name, dag_resource_name)
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
for action_name in self.DAG_ACTIONS:
if (action_name, dag_resource_name) not in perms:
self._merge_perm(action_name, dag_resource_name)

if dag.access_control is not None:
self.sync_perm_for_dag(root_dag_id, dag.access_control)
self.sync_perm_for_dag(dag_resource_name, dag.access_control)

def prefixed_dag_id(self, dag_id: str) -> str:
"""Return the permission name for a DAG id."""
warnings.warn(
"`prefixed_dag_id` has been deprecated. "
"Please use `airflow.security.permissions.resource_name` instead.",
"Please use `airflow.security.permissions.resource_name_for_dag` instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
root_dag_id = self._get_root_dag_id(dag_id)
return self._resource_name(root_dag_id, permissions.RESOURCE_DAG)
return permissions.resource_name_for_dag(root_dag_id)

def is_dag_resource(self, resource_name: str) -> bool:
"""Determine if a resource belongs to a DAG or all DAGs."""
Expand All @@ -1103,121 +1093,90 @@ def is_dag_resource(self, resource_name: str) -> bool:
def sync_perm_for_dag(
self,
dag_id: str,
access_control: dict[str, dict[str, Collection[str]]] | None = None,
access_control: dict[str, Collection[str]] | None = None,
) -> None:
"""
Sync permissions for given dag id.

The dag id surely exists in our dag bag as only / refresh button or DagBag will call this function.

:param dag_id: the ID of the DAG whose permissions should be updated
:param access_control: a dict where each key is a role name and each value can be:
- a set() of DAGs resource action names (e.g. `{'can_read'}`)
- or a dict where each key is a resource name ('DAGs' or 'DAG Runs') and each value
is a set() of action names (e.g., `{'DAG Runs': {'can_create'}, 'DAGs': {'can_read'}}`)
:param access_control: a dict where each key is a role name and
each value is a set() of action names (e.g.,
{'can_read'}
:return:
"""
for resource_name, resource_values in self.RESOURCE_DETAILS_MAP.items():
dag_resource_name = self._resource_name(dag_id, resource_name)
for dag_action_name in resource_values["actions"]:
self.create_permission(dag_action_name, dag_resource_name)
dag_resource_name = permissions.resource_name_for_dag(dag_id)
for dag_action_name in self.DAG_ACTIONS:
self.create_permission(dag_action_name, dag_resource_name)

if access_control is not None:
self.log.debug("Syncing DAG-level permissions for DAG '%s'", dag_id)
self._sync_dag_view_permissions(dag_id, access_control.copy())
self.log.debug("Syncing DAG-level permissions for DAG '%s'", dag_resource_name)
self._sync_dag_view_permissions(dag_resource_name, access_control)
else:
self.log.debug(
"Not syncing DAG-level permissions for DAG '%s' as access control is unset.",
dag_id,
dag_resource_name,
)

def _resource_name(self, dag_id: str, resource_name: str) -> str:
"""
Get the resource name from permissions.

This method is to keep compatibility with new FAB versions
running with old airflow versions.
"""
if hasattr(permissions, "resource_name"):
return getattr(permissions, "resource_name")(dag_id, resource_name)
return getattr(permissions, "resource_name_for_dag")(dag_id)

def _sync_dag_view_permissions(
self,
dag_id: str,
access_control: dict[str, dict[str, Collection[str]]],
) -> None:
def _sync_dag_view_permissions(self, dag_id: str, access_control: dict[str, Collection[str]]) -> None:
"""
Set the access policy on the given DAG's ViewModel.

:param dag_id: the ID of the DAG whose permissions should be updated
:param access_control: a dict where each key is a role name and each value is:
- a dict where each key is a resource name ('DAGs' or 'DAG Runs') and each value
is a set() of action names (e.g., `{'DAG Runs': {'can_create'}, 'DAGs': {'can_read'}}`)
:param access_control: a dict where each key is a role name and
each value is a set() of action names (e.g. {'can_read'})
"""
dag_resource_name = permissions.resource_name_for_dag(dag_id)

def _get_or_create_dag_permission(action_name: str, dag_resource_name: str) -> Permission | None:
def _get_or_create_dag_permission(action_name: str) -> Permission | None:
perm = self.get_permission(action_name, dag_resource_name)
if not perm:
self.log.info("Creating new action '%s' on resource '%s'", action_name, dag_resource_name)
perm = self.create_permission(action_name, dag_resource_name)

return perm

# Revoking stale permissions for all possible DAG level resources
for resource_name in self.RESOURCE_DETAILS_MAP.keys():
dag_resource_name = self._resource_name(dag_id, resource_name)
if resource := self.get_resource(dag_resource_name):
existing_dag_perms = self.get_resource_permissions(resource)
for perm in existing_dag_perms:
non_admin_roles = [role for role in perm.role if role.name != "Admin"]
for role in non_admin_roles:
target_perms_for_role = access_control.get(role.name, {}).get(resource_name, set())
if perm.action.name not in target_perms_for_role:
self.log.info(
"Revoking '%s' on DAG '%s' for role '%s'",
perm.action,
dag_resource_name,
role.name,
)
self.remove_permission_from_role(role, perm)

# Adding the access control permissions
for rolename, resource_actions in access_control.items():
def _revoke_stale_permissions(resource: Resource):
existing_dag_perms = self.get_resource_permissions(resource)
for perm in existing_dag_perms:
non_admin_roles = [role for role in perm.role if role.name != "Admin"]
for role in non_admin_roles:
target_perms_for_role = access_control.get(role.name, ())
if perm.action.name not in target_perms_for_role:
self.log.info(
"Revoking '%s' on DAG '%s' for role '%s'",
perm.action,
dag_resource_name,
role.name,
)
self.remove_permission_from_role(role, perm)

resource = self.get_resource(dag_resource_name)
if resource:
_revoke_stale_permissions(resource)

for rolename, action_names in access_control.items():
role = self.find_role(rolename)
if not role:
raise AirflowException(
f"The access_control mapping for DAG '{dag_id}' includes a role named "
f"'{rolename}', but that role does not exist"
)

if isinstance(resource_actions, (set, list)):
# Support for old-style access_control where only the actions are specified
resource_actions = {permissions.RESOURCE_DAG: set(resource_actions)}

for resource_name, actions in resource_actions.items():
if resource_name not in self.RESOURCE_DETAILS_MAP:
raise AirflowException(
f"The access_control map for DAG '{dag_id}' includes the following invalid "
f"resource name: '{resource_name}'; "
f"The set of valid resource names is: {self.RESOURCE_DETAILS_MAP.keys()}"
)

dag_resource_name = self._resource_name(dag_id, resource_name)
self.log.debug("Syncing DAG-level permissions for DAG '%s'", dag_resource_name)

invalid_actions = set(actions) - self.RESOURCE_DETAILS_MAP[resource_name]["actions"]

if invalid_actions:
raise AirflowException(
f"The access_control map for DAG '{dag_resource_name}' includes "
f"the following invalid permissions: {invalid_actions}; "
f"The set of valid permissions is: {self.RESOURCE_DETAILS_MAP[resource_name]['actions']}"
)
action_names = set(action_names)
invalid_action_names = action_names - self.DAG_ACTIONS
if invalid_action_names:
raise AirflowException(
f"The access_control map for DAG '{dag_resource_name}' includes "
f"the following invalid permissions: {invalid_action_names}; "
f"The set of valid permissions is: {self.DAG_ACTIONS}"
)

for action_name in actions:
dag_perm = _get_or_create_dag_permission(action_name, dag_resource_name)
if dag_perm:
self.add_permission_to_role(role, dag_perm)
for action_name in action_names:
dag_perm = _get_or_create_dag_permission(action_name)
if dag_perm:
self.add_permission_to_role(role, dag_perm)

def add_permissions_view(self, base_action_names, resource_name): # Keep name for compatibility with FAB.
"""
Expand Down Expand Up @@ -1348,9 +1307,8 @@ def update_admin_permission(self) -> None:
Add the missing ones to the table for admin.
"""
session = self.appbuilder.get_session
prefixes = getattr(permissions, "PREFIX_LIST", [permissions.RESOURCE_DAG_PREFIX])
dag_resources = session.scalars(
select(Resource).where(or_(*[Resource.name.like(f"{prefix}%") for prefix in prefixes]))
select(Resource).where(Resource.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
)
resource_ids = [resource.id for resource in dag_resources]

Expand Down
Loading