diff --git a/.prod.env b/.prod.env index 9035e6e3..6cb1dbcc 100644 --- a/.prod.env +++ b/.prod.env @@ -126,6 +126,12 @@ CELERY_RESULT_BACKEND=redis://merginmaps-redis:6379/0 #CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS={} # cast=eval CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS={ 'master_name': 'mymaster' } +#CELERY_ACKS_LATE=False + +#CELERY_WORKER_CONCURRENCY=1 # set to number of cpu in case of prefork or to higher number in case of gevent pool +CELERYD_CONCURRENCY=2 + +#CELERYD_PREFETCH_MULTIPLIER=4 # various life times diff --git a/server/.test.env b/server/.test.env index b65a4649..d83920b8 100644 --- a/server/.test.env +++ b/server/.test.env @@ -19,3 +19,4 @@ INPUTAPP_API_KEY=not-secret-key GLOBAL_WORKSPACE='mergin' GLOBAL_STORAGE=104857600 COLLECT_STATISTICS=0 +GEODIFF_WORKING_DIR=/tmp/geodiff diff --git a/server/Dockerfile b/server/Dockerfile index 6c206bf4..c64a4d54 100644 --- a/server/Dockerfile +++ b/server/Dockerfile @@ -38,7 +38,6 @@ ENV LC_ALL=C.UTF-8 ENV LANG=C.UTF-8 RUN pipenv install --system --deploy --verbose -RUN pip3 install flower==0.9.7 USER mergin diff --git a/server/mergin/celery.py b/server/mergin/celery.py index 25a912ae..156e14bf 100644 --- a/server/mergin/celery.py +++ b/server/mergin/celery.py @@ -66,6 +66,11 @@ def __call__(self, *args, **kwargs): return self.run(*args, **kwargs) celery.conf.update(app.config) + celery.conf.update( + task_acks_late=Configuration.CELERY_ACKS_LATE, + worker_concurrency=Configuration.CELERYD_CONCURRENCY, + worker_prefetch_multiplier=Configuration.CELERYD_PREFETCH_MULTIPLIER, + ) # configure celery with flask context https://flask.palletsprojects.com/en/1.1.x/patterns/celery/ # e.g. for using flask-mail celery.Task = ContextTask diff --git a/server/mergin/config.py b/server/mergin/config.py index 2f0dbb35..34b8652b 100644 --- a/server/mergin/config.py +++ b/server/mergin/config.py @@ -74,6 +74,11 @@ class Configuration(object): CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = config( "CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS", default="{}", cast=eval ) + CELERY_ACKS_LATE = config("CELERY_ACKS_LATE", default=False, cast=bool) + CELERYD_CONCURRENCY = config("CELERYD_CONCURRENCY", default=1, cast=int) + CELERYD_PREFETCH_MULTIPLIER = config( + "CELERYD_PREFETCH_MULTIPLIER", default=4, cast=int + ) # deployment URL (e.g. for links generated in emails) MERGIN_BASE_URL = config("MERGIN_BASE_URL", default="") diff --git a/server/mergin/sync/commands.py b/server/mergin/sync/commands.py index be0981e9..327bc560 100644 --- a/server/mergin/sync/commands.py +++ b/server/mergin/sync/commands.py @@ -42,7 +42,7 @@ def create(name, namespace, user): # pylint: disable=W0612 db.session.add(p) p.access = ProjectAccess(p, public=False) changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.username, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") pv.project = p db.session.commit() os.makedirs(p.storage.project_dir, exist_ok=True) diff --git a/server/mergin/sync/config.py b/server/mergin/sync/config.py index f7fa9a8f..de6edcfd 100644 --- a/server/mergin/sync/config.py +++ b/server/mergin/sync/config.py @@ -47,3 +47,8 @@ class Configuration(object): ) # trash dir for temp files being cleaned regularly TEMP_DIR = config("TEMP_DIR", default=gettempdir()) + # working directory for geodiff actions - should be a fast local storage + GEODIFF_WORKING_DIR = config( + "GEODIFF_WORKING_DIR", + default=os.path.join(LOCAL_PROJECTS, os.pardir, "geodiff_tmp"), + ) diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index cb164695..83febbbd 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -4,6 +4,7 @@ from __future__ import annotations import json import os +import time import uuid from datetime import datetime, timedelta from enum import Enum @@ -28,6 +29,7 @@ ChangesSchema, ProjectFile, ) +from .storages.disk import move_to_tmp from .. import db from .storages import DiskStorage from .utils import is_versioned_file, is_qgis @@ -209,7 +211,7 @@ def expiration(self) -> timedelta: def delete(self, removed_by: int = None): """Mark project as permanently deleted (but keep in db) - rename (to free up the same name) - - remove associated files and their history and project versions + - remove associated files and their history - reset project_access - decline pending project access requests """ @@ -225,9 +227,7 @@ def delete(self, removed_by: int = None): # Null in storage params serves as permanent deletion flag self.storage.delete() self.storage_params = null() - pv_table = ProjectVersion.__table__ - # remove versions and file history items with cascade - db.session.execute(pv_table.delete().where(pv_table.c.project_id == self.id)) + # remove file records and their history (cascade) files_path_table = ProjectFilePath.__table__ db.session.execute( files_path_table.delete().where(files_path_table.c.project_id == self.id) @@ -492,6 +492,17 @@ class FileHistory(db.Model): file_path_id, project_version_name.desc(), ), + db.CheckConstraint( + text( + """ + CASE + WHEN (change = 'update_diff') THEN diff IS NOT NULL + ELSE diff IS NULL + END + """ + ), + name="changes_with_diff", + ), ) def __init__( @@ -507,7 +518,7 @@ def __init__( self.size = size self.checksum = checksum self.location = location - self.diff = diff + self.diff = diff if diff is not None else null() self.change = change.value @property @@ -684,7 +695,9 @@ class ProjectVersion(db.Model): UUID(as_uuid=True), db.ForeignKey("project.id", ondelete="CASCADE"), index=True ) created = db.Column(db.DateTime, default=datetime.utcnow, index=True) - author = db.Column(db.String, index=True) + author_id = db.Column( + db.Integer, db.ForeignKey("user.id"), index=True, nullable=True + ) user_agent = db.Column(db.String, index=True) ip_address = db.Column(db.String, index=True) ip_geolocation_country = db.Column( @@ -698,6 +711,8 @@ class ProjectVersion(db.Model): uselist=False, ) device_id = db.Column(db.String, index=True, nullable=True) + author = db.relationship("User", uselist=False, lazy="joined") + __table_args__ = ( db.UniqueConstraint("project_id", "name"), db.Index( @@ -711,7 +726,7 @@ def __init__( self, project: Project, name: int, - author: str, + author_id: int, changes: UploadChanges, ip: str, user_agent: str = None, @@ -720,7 +735,7 @@ def __init__( self.project = project self.project_id = project.id self.name = name - self.author = author + self.author_id = author_id self.user_agent = user_agent self.ip_address = ip self.device_id = device_id @@ -764,7 +779,11 @@ def __init__( size=upload_file.size, checksum=upload_file.checksum, location=upload_file.location, - diff=asdict(upload_file.diff) if upload_file.diff else null(), + diff=( + asdict(upload_file.diff) + if (is_diff_change and upload_file.diff) + else null() + ), change=( PushChangeType.UPDATE_DIFF if is_diff_change else change_type ), @@ -1026,6 +1045,29 @@ def __init__( self.changes = ChangesSchema().dump(changes) self.user_id = user_id + @property + def upload_dir(self): + return os.path.join(self.project.storage.project_dir, "tmp", self.id) + + @property + def lockfile(self): + return os.path.join(self.upload_dir, "lockfile") + + def is_active(self): + """Check if upload is still active because there was a ping (lockfile update) from underlying process""" + return os.path.exists(self.lockfile) and ( + time.time() - os.path.getmtime(self.lockfile) + < current_app.config["LOCKFILE_EXPIRATION"] + ) + + def clear(self): + """Clean up pending upload. + Uploaded files and table records are removed, and another upload can start. + """ + move_to_tmp(self.upload_dir, self.id) + db.session.delete(self) + db.session.commit() + class RequestStatus(Enum): ACCEPTED = "accepted" @@ -1160,6 +1202,6 @@ def __init__( self.target_version = target_version self.action = action - if os.path.exists: + if os.path.exists(diff_path): self.diff_size = os.path.getsize(diff_path) self.changes = GeoDiff().changes_count(diff_path) diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 8af7ca38..7a3b1833 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -81,7 +81,6 @@ is_versioned_file, is_name_allowed, get_project_path, - clean_upload, get_device_id, ) from .errors import StorageLimitHit @@ -224,7 +223,7 @@ def add_project(namespace): # noqa: E501 version = ProjectVersion( p, version_name, - current_user.username, + current_user.id, changes, get_ip(request), get_user_agent(request), @@ -746,6 +745,13 @@ def project_push(namespace, project_name): if all(len(changes[key]) == 0 for key in changes.keys()): abort(400, "No changes") + # reject upload early if there is another one already running + pending_upload = Upload.query.filter_by( + project_id=project.id, version=version + ).first() + if pending_upload and pending_upload.is_active(): + abort(400, "Another process is running. Please try later.") + upload_changes = ChangesSchema(context={"version": version + 1}).load(changes) # check if same file is not already uploaded for item in upload_changes.added: @@ -817,16 +823,8 @@ def project_push(namespace, project_name): db.session.rollback() # check and clean dangling uploads or abort for current_upload in project.uploads.all(): - upload_dir = os.path.join( - project.storage.project_dir, "tmp", current_upload.id - ) - upload_lockfile = os.path.join(upload_dir, "lockfile") - if os.path.exists(upload_lockfile): - if ( - time() - os.path.getmtime(upload_lockfile) - < current_app.config["LOCKFILE_EXPIRATION"] - ): - abort(400, "Another process is running. Please try later.") + if current_upload.is_active(): + abort(400, "Another process is running. Please try later.") db.session.delete(current_upload) db.session.commit() # previous push attempt is definitely lost @@ -844,15 +842,14 @@ def project_push(namespace, project_name): logging.info( f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" ) - move_to_tmp(upload_dir) + move_to_tmp(upload.upload_dir) except IntegrityError as err: logging.error(f"Failed to create upload session: {str(err)}") abort(422, "Failed to create upload session. Please try later.") # Create transaction folder and lockfile - folder = os.path.join(project.storage.project_dir, "tmp", upload.id) - os.makedirs(folder) - open(os.path.join(folder, "lockfile"), "w").close() + os.makedirs(upload.upload_dir) + open(upload.lockfile, "w").close() # Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit if not (changes["added"] or changes["updated"]): @@ -863,7 +860,7 @@ def project_push(namespace, project_name): pv = ProjectVersion( project, next_version, - current_user.username, + current_user.id, upload_changes, get_ip(request), user_agent, @@ -877,15 +874,15 @@ def project_push(namespace, project_name): f"Transaction id: {upload.id}. No upload." ) project_version_created.send(pv) - clean_upload(upload.id) return jsonify(ProjectSchema().dump(project)), 200 except IntegrityError as err: db.session.rollback() - clean_upload(upload.id) logging.exception( f"Failed to upload a new project version using transaction id: {upload.id}: {str(err)}" ) abort(422, "Failed to upload a new project version. Please try later.") + finally: + upload.clear() return {"transaction": upload.id} @@ -1074,7 +1071,7 @@ def push_finish(transaction_id): pv = ProjectVersion( project, next_version, - current_user.username, + current_user.id, changes, get_ip(request), user_agent, @@ -1087,16 +1084,16 @@ def push_finish(transaction_id): f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." ) project_version_created.send(pv) - # remove artifacts - clean_upload(transaction_id) except (psycopg2.Error, FileNotFoundError, DataSyncError, IntegrityError) as err: db.session.rollback() - clean_upload(transaction_id) logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " f"transaction id: {transaction_id}.: {str(err)}" ) abort(422, "Failed to create new version: {}".format(str(err))) + finally: + # remove artifacts + upload.clear() # do not optimize on every version, every 10th is just fine if not project.latest_version % 10: @@ -1208,7 +1205,7 @@ def clone_project(namespace, project_name): # noqa: E501 project_version = ProjectVersion( p, version, - current_user.username, + current_user.id, changes, get_ip(request), user_agent, diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 291b3daf..827391a9 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -228,7 +228,7 @@ class ProjectVersionSchema(ma.SQLAlchemyAutoSchema): project_name = fields.Function(lambda obj: obj.project.name) namespace = fields.Function(lambda obj: obj.project.workspace.name) name = fields.Function(lambda obj: ProjectVersion.to_v_name(obj.name)) - author = fields.String() + author = fields.String(attribute="author.username") changesets = fields.Method("get_diff_summary") files = fields.String() created = DateTimeWithZ() @@ -361,7 +361,7 @@ class ProjectVersionListSchema(ma.SQLAlchemyAutoSchema): project_name = fields.Function(lambda obj: obj.project.name) namespace = fields.Function(lambda obj: obj.project.workspace.name) name = fields.Function(lambda obj: ProjectVersion.to_v_name(obj.name)) - author = fields.String() + author = fields.String(attribute="author.username") created = DateTimeWithZ() changes = fields.Method("_changes") project_size = fields.Integer() diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index f78b7751..8c89f2bb 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -7,6 +7,7 @@ import time import uuid import logging +from contextlib import contextmanager from flask import current_app from pygeodiff import GeoDiff, GeoDiffLibError @@ -112,6 +113,13 @@ def __init__(self, project): self.project_dir = self._project_dir() self.geodiff = GeoDiff() self.gediff_log = io.StringIO() + self.geodiff_working_dir = os.path.abspath( + os.path.join( + current_app.config["GEODIFF_WORKING_DIR"], + self.project.storage_params["location"], + str(uuid.uuid4()), + ) + ) def _logger_callback(level, text_bytes): text = text_bytes.decode() @@ -124,6 +132,26 @@ def _logger_callback(level, text_bytes): self.geodiff.set_logger_callback(_logger_callback) + @contextmanager + def geodiff_copy(self, file): + """Copy project file from live storage to geodiff temp storage for further actions. + If file cannot be copied then as a fallback it creates _tmp file next to original file. + Temporary copy is removed on exit. + """ + file_path = file.replace(self.project_dir, "").lstrip(os.path.sep) + file_copy = os.path.join(self.geodiff_working_dir, file_path) + try: + copy_file(file, file_copy) + yield file_copy + except OSError as e: + logging.warning(f"Copy to geodiff dir failed: {str(e)}") + # fallback to live directory + file_copy = file + "_tmp" + copy_file(file, file_copy) + yield file_copy + finally: + move_to_tmp(file_copy) + def flush_geodiff_logger(self): """Push content to stdout and then reset.""" logging.warning(self.gediff_log.getvalue()) @@ -214,56 +242,61 @@ def apply_diff( basefile = os.path.join(self.project_dir, current_file.location) changeset = os.path.join(self.project_dir, upload_file.diff.location) patchedfile = os.path.join(self.project_dir, upload_file.location) - # create copy of basefile which will be updated in next version + # create local copy of basefile which will be updated in next version and changeset needed # TODO this can potentially fail for large files logging.info(f"Apply changes: copying {basefile} to {patchedfile}") start = time.time() - copy_file(basefile, patchedfile) - copy_time = time.time() - start - logging.info(f"Copying finished in {copy_time} s") - try: - # clean geodiff logger - self.flush_geodiff_logger() - logging.info( - f"Geodiff: apply changeset {changeset} of size {os.path.getsize(changeset)} with changes to {patchedfile}" - ) - start = time.time() - self.geodiff.apply_changeset(patchedfile, changeset) - geodiff_apply_time = time.time() - start - # track performance of geodiff action - base_version = current_file.location.split("/")[0] - gh = GeodiffActionHistory( - self.project.id, - base_version, - current_file.path, - current_file.size, - v_name, - "apply_changes", - changeset, - ) - gh.copy_time = copy_time - gh.geodiff_time = geodiff_apply_time - logging.info(f"Changeset applied in {geodiff_apply_time} s") - except (GeoDiffLibError, GeoDiffLibConflictError): - move_to_tmp(patchedfile) - move_to_tmp(changeset) - return Err(self.gediff_log.getvalue()) - - # we can now replace old basefile metadata with the new one (patchedfile) - # TODO this can potentially fail for large files - logging.info(f"Apply changes: calculating checksum of {patchedfile}") - start = time.time() - checksum = generate_checksum(patchedfile) - checksumming_time = time.time() - start - gh.checksum_time = checksumming_time - logging.info(f"Checksum calculated in {checksumming_time} s") - db.session.add(gh) - return Ok( - ( - checksum, - os.path.getsize(patchedfile), - ) - ) + with self.geodiff_copy(changeset) as changeset_tmp, self.geodiff_copy( + basefile + ) as patchedfile_tmp: + copy_time = time.time() - start + logging.info(f"Copying finished in {copy_time} s") + try: + # clean geodiff logger + self.flush_geodiff_logger() + logging.info( + f"Geodiff: apply changeset {changeset} of size {os.path.getsize(changeset)} with changes to {patchedfile}" + ) + start = time.time() + self.geodiff.apply_changeset(patchedfile_tmp, changeset_tmp) + geodiff_apply_time = time.time() - start + # track performance of geodiff action + base_version = current_file.location.split("/")[0] + gh = GeodiffActionHistory( + self.project.id, + base_version, + current_file.path, + current_file.size, + v_name, + "apply_changes", + changeset, + ) + gh.copy_time = copy_time + gh.geodiff_time = geodiff_apply_time + logging.info(f"Changeset applied in {geodiff_apply_time} s") + # move constructed file where is belongs + logging.info(f"Apply changes: moving patchfile {patchedfile_tmp}") + start = time.time() + copy_file(patchedfile_tmp, patchedfile) + gh.copy_time = copy_time + (time.time() - start) + + # TODO this can potentially fail for large files + logging.info(f"Apply changes: calculating checksum of {patchedfile}") + start = time.time() + checksum = generate_checksum(patchedfile_tmp) + checksumming_time = time.time() - start + gh.checksum_time = checksumming_time + logging.info(f"Checksum calculated in {checksumming_time} s") + db.session.add(gh) + return Ok( + ( + checksum, + os.path.getsize(patchedfile_tmp), + ) + ) + except (GeoDiffLibError, GeoDiffLibConflictError): + move_to_tmp(changeset) + return Err(self.gediff_log.getvalue()) def construct_diff( self, current_file: ProjectFile, upload_file: UploadFile, version: int @@ -278,22 +311,40 @@ def construct_diff( uploaded_file = os.path.join(self.project_dir, upload_file.location) diff_name = upload_file.path + "-diff-" + str(uuid.uuid4()) changeset = os.path.join(self.project_dir, v_name, diff_name) - try: - self.flush_geodiff_logger() - logging.info(f"Geodiff: create changeset {changeset} from {uploaded_file}") - self.geodiff.create_changeset(basefile, uploaded_file, changeset) - # create diff metadata as it would be created by other clients - diff_file = File( - path=diff_name, - checksum=generate_checksum(changeset), - size=os.path.getsize(changeset), - location=os.path.join(v_name, mergin_secure_filename(diff_name)), - ) - return Ok(diff_file) - except (GeoDiffLibError, GeoDiffLibConflictError): - # diff is not possible to create - file will be overwritten - move_to_tmp(changeset) - return Err(self.gediff_log.getvalue()) + with self.geodiff_copy(basefile) as basefile_tmp, self.geodiff_copy( + uploaded_file + ) as uploaded_file_tmp: + try: + # create changeset next to uploaded file copy + changeset_tmp = os.path.join( + uploaded_file_tmp.replace(upload_file.location, "").rstrip( + os.path.sep + ), + v_name, + diff_name, + ) + self.flush_geodiff_logger() + logging.info( + f"Geodiff: create changeset {changeset} from {uploaded_file}" + ) + self.geodiff.create_changeset( + basefile_tmp, uploaded_file_tmp, changeset_tmp + ) + # create diff metadata as it would be created by other clients + diff_file = File( + path=diff_name, + checksum=generate_checksum(changeset_tmp), + size=os.path.getsize(changeset_tmp), + location=os.path.join(v_name, mergin_secure_filename(diff_name)), + ) + copy_file(changeset_tmp, changeset) + return Ok(diff_file) + except (GeoDiffLibError, GeoDiffLibConflictError) as e: + # diff is not possible to create - file will be overwritten + move_to_tmp(changeset) + return Err(self.gediff_log.getvalue()) + finally: + move_to_tmp(changeset_tmp) def delete(self): move_to_tmp(self.project_dir) @@ -331,70 +382,78 @@ def restore_versioned_file(self, file: str, version: int): if not (base_meta and diffs): return - tmp_dir = os.path.join(current_app.config["TEMP_DIR"], str(uuid.uuid4())) - os.makedirs(tmp_dir, exist_ok=True) - # this is final restored file - restored_file = os.path.join(tmp_dir, os.path.basename(base_meta.abs_path)) - logging.info(f"Restore file: copying {base_meta.abs_path} to {restored_file}") start = time.time() - copy_file(base_meta.abs_path, restored_file) - copy_time = time.time() - start - logging.info(f"File copied in {copy_time} s") - logging.info(f"Restoring gpkg file with {len(diffs)} diffs") - - try: - self.flush_geodiff_logger() # clean geodiff logger - if len(diffs) > 1: - # concatenate multiple diffs into single one + with self.geodiff_copy(base_meta.abs_path) as restored_file: + copy_time = time.time() - start + logging.info( + f"Restore file: {base_meta.abs_path} copied to {restored_file} in {copy_time} s" + ) + logging.info(f"Restoring gpkg file with {len(diffs)} diffs") + try: + self.flush_geodiff_logger() # clean geodiff logger changeset = os.path.join( - tmp_dir, os.path.basename(base_meta.abs_path) + "-diff" + self.geodiff_working_dir, + os.path.basename(base_meta.abs_path) + "-diff", ) - partials = [os.path.join(self.project_dir, d.location) for d in diffs] - self.geodiff.concat_changes(partials, changeset) - else: - changeset = os.path.join(self.project_dir, diffs[0].location) + if len(diffs) > 1: + # concatenate multiple diffs into single one + partials = [ + os.path.join(self.project_dir, d.location) for d in diffs + ] + self.geodiff.concat_changes(partials, changeset) + else: + copy_file( + os.path.join(self.project_dir, diffs[0].location), changeset + ) + logging.info( + f"Geodiff: apply changeset {changeset} of size {os.path.getsize(changeset)}" + ) + # if we are going backwards we need to reverse changeset! + if base_meta.version.name > version: + logging.info(f"Geodiff: inverting changeset") + changes = os.path.join( + self.geodiff_working_dir, + os.path.basename(base_meta.abs_path) + "-diff-inv", + ) + self.geodiff.invert_changeset(changeset, changes) + else: + changes = changeset + + start = time.time() + self.geodiff.apply_changeset(restored_file, changes) + # track geodiff event for performance analysis + gh = GeodiffActionHistory( + self.project.id, + ProjectVersion.to_v_name(base_meta.version.name), + base_meta.path, + base_meta.size, + ProjectVersion.to_v_name(project_version.name), + "restore_file", + changes, + ) + apply_time = time.time() - start + gh.geodiff_time = apply_time + logging.info(f"Changeset applied in {apply_time} s") + except (GeoDiffLibError, GeoDiffLibConflictError): + project_workspace = self.project.workspace.name + logging.exception( + f"Failed to restore file: {self.gediff_log.getvalue()} from project {project_workspace}/{self.project.name}" + ) + return + finally: + move_to_tmp(changes) + move_to_tmp(changeset) + # move final restored file to place where it is expected (only after it is successfully created) logging.info( - f"Geodiff: apply changeset {changeset} of size {os.path.getsize(changeset)}" + f"Copying restored file to expected location {file_found.location}" ) - # if we are going backwards we need to reverse changeset! - if base_meta.version.name > version: - logging.info(f"Geodiff: inverting changeset") - changes = os.path.join( - tmp_dir, os.path.basename(base_meta.abs_path) + "-diff-inv" - ) - self.geodiff.invert_changeset(changeset, changes) - else: - changes = changeset start = time.time() - self.geodiff.apply_changeset(restored_file, changes) - # track geodiff event for performance analysis - gh = GeodiffActionHistory( - self.project.id, - ProjectVersion.to_v_name(base_meta.version.name), - base_meta.path, - base_meta.size, - ProjectVersion.to_v_name(project_version.name), - "restore_file", - changes, + copy_file( + restored_file, os.path.join(self.project_dir, file_found.location) ) - apply_time = time.time() - start - gh.geodiff_time = apply_time - logging.info(f"Changeset applied in {apply_time} s") - except (GeoDiffLibError, GeoDiffLibConflictError): - project_workspace = self.project.workspace.name - logging.exception( - f"Failed to restore file: {self.gediff_log.getvalue()} from project {project_workspace}/{self.project.name}" - ) - return - # move final restored file to place where it is expected (only after it is successfully created) - logging.info( - f"Copying restored file to expected location {file_found.location}" - ) - start = time.time() - copy_file(restored_file, os.path.join(self.project_dir, file_found.location)) - logging.info(f"File copied in {time.time() - start} s") - copy_time += time.time() - start - gh.copy_time = copy_time - db.session.add(gh) - db.session.commit() + logging.info(f"File copied in {time.time() - start} s") + copy_time += time.time() - start + gh.copy_time = copy_time + db.session.add(gh) + db.session.commit() diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index 4e4516ae..724b0c31 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -301,27 +301,6 @@ def split_project_path(project_path): return workspace_name, project_name -def clean_upload(transaction_id): - """Clean upload infrastructure - - Uploaded files and table records are removed, and another upload can be started. - - :param transaction_id: Transaction id. - :type transaction_id: Str - - :rtype: None - """ - from mergin.sync.permissions import get_upload - from mergin.sync.storages.disk import move_to_tmp - from .. import db - - upload, upload_dir = get_upload(transaction_id) - db.session.delete(upload) - db.session.commit() - move_to_tmp(upload_dir, transaction_id) - return NoContent, 200 - - def get_device_id(request: Request) -> Optional[str]: """Get device uuid from http header X-Device-Id""" return request.headers.get("X-Device-Id") diff --git a/server/mergin/tests/fixtures.py b/server/mergin/tests/fixtures.py index 00407ce8..bab7646e 100644 --- a/server/mergin/tests/fixtures.py +++ b/server/mergin/tests/fixtures.py @@ -213,7 +213,7 @@ def diff_project(app): pv = ProjectVersion( project, i + 2, - project.creator.username, + project.creator.id, upload_changes, "127.0.0.1", ) diff --git a/server/mergin/tests/test_celery.py b/server/mergin/tests/test_celery.py index 34a07fc2..44c36f2a 100644 --- a/server/mergin/tests/test_celery.py +++ b/server/mergin/tests/test_celery.py @@ -133,5 +133,5 @@ def test_remove_deleted_project_backups(client): .filter(Project.storage_params.isnot(None)) .first() ) - assert ProjectVersion.query.filter_by(project_id=rm_project.id).count() == 0 + assert ProjectVersion.query.filter_by(project_id=rm_project.id).count() != 0 assert str(rm_project.id) in rm_project.name diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index 4304fa1f..0f16710d 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -42,7 +42,7 @@ def test_close_user_account(client, diff_project): flag_modified(diff_project.access, "writers") # user contributed to another user project so he is listed in projects history changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(diff_project, 11, user.username, changes, "127.0.0.1") + pv = ProjectVersion(diff_project, 11, user.id, changes, "127.0.0.1") diff_project.latest_version = pv.name pv.project = diff_project db.session.add(pv) @@ -98,7 +98,7 @@ def test_close_user_account(client, diff_project): ) assert user_id not in diff_project.access.writers # user remains referenced in existing project version he created (as read-only ref) - assert diff_project.get_latest_version().author == "user" + assert diff_project.get_latest_version().author_id == user_id sync_fail_history = SyncFailuresHistory.query.filter( SyncFailuresHistory.project_id == diff_project.id ).all() @@ -142,15 +142,15 @@ def test_remove_project(client, diff_project): diff_project.delete() assert Project.query.filter_by(id=project_id).count() assert not Upload.query.filter_by(project_id=project_id).count() - assert not ProjectVersion.query.filter_by(project_id=project_id).count() + assert ProjectVersion.query.filter_by(project_id=project_id).count() assert ProjectAccess.query.filter_by(project_id=project_id).count() cleanup(client, [project_dir]) assert access_request.status == RequestStatus.DECLINED.value - # after removal only cached information in project table remains + # after removal cached information in project table remains and project versions, but not files details assert diff_project.disk_usage assert diff_project.latest_version is not None assert diff_project.files == [] - assert not diff_project.get_latest_version() + assert diff_project.get_latest_version() assert ( FileHistory.query.filter(FileHistory.version_id.in_(versions_ids)).count() == 0 ) diff --git a/server/mergin/tests/test_file_restore.py b/server/mergin/tests/test_file_restore.py index b6d4c120..f189c9e4 100644 --- a/server/mergin/tests/test_file_restore.py +++ b/server/mergin/tests/test_file_restore.py @@ -205,6 +205,8 @@ def test_version_file_restore(diff_project): ).first() assert gh.geodiff_time assert gh.copy_time + # all temp files are gone + assert not os.path.exists(diff_project.storage.geodiff_working_dir) # basefile can not be restored test_file = os.path.join(diff_project.storage.project_dir, "v5", "base.gpkg") @@ -217,3 +219,4 @@ def test_version_file_restore(diff_project): os.remove(test_file) diff_project.storage.restore_versioned_file("test.txt", 1) assert not os.path.exists(test_file) + assert not os.path.exists(diff_project.storage.geodiff_working_dir) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 8c7f8de2..216f4b48 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -16,6 +16,8 @@ import hashlib import shutil import re + +from flask_login import current_user from pygeodiff import GeoDiff from flask import url_for, current_app import tempfile @@ -46,6 +48,7 @@ test_project_dir, json_headers, TMP_DIR, + DEFAULT_USER, ) from .utils import ( add_user, @@ -1564,6 +1567,7 @@ def test_push_diff_finish(client): assert gh.copy_time assert gh.checksum_time assert gh.action == "apply_changes" + assert not os.path.exists(upload.project.storage.geodiff_working_dir) # try with valid update metadata but with conflicting diff (rebase was not done) upload, upload_dir = create_transaction("mergin", changes, 2) @@ -1636,6 +1640,7 @@ def test_push_no_diff_finish(client): assert os.path.exists( os.path.join(upload.project.storage.project_dir, file_meta.diff_file.location) ) + assert not os.path.exists(upload.project.storage.geodiff_working_dir) # change structure of gpkg file so diff would not be available -> hard overwrite gpkg_conn = pysqlite3.connect(os.path.join(working_dir, "base.gpkg")) @@ -2299,12 +2304,17 @@ def test_get_project_version(client, diff_project): def add_project_version(project, changes, version=None): + author = ( + current_user + if current_user + else User.query.filter_by(username=DEFAULT_USER[0]).first() + ) next_version = version or project.next_version() upload_changes = ChangesSchema(context={"version": next_version}).load(changes) pv = ProjectVersion( project, next_version, - "mergin", + author.id, upload_changes, ip="127.0.0.1", ) @@ -2402,3 +2412,52 @@ def test_version_files(client, diff_project): sorted(backward_search, key=lambda f: f.path), ) ) + + +def test_delete_diff_file(client): + """Test file history in case of diff file removal""" + # prepare: add .gpkg and update with diff + changes = { + "added": [file_info(test_project_dir, "base.gpkg", chunk_size=CHUNK_SIZE)], + } + upload, upload_dir = create_transaction("mergin", changes) + upload_chunks(upload_dir, upload.changes) + client.post(f"/v1/project/push/finish/{upload.id}") + + changes = _get_changes_with_diff(test_project_dir) + upload, upload_dir = create_transaction("mergin", changes, version=2) + upload_chunks(upload_dir, upload.changes) + client.post(f"/v1/project/push/finish/{upload.id}") + + fh = FileHistory.query.filter_by( + project_version_name=upload.project.latest_version, + change=PushChangeType.UPDATE_DIFF.value, + ).first() + assert fh.diff is not None + + # delete file + diff_change = next( + change for change in changes["updated"] if change["path"] == "base.gpkg" + ) + resp = client.post( + f"/v1/project/push/{upload.project.workspace.name}/{upload.project.name}", + data=json.dumps( + { + "version": "v3", + "changes": { + "added": [], + "updated": [], + "removed": [diff_change], + }, + }, + cls=DateTimeEncoder, + ).encode("utf-8"), + headers=json_headers, + ) + assert resp.status_code == 200 + + fh = FileHistory.query.filter_by( + project_version_name=upload.project.latest_version, + change=PushChangeType.DELETE.value, + ).first() + assert fh.path == "base.gpkg" and fh.diff is None diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index 3bb9d94a..121acf2f 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -85,7 +85,7 @@ def create_project(name, workspace, user, **kwargs): pa = ProjectAccess(p, public) db.session.add(pa) changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.username, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") db.session.add(pv) db.session.commit() @@ -185,7 +185,7 @@ def initialize(): "renamed": [], } ) - pv = ProjectVersion(p, 1, user.username, upload_changes, "127.0.0.1") + pv = ProjectVersion(p, 1, user.id, upload_changes, "127.0.0.1") db.session.add(pv) db.session.commit() @@ -269,7 +269,7 @@ def create_blank_version(project): pv = ProjectVersion( project, project.next_version(), - project.creator.username, + project.creator.id, UploadChanges(added=[], updated=[], removed=[]), "127.0.0.1", ) @@ -346,7 +346,7 @@ def push_change(project, action, path, src_dir): pv = ProjectVersion( project, project.next_version(), - project.creator.username, + project.creator.id, upload_changes, "127.0.0.1", ) diff --git a/server/migrations/community/1ab5b02ce532_version_author_name_to_user_id.py b/server/migrations/community/1ab5b02ce532_version_author_name_to_user_id.py new file mode 100644 index 00000000..3dfbadb4 --- /dev/null +++ b/server/migrations/community/1ab5b02ce532_version_author_name_to_user_id.py @@ -0,0 +1,75 @@ +"""Migrage project version author name to user.id + +Revision ID: 1ab5b02ce532 +Revises: 57d0de13ce4a +Create Date: 2024-09-06 14:01:40.668483 + +""" + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "1ab5b02ce532" +down_revision = "1c23e3be03a3" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "project_version", + sa.Column("author_id", sa.Integer(), autoincrement=False, nullable=True), + ) + op.create_index( + op.f("ix_project_version_author_id"), + "project_version", + ["author_id"], + unique=False, + ) + op.create_foreign_key( + op.f("fk_project_version_author_id_user"), + "project_version", + "user", + ["author_id"], + ["id"], + ) + + # migrate data + conn = op.get_bind() + query = """ + UPDATE project_version + SET author_id = u.id + FROM "user" u + WHERE u.username = author; + """ + conn.execute(sa.text(query)) + + op.drop_index("ix_project_version_author", table_name="project_version") + op.drop_column("project_version", "author") + + +def downgrade(): + op.add_column( + "project_version", + sa.Column("author", sa.VARCHAR(), autoincrement=False, nullable=True), + ) + op.create_index( + "ix_project_version_author", "project_version", ["author"], unique=False + ) + + # migrate data + conn = op.get_bind() + query = """ + UPDATE project_version + SET author = u.username + FROM "user" u + WHERE u.id = author_id; + """ + conn.execute(sa.text(query)) + + op.drop_constraint( + op.f("fk_project_version_author_id_user"), "project_version", type_="foreignkey" + ) + op.drop_index(op.f("ix_project_version_author_id"), table_name="project_version") + op.drop_column("project_version", "author_id") diff --git a/server/migrations/community/1c23e3be03a3_add_file_history_diff_constraint.py b/server/migrations/community/1c23e3be03a3_add_file_history_diff_constraint.py new file mode 100644 index 00000000..003b1eb5 --- /dev/null +++ b/server/migrations/community/1c23e3be03a3_add_file_history_diff_constraint.py @@ -0,0 +1,49 @@ +"""Add file history diff constraint + +Revision ID: 1c23e3be03a3 +Revises: 57d0de13ce4a +Create Date: 2024-09-09 09:46:42.950624 + +""" + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "1c23e3be03a3" +down_revision = "57d0de13ce4a" +branch_labels = None +depends_on = None + + +def upgrade(): + conn = op.get_bind() + conn.execute( + sa.text( + """ + UPDATE file_history + SET diff = NULL + WHERE change != 'update_diff' AND diff IS NOT NULL; + """ + ) + ) + conn.execute( + sa.text( + """ + ALTER TABLE file_history + ADD CONSTRAINT ck_file_history_changes_with_diff CHECK ( + CASE + WHEN (change = 'update_diff') THEN diff IS NOT NULL + ELSE diff IS NULL + END + ); + """ + ) + ) + + +def downgrade(): + op.drop_constraint( + op.f("ck_file_history_changes_with_diff"), + "file_history", + )