From c865e5927e6a275f03f94a5a629a3c2fb8b437b5 Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Tue, 5 Aug 2025 10:28:31 +0200 Subject: [PATCH 1/3] Added new endpoint for pushing chunks - new variable for storing chunks - UPLOAD_CHUNKS_DIR, UPLOAD_CHUNKS_EXPIRATION --- .gitignore | 1 + deployment/common/set_permissions.sh | 0 development.md | 2 - server/mergin/app.py | 10 +++ server/mergin/sync/config.py | 7 ++ server/mergin/sync/public_api_v2.yaml | 50 +++++++++++++ .../mergin/sync/public_api_v2_controller.py | 42 ++++++++++- server/mergin/sync/schemas.py | 7 ++ server/mergin/sync/utils.py | 14 ++++ server/mergin/tests/test_public_api_v2.py | 71 ++++++++++++++++++- 10 files changed, 199 insertions(+), 5 deletions(-) mode change 100644 => 100755 deployment/common/set_permissions.sh diff --git a/.gitignore b/.gitignore index 7614b44b..5ca81560 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ projects*/ data/ mergin_db +diagnostic_logs logs *.log diff --git a/deployment/common/set_permissions.sh b/deployment/common/set_permissions.sh old mode 100644 new mode 100755 diff --git a/development.md b/development.md index b2a1be08..b9920e89 100644 --- a/development.md +++ b/development.md @@ -71,8 +71,6 @@ cd deployment/community/ # Create .prod.env file from .env.template cp .env.template .prod.env -# Run the docker composition with the current Dockerfiles -cp .env.template .prod.env docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d # Give ownership of the ./projects folder to user that is running the gunicorn container diff --git a/server/mergin/app.py b/server/mergin/app.py index d0fd2f3a..845afd8c 100644 --- a/server/mergin/app.py +++ b/server/mergin/app.py @@ -347,6 +347,16 @@ def ping(): # pylint: disable=W0612 ) return status, 200 + # reading raw input stream not supported in connexion so far + # https://github.com/zalando/connexion/issues/592 + # and as workaround we use custom Flask endpoint in create_app function + @app.route("/v2/projects//chunks", methods=["POST"]) + @auth_required + def upload_chunk_v2(id: str): + from .sync import public_api_v2_controller + + return public_api_v2_controller.upload_chunk(id) + # reading raw input stream not supported in connexion so far # https://github.com/zalando/connexion/issues/592 # and as workaround we use custom Flask endpoint in create_app function diff --git a/server/mergin/sync/config.py b/server/mergin/sync/config.py index b182da6d..ff3875ee 100644 --- a/server/mergin/sync/config.py +++ b/server/mergin/sync/config.py @@ -64,3 +64,10 @@ class Configuration(object): ) # in seconds, older unfinished zips are moved to temp PARTIAL_ZIP_EXPIRATION = config("PARTIAL_ZIP_EXPIRATION", default=600, cast=int) + UPLOAD_CHUNKS_DIR = config( + "UPLOAD_CHUNKS_DIR", + default=os.path.join(LOCAL_PROJECTS, "chunks"), + ) # directory for file chunks + UPLOAD_CHUNKS_EXPIRATION = config( + "UPLOAD_CHUNKS_EXPIRATION", default=86400, cast=int + ) # time in seconds after chunks are permanently deleted (1 day) diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index 04dbce61..b50f7f83 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -96,6 +96,39 @@ paths: "404": $ref: "#/components/responses/NotFound" x-openapi-router-controller: mergin.sync.public_api_v2_controller + # /projects/{id}/chunks: + # post: + # tags: + # - project + # summary: Upload file chunk. + # operationId: upload_chunk + # parameters: + # - $ref: "#/components/parameters/ProjectId" + # requestBody: + # x-stream-upload: true + # content: + # application/octet-stream: + # schema: + # type: string + # format: binary + # responses: + # "200": + # description: Chunk upload response + # content: + # application/json: + # schema: + # $ref: "#/components/schemas/UploadChunk" + # "400": + # $ref: "#/components/responses/BadRequest" + # "401": + # $ref: "#/components/responses/Unauthorized" + # "403": + # $ref: "#/components/responses/Forbidden" + # "404": + # $ref: "#/components/responses/NotFound" + # "413": + # $ref: "#/components/responses/RequestTooBig" + # x-openapi-router-controller: mergin.sync.public_api_v2_controller /projects/{id}/collaborators: parameters: - $ref: "#/components/parameters/ProjectId" @@ -233,6 +266,8 @@ components: description: Not found Conflict: description: Conflict + RequestTooBig: + description: Request Entity Too Large parameters: ProjectId: name: id @@ -268,6 +303,21 @@ components: - $ref: "#/components/schemas/ProjectRole" nullable: false description: combination of workspace role and project role + UploadChunk: + type: object + properties: + id: + type: string + format: uuid + example: "123e4567-e89b-12d3-a456-426614174000" + project_id: + type: string + format: uuid + example: "123e4567-e89b-12d3-a456-426614174001" + valid_until: + type: string + format: date-time + example: "2023-10-01T12:00:00Z" ProjectMember: type: object properties: diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 7f40c54b..4c224c48 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -3,19 +3,25 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from datetime import datetime +import os +import uuid +from datetime import datetime, timedelta, timezone from connexion import NoContent, request -from flask import abort, jsonify +from flask import abort, jsonify, current_app, make_response from flask_login import current_user from mergin.sync.forms import project_name_validation -from .schemas import ProjectMemberSchema +from .schemas import ProjectMemberSchema, UploadChunkSchema from .workspace import WorkspaceRole from ..app import db from ..auth import auth_required from ..auth.models import User from .models import Project, ProjectRole, ProjectMember from .permissions import ProjectPermissions, require_project_by_uuid +from .errors import ProjectLocked +from .utils import get_chunk_location +from .storages.disk import move_to_tmp, save_to_file @auth_required @@ -128,3 +134,35 @@ def remove_project_collaborator(id, user_id): project.unset_role(user_id) db.session.commit() return NoContent, 204 + + +@auth_required +def upload_chunk(id: str): + """ + Push chunk to files lake. + """ + project = require_project_by_uuid(id, ProjectPermissions.Edit) + if project.locked_until: + abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) + # generate uuid for chunk + chunk_id = str(uuid.uuid4()) + dest_file = get_chunk_location(chunk_id) + try: + # we could have used request.data here, but it could eventually cause OOM issue + print(request.stream) + save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"]) + except IOError: + move_to_tmp(dest_file, chunk_id) + abort(413, "Chunk size exceeds maximum allowed size") + except Exception as e: + abort(400, "Error saving chunk") + + # Add valid_until timestamp to the response, remove tzinfo for compatibility with DateTimeWithZ + valid_until = ( + datetime.now(timezone.utc) + + timedelta(seconds=current_app.config["UPLOAD_CHUNKS_EXPIRATION"]) + ).replace(tzinfo=None) + return ( + UploadChunkSchema().dump({"id": chunk_id, "valid_until": valid_until}), + 200, + ) diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 75b6f09e..9d9b1309 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -405,3 +405,10 @@ class ProjectMemberSchema(Schema): project_role = fields.Enum(enum=ProjectRole, by_value=True) workspace_role = fields.Enum(enum=WorkspaceRole, by_value=True) role = fields.Enum(enum=ProjectRole, by_value=True) + + +class UploadChunkSchema(Schema): + """Schema for chunk upload response""" + + id = fields.UUID() + valid_until = DateTimeWithZ() diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index c4d5fa16..7babb231 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -578,3 +578,17 @@ def get_x_accel_uri(*url_parts): url = url.lstrip(os.path.sep) result = os.path.join(download_accell_uri, url) return result + + +def get_chunk_location(id: str): + """ + Get file name for chunk + + Splits the given identifier into two parts. + + Returns a tuple where the first element is the first two characters of the identifier, and the second element is the remaining characters. + """ + chunk_dir = current_app.config.get("UPLOAD_CHUNKS_DIR") + small_hash = id[:2] + file_name = id[2:] + return os.path.join(chunk_dir, small_hash, file_name) diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 2d88d652..5bef6b7d 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1,13 +1,19 @@ # Copyright (C) Lutra Consulting Limited # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import os +from datetime import datetime, timedelta, timezone + +from mergin.sync.utils import get_chunk_location + from .utils import add_user from ..app import db from mergin.sync.models import Project -from tests import test_project, test_workspace_id +from ..tests import test_project, test_workspace_id from ..config import Configuration from ..sync.models import ProjectRole +from . import test_project_dir def test_schedule_delete_project(client): @@ -126,3 +132,66 @@ def test_project_members(client): # access provided by workspace role cannot be removed directly response = client.delete(url + f"/{user.id}") assert response.status_code == 404 + + +def test_upload_chunk(client, app): + """Test pushing a chunk to a project""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + url = f"/v2/projects/{project.id}/chunks" + app.config["MAX_CHUNK_SIZE"] = 1024 # Set a small max chunk size for testing + max_chunk_size = app.config["MAX_CHUNK_SIZE"] + + response = client.post( + url, + data=b"a" * (max_chunk_size + 1), # Exceeding max chunk size + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 413 + + # Project is locked, cannot push chunks + project.locked_until = datetime.now(timezone.utc) + timedelta(weeks=26) + db.session.commit() + response = client.post( + url, + data=b"a", + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 422 + assert response.json["code"] == "ProjectLocked" + + project.locked_until = None # Unlock the project + project.removed_at = datetime.now(timezone.utc) - timedelta( + days=(client.application.config["DELETED_PROJECT_EXPIRATION"] + 1) + ) # Ensure project is removed + db.session.commit() + response = client.post( + url, + data=b"a", + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 404 + + # Push a chunk successfully + project.removed_at = None # Ensure project is not removed + db.session.commit() + response = client.post( + url, + data=b"a" * max_chunk_size, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 200 + chunk_id = response.json["id"] + assert chunk_id + valid_until = response.json["valid_until"] + valid_until_dt = datetime.strptime(valid_until, "%Y-%m-%dT%H:%M:%S%z") + assert valid_until_dt > datetime.now(timezone.utc) + assert valid_until_dt < datetime.now(timezone.utc) + timedelta( + seconds=app.config["UPLOAD_CHUNKS_EXPIRATION"] + ) + # Check if the chunk is stored correctly + stored_chunk = get_chunk_location(chunk_id) + assert os.path.exists(stored_chunk) + with open(stored_chunk, "rb") as f: + assert f.read() == b"a" * max_chunk_size From e1a4bcff44d5a99384764e9e8dda89bf9cb0b390 Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Tue, 5 Aug 2025 10:33:15 +0200 Subject: [PATCH 2/3] cleanup --- server/mergin/sync/public_api_v2_controller.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 4c224c48..55ae48d8 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -139,7 +139,7 @@ def remove_project_collaborator(id, user_id): @auth_required def upload_chunk(id: str): """ - Push chunk to files lake. + Push chunk to chunks location. """ project = require_project_by_uuid(id, ProjectPermissions.Edit) if project.locked_until: @@ -149,7 +149,6 @@ def upload_chunk(id: str): dest_file = get_chunk_location(chunk_id) try: # we could have used request.data here, but it could eventually cause OOM issue - print(request.stream) save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"]) except IOError: move_to_tmp(dest_file, chunk_id) From 36e96d743ba915db4d390c50699b86811fed06e3 Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Tue, 5 Aug 2025 10:43:45 +0200 Subject: [PATCH 3/3] cleanup api yaml --- server/mergin/sync/public_api_v2.yaml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index b50f7f83..fd6e4353 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -310,10 +310,6 @@ components: type: string format: uuid example: "123e4567-e89b-12d3-a456-426614174000" - project_id: - type: string - format: uuid - example: "123e4567-e89b-12d3-a456-426614174001" valid_until: type: string format: date-time