diff --git a/server/application.py b/server/application.py index b1ab79ac..74f82213 100644 --- a/server/application.py +++ b/server/application.py @@ -47,6 +47,7 @@ "GLOBAL_WRITE", "ENABLE_SUPERADMIN_ASSIGNMENT", "DIAGNOSTIC_LOGS_URL", + "V2_PUSH_ENABLED", ] ) register_stats(application) diff --git a/server/mergin/app.py b/server/mergin/app.py index 845afd8c..2f8c7dcf 100644 --- a/server/mergin/app.py +++ b/server/mergin/app.py @@ -12,7 +12,17 @@ from sqlalchemy.schema import MetaData from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow -from flask import json, jsonify, request, abort, current_app, Flask, Request, Response +from flask import ( + json, + jsonify, + make_response, + request, + abort, + current_app, + Flask, + Request, + Response, +) from flask_login import current_user, LoginManager from flask_wtf.csrf import generate_csrf, CSRFProtect from flask_migrate import Migrate @@ -25,7 +35,7 @@ import time import traceback from werkzeug.exceptions import HTTPException -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Tuple from .sync.utils import get_blacklisted_dirs, get_blacklisted_files from .config import Configuration @@ -495,6 +505,12 @@ class ResponseError: def to_dict(self) -> Dict: return dict(code=self.code, detail=self.detail + f" ({self.code})") + def response(self, status_code: int) -> Tuple[Response, int]: + """Returns a custom error response with the given code.""" + response = make_response(jsonify(self.to_dict()), status_code) + response.headers["Content-Type"] = "application/problem+json" + return response, status_code + def whitespace_filter(obj): return obj.strip() if isinstance(obj, str) else obj diff --git a/server/mergin/sync/commands.py b/server/mergin/sync/commands.py index 97e85981..4ec898cf 100644 --- a/server/mergin/sync/commands.py +++ b/server/mergin/sync/commands.py @@ -9,7 +9,6 @@ from datetime import datetime from flask import Flask, current_app -from .files import UploadChanges from ..app import db from .models import Project, ProjectVersion from .utils import split_project_path @@ -52,8 +51,7 @@ def create(name, namespace, username): # pylint: disable=W0612 p = Project(**project_params) p.updated = datetime.utcnow() db.session.add(p) - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") pv.project = p db.session.commit() os.makedirs(p.storage.project_dir, exist_ok=True) diff --git a/server/mergin/sync/config.py b/server/mergin/sync/config.py index ff3875ee..7200dae5 100644 --- a/server/mergin/sync/config.py +++ b/server/mergin/sync/config.py @@ -64,10 +64,14 @@ class Configuration(object): ) # in seconds, older unfinished zips are moved to temp PARTIAL_ZIP_EXPIRATION = config("PARTIAL_ZIP_EXPIRATION", default=600, cast=int) + # whether new push is allowed + V2_PUSH_ENABLED = config("V2_PUSH_ENABLED", default=True, cast=bool) + # directory for file chunks UPLOAD_CHUNKS_DIR = config( "UPLOAD_CHUNKS_DIR", default=os.path.join(LOCAL_PROJECTS, "chunks"), - ) # directory for file chunks + ) + # time in seconds after chunks are permanently deleted (1 day) UPLOAD_CHUNKS_EXPIRATION = config( "UPLOAD_CHUNKS_EXPIRATION", default=86400, cast=int - ) # time in seconds after chunks are permanently deleted (1 day) + ) diff --git a/server/mergin/sync/db_events.py b/server/mergin/sync/db_events.py index 18d1ce60..48a1756d 100644 --- a/server/mergin/sync/db_events.py +++ b/server/mergin/sync/db_events.py @@ -6,6 +6,8 @@ from flask import current_app, abort from sqlalchemy import event +from .models import ProjectVersion +from .tasks import optimize_storage from ..app import db @@ -14,9 +16,17 @@ def check(session): abort(503, "Service unavailable due to maintenance, please try later") +def optimize_gpgk_storage(mapper, connection, project_version): + # do not optimize on every version, every 10th is just fine + if not project_version.name % 10: + optimize_storage.delay(project_version.project_id) + + def register_events(): event.listen(db.session, "before_commit", check) + event.listen(ProjectVersion, "after_insert", optimize_gpgk_storage) def remove_events(): event.remove(db.session, "before_commit", check) + event.listen(ProjectVersion, "after_insert", optimize_gpgk_storage) diff --git a/server/mergin/sync/errors.py b/server/mergin/sync/errors.py index d253ef4c..35985ab9 100644 --- a/server/mergin/sync/errors.py +++ b/server/mergin/sync/errors.py @@ -3,8 +3,12 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from typing import List, Dict + +from .config import Configuration from ..app import ResponseError +MAX_CHUNK_SIZE = Configuration.MAX_CHUNK_SIZE / 1024 / 1024 + class UpdateProjectAccessError(ResponseError): code = "UpdateProjectAccessError" @@ -39,3 +43,55 @@ def to_dict(self) -> Dict: class ProjectLocked(ResponseError): code = "ProjectLocked" detail = "The project is currently locked and you cannot make changes to it" + + +class DataSyncError(ResponseError): + code = "DataSyncError" + detail = "There are either corrupted files or it is not possible to create version with provided geopackage data" + + def __init__(self, failed_files: Dict): + self.failed_files = failed_files + + def to_dict(self) -> Dict: + data = super().to_dict() + data["failed_files"] = self.failed_files + return data + + +class ProjectVersionExists(ResponseError): + code = "ProjectVersionExists" + detail = "Project version mismatch" + + def __init__(self, client_version: int, server_version: int): + self.client_version = client_version + self.server_version = server_version + + def to_dict(self) -> Dict: + data = super().to_dict() + data["client_version"] = f"v{self.client_version}" + data["server_version"] = f"v{self.server_version}" + return data + + +class AnotherUploadRunning(ResponseError): + code = "AnotherUploadRunning" + detail = "Another process is running" + + +class UploadError(ResponseError): + code = "UploadError" + detail = "Project version could not be created" + + def __init__(self, error: str = None): + self.error = error + + def to_dict(self) -> Dict: + data = super().to_dict() + if self.error is not None: + data["detail"] = self.error + f" ({self.code})" + return data + + +class BigChunkError(ResponseError): + code = "BigChunkError" + detail = f"Chunk size exceeds maximum allowed size {MAX_CHUNK_SIZE} MB" diff --git a/server/mergin/sync/files.py b/server/mergin/sync/files.py index 12b30afe..fd77c597 100644 --- a/server/mergin/sync/files.py +++ b/server/mergin/sync/files.py @@ -2,15 +2,36 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial import datetime +from enum import Enum import os from dataclasses import dataclass from typing import Optional, List -from marshmallow import fields, EXCLUDE, pre_load, post_load, post_dump +import uuid +from flask import current_app +from marshmallow import ValidationError, fields, EXCLUDE, post_dump, validates_schema from pathvalidate import sanitize_filename +from .utils import ( + is_file_name_blacklisted, + is_qgis, + is_supported_extension, + is_valid_path, + is_versioned_file, +) from ..app import DateTimeWithZ, ma +class PushChangeType(Enum): + CREATE = "create" + UPDATE = "update" + DELETE = "delete" + UPDATE_DIFF = "update_diff" + + @classmethod + def values(cls): + return [member.value for member in cls.__members__.values()] + + def mergin_secure_filename(filename: str) -> str: """Generate secure filename for given file""" filename = os.path.normpath(filename) @@ -24,94 +45,181 @@ def mergin_secure_filename(filename: str) -> str: @dataclass class File: - """Base class for every file object""" + """Base class for every file object, either intended to upload or already existing in project""" path: str checksum: str size: int - location: str def is_valid_gpkg(self): """Check if diff file is valid""" return self.size != 0 +@dataclass +class ProjectDiffFile(File): + """Metadata for geodiff diff file (aka. changeset) associated with geopackage""" + + # location where file is actually stored + location: str + + @dataclass class ProjectFile(File): - """Project file metadata including metadata for diff file""" + """Project file metadata including metadata for diff file and location where it is stored""" # metadata for gpkg diff file - diff: Optional[File] + diff: Optional[ProjectDiffFile] # deprecated attribute kept for public API compatibility mtime: Optional[datetime.datetime] + # location where file is actually stored + location: str @dataclass -class UploadFile(File): - """File to be uploaded coming from client push process""" - - # determined by client - chunks: Optional[List[str]] - diff: Optional[File] - +class ProjectFileChange(ProjectFile): + """Metadata of changed file in project version. + + This item is saved into database into file_history. + """ + + change: PushChangeType + + +def files_changes_from_upload( + changes: dict, location_dir: str +) -> List["ProjectFileChange"]: + """Create a list of version file changes from upload changes dictionary used by public API. + + It flattens changes dict and adds change type to each item. Also generates location for each file. + """ + secure_filenames = [] + version_changes = [] + for key in ("added", "updated", "removed"): + for item in changes.get(key, []): + location = os.path.join(location_dir, mergin_secure_filename(item["path"])) + diff = None + + # make sure we have unique location for each file + if location in secure_filenames: + filename, file_extension = os.path.splitext(location) + location = filename + f".{str(uuid.uuid4())}" + file_extension + + secure_filenames.append(location) + + if key == "removed": + change = PushChangeType.DELETE + location = None + elif key == "added": + change = PushChangeType.CREATE + else: + change = PushChangeType.UPDATE + if item.get("diff"): + change = PushChangeType.UPDATE_DIFF + diff_location = os.path.join( + location_dir, mergin_secure_filename(item["diff"]["path"]) + ) + if diff_location in secure_filenames: + filename, file_extension = os.path.splitext(diff_location) + diff_location = ( + filename + f".{str(uuid.uuid4())}" + file_extension + ) + + secure_filenames.append(diff_location) + diff = ProjectDiffFile( + path=item["diff"]["path"], + checksum=item["diff"]["checksum"], + size=item["diff"]["size"], + location=diff_location, + ) + + file_change = ProjectFileChange( + path=item["path"], + checksum=item["checksum"], + size=item["size"], + mtime=None, + change=change, + location=location, + diff=diff, + ) + version_changes.append(file_change) -@dataclass -class UploadChanges: - added: List[UploadFile] - updated: List[UploadFile] - removed: List[UploadFile] + return version_changes class FileSchema(ma.Schema): path = fields.String() size = fields.Integer() checksum = fields.String() - location = fields.String(load_default="", load_only=True) class Meta: unknown = EXCLUDE - @post_load - def create_obj(self, data, **kwargs): - return File(**data) - class UploadFileSchema(FileSchema): chunks = fields.List(fields.String(), load_default=[]) diff = fields.Nested(FileSchema(), many=False, load_default=None) - @pre_load - def pre_load(self, data, **kwargs): - # add future location based on context version - version = f"v{self.context.get('version')}" - if not data.get("location"): - data["location"] = os.path.join( - version, mergin_secure_filename(data["path"]) - ) - if data.get("diff") and not data.get("diff").get("location"): - data["diff"]["location"] = os.path.join( - version, mergin_secure_filename(data["diff"]["path"]) - ) - return data - - @post_load - def create_obj(self, data, **kwargs): - return UploadFile(**data) - class ChangesSchema(ma.Schema): """Schema for upload changes""" - added = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) - updated = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) - removed = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) + added = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) + updated = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) + removed = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) class Meta: unknown = EXCLUDE - @post_load - def create_obj(self, data, **kwargs): - return UploadChanges(**data) + @post_dump + def remove_blacklisted_files(self, data, **kwargs): + """Files which are blacklisted are not allowed to be uploaded and are simple ignored.""" + for key in ("added", "updated", "removed"): + data[key] = [ + f + for f in data[key] + if not is_file_name_blacklisted( + f["path"], current_app.config["BLACKLIST"] + ) + ] + return data + + @validates_schema + def validate(self, data, **kwargs): + """Basic consistency validations for upload metadata""" + changes_files = [ + f["path"] for f in data["added"] + data["updated"] + data["removed"] + ] + + if len(changes_files) == 0: + raise ValidationError("No changes") + + # changes' files must be unique + if len(set(changes_files)) != len(changes_files): + raise ValidationError("Not unique changes") + + # check if all files are valid + for file in data["added"] + data["updated"]: + file_path = file["path"] + if is_versioned_file(file_path) and file["size"] == 0: + raise ValidationError("File is not valid") + + if not is_valid_path(file_path): + raise ValidationError( + f"Unsupported file name detected: {file_path}. Please remove the invalid characters." + ) + + if not is_supported_extension(file_path): + raise ValidationError( + f"Unsupported file type detected: {file_path}. " + f"Please remove the file or try compressing it into a ZIP file before uploading.", + ) class ProjectFileSchema(FileSchema): diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 3854e4d2..07219149 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -10,6 +10,7 @@ from enum import Enum from typing import Optional, List, Dict, Set, Tuple from dataclasses import dataclass, asdict +import logging from blinker import signal from flask_login import current_user @@ -23,30 +24,38 @@ from .files import ( File, - UploadChanges, + ProjectDiffFile, + ProjectFileChange, ChangesSchema, ProjectFile, + files_changes_from_upload, + mergin_secure_filename, + PushChangeType, ) from .interfaces import WorkspaceRole from .storages.disk import move_to_tmp from ..app import db from .storages import DiskStorage -from .utils import is_versioned_file, is_qgis +from .utils import ( + Toucher, + get_chunk_location, + get_project_path, + is_supported_type, + is_versioned_file, + is_qgis, +) Storages = {"local": DiskStorage} project_deleted = signal("project_deleted") project_access_granted = signal("project_access_granted") +push_finished = signal("push_finished") +project_version_created = signal("project_version_created") -class PushChangeType(Enum): - CREATE = "create" - UPDATE = "update" - DELETE = "delete" - UPDATE_DIFF = "update_diff" - - @classmethod - def values(cls): - return [member.value for member in cls.__members__.values()] +class FileSyncErrorType(Enum): + CORRUPTED = "corrupted" + UNSUPPORTED = "unsupported" + SYNC_ERROR = "sync error" class Project(db.Model): @@ -181,7 +190,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=File(**row.diff) if row.diff else None, + diff=ProjectDiffFile(**row.diff) if row.diff else None, ) for row in db.session.execute(query, params).fetchall() ] @@ -504,9 +513,9 @@ def path(self) -> str: return self.file.path @property - def diff_file(self) -> Optional[File]: + def diff_file(self) -> Optional[ProjectDiffFile]: if self.diff: - return File(**self.diff) + return ProjectDiffFile(**self.diff) @property def mtime(self) -> datetime: @@ -705,7 +714,7 @@ def __init__( project: Project, name: int, author_id: int, - changes: UploadChanges, + changes: List[ProjectFileChange], ip: str, user_agent: str = None, device_id: str = None, @@ -725,9 +734,7 @@ def __init__( ).all() } - changed_files_paths = [ - f.path for f in changes.updated + changes.removed + changes.added - ] + changed_files_paths = set(change.path for change in changes) existing_files_map = { f.path: f for f in ProjectFilePath.query.filter_by(project_id=self.project_id) @@ -735,46 +742,32 @@ def __init__( .all() } - for key in ( - ("added", PushChangeType.CREATE), - ("updated", PushChangeType.UPDATE), - ("removed", PushChangeType.DELETE), - ): - change_attr = key[0] - change_type = key[1] - - for upload_file in getattr(changes, change_attr): - is_diff_change = ( - change_type is PushChangeType.UPDATE - and upload_file.diff is not None - ) - - file = existing_files_map.get( - upload_file.path, ProjectFilePath(self.project_id, upload_file.path) - ) - fh = FileHistory( - file=file, - size=upload_file.size, - checksum=upload_file.checksum, - location=upload_file.location, - diff=( - asdict(upload_file.diff) - if (is_diff_change and upload_file.diff) - else null() - ), - change=( - PushChangeType.UPDATE_DIFF if is_diff_change else change_type - ), - ) - fh.version = self - fh.project_version_name = self.name - db.session.add(fh) - db.session.flush() + for item in changes: + # get existing DB file reference or create a new one (for added files) + db_file = existing_files_map.get( + item.path, ProjectFilePath(self.project_id, item.path) + ) + fh = FileHistory( + file=db_file, + size=item.size, + checksum=item.checksum, + location=item.location, + diff=( + asdict(item.diff) + if (item.change is PushChangeType.UPDATE_DIFF and item.diff) + else null() + ), + change=item.change, + ) + fh.version = self + fh.project_version_name = self.name + db.session.add(fh) + db.session.flush() - if change_type is PushChangeType.DELETE: - latest_files_map.pop(fh.path, None) - else: - latest_files_map[fh.path] = fh.id + if item.change is PushChangeType.DELETE: + latest_files_map.pop(fh.path, None) + else: + latest_files_map[fh.path] = fh.id # update cached values in project and push to transaction buffer so that self.files is up-to-date self.project.latest_project_files.file_history_ids = latest_files_map.values() @@ -909,7 +902,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=File(**row.diff) if row.diff else None, + diff=ProjectDiffFile(**row.diff) if row.diff else None, ) for row in result ] @@ -1021,9 +1014,7 @@ class Upload(db.Model): ) __table_args__ = (db.UniqueConstraint("project_id", "version"),) - def __init__( - self, project: Project, version: int, changes: UploadChanges, user_id: int - ): + def __init__(self, project: Project, version: int, changes: dict, user_id: int): self.id = str(uuid.uuid4()) self.project_id = project.id self.version = version @@ -1053,6 +1044,141 @@ def clear(self): db.session.delete(self) db.session.commit() + def process_chunks( + self, use_shared_chunk_dir: bool + ) -> Tuple[List[ProjectFileChange], Dict]: + """Concatenate chunks into single file and apply gpkg updates if needed""" + errors = {} + project_path = get_project_path(self.project) + v_next_version = ProjectVersion.to_v_name(self.project.next_version()) + chunks_map = { + f["path"]: f["chunks"] + for f in self.changes["added"] + self.changes["updated"] + } + file_changes = files_changes_from_upload(self.changes, v_next_version) + to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE] + current_files = [f for f in self.project.files if f.path not in to_remove] + + with Toucher(self.lockfile, 5): + for f in file_changes: + if f.change == PushChangeType.DELETE: + continue + + f_location = ( + f.diff.location + if f.change == PushChangeType.UPDATE_DIFF + else f.location + ) + temporary_location = os.path.join(self.upload_dir, "files", f_location) + os.makedirs(os.path.dirname(temporary_location), exist_ok=True) + with open(temporary_location, "wb") as dest: + try: + for chunk_id in chunks_map.get(f.path, []): + # based on API version location for uploaded chunks differs + if use_shared_chunk_dir: + chunk_file = get_chunk_location(chunk_id) + else: + chunk_file = os.path.join( + self.upload_dir, "chunks", chunk_id + ) + + if not os.path.exists(chunk_file): + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + with open(chunk_file, "rb") as src: + data = src.read(8192) + while data: + dest.write(data) + data = src.read(8192) + + move_to_tmp(chunk_file) + except IOError: + logging.exception( + f"Failed to process chunk: {chunk_id} in project {project_path}" + ) + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + if not is_supported_type(temporary_location): + logging.info(f"Rejecting blacklisted file: {temporary_location}") + errors[f.path] = FileSyncErrorType.UNSUPPORTED.value + continue + + # check if .gpkg file is valid + if is_versioned_file(temporary_location) and not f.is_valid_gpkg(): + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + expected_size = ( + f.diff.size if f.change == PushChangeType.UPDATE_DIFF else f.size + ) + if expected_size != os.path.getsize(temporary_location): + logging.error( + f"Data integrity check has failed on file {f.path} in project {project_path}", + exc_info=True, + ) + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + # for updates try to apply diff to create a full updated gpkg file or from full .gpkg try to create corresponding diff + if f.change in ( + PushChangeType.UPDATE, + PushChangeType.UPDATE_DIFF, + ) and is_versioned_file(f.path): + current_file = next( + (i for i in current_files if i.path == f.path), None + ) + if not current_file: + errors[f.path] = ( + f"{FileSyncErrorType.SYNC_ERROR.value}: file not found on server" + ) + continue + + if f.diff: + changeset = temporary_location + patched_file = os.path.join( + self.upload_dir, "files", f.location + ) + + result = self.project.storage.apply_diff( + current_file, changeset, patched_file + ) + if result.ok(): + checksum, size = result.value + f.checksum = checksum + f.size = size + else: + errors[f.path] = ( + f"{FileSyncErrorType.SYNC_ERROR.value}: project {self.project.workspace.name}/{self.project.name}, {result.value}" + ) + else: + diff_name = mergin_secure_filename( + f.path + "-diff-" + str(uuid.uuid4()) + ) + changeset = os.path.join( + self.upload_dir, "files", v_next_version, diff_name + ) + patched_file = temporary_location + result = self.project.storage.construct_diff( + current_file, changeset, patched_file + ) + if result.ok(): + checksum, size = result.value + f.diff = ProjectDiffFile( + checksum=checksum, + size=size, + path=diff_name, + location=os.path.join(v_next_version, diff_name), + ) + f.change = PushChangeType.UPDATE_DIFF + else: + # if diff cannot be constructed it would be a force update + logging.warning( + f"Geodiff: create changeset error {result.value}" + ) + return file_changes, errors + class RequestStatus(Enum): ACCEPTED = "accepted" diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 9fd229a1..ab0908d1 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -14,8 +14,8 @@ from datetime import datetime import gevent +from marshmallow import ValidationError import psycopg2 -from blinker import signal from connexion import NoContent, request from flask import ( abort, @@ -40,6 +40,7 @@ from ..auth import auth_required from ..auth.models import User from .models import ( + FileSyncErrorType, Project, ProjectVersion, Upload, @@ -48,13 +49,17 @@ ProjectFilePath, ProjectUser, ProjectRole, + project_version_created, + push_finished, ) from .files import ( - UploadChanges, + ProjectFileChange, ChangesSchema, UploadFileSchema, ProjectFileSchema, FileSchema, + files_changes_from_upload, + mergin_secure_filename, ) from .schemas import ( ProjectSchema, @@ -65,7 +70,7 @@ FileHistorySchema, ProjectVersionListSchema, ) -from .storages.storage import DataSyncError, InitializationError +from .storages.storage import InitializationError from .storages.disk import save_to_file, move_to_tmp from .permissions import ( require_project, @@ -96,11 +101,6 @@ from ..utils import format_time_delta -push_finished = signal("push_finished") -# TODO: Move to database events to handle all commits to project versions -project_version_created = signal("project_version_created") - - def parse_project_access_update_request(access: Dict) -> Dict: """Parse raw project access update request and filter out invalid entries. New access can be specified either by list of usernames or ids -> convert only to ids fur further processing. @@ -239,15 +239,24 @@ def add_project(namespace): # noqa: E501 .first_or_404() ) version_name = 1 - files = UploadFileSchema(context={"version": 1}, many=True).load( - FileSchema(exclude=("location",), many=True).dump(template.files) - ) - changes = UploadChanges(added=files, updated=[], removed=[]) + file_changes = [] + for file in template.files: + file_changes.append( + ProjectFileChange( + file.path, + file.checksum, + file.size, + diff=None, + mtime=None, + location=os.path.join("v1", mergin_secure_filename(file.path)), + change=PushChangeType.CREATE, + ) + ) else: template = None version_name = 0 - changes = UploadChanges(added=[], updated=[], removed=[]) + file_changes = [] try: p.storage.initialize(template_project=template) @@ -258,7 +267,7 @@ def add_project(namespace): # noqa: E501 p, version_name, current_user.id, - changes, + file_changes, get_ip(request), get_user_agent(request), get_device_id(request), @@ -694,7 +703,10 @@ def catch_sync_failure(f): @functools.wraps(f) def wrapper(*args, **kwargs): try: - return f(*args, **kwargs) + response, status_code = f(*args, **kwargs) + if status_code >= 400: + raise HTTPException(response=response) + return response, status_code except (HTTPException, IntegrityError) as e: if e.code in [401, 403, 404]: raise # nothing to do, just propagate downstream @@ -711,6 +723,11 @@ def wrapper(*args, **kwargs): error_type = "push_finish" elif request.endpoint == "chunk_upload": error_type = "chunk_upload" + elif ( + request.endpoint + == "/v2.mergin_sync_public_api_v2_controller_create_project_version" + ): + error_type = "project_push" if not e.description: # custom error cases (e.g. StorageLimitHit) e.description = e.response.json["detail"] @@ -745,7 +762,7 @@ def project_push(namespace, project_name): project_permission = current_app.project_handler.get_push_permission(changes) project = require_project(namespace, project_name, project_permission) if project.locked_until: - abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) + return ProjectLocked().response(422) # pass full project object to request for later use request.view_args["project"] = project ws = project.workspace @@ -771,76 +788,36 @@ def project_push(namespace, project_name): if pending_upload and pending_upload.is_active(): abort(400, "Another process is running. Please try later.") - upload_changes = ChangesSchema(context={"version": version + 1}).load(changes) + try: + ChangesSchema().validate(changes) + upload_changes = ChangesSchema().dump(changes) + except ValidationError as err: + msg = err.messages[0] if type(err.messages) == list else "Invalid input data" + abort(400, msg) - for item in upload_changes.added: + for item in upload_changes["added"]: # check if same file is not already uploaded - if not all(ele.path != item.path for ele in project.files): - abort(400, f"File {item.path} has been already uploaded") - if not is_valid_path(item.path): - abort( - 400, - f"Unsupported file name detected: {item.path}. Please remove the invalid characters.", - ) - if not is_supported_extension(item.path): - abort( - 400, - f"Unsupported file type detected: {item.path}. " - f"Please remove the file or try compressing it into a ZIP file before uploading.", - ) - - # changes' files must be unique - changes_files = [ - f.path - for f in upload_changes.added + upload_changes.updated + upload_changes.removed - ] - if len(set(changes_files)) != len(changes_files): - abort(400, "Not unique changes") - - sanitized_files = [] - blacklisted_files = [] - for f in upload_changes.added + upload_changes.updated + upload_changes.removed: - # check if .gpkg file is valid - if is_versioned_file(f.path): - if not f.is_valid_gpkg(): - abort(400, f"File {f.path} is not valid") - if is_file_name_blacklisted(f.path, current_app.config["BLACKLIST"]): - blacklisted_files.append(f.path) - # all file need to be unique after sanitized - if f.location in sanitized_files: - filename, file_extension = os.path.splitext(f.location) - f.location = filename + f".{str(uuid.uuid4())}" + file_extension - sanitized_files.append(f.location) - if f.diff: - if f.diff.location in sanitized_files: - filename, file_extension = os.path.splitext(f.diff.location) - f.diff.location = filename + f".{str(uuid.uuid4())}" + file_extension - sanitized_files.append(f.diff.location) - - # remove blacklisted files from changes - for key in upload_changes.__dict__.keys(): - new_value = [ - f for f in getattr(upload_changes, key) if f.path not in blacklisted_files - ] - setattr(upload_changes, key, new_value) + if not all(ele.path != item["path"] for ele in project.files): + abort(400, f"File {item['path']} has been already uploaded") # Check user data limit - updates = [f.path for f in upload_changes.updated] - updated_files = list(filter(lambda i: i.path in updates, project.files)) + updated_files = list( + filter( + lambda i: i.path in [f["path"] for f in upload_changes["updated"]], + project.files, + ) + ) additional_disk_usage = ( - sum(file.size for file in upload_changes.added + upload_changes.updated) + sum( + file["size"] for file in upload_changes["added"] + upload_changes["updated"] + ) - sum(file.size for file in updated_files) - - sum(file.size for file in upload_changes.removed) + - sum(file["size"] for file in upload_changes["removed"]) ) - current_usage = ws.disk_usage() requested_storage = current_usage + additional_disk_usage if requested_storage > ws.storage: - abort( - make_response( - jsonify(StorageLimitHit(current_usage, ws.storage).to_dict()), 422 - ) - ) + return StorageLimitHit(current_usage, ws.storage).response(422) upload = Upload(project, version, upload_changes, current_user.id) db.session.add(upload) @@ -885,6 +862,9 @@ def project_push(namespace, project_name): # Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit if not (changes["added"] or changes["updated"]): next_version = version + 1 + file_changes = files_changes_from_upload( + upload.changes, ProjectVersion.to_v_name(next_version) + ) user_agent = get_user_agent(request) device_id = get_device_id(request) try: @@ -892,7 +872,7 @@ def project_push(namespace, project_name): project, next_version, current_user.id, - upload_changes, + file_changes, get_ip(request), user_agent, device_id, @@ -919,7 +899,7 @@ def project_push(namespace, project_name): finally: upload.clear() - return {"transaction": upload.id} + return {"transaction": upload.id}, 200 @auth_required @@ -938,29 +918,27 @@ def chunk_upload(transaction_id, chunk_id): """ upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - upload_changes = ChangesSchema(context={"version": upload.version + 1}).load( - upload.changes - ) - for f in upload_changes.added + upload_changes.updated: - if chunk_id in f.chunks: - dest = os.path.join(upload_dir, "chunks", chunk_id) - lockfile = os.path.join(upload_dir, "lockfile") - with Toucher(lockfile, 30): - try: - # we could have used request.data here, but it could eventually cause OOM issue - save_to_file( - request.stream, dest, current_app.config["MAX_CHUNK_SIZE"] - ) - except IOError: - move_to_tmp(dest, transaction_id) - abort(400, "Too big chunk") - if os.path.exists(dest): - checksum = generate_checksum(dest) - size = os.path.getsize(dest) - return jsonify({"checksum": checksum, "size": size}), 200 - else: - abort(400, "Upload was probably canceled") - abort(404) + chunks = [] + for file in upload.changes["added"] + upload.changes["updated"]: + chunks += file.get("chunks", []) + + if chunk_id not in chunks: + abort(404) + + dest = os.path.join(upload_dir, "chunks", chunk_id) + with Toucher(upload.lockfile, 30): + try: + # we could have used request.data here, but it could eventually cause OOM issue + save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"]) + except IOError: + move_to_tmp(dest, transaction_id) + abort(400, "Too big chunk") + if os.path.exists(dest): + checksum = generate_checksum(dest) + size = os.path.getsize(dest) + return jsonify({"checksum": checksum, "size": size}), 200 + else: + abort(400, "Upload was probably canceled") @auth_required @@ -980,73 +958,45 @@ def push_finish(transaction_id): :rtype: None """ - from .tasks import optimize_storage - upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - changes = ChangesSchema(context={"version": upload.version + 1}).load( - upload.changes - ) project = upload.project + next_version = project.next_version() + v_next_version = ProjectVersion.to_v_name(next_version) + version_dir = os.path.join(project.storage.project_dir, v_next_version) if project.locked_until: - abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) - project_path = get_project_path(project) - corrupted_files = [] - - for f in changes.added + changes.updated: - if f.diff is not None: - dest_file = os.path.join(upload_dir, "files", f.diff.location) - expected_size = f.diff.size - else: - dest_file = os.path.join(upload_dir, "files", f.location) - expected_size = f.size - - # Concatenate chunks into single file - # TODO we need to move this elsewhere since it can fail for large files (and slow FS) - os.makedirs(os.path.dirname(dest_file), exist_ok=True) - with open(dest_file, "wb") as dest: - try: - for chunk_id in f.chunks: - sleep(0) # to unblock greenlet - chunk_file = os.path.join(upload_dir, "chunks", chunk_id) - with open(chunk_file, "rb") as src: - data = src.read(8192) - while data: - dest.write(data) - data = src.read(8192) - except IOError: - logging.exception( - "Failed to process chunk: %s in project %s" - % (chunk_id, project_path) - ) - corrupted_files.append(f.path) - continue - if not is_supported_type(dest_file): - logging.info(f"Rejecting blacklisted file: {dest_file}") + return ProjectLocked().response(422) + + file_changes, errors = upload.process_chunks(use_shared_chunk_dir=False) + if errors: + upload.clear() + + unsupported_files = [ + k for k, v in errors.items() if v == FileSyncErrorType.UNSUPPORTED.value + ] + if len(unsupported_files): abort( 400, - f"Unsupported file type detected: {f.path}. " + f"Unsupported file type detected: {unsupported_files[0]}. " f"Please remove the file or try compressing it into a ZIP file before uploading.", ) - if expected_size != os.path.getsize(dest_file): - logging.error( - "Data integrity check has failed on file %s in project %s" - % (f.path, project_path), - exc_info=True, - ) - # check if .gpkg file is valid - if is_versioned_file(dest_file): - if not f.is_valid_gpkg(): - corrupted_files.append(f.path) - corrupted_files.append(f.path) + corrupted_files = [ + k for k, v in errors.items() if v == FileSyncErrorType.CORRUPTED.value + ] + if corrupted_files: + abort(422, {"corrupted_files": corrupted_files}) - if corrupted_files: - move_to_tmp(upload_dir) - abort(422, {"corrupted_files": corrupted_files}) + sync_errors = { + k: v for k, v in errors.items() if FileSyncErrorType.SYNC_ERROR.value in v + } + if sync_errors: + msg = "" + for key, value in sync_errors.items(): + msg += key + " error=" + value + "\n" + + abort(422, f"Failed to create new version: {msg}") - next_version = upload.version + 1 - v_next_version = ProjectVersion.to_v_name(next_version) files_dir = os.path.join(upload_dir, "files", v_next_version) target_dir = os.path.join(project.storage.project_dir, v_next_version) if os.path.exists(target_dir): @@ -1065,58 +1015,13 @@ def push_finish(transaction_id): move_to_tmp(target_dir) try: - # let's move uploaded files where they are expected to be - os.renames(files_dir, target_dir) - # apply gpkg updates - sync_errors = {} - to_remove = [i.path for i in changes.removed] - current_files = [f for f in project.files if f.path not in to_remove] - for updated_file in changes.updated: - # yield to gevent hub since geodiff action can take some time to prevent worker timeout - sleep(0) - current_file = next( - (i for i in current_files if i.path == updated_file.path), None - ) - if not current_file: - sync_errors[updated_file.path] = "file not found on server " - continue - - if updated_file.diff: - result = project.storage.apply_diff( - current_file, updated_file, next_version - ) - if result.ok(): - checksum, size = result.value - updated_file.checksum = checksum - updated_file.size = size - else: - sync_errors[updated_file.path] = ( - f"project: {project.workspace.name}/{project.name}, {result.value}" - ) - - elif is_versioned_file(updated_file.path): - result = project.storage.construct_diff( - current_file, updated_file, next_version - ) - if result.ok(): - updated_file.diff = result.value - else: - # if diff cannot be constructed it would be force update - logging.warning(f"Geodiff: create changeset error {result.value}") - - if sync_errors: - msg = "" - for key, value in sync_errors.items(): - msg += key + " error=" + value + "\n" - raise DataSyncError(msg) - user_agent = get_user_agent(request) device_id = get_device_id(request) pv = ProjectVersion( project, next_version, current_user.id, - changes, + file_changes, get_ip(request), user_agent, device_id, @@ -1124,12 +1029,15 @@ def push_finish(transaction_id): db.session.add(pv) db.session.add(project) db.session.commit() + + # let's move uploaded files where they are expected to be + os.renames(files_dir, version_dir) logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." ) project_version_created.send(pv) push_finished.send(pv) - except (psycopg2.Error, FileNotFoundError, DataSyncError, IntegrityError) as err: + except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: db.session.rollback() logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " @@ -1144,9 +1052,6 @@ def push_finish(transaction_id): # remove artifacts upload.clear() - # do not optimize on every version, every 10th is just fine - if not project.latest_version % 10: - optimize_storage.delay(project.id) return jsonify(ProjectSchema().dump(project)), 200 @@ -1246,15 +1151,24 @@ def clone_project(namespace, project_name): # noqa: E501 user_agent = get_user_agent(request) device_id = get_device_id(request) # transform source files to new uploaded files - files = UploadFileSchema(context={"version": 1}, many=True).load( - FileSchema(exclude=("location",), many=True).dump(cloned_project.files) - ) - changes = UploadChanges(added=files, updated=[], removed=[]) + file_changes = [] + for file in cloned_project.files: + file_changes.append( + ProjectFileChange( + file.path, + file.checksum, + file.size, + diff=None, + mtime=None, + location=os.path.join("v1", mergin_secure_filename(file.path)), + change=PushChangeType.CREATE, + ) + ) project_version = ProjectVersion( p, version, current_user.id, - changes, + file_changes, get_ip(request), user_agent, device_id, diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index fd6e4353..4a4f7904 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -252,6 +252,95 @@ paths: "404": $ref: "#/components/responses/NotFound" x-openapi-router-controller: mergin.sync.public_api_v2_controller + /projects/{id}/versions: + post: + tags: + - project + summary: Create a new project version from pushed data + operationId: create_project_version + parameters: + - $ref: "#/components/parameters/ProjectId" + requestBody: + description: Project files changes and current version head. + required: true + content: + application/json: + schema: + type: object + required: + - version + - changes + properties: + check_only: + type: boolean + default: false + example: true + version: + type: string + pattern: '^$|^v\d+$' + example: v2 + changes: + type: object + required: + - added + - updated + - removed + properties: + added: + type: array + items: + $ref: "#/components/schemas/UploadFile" + updated: + type: array + items: + $ref: "#/components/schemas/UpdateFile" + removed: + type: array + items: + $ref: "#/components/schemas/File" + responses: + "201": + description: New project version + content: + application/json: + schema: + $ref: "#/components/schemas/ProjectVersion" + "204": + $ref: "#/components/responses/NoContent" + "400": + $ref: "#/components/responses/BadRequest" + "401": + $ref: "#/components/responses/Unauthorized" + "403": + $ref: "#/components/responses/Forbidden" + "404": + $ref: "#/components/responses/NotFound" + "409": + description: Version already exists or another process is already running + content: + application/problem+json: + schema: + anyOf: + - $ref: "#/components/schemas/ProjectVersionExists" + - $ref: "#/components/schemas/AnotherUploadRunning" + "422": + description: Request could not be processed by server + content: + application/problem+json: + schema: + anyOf: + - $ref: "#/components/schemas/UploadError" + - $ref: "#/components/schemas/TrialExpired" + - $ref: "#/components/schemas/StorageLimitHit" + - $ref: "#/components/schemas/DataSyncError" + "423": + description: Project is locked for any upload + content: + application/problem+json: + schema: + $ref: "#/components/schemas/ProjectLocked" + + x-openapi-router-controller: mergin.sync.public_api_v2_controller components: responses: NoContent: @@ -279,6 +368,86 @@ components: format: uuid pattern: \b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b schemas: + # Errors + CustomError: + type: object + properties: + code: + type: string + detail: + type: string + required: + - code + - detail + TrialExpired: + allOf: + - $ref: '#/components/schemas/CustomError' + example: + code: TrialExpired + detail: Failed to push changes. Ask the workspace owner to log in to their Mergin Maps dashboard + StorageLimitHit: + allOf: + - $ref: '#/components/schemas/CustomError' + type: object + properties: + current_usage: + type: integer + storage_limit: + type: integer + example: + code: StorageLimitHit + detail: You have reached a data limit (StorageLimitHit) + current_usage: 24865 + storage_limit: 24865 + ProjectLocked: + allOf: + - $ref: '#/components/schemas/CustomError' + example: + code: ProjectLocked + detail: The project is currently locked and you cannot make changes to it (ProjectLocked) + ProjectVersionExists: + allOf: + - $ref: '#/components/schemas/CustomError' + type: object + properties: + client_version: + type: string + server_version: + type: string + example: + code: ProjectVersionExists + detail: Project version mismatch (ProjectVersionExists) + AnotherUploadRunning: + allOf: + - $ref: '#/components/schemas/CustomError' + type: object + properties: + client_version: + type: string + server_version: + type: string + example: + code: AnotherUploadRunning + detail: Another process is running (AnotherUploadRunning) + DataSyncError: + allOf: + - $ref: '#/components/schemas/CustomError' + type: object + properties: + failed_files: + type: object + example: + code: DataSyncError + detail: "There are either corrupted files or it is not possible to create version with provided geopackage data (DataSyncError)" + failed_files: + "survey.gpkg": "Corrupted file" + UploadError: + allOf: + - $ref: '#/components/schemas/CustomError' + example: + code: UploadError + detail: "Project version could not be created (UploadError)" + # Data ProjectRole: type: string nullable: true @@ -333,3 +502,62 @@ components: $ref: "#/components/schemas/ProjectRole" role: $ref: "#/components/schemas/Role" + File: + type: object + description: Project file metadata + required: + - path + - size + properties: + path: + type: string + example: media/favicon.ico + size: + type: integer + format: int64 + example: 1024 + checksum: + description: sha1 hash of file + type: string + example: 9adb76bf81a34880209040ffe5ee262a090b62ab + UploadFile: + description: Metadata of uploaded file with chunks it is composed of + allOf: + - $ref: "#/components/schemas/File" + - type: object + properties: + chunks: + type: array + items: + type: string + example: d17a60eb-6581-431c-adfc-3451231455bb + UpdateFile: + description: Metadata of updated file with optional metadata about uploaded file diff + allOf: + - $ref: "#/components/schemas/UploadFile" + - type: object + properties: + diff: + nullable: true + allOf: + - $ref: "#/components/schemas/File" + ProjectVersion: + type: object + properties: + name: + type: string + example: v1 + author: + type: string + example: john.doe + created: + type: string + format: date-time + example: 2018-11-30T08:47:58.636074Z + project_name: + type: string + example: survey + namespace: + type: string + example: john.doe + \ No newline at end of file diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 55ae48d8..ec3fd260 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -2,26 +2,47 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial -from datetime import datetime -import os import uuid -from datetime import datetime, timedelta, timezone +import gevent +import logging +import os +import psycopg2 from connexion import NoContent, request -from flask import abort, jsonify, current_app, make_response +from datetime import datetime, timedelta, timezone +from flask import abort, jsonify, current_app from flask_login import current_user +from marshmallow import ValidationError +from psycopg2 import IntegrityError -from mergin.sync.forms import project_name_validation - -from .schemas import ProjectMemberSchema, UploadChunkSchema -from .workspace import WorkspaceRole from ..app import db from ..auth import auth_required from ..auth.models import User -from .models import Project, ProjectRole, ProjectMember +from .errors import ( + AnotherUploadRunning, + BigChunkError, + DataSyncError, + ProjectLocked, + ProjectVersionExists, + StorageLimitHit, + UploadError, +) +from .files import ChangesSchema +from .forms import project_name_validation +from .models import ( + Project, + ProjectRole, + ProjectMember, + ProjectVersion, + Upload, + project_version_created, + push_finished, +) from .permissions import ProjectPermissions, require_project_by_uuid -from .errors import ProjectLocked -from .utils import get_chunk_location +from .public_api_controller import catch_sync_failure +from .schemas import ProjectMemberSchema, ProjectVersionSchema, UploadChunkSchema from .storages.disk import move_to_tmp, save_to_file +from .utils import get_device_id, get_ip, get_user_agent, get_chunk_location +from .workspace import WorkspaceRole @auth_required @@ -136,6 +157,188 @@ def remove_project_collaborator(id, user_id): return NoContent, 204 +@auth_required +@catch_sync_failure +def create_project_version(id): + """Create a new project version from pushed data""" + version: int = ProjectVersion.from_v_name(request.json["version"]) + changes = request.json["changes"] + project_permission: ProjectPermissions = ( + current_app.project_handler.get_push_permission(changes) + ) + project = require_project_by_uuid(id, project_permission) + # pass full project object to request for later use + request.view_args["project"] = project + + if project.locked_until: + return ProjectLocked().response(423) + + next_version = project.next_version() + v_next_version = ProjectVersion.to_v_name(next_version) + version_dir = os.path.join(project.storage.project_dir, v_next_version) + + pv = project.get_latest_version() + if pv and pv.name != version: + return ProjectVersionExists(version, pv.name).response(409) + + # reject push if there is another one already running + pending_upload = Upload.query.filter_by(project_id=project.id).first() + if pending_upload and pending_upload.is_active(): + return AnotherUploadRunning().response(409) + + try: + ChangesSchema().validate(changes) + upload_changes = ChangesSchema().dump(changes) + except ValidationError as err: + msg = err.messages[0] if type(err.messages) == list else "Invalid input data" + return UploadError(error=msg).response(422) + + to_be_added_files = upload_changes["added"] + to_be_updated_files = upload_changes["updated"] + to_be_removed_files = upload_changes["removed"] + + # check consistency of changes + current_files = set(file.path for file in project.files) + added_files = set(file["path"] for file in to_be_added_files) + if added_files and added_files.issubset(current_files): + return UploadError( + error=f"Add changes contain files which already exist" + ).response(422) + + modified_files = set( + file["path"] for file in to_be_updated_files + to_be_removed_files + ) + if modified_files and not modified_files.issubset(current_files): + return UploadError( + error="Update or remove changes contain files that are not in project" + ).response(422) + + # Check user data limit + updated_files = list( + filter( + lambda i: i.path in [f["path"] for f in to_be_updated_files], + project.files, + ) + ) + additional_disk_usage = ( + sum(file["size"] for file in to_be_added_files + to_be_updated_files) + - sum(file.size for file in updated_files) + - sum(file["size"] for file in to_be_removed_files) + ) + + current_usage = project.workspace.disk_usage() + requested_storage = current_usage + additional_disk_usage + if requested_storage > project.workspace.storage: + return StorageLimitHit(current_usage, project.workspace.storage).response(422) + + # we have done all checks but this request is just a dry-run + if request.json.get("check_only", False): + return NoContent, 204 + + # while processing data, block other uploads + upload = Upload(project, version, upload_changes, current_user.id) + db.session.add(upload) + try: + # Creating blocking upload can fail, e.g. in case of racing condition + db.session.commit() + except IntegrityError: + db.session.rollback() + # check and clean dangling blocking uploads or abort + for current_upload in project.uploads.all(): + if current_upload.is_active(): + return AnotherUploadRunning().response(409) + db.session.delete(current_upload) + db.session.commit() + # previous push attempt is definitely lost + project.sync_failed( + "", + "push_lost", + "Push artefact removed by subsequent push", + current_user.id, + ) + + # Try again after cleanup + db.session.add(upload) + try: + db.session.commit() + move_to_tmp(upload.upload_dir) + except IntegrityError as err: + logging.error(f"Failed to create upload session: {str(err)}") + return AnotherUploadRunning().response(409) + + # Create transaction folder and lockfile + os.makedirs(upload.upload_dir) + open(upload.lockfile, "w").close() + + file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) + # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted + if errors: + upload.clear() + return DataSyncError(failed_files=errors).response(422) + + try: + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + get_user_agent(request), + get_device_id(request), + ) + db.session.add(pv) + db.session.add(project) + db.session.commit() + # let's move uploaded files where they are expected to be + temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) + os.renames(temp_files_dir, version_dir) + + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}." + ) + project_version_created.send(pv) + push_finished.send(pv) + except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: + db.session.rollback() + logging.exception( + f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " + f"upload id: {upload.id}.: {str(err)}" + ) + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) + return UploadError().response(422) + # catch exception during pg transaction so we can rollback and prevent PendingRollbackError during upload clean up + except gevent.timeout.Timeout: + db.session.rollback() + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) + raise + finally: + # remove artifacts + upload.clear() + + return ( + ProjectVersionSchema( + exclude=( + "files", + "changes", + "changesets", + ) + ).dump(pv), + 201, + ) + + @auth_required def upload_chunk(id: str): """ @@ -143,7 +346,7 @@ def upload_chunk(id: str): """ project = require_project_by_uuid(id, ProjectPermissions.Edit) if project.locked_until: - abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) + return ProjectLocked().response(423) # generate uuid for chunk chunk_id = str(uuid.uuid4()) dest_file = get_chunk_location(chunk_id) @@ -152,9 +355,9 @@ def upload_chunk(id: str): save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"]) except IOError: move_to_tmp(dest_file, chunk_id) - abort(413, "Chunk size exceeds maximum allowed size") + return BigChunkError().response(413) except Exception as e: - abort(400, "Error saving chunk") + return UploadError(error="Error saving chunk").response(400) # Add valid_until timestamp to the response, remove tzinfo for compatibility with DateTimeWithZ valid_until = ( diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 9d9b1309..8d1df050 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -75,7 +75,7 @@ def project_user_permissions(project): class FileHistorySchema(ma.SQLAlchemyAutoSchema): mtime = DateTimeWithZ() - diff = fields.Nested(FileSchema(), attribute="diff_file", exclude=("location",)) + diff = fields.Nested(FileSchema(), attribute="diff_file") expiration = DateTimeWithZ(attribute="expiration", dump_only=True) class Meta: diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index 4debb255..4491ad98 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -21,7 +21,7 @@ generate_checksum, is_versioned_file, ) -from ..files import mergin_secure_filename, ProjectFile, UploadFile, File +from ..files import mergin_secure_filename, ProjectFile, File def save_to_file(stream, path, max_size=None): @@ -245,22 +245,20 @@ def _generator(): return _generator() def apply_diff( - self, current_file: ProjectFile, upload_file: UploadFile, version: int + self, current_file: ProjectFile, diff_file: str, patched_file: str ) -> Result: """Apply geodiff diff file on current gpkg basefile. Creates GeodiffActionHistory record of the action. Returns checksum and size of generated file. If action fails it returns geodiff error message. """ from ..models import GeodiffActionHistory, ProjectVersion - v_name = ProjectVersion.to_v_name(version) + v_name = ProjectVersion.to_v_name(self.project.next_version()) basefile = os.path.join(self.project_dir, current_file.location) - changeset = os.path.join(self.project_dir, upload_file.diff.location) - patchedfile = os.path.join(self.project_dir, upload_file.location) # create local copy of basefile which will be updated in next version and changeset needed # TODO this can potentially fail for large files - logging.info(f"Apply changes: copying {basefile} to {patchedfile}") + logging.info(f"Apply changes: copying {basefile} to {patched_file}") start = time.time() - with self.geodiff_copy(changeset) as changeset_tmp, self.geodiff_copy( + with self.geodiff_copy(diff_file) as changeset_tmp, self.geodiff_copy( basefile ) as patchedfile_tmp: copy_time = time.time() - start @@ -269,7 +267,7 @@ def apply_diff( # clean geodiff logger self.flush_geodiff_logger() logging.info( - f"Geodiff: apply changeset {changeset} of size {os.path.getsize(changeset)} with changes to {patchedfile}" + f"Geodiff: apply changeset {diff_file} of size {os.path.getsize(diff_file)} with changes to {patched_file}" ) start = time.time() self.geodiff.apply_changeset(patchedfile_tmp, changeset_tmp) @@ -283,7 +281,7 @@ def apply_diff( current_file.size, v_name, "apply_changes", - changeset, + diff_file, ) gh.copy_time = copy_time gh.geodiff_time = geodiff_apply_time @@ -291,11 +289,11 @@ def apply_diff( # move constructed file where is belongs logging.info(f"Apply changes: moving patchfile {patchedfile_tmp}") start = time.time() - copy_file(patchedfile_tmp, patchedfile) + copy_file(patchedfile_tmp, patched_file) gh.copy_time = copy_time + (time.time() - start) # TODO this can potentially fail for large files - logging.info(f"Apply changes: calculating checksum of {patchedfile}") + logging.info(f"Apply changes: calculating checksum of {patched_file}") start = time.time() checksum = generate_checksum(patchedfile_tmp) checksumming_time = time.time() - start @@ -309,53 +307,46 @@ def apply_diff( ) ) except (GeoDiffLibError, GeoDiffLibConflictError): - move_to_tmp(changeset) + move_to_tmp(diff_file) return Err(self.gediff_log.getvalue()) def construct_diff( - self, current_file: ProjectFile, upload_file: UploadFile, version: int + self, + current_file: ProjectFile, + diff_file: str, + uploaded_file: str, ) -> Result: """Construct geodiff diff file from uploaded gpkg and current basefile. Returns diff metadata as a result. If action fails it returns geodiff error message. """ - from ..models import ProjectVersion - - v_name = ProjectVersion.to_v_name(version) basefile = os.path.join(self.project_dir, current_file.location) - uploaded_file = os.path.join(self.project_dir, upload_file.location) - diff_name = upload_file.path + "-diff-" + str(uuid.uuid4()) - changeset = os.path.join(self.project_dir, v_name, diff_name) + diff_name = os.path.basename(diff_file) with self.geodiff_copy(basefile) as basefile_tmp, self.geodiff_copy( uploaded_file ) as uploaded_file_tmp: try: # create changeset next to uploaded file copy changeset_tmp = os.path.join( - uploaded_file_tmp.replace(upload_file.location, "").rstrip( - os.path.sep - ), - v_name, + os.path.dirname(uploaded_file_tmp), diff_name, ) self.flush_geodiff_logger() logging.info( - f"Geodiff: create changeset {changeset} from {uploaded_file}" + f"Geodiff: create changeset {diff_file} from {uploaded_file}" ) self.geodiff.create_changeset( basefile_tmp, uploaded_file_tmp, changeset_tmp ) - # create diff metadata as it would be created by other clients - diff_file = File( - path=diff_name, - checksum=generate_checksum(changeset_tmp), - size=os.path.getsize(changeset_tmp), - location=os.path.join(v_name, mergin_secure_filename(diff_name)), + copy_file(changeset_tmp, diff_file) + return Ok( + ( + generate_checksum(changeset_tmp), + os.path.getsize(changeset_tmp), + ) ) - copy_file(changeset_tmp, changeset) - return Ok(diff_file) except (GeoDiffLibError, GeoDiffLibConflictError) as e: # diff is not possible to create - file will be overwritten - move_to_tmp(changeset) + move_to_tmp(diff_file) return Err(self.gediff_log.getvalue()) finally: move_to_tmp(changeset_tmp) diff --git a/server/mergin/sync/storages/storage.py b/server/mergin/sync/storages/storage.py index fd4c1e81..3b9699a6 100644 --- a/server/mergin/sync/storages/storage.py +++ b/server/mergin/sync/storages/storage.py @@ -11,10 +11,6 @@ class FileNotFound(Exception): pass -class DataSyncError(Exception): - pass - - class InitializationError(Exception): pass diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index 7babb231..15f5bd86 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -13,7 +13,7 @@ from shapely.errors import ShapelyError from gevent import sleep from flask import Request -from typing import Optional +from typing import Optional, Tuple from sqlalchemy import text from pathvalidate import ( validate_filename, @@ -83,6 +83,8 @@ def touch_lockfile(self): os.access(self.lockfile, os.W_OK) with open(self.lockfile, "a"): os.utime(self.lockfile, None) + + sleep(0) # to unblock greenlet if self.running: self.timer = Timer(self.interval, self.touch_lockfile) self.timer.start() @@ -582,11 +584,10 @@ def get_x_accel_uri(*url_parts): def get_chunk_location(id: str): """ - Get file name for chunk - - Splits the given identifier into two parts. + Get file location for chunk on FS - Returns a tuple where the first element is the first two characters of the identifier, and the second element is the remaining characters. + Splits the given identifier into two parts where the first two characters of the identifier are the small hash, + and the remaining characters is a file identifier. """ chunk_dir = current_app.config.get("UPLOAD_CHUNKS_DIR") small_hash = id[:2] diff --git a/server/mergin/tests/fixtures.py b/server/mergin/tests/fixtures.py index 7cff688e..e50d3350 100644 --- a/server/mergin/tests/fixtures.py +++ b/server/mergin/tests/fixtures.py @@ -19,7 +19,7 @@ from ..stats.models import MerginInfo from . import test_project, test_workspace_id, test_project_dir, TMP_DIR from .utils import login_as_admin, initialize, cleanup, file_info -from ..sync.files import ChangesSchema +from ..sync.files import ChangesSchema, files_changes_from_upload thisdir = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.join(thisdir, os.pardir)) @@ -36,6 +36,7 @@ def flask_app(request): "DOCS_URL", "COLLECT_STATISTICS", "USER_SELF_REGISTRATION", + "V2_PUSH_ENABLED", ] ) register(application) @@ -213,12 +214,13 @@ def diff_project(app): else: # no files uploaded, hence no action needed pass - upload_changes = ChangesSchema(context={"version": i + 2}).load(change) + + file_changes = files_changes_from_upload(change, location_dir=f"v{i + 2}") pv = ProjectVersion( project, i + 2, project.creator.id, - upload_changes, + file_changes, "127.0.0.1", ) assert pv.project_size == sum(file.size for file in pv.files) diff --git a/server/mergin/tests/test_config.py b/server/mergin/tests/test_config.py index 8b745a0a..af677cb0 100644 --- a/server/mergin/tests/test_config.py +++ b/server/mergin/tests/test_config.py @@ -21,6 +21,7 @@ def test_config(client): "minor", "user_self_registration", "build_hash", + "v2_push_enabled", } resp = client.get("/config") assert resp.status_code == 200 diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index e7f9e270..044294c5 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -18,7 +18,6 @@ ProjectRole, ProjectUser, ) -from ..sync.files import UploadChanges from ..auth.models import User from ..app import db from . import DEFAULT_USER @@ -40,8 +39,7 @@ def test_close_user_account(client, diff_project): # user has access to mergin user diff_project diff_project.set_role(user.id, ProjectRole.WRITER) # user contributed to another user project so he is listed in projects history - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(diff_project, 11, user.id, changes, "127.0.0.1") + pv = ProjectVersion(diff_project, 11, user.id, [], "127.0.0.1") diff_project.latest_version = pv.name pv.project = diff_project db.session.add(pv) @@ -116,8 +114,7 @@ def test_remove_project(client, diff_project): # set up mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) - changes = UploadChanges(added=[], removed=[], updated=[]) - upload = Upload(diff_project, 10, changes, mergin_user.id) + upload = Upload(diff_project, 10, [], mergin_user.id) db.session.add(upload) project_id = diff_project.id user = add_user("user", "user") diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index b1f60a8f..b615f195 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -7,6 +7,7 @@ from dataclasses import asdict from unittest.mock import patch from urllib.parse import quote +from psycopg2 import IntegrityError import pysqlite3 import pytest import json @@ -35,7 +36,7 @@ PushChangeType, ProjectFilePath, ) -from ..sync.files import ChangesSchema +from ..sync.files import files_changes_from_upload from ..sync.schemas import ProjectListSchema from ..sync.utils import generate_checksum, is_versioned_file from ..auth.models import User, UserProfile @@ -1277,8 +1278,7 @@ def create_transaction(username, changes, version=1): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - upload_changes = ChangesSchema(context={"version": version}).load(changes) - upload = Upload(project, version, upload_changes, user.id) + upload = Upload(project, version, changes, user.id) db.session.add(upload) db.session.commit() upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) @@ -1354,9 +1354,8 @@ def upload_chunks(upload_dir, changes, src_dir=test_project_dir): def test_push_finish(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/finish/{}".format(upload.id) - resp = client.post(url, headers=json_headers) + resp = client.post(f"/v1/project/push/finish/{upload.id}", headers=json_headers) assert resp.status_code == 422 assert "corrupted_files" in resp.json["detail"].keys() assert not os.path.exists(os.path.join(upload_dir, "files", "test.txt")) @@ -1364,6 +1363,7 @@ def test_push_finish(client): assert failure.error_type == "push_finish" assert "corrupted_files" in failure.error_details + upload, upload_dir = create_transaction("mergin", changes) os.mkdir(os.path.join(upload.project.storage.project_dir, "v2")) # mimic chunks were uploaded os.makedirs(os.path.join(upload_dir, "chunks")) @@ -1373,7 +1373,10 @@ def test_push_finish(client): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp2 = client.post(url, headers={**json_headers, "User-Agent": "Werkzeug"}) + resp2 = client.post( + f"/v1/project/push/finish/{upload.id}", + headers={**json_headers, "User-Agent": "Werkzeug"}, + ) assert resp2.status_code == 200 assert not os.path.exists(upload_dir) version = upload.project.get_latest_version() @@ -2274,12 +2277,12 @@ def add_project_version(project, changes, version=None): else User.query.filter_by(username=DEFAULT_USER[0]).first() ) next_version = version or project.next_version() - upload_changes = ChangesSchema(context={"version": next_version}).load(changes) + file_changes = files_changes_from_upload(changes, location_dir=f"v{next_version}") pv = ProjectVersion( project, next_version, author.id, - upload_changes, + file_changes, ip="127.0.0.1", ) db.session.add(pv) @@ -2293,19 +2296,23 @@ def test_project_version_integrity(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - # manually create an identical project version in db - pv = add_project_version(upload.project, changes) - # try to finish the transaction - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) - assert resp.status_code == 422 - assert "Failed to create new version" in resp.json["detail"] - failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first() - assert failure.error_type == "push_finish" - assert "Failed to create new version" in failure.error_details - upload.project.latest_version = pv.name - 1 - db.session.delete(pv) - db.session.delete(failure) - db.session.commit() + + # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + with patch.object( + ProjectVersion, + "__init__", + side_effect=IntegrityError("Project version already exists", None, None), + ): + resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + assert resp.status_code == 422 + assert "Failed to create new version" in resp.json["detail"] + failure = SyncFailuresHistory.query.filter_by( + project_id=upload.project.id + ).first() + assert failure.error_type == "push_finish" + assert "Failed to create new version" in failure.error_details + db.session.delete(failure) + db.session.commit() # changes without an upload with patch("mergin.sync.public_api_controller.get_user_agent") as mock: @@ -2320,7 +2327,7 @@ def test_project_version_integrity(client): # to insert an identical project version when no upload (only one endpoint used), # we need to pretend side effect of a function called just before project version insertion def _get_user_agent(): - add_project_version(project, changes) + add_project_version(project, {}) # bypass endpoint checks upload.project.latest_version = ProjectVersion.from_v_name(data["version"]) return "Input" diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 5bef6b7d..68df0949 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -2,18 +2,39 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial import os +import shutil +from unittest.mock import patch +from psycopg2 import IntegrityError +import pytest from datetime import datetime, timedelta, timezone +from mergin.app import db +from mergin.config import Configuration +from mergin.sync.errors import ( + BigChunkError, + ProjectLocked, + ProjectVersionExists, + AnotherUploadRunning, + StorageLimitHit, + UploadError, +) +from mergin.sync.models import ( + Project, + ProjectRole, + ProjectVersion, + SyncFailuresHistory, + Upload, +) from mergin.sync.utils import get_chunk_location - -from .utils import add_user -from ..app import db -from mergin.sync.models import Project -from ..tests import test_project, test_workspace_id - -from ..config import Configuration -from ..sync.models import ProjectRole -from . import test_project_dir +from . import TMP_DIR, test_project, test_workspace_id, test_project_dir +from .test_project_controller import ( + CHUNK_SIZE, + _get_changes, + _get_changes_with_diff, + _get_changes_with_diff_0_size, + _get_changes_without_added, +) +from .utils import add_user, file_info def test_schedule_delete_project(client): @@ -134,14 +155,204 @@ def test_project_members(client): assert response.status_code == 404 -def test_upload_chunk(client, app): +push_data = [ + # success + ( + {"version": "v1", "changes": _get_changes_without_added(test_project_dir)}, + 201, + None, + ), + # with diff, success + ({"version": "v1", "changes": _get_changes_with_diff(test_project_dir)}, 201, None), + # just a dry-run + ( + { + "version": "v1", + "changes": _get_changes_with_diff(test_project_dir), + "check_only": True, + }, + 204, + None, + ), + # broken .gpkg file + ( + {"version": "v1", "changes": _get_changes_with_diff_0_size(test_project_dir)}, + 422, + UploadError.code, + ), + # contains already uploaded file + ( + {"version": "v1", "changes": _get_changes(test_project_dir)}, + 422, + UploadError.code, + ), + # version mismatch + ( + {"version": "v0", "changes": _get_changes_without_added(test_project_dir)}, + 409, + ProjectVersionExists.code, + ), + # no changes requested + ( + {"version": "v1", "changes": {"added": [], "removed": [], "updated": []}}, + 422, + UploadError.code, + ), + # inconsistent changes, a file cannot be added and updated at the same time + ( + { + "version": "v1", + "changes": { + "added": [ + { + "path": "test.txt", + "size": 1234, + "checksum": "9adb76bf81a34880209040ffe5ee262a090b62ab", + "chunks": [], + } + ], + "removed": [], + "updated": [ + { + "path": "test.txt", + "size": 1234, + "checksum": "9adb76bf81a34880209040ffe5ee262a090b62ab", + "chunks": [], + } + ], + }, + }, + 422, + UploadError.code, + ), + # inconsistent changes, a file which does not exist cannot be deleted + ( + { + "version": "v1", + "changes": { + "added": [], + "removed": [ + { + "path": "not-existing.txt", + "size": 1234, + "checksum": "9adb76bf81a34880209040ffe5ee262a090b62ab", + } + ], + "updated": [], + }, + }, + 422, + UploadError.code, + ), + # missing version (required parameter) + ({"changes": _get_changes_without_added(test_project_dir)}, 400, None), + # incorrect changes format + ({"version": "v1", "changes": {}}, 400, None), +] + + +@pytest.mark.parametrize("data,expected,err_code", push_data) +def test_create_version(client, data, expected, err_code): + """Test project push endpoint with different payloads.""" + + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + assert project.latest_version == 1 + + if expected == 201: + # mimic chunks were uploaded + for f in data["changes"]["added"] + data["changes"]["updated"]: + src_file = ( + os.path.join(TMP_DIR, f["diff"]["path"]) + if f.get("diff") + else os.path.join(test_project_dir, f["path"]) + ) + with open(src_file, "rb") as in_file: + for chunk in f["chunks"]: + chunk_location = get_chunk_location(chunk) + os.makedirs(os.path.dirname(chunk_location), exist_ok=True) + with open(chunk_location, "wb") as out_file: + out_file.write(in_file.read(CHUNK_SIZE)) + + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == expected + if expected == 201: + assert response.json["name"] == "v2" + assert project.latest_version == 2 + else: + assert project.latest_version == 1 + if err_code: + assert response.json["code"] == err_code + failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() + # failures are not created when POST request body is invalid (caught by connexion validators) + if failure: + assert failure.last_version == "v1" + assert failure.error_type == "project_push" + + +def test_create_version_failures(client): + """Test various project push failures beyond invalid payload""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + + data = {"version": "v1", "changes": _get_changes_without_added(test_project_dir)} + + # somebody else is syncing + upload = Upload(project, 1, _get_changes(test_project_dir), 1) + db.session.add(upload) + db.session.commit() + os.makedirs(upload.upload_dir) + open(upload.lockfile, "w").close() + + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 409 + assert response.json["code"] == AnotherUploadRunning.code + upload.clear() + + # project is locked + project.locked_until = datetime.now(timezone.utc) + timedelta(days=1) + db.session.commit() + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 423 + assert response.json["code"] == ProjectLocked.code + project.locked_until = None + db.session.commit() + + # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + with patch.object( + Configuration, + "GLOBAL_STORAGE", + 0, + ): + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 422 + assert response.json["code"] == StorageLimitHit.code + + # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + with patch.object( + ProjectVersion, + "__init__", + side_effect=IntegrityError("Cannot insert new version", None, None), + ): + # keep just deleted data to avoid messing with chunks + data["changes"]["added"] = data["changes"]["updated"] = [] + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 422 + assert response.json["code"] == UploadError.code + + +def test_upload_chunk(client): """Test pushing a chunk to a project""" project = Project.query.filter_by( workspace_id=test_workspace_id, name=test_project ).first() url = f"/v2/projects/{project.id}/chunks" - app.config["MAX_CHUNK_SIZE"] = 1024 # Set a small max chunk size for testing - max_chunk_size = app.config["MAX_CHUNK_SIZE"] + client.application.config["MAX_CHUNK_SIZE"] = ( + 1024 # Set a small max chunk size for testing + ) + max_chunk_size = client.application.config["MAX_CHUNK_SIZE"] response = client.post( url, @@ -149,6 +360,7 @@ def test_upload_chunk(client, app): headers={"Content-Type": "application/octet-stream"}, ) assert response.status_code == 413 + assert response.json["code"] == BigChunkError.code # Project is locked, cannot push chunks project.locked_until = datetime.now(timezone.utc) + timedelta(weeks=26) @@ -158,8 +370,8 @@ def test_upload_chunk(client, app): data=b"a", headers={"Content-Type": "application/octet-stream"}, ) - assert response.status_code == 422 - assert response.json["code"] == "ProjectLocked" + assert response.status_code == 423 + assert response.json["code"] == ProjectLocked.code project.locked_until = None # Unlock the project project.removed_at = datetime.now(timezone.utc) - timedelta( @@ -188,10 +400,60 @@ def test_upload_chunk(client, app): valid_until_dt = datetime.strptime(valid_until, "%Y-%m-%dT%H:%M:%S%z") assert valid_until_dt > datetime.now(timezone.utc) assert valid_until_dt < datetime.now(timezone.utc) + timedelta( - seconds=app.config["UPLOAD_CHUNKS_EXPIRATION"] + seconds=client.application.config["UPLOAD_CHUNKS_EXPIRATION"] ) # Check if the chunk is stored correctly stored_chunk = get_chunk_location(chunk_id) assert os.path.exists(stored_chunk) with open(stored_chunk, "rb") as f: assert f.read() == b"a" * max_chunk_size + + +def test_full_push(client): + """Test full project push with upload of chunks and project version creation""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + + # prepare data to push + project_dir = os.path.join(TMP_DIR, test_project) + if os.path.exists(project_dir): + shutil.rmtree(project_dir) + shutil.copytree(test_project_dir, project_dir) + os.rename( + os.path.join(project_dir, "base.gpkg"), + os.path.join(project_dir, "new_base.gpkg"), + ) + + test_file = file_info(project_dir, "new_base.gpkg", chunk_size=CHUNK_SIZE) + uploaded_chunks = [] + + with open(os.path.join(project_dir, test_file["path"]), "rb") as in_file: + for _ in test_file["chunks"]: + data = in_file.read(CHUNK_SIZE) + response = client.post( + f"/v2/projects/{project.id}/chunks", + data=data, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 200 + uploaded_chunks.append(response.json["id"]) + chunk_location = get_chunk_location(response.json["id"]) + assert os.path.exists(chunk_location) + + test_file["chunks"] = uploaded_chunks + + response = client.post( + f"v2/projects/{project.id}/versions", + json={ + "version": "v1", + "changes": {"added": [test_file], "updated": [], "removed": []}, + }, + ) + assert response.status_code == 201 + assert response.json["name"] == "v2" + assert project.latest_version == 2 + assert os.path.exists( + os.path.join(project.storage.project_dir, "v2", test_file["path"]) + ) + assert not Upload.query.filter_by(project_id=project.id).first() diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index 94fc033f..6dcfd157 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -4,13 +4,11 @@ import json import shutil -from typing import Tuple import pysqlite3 import uuid import math from dataclasses import asdict from datetime import datetime - import pysqlite3 from flask import url_for, current_app import os @@ -20,7 +18,7 @@ from ..auth.models import User, UserProfile from ..sync.utils import generate_location, generate_checksum from ..sync.models import Project, ProjectVersion, FileHistory, ProjectRole -from ..sync.files import UploadChanges, ChangesSchema +from ..sync.files import ProjectFileChange, PushChangeType, files_changes_from_upload from ..sync.workspace import GlobalWorkspace from ..app import db from . import json_headers, DEFAULT_USER, test_project, test_project_dir, TMP_DIR @@ -82,8 +80,7 @@ def create_project(name, workspace, user, **kwargs): p.updated = datetime.utcnow() db.session.add(p) db.session.flush() - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") db.session.add(pv) db.session.commit() @@ -156,15 +153,17 @@ def initialize(): for f in files: abs_path = os.path.join(root, f) project_files.append( - { - "path": abs_path.replace(test_project_dir, "").lstrip("/"), - "location": os.path.join( + ProjectFileChange( + path=abs_path.replace(test_project_dir, "").lstrip("/"), + checksum=generate_checksum(abs_path), + size=os.path.getsize(abs_path), + mtime=str(datetime.fromtimestamp(os.path.getmtime(abs_path))), + change=PushChangeType.CREATE, + location=os.path.join( "v1", abs_path.replace(test_project_dir, "").lstrip("/") ), - "size": os.path.getsize(abs_path), - "checksum": generate_checksum(abs_path), - "mtime": str(datetime.fromtimestamp(os.path.getmtime(abs_path))), - } + diff=None, + ) ) p.latest_version = 1 p.public = True @@ -173,14 +172,7 @@ def initialize(): db.session.add(p) db.session.commit() - upload_changes = ChangesSchema(context={"version": 1}).load( - { - "added": project_files, - "updated": [], - "removed": [], - } - ) - pv = ProjectVersion(p, 1, user.id, upload_changes, "127.0.0.1") + pv = ProjectVersion(p, 1, user.id, project_files, "127.0.0.1") db.session.add(pv) db.session.commit() @@ -285,7 +277,7 @@ def create_blank_version(project): project, project.next_version(), project.creator.id, - UploadChanges(added=[], updated=[], removed=[]), + [], "127.0.0.1", ) db.session.add(pv) @@ -355,14 +347,14 @@ def push_change(project, action, path, src_dir): else: return - upload_changes = ChangesSchema(context={"version": project.next_version()}).load( - changes + file_changes = files_changes_from_upload( + changes, location_dir=f"v{project.next_version()}" ) pv = ProjectVersion( project, project.next_version(), project.creator.id, - upload_changes, + file_changes, "127.0.0.1", ) db.session.add(pv)