Skip to content
Merged
6 changes: 6 additions & 0 deletions geonode/base/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,9 @@
)
elif request.method == "PUT":
perms_spec_compact = PermSpecCompact(request.data, resource)
if resource.dirty_state:
raise Exception("Cannot update if the resource is in dirty state")

Check warning on line 622 in geonode/base/api/views.py

View check run for this annotation

Codecov / codecov/patch

geonode/base/api/views.py#L622

Added line #L622 was not covered by tests
resource.set_dirty_state()
_exec_request = ExecutionRequest.objects.create(
user=request.user,
func_name="set_permissions",
Expand All @@ -634,6 +637,9 @@
perms_spec_compact_patch = PermSpecCompact(request.data, resource)
perms_spec_compact_resource = PermSpecCompact(perms_spec.compact, resource)
perms_spec_compact_resource.merge(perms_spec_compact_patch)
if resource.dirty_state:
raise Exception("Cannot update if the resource is in dirty state")

Check warning on line 641 in geonode/base/api/views.py

View check run for this annotation

Codecov / codecov/patch

geonode/base/api/views.py#L641

Added line #L641 was not covered by tests
resource.set_dirty_state()
_exec_request = ExecutionRequest.objects.create(
user=request.user,
func_name="set_permissions",
Expand Down
101 changes: 86 additions & 15 deletions geonode/people/api/views.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
#########################################################################
#
# Copyright (C) 2025 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import logging
from django.conf import settings
from dynamic_rest.filters import DynamicFilterBackend, DynamicSortingFilter
from oauth2_provider.contrib.rest_framework import OAuth2Authentication
Expand All @@ -21,6 +41,11 @@
from geonode.people.utils import get_available_users
from django.contrib.auth import get_user_model
from django.shortcuts import get_object_or_404
from geonode.resource.manager import resource_manager
from geonode.security.registry import permissions_registry


logger = logging.getLogger()


class UserViewSet(DynamicModelViewSet):
Expand Down Expand Up @@ -147,20 +172,66 @@
def transfer_resources(self, request, pk=None):
user = self.get_object()
admin = get_user_model().objects.filter(is_superuser=True, is_staff=True).first()
target_user = request.data.get("owner")

target_user = request.data.get("newOwner") # the new owner
previous_owner = request.data.get("currentOwner") # the previous owner, usually it match the user
transfer_resource_subset = (
request.data.get("resources", None)
if not hasattr(request.data, "getlist")
else request.data.getlist("resources", None)
)
target = None
if target_user == "DEFAULT":
if not admin:
return Response("Principal User not found", status=500)
target = admin
else:
target = get_object_or_404(get_user_model(), id=target_user)

if target == user:
return Response("Cannot reassign to self", status=400)

# transfer to target
ResourceBase.objects.filter(owner=user).update(owner=target or user)

return Response("Resources transfered successfully", status=200)
if not target_user and previous_owner:
return Response("Payload not passed", status=400)

Check warning on line 185 in geonode/people/api/views.py

View check run for this annotation

Codecov / codecov/patch

geonode/people/api/views.py#L185

Added line #L185 was not covered by tests

if user.is_superuser or (
not user.is_superuser
and ResourceBase.objects.filter(owner=user, pk__in=transfer_resource_subset).count()
== len(transfer_resource_subset)
):

if target_user == "DEFAULT":
if not admin:
return Response("Principal User not found", status=500)
target = admin
else:
target = get_object_or_404(get_user_model(), id=target_user)

if target == user:
return Response("Cannot reassign to self", status=400)

# we need to filter by the previous owner id
filter_payload = dict(owner=previous_owner or user)

if transfer_resource_subset:
# transfer_resources
filter_payload["pk__in"] = transfer_resource_subset

for instance in ResourceBase.objects.filter(**filter_payload).iterator():
"""
We should reassing all the permissions to the new resource owner
we can use the resource manager because inside it will automatically update
the owner
"""
try:
# putting the resource in dirty state
instance.set_dirty_state()
# updating the perms with the new owner
perms = permissions_registry.get_perms(instance=instance, include_virtual=False)
prev_owner = get_user_model().objects.filter(pk=previous_owner).first()

if prev_owner and not prev_owner.is_superuser:
perms["users"].pop(prev_owner)
# calling the registry to update the perms
resource_manager.set_permissions(instance.uuid, instance, owner=target or user, permissions=perms)
except Exception as e:
logger.exeption(e)

Check warning on line 228 in geonode/people/api/views.py

View check run for this annotation

Codecov / codecov/patch

geonode/people/api/views.py#L227-L228

Added lines #L227 - L228 were not covered by tests
finally:
# clearing the dirty state
instance.clear_dirty_state()

return Response("Resources transfered successfully", status=200)

return Response(

Check warning on line 235 in geonode/people/api/views.py

View check run for this annotation

Codecov / codecov/patch

geonode/people/api/views.py#L235

Added line #L235 was not covered by tests
{"error": "The user does not have any right to perform this action on this resource"}, status=403
)
51 changes: 44 additions & 7 deletions geonode/people/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from geonode.layers.models import Dataset
from geonode.people import profileextractors

from geonode.base.populate_test_data import all_public, create_models, remove_models
from geonode.base.populate_test_data import all_public, create_models, create_single_dataset, remove_models
from django.db.models import Q
from geonode.security.registry import permissions_registry

Expand Down Expand Up @@ -1150,7 +1150,8 @@ def test_transfer_resources_all(self):
self.assertTrue(bobby_resources.exists())
# call api
response = self.client.post(
path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={"owner": norman.id}
path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources",
data={"currentOwner": bobby.id, "newOwner": norman.id},
)
# check that bobby owns the resources no more
self.assertFalse(bobby_resources.exists())
Expand All @@ -1174,7 +1175,8 @@ def test_transfer_resources_invalid_user(self):
self.assertTrue(bobby_resources.exists())
# call api
response = self.client.post(
path=f"{reverse('users-list')}/{bobby}/transfer_resources", data={"owner": invalid_user_id}
path=f"{reverse('users-list')}/{bobby}/transfer_resources",
data={"currentOwner": bobby.id, "newOwner": invalid_user_id},
)
# response should be 404
self.assertEqual(response.status_code, 404)
Expand All @@ -1200,7 +1202,8 @@ def test_transfer_resources_default(self):
self.assertTrue(bobby_resources.exists())
# call api
response = self.client.post(
path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={"owner": "DEFAULT"}
path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources",
data={"currentOwner": bobby.id, "newOwner": "DEFAULT"},
)
self.assertTrue(response.status_code == 200)
# check that bobby owns the resources no more
Expand Down Expand Up @@ -1232,7 +1235,8 @@ def test_transfer_resources_to_missing_default(self):
self.assertTrue(bobby_resources.exists())
# call api
response = self.client.post(
path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={"owner": "DEFAULT"}
path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources",
data={"newOwner": "DEFAULT", "previousOwner": bobby.pk},
)
self.assertTrue(response.status_code == 500)
self.assertEqual(response.data, "Principal User not found")
Expand All @@ -1257,7 +1261,8 @@ def test_transfer_resources_to_self(self):

# call api
response = self.client.post(
path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={"owner": bobby.pk}
path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources",
data={"previousOwner": bobby.pk, "newOwner": bobby.pk},
)
self.assertTrue(response.status_code == 400)
self.assertEqual(response.data, "Cannot reassign to self")
Expand All @@ -1277,7 +1282,6 @@ def test_transfer_resources_nopayload(self):
bobby_resources = ResourceBase.objects.filter(owner=bobby)
prior_bobby_resources = bobby_resources.all()
self.assertTrue(bobby_resources.exists())

# call api
response = self.client.post(path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={})
# response should be 404
Expand All @@ -1286,3 +1290,36 @@ def test_transfer_resources_nopayload(self):
self.assertTrue(bobby_resources.exists())
later_bobby_resources = ResourceBase.objects.filter(owner=bobby).all()
self.assertTrue(set(prior_bobby_resources) == set(later_bobby_resources))

def test_transfer_resource_subset(self):
"""
user wants to transfer resources to target
"""
bobby = get_user_model().objects.get(username="bobby")
self.assertTrue(self.client.login(username="bobby", password="bob"))
self.assertTrue(bobby.is_authenticated)
# check bobbys resources
resource_to_transfer = ResourceBase.objects.filter(owner=bobby).first()
second_resource = create_single_dataset("test for api", owner=bobby)
new_owner = get_user_model().objects.exclude(username__in=["bobby", "AnonymousUser"]).first()

# call api to transfer bobby resource to the other user
response = self.client.post(
path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources",
data={
"newOwner": new_owner.pk,
"currentOwner": bobby.pk,
"resources": [resource_to_transfer.pk, second_resource.pk],
},
)
# response should be 200
self.assertEqual(response.status_code, 200)

resource_to_transfer.refresh_from_db()
second_resource.refresh_from_db()
# Check that bobby does not own the resource anymore
self.assertTrue(resource_to_transfer.owner != bobby)
self.assertTrue(second_resource.owner != bobby)
# since the payload say "default"
self.assertTrue(resource_to_transfer.owner == new_owner)
self.assertTrue(second_resource.owner == new_owner)
Loading