Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,29 +689,11 @@ def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW
@classmethod
@provide_session
def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION):
"""Sync DAG specific permissions, if necessary"""
from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag
from airflow.www.fab_security.sqla.models import Action, Permission, Resource

"""Sync DAG specific permissions"""
root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id

def needs_perms(dag_id: str) -> bool:
dag_resource_name = resource_name_for_dag(dag_id)
for permission_name in DAG_ACTIONS:
if not (
session.query(Permission)
.join(Action)
.join(Resource)
.filter(Action.name == permission_name)
.filter(Resource.name == dag_resource_name)
.one_or_none()
):
return True
return False

if dag.access_control or needs_perms(root_dag_id):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this if block completely so we sync perms in both cases. This is effectively reverting #15464.

cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id)
from airflow.www.security import ApplessAirflowSecurityManager
cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id)
from airflow.www.security import ApplessAirflowSecurityManager

security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
17 changes: 17 additions & 0 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,25 @@ def sync_perm_for_dag(
for dag_action_name in self.DAG_ACTIONS:
self.create_permission(dag_action_name, dag_resource_name)

def _revoke_all_stale_permissions(resource: Resource):
existing_dag_perms = self.get_resource_permissions(resource)
for perm in existing_dag_perms:
non_admin_roles = [role for role in perm.role if role.name != "Admin"]
for role in non_admin_roles:
self.log.info(
"Revoking '%s' on DAG '%s' for role '%s'",
perm.action,
dag_resource_name,
role.name,
)
self.remove_permission_from_role(role, perm)

if access_control:
self._sync_dag_view_permissions(dag_resource_name, access_control)
else:
resource = self.get_resource(dag_resource_name)
if resource:
_revoke_all_stale_permissions(resource)
Comment on lines +673 to +676
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a DAG’s access_control is empty, it check whether there’s existing permission configurations and reset those


def _sync_dag_view_permissions(self, dag_id: str, access_control: dict[str, Collection[str]]) -> None:
"""
Expand Down
3 changes: 1 addition & 2 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,6 @@ def _sync_to_db():
def test_sync_perm_for_dag(self, mock_security_manager):
"""
Test that dagbag._sync_perm_for_dag will call ApplessAirflowSecurityManager.sync_perm_for_dag
when DAG specific perm views don't exist already or the DAG has access_control set.
"""
db_clean_up()
with create_session() as session:
Expand All @@ -932,7 +931,7 @@ def _sync_perms():

# perms now exist
_sync_perms()
mock_sync_perm_for_dag.assert_not_called()
mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator", None)

# Always sync if we have access_control
dag.access_control = {"Public": {"can_read"}}
Expand Down
54 changes: 51 additions & 3 deletions tests/www/views/test_views_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,44 @@ def working_dags(tmpdir):
_process_file(filename, session)


@pytest.fixture()
def working_dags_with_read_perm(tmpdir):
dag_contents_template = "from airflow import DAG\ndag = DAG('{}', tags=['{}'])"
dag_contents_template_with_read_perm = (
"from airflow import DAG\ndag = DAG('{}', tags=['{}'], "
"access_control={{'role_single_dag':{{'can_read'}}}}) "
)
with create_session() as session:
for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)):
filename = os.path.join(tmpdir, f"{dag_id}.py")
if dag_id == "filter_test_1":
with open(filename, "w") as f:
f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag))
else:
with open(filename, "w") as f:
f.writelines(dag_contents_template.format(dag_id, tag))
_process_file(filename, session)


@pytest.fixture()
def working_dags_with_edit_perm(tmpdir):
dag_contents_template = "from airflow import DAG\ndag = DAG('{}', tags=['{}'])"
dag_contents_template_with_read_perm = (
"from airflow import DAG\ndag = DAG('{}', tags=['{}'], "
"access_control={{'role_single_dag':{{'can_edit'}}}}) "
)
with create_session() as session:
for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)):
filename = os.path.join(tmpdir, f"{dag_id}.py")
if dag_id == "filter_test_1":
with open(filename, "w") as f:
f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag))
else:
with open(filename, "w") as f:
f.writelines(dag_contents_template.format(dag_id, tag))
_process_file(filename, session)


@pytest.fixture()
def broken_dags(tmpdir, working_dags):
with create_session() as session:
Expand All @@ -165,6 +203,16 @@ def broken_dags(tmpdir, working_dags):
_process_file(filename, session)


@pytest.fixture()
def broken_dags_with_read_perm(tmpdir, working_dags_with_read_perm):
with create_session() as session:
for dag_id in TEST_FILTER_DAG_IDS:
filename = os.path.join(tmpdir, f"{dag_id}.py")
with open(filename, "w") as f:
f.writelines("airflow DAG")
_process_file(filename, session)


def test_home_filter_tags(working_dags, admin_client):
with admin_client:
admin_client.get("home?tags=example&tags=data", follow_redirects=True)
Expand All @@ -183,7 +231,7 @@ def test_home_importerrors(broken_dags, user_client):


@pytest.mark.parametrize("page", ["home", "home?status=active", "home?status=paused", "home?status=all"])
def test_home_importerrors_filtered_singledag_user(broken_dags, client_single_dag, page):
def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm, client_single_dag, page):
# Users that can only see certain DAGs get a filtered list of import errors
resp = client_single_dag.get(page, follow_redirects=True)
check_content_in_response("Import Errors", resp)
Expand All @@ -201,7 +249,7 @@ def test_home_dag_list(working_dags, user_client):
check_content_in_response(f"dag_id={dag_id}", resp)


def test_home_dag_list_filtered_singledag_user(working_dags, client_single_dag):
def test_home_dag_list_filtered_singledag_user(working_dags_with_read_perm, client_single_dag):
# Users that can only see certain DAGs get a filtered list
resp = client_single_dag.get("home", follow_redirects=True)
# They can see the first DAG
Expand All @@ -219,7 +267,7 @@ def test_home_dag_list_search(working_dags, user_client):
check_content_not_in_response("dag_id=a_first_dag_id_asc", resp)


def test_home_dag_edit_permissions(capture_templates, working_dags, client_single_dag_edit):
def test_home_dag_edit_permissions(capture_templates, working_dags_with_edit_perm, client_single_dag_edit):
with capture_templates() as templates:
client_single_dag_edit.get("home", follow_redirects=True)

Expand Down