Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b9b19a4
Perform geodiff actions in separate working dir
varmar05 Sep 5, 2024
110d0cc
Migration version author from str to user id
varmar05 Sep 6, 2024
4ba2621
Keep project versions for removed projects
varmar05 Sep 6, 2024
d3983f7
Make sure file_history.diff is not null only when change is update_diff
varmar05 Sep 9, 2024
596d97f
Merge pull request #284 from MerginMaps/master
MarcelGeo Sep 12, 2024
52058d9
Reject upload early if possible to save db resources
varmar05 Sep 13, 2024
c6d7111
Refactor upload clean up
varmar05 Sep 13, 2024
949f769
Remove flower deps from Dockerfile
varmar05 Sep 13, 2024
c2bc390
Merge pull request #286 from MerginMaps/remove_flower
MarcelGeo Sep 13, 2024
d4d6f11
Merge pull request #285 from MerginMaps/reject_upload_early
MarcelGeo Sep 13, 2024
ae78beb
Merge pull request #280 from MerginMaps/keep_project_version
MarcelGeo Sep 16, 2024
4087436
Merge pull request #281 from MerginMaps/diff_constraint_file_history
MarcelGeo Sep 16, 2024
2804180
Update revision
MarcelGeo Sep 16, 2024
5f64264
Merge pull request #278 from MerginMaps/geodiff_working_dir
MarcelGeo Sep 16, 2024
a55fbfb
Merge pull request #279 from MerginMaps/version_author_to_user_id
MarcelGeo Sep 16, 2024
a65ffd4
Update 1c23e3be03a3_add_file_history_diff_constraint.py
MarcelGeo Sep 17, 2024
ee9a95f
Merge pull request #288 from MerginMaps/migration-diff-null-patch-1
MarcelGeo Sep 17, 2024
e4217fe
Change constraint name
varmar05 Sep 18, 2024
52ab318
Merge pull request #290 from MerginMaps/fix_constraint_name
MarcelGeo Sep 18, 2024
688bc6b
Introduce more celery variables
varmar05 Sep 19, 2024
53bcd49
Rename variables to be compliant with old naming convention (for now)
varmar05 Sep 19, 2024
6f21f88
Merge pull request #291 from MerginMaps/celery_variables
MarcelGeo Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .prod.env
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ CELERY_RESULT_BACKEND=redis://merginmaps-redis:6379/0
#CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS={} # cast=eval
CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS={ 'master_name': 'mymaster' }

#CELERY_ACKS_LATE=False

#CELERY_WORKER_CONCURRENCY=1 # set to number of cpu in case of prefork or to higher number in case of gevent pool
CELERYD_CONCURRENCY=2

#CELERYD_PREFETCH_MULTIPLIER=4

# various life times

Expand Down
1 change: 1 addition & 0 deletions server/.test.env
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ INPUTAPP_API_KEY=not-secret-key
GLOBAL_WORKSPACE='mergin'
GLOBAL_STORAGE=104857600
COLLECT_STATISTICS=0
GEODIFF_WORKING_DIR=/tmp/geodiff
1 change: 0 additions & 1 deletion server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8

RUN pipenv install --system --deploy --verbose
RUN pip3 install flower==0.9.7

USER mergin

Expand Down
5 changes: 5 additions & 0 deletions server/mergin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)

celery.conf.update(app.config)
celery.conf.update(
task_acks_late=Configuration.CELERY_ACKS_LATE,
worker_concurrency=Configuration.CELERYD_CONCURRENCY,
worker_prefetch_multiplier=Configuration.CELERYD_PREFETCH_MULTIPLIER,
)
# configure celery with flask context https://flask.palletsprojects.com/en/1.1.x/patterns/celery/
# e.g. for using flask-mail
celery.Task = ContextTask
Expand Down
5 changes: 5 additions & 0 deletions server/mergin/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class Configuration(object):
CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = config(
"CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS", default="{}", cast=eval
)
CELERY_ACKS_LATE = config("CELERY_ACKS_LATE", default=False, cast=bool)
CELERYD_CONCURRENCY = config("CELERYD_CONCURRENCY", default=1, cast=int)
CELERYD_PREFETCH_MULTIPLIER = config(
"CELERYD_PREFETCH_MULTIPLIER", default=4, cast=int
)

# deployment URL (e.g. for links generated in emails)
MERGIN_BASE_URL = config("MERGIN_BASE_URL", default="")
Expand Down
2 changes: 1 addition & 1 deletion server/mergin/sync/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def create(name, namespace, user): # pylint: disable=W0612
db.session.add(p)
p.access = ProjectAccess(p, public=False)
changes = UploadChanges(added=[], updated=[], removed=[])
pv = ProjectVersion(p, 0, user.username, changes, "127.0.0.1")
pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1")
pv.project = p
db.session.commit()
os.makedirs(p.storage.project_dir, exist_ok=True)
Expand Down
5 changes: 5 additions & 0 deletions server/mergin/sync/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ class Configuration(object):
)
# trash dir for temp files being cleaned regularly
TEMP_DIR = config("TEMP_DIR", default=gettempdir())
# working directory for geodiff actions - should be a fast local storage
GEODIFF_WORKING_DIR = config(
"GEODIFF_WORKING_DIR",
default=os.path.join(LOCAL_PROJECTS, os.pardir, "geodiff_tmp"),
)
62 changes: 52 additions & 10 deletions server/mergin/sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations
import json
import os
import time
import uuid
from datetime import datetime, timedelta
from enum import Enum
Expand All @@ -28,6 +29,7 @@
ChangesSchema,
ProjectFile,
)
from .storages.disk import move_to_tmp
from .. import db
from .storages import DiskStorage
from .utils import is_versioned_file, is_qgis
Expand Down Expand Up @@ -209,7 +211,7 @@ def expiration(self) -> timedelta:
def delete(self, removed_by: int = None):
"""Mark project as permanently deleted (but keep in db)
- rename (to free up the same name)
- remove associated files and their history and project versions
- remove associated files and their history
- reset project_access
- decline pending project access requests
"""
Expand All @@ -225,9 +227,7 @@ def delete(self, removed_by: int = None):
# Null in storage params serves as permanent deletion flag
self.storage.delete()
self.storage_params = null()
pv_table = ProjectVersion.__table__
# remove versions and file history items with cascade
db.session.execute(pv_table.delete().where(pv_table.c.project_id == self.id))
# remove file records and their history (cascade)
files_path_table = ProjectFilePath.__table__
db.session.execute(
files_path_table.delete().where(files_path_table.c.project_id == self.id)
Expand Down Expand Up @@ -492,6 +492,17 @@ class FileHistory(db.Model):
file_path_id,
project_version_name.desc(),
),
db.CheckConstraint(
text(
"""
CASE
WHEN (change = 'update_diff') THEN diff IS NOT NULL
ELSE diff IS NULL
END
"""
),
name="changes_with_diff",
),
)

def __init__(
Expand All @@ -507,7 +518,7 @@ def __init__(
self.size = size
self.checksum = checksum
self.location = location
self.diff = diff
self.diff = diff if diff is not None else null()
self.change = change.value

@property
Expand Down Expand Up @@ -684,7 +695,9 @@ class ProjectVersion(db.Model):
UUID(as_uuid=True), db.ForeignKey("project.id", ondelete="CASCADE"), index=True
)
created = db.Column(db.DateTime, default=datetime.utcnow, index=True)
author = db.Column(db.String, index=True)
author_id = db.Column(
db.Integer, db.ForeignKey("user.id"), index=True, nullable=True
)
user_agent = db.Column(db.String, index=True)
ip_address = db.Column(db.String, index=True)
ip_geolocation_country = db.Column(
Expand All @@ -698,6 +711,8 @@ class ProjectVersion(db.Model):
uselist=False,
)
device_id = db.Column(db.String, index=True, nullable=True)
author = db.relationship("User", uselist=False, lazy="joined")

__table_args__ = (
db.UniqueConstraint("project_id", "name"),
db.Index(
Expand All @@ -711,7 +726,7 @@ def __init__(
self,
project: Project,
name: int,
author: str,
author_id: int,
changes: UploadChanges,
ip: str,
user_agent: str = None,
Expand All @@ -720,7 +735,7 @@ def __init__(
self.project = project
self.project_id = project.id
self.name = name
self.author = author
self.author_id = author_id
self.user_agent = user_agent
self.ip_address = ip
self.device_id = device_id
Expand Down Expand Up @@ -764,7 +779,11 @@ def __init__(
size=upload_file.size,
checksum=upload_file.checksum,
location=upload_file.location,
diff=asdict(upload_file.diff) if upload_file.diff else null(),
diff=(
asdict(upload_file.diff)
if (is_diff_change and upload_file.diff)
else null()
),
change=(
PushChangeType.UPDATE_DIFF if is_diff_change else change_type
),
Expand Down Expand Up @@ -1026,6 +1045,29 @@ def __init__(
self.changes = ChangesSchema().dump(changes)
self.user_id = user_id

@property
def upload_dir(self):
return os.path.join(self.project.storage.project_dir, "tmp", self.id)

@property
def lockfile(self):
return os.path.join(self.upload_dir, "lockfile")

def is_active(self):
"""Check if upload is still active because there was a ping (lockfile update) from underlying process"""
return os.path.exists(self.lockfile) and (
time.time() - os.path.getmtime(self.lockfile)
< current_app.config["LOCKFILE_EXPIRATION"]
)

def clear(self):
"""Clean up pending upload.
Uploaded files and table records are removed, and another upload can start.
"""
move_to_tmp(self.upload_dir, self.id)
db.session.delete(self)
db.session.commit()


class RequestStatus(Enum):
ACCEPTED = "accepted"
Expand Down Expand Up @@ -1160,6 +1202,6 @@ def __init__(
self.target_version = target_version
self.action = action

if os.path.exists:
if os.path.exists(diff_path):
self.diff_size = os.path.getsize(diff_path)
self.changes = GeoDiff().changes_count(diff_path)
45 changes: 21 additions & 24 deletions server/mergin/sync/public_api_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
is_versioned_file,
is_name_allowed,
get_project_path,
clean_upload,
get_device_id,
)
from .errors import StorageLimitHit
Expand Down Expand Up @@ -224,7 +223,7 @@ def add_project(namespace): # noqa: E501
version = ProjectVersion(
p,
version_name,
current_user.username,
current_user.id,
changes,
get_ip(request),
get_user_agent(request),
Expand Down Expand Up @@ -746,6 +745,13 @@ def project_push(namespace, project_name):
if all(len(changes[key]) == 0 for key in changes.keys()):
abort(400, "No changes")

# reject upload early if there is another one already running
pending_upload = Upload.query.filter_by(
project_id=project.id, version=version
).first()
if pending_upload and pending_upload.is_active():
abort(400, "Another process is running. Please try later.")

upload_changes = ChangesSchema(context={"version": version + 1}).load(changes)
# check if same file is not already uploaded
for item in upload_changes.added:
Expand Down Expand Up @@ -817,16 +823,8 @@ def project_push(namespace, project_name):
db.session.rollback()
# check and clean dangling uploads or abort
for current_upload in project.uploads.all():
upload_dir = os.path.join(
project.storage.project_dir, "tmp", current_upload.id
)
upload_lockfile = os.path.join(upload_dir, "lockfile")
if os.path.exists(upload_lockfile):
if (
time() - os.path.getmtime(upload_lockfile)
< current_app.config["LOCKFILE_EXPIRATION"]
):
abort(400, "Another process is running. Please try later.")
if current_upload.is_active():
abort(400, "Another process is running. Please try later.")
db.session.delete(current_upload)
db.session.commit()
# previous push attempt is definitely lost
Expand All @@ -844,15 +842,14 @@ def project_push(namespace, project_name):
logging.info(
f"Upload transaction {upload.id} created for project: {project.id}, version: {version}"
)
move_to_tmp(upload_dir)
move_to_tmp(upload.upload_dir)
except IntegrityError as err:
logging.error(f"Failed to create upload session: {str(err)}")
abort(422, "Failed to create upload session. Please try later.")

# Create transaction folder and lockfile
folder = os.path.join(project.storage.project_dir, "tmp", upload.id)
os.makedirs(folder)
open(os.path.join(folder, "lockfile"), "w").close()
os.makedirs(upload.upload_dir)
open(upload.lockfile, "w").close()

# Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit
if not (changes["added"] or changes["updated"]):
Expand All @@ -863,7 +860,7 @@ def project_push(namespace, project_name):
pv = ProjectVersion(
project,
next_version,
current_user.username,
current_user.id,
upload_changes,
get_ip(request),
user_agent,
Expand All @@ -877,15 +874,15 @@ def project_push(namespace, project_name):
f"Transaction id: {upload.id}. No upload."
)
project_version_created.send(pv)
clean_upload(upload.id)
return jsonify(ProjectSchema().dump(project)), 200
except IntegrityError as err:
db.session.rollback()
clean_upload(upload.id)
logging.exception(
f"Failed to upload a new project version using transaction id: {upload.id}: {str(err)}"
)
abort(422, "Failed to upload a new project version. Please try later.")
finally:
upload.clear()

return {"transaction": upload.id}

Expand Down Expand Up @@ -1074,7 +1071,7 @@ def push_finish(transaction_id):
pv = ProjectVersion(
project,
next_version,
current_user.username,
current_user.id,
changes,
get_ip(request),
user_agent,
Expand All @@ -1087,16 +1084,16 @@ def push_finish(transaction_id):
f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}."
)
project_version_created.send(pv)
# remove artifacts
clean_upload(transaction_id)
except (psycopg2.Error, FileNotFoundError, DataSyncError, IntegrityError) as err:
db.session.rollback()
clean_upload(transaction_id)
logging.exception(
f"Failed to finish push for project: {project.id}, project version: {v_next_version}, "
f"transaction id: {transaction_id}.: {str(err)}"
)
abort(422, "Failed to create new version: {}".format(str(err)))
finally:
# remove artifacts
upload.clear()

# do not optimize on every version, every 10th is just fine
if not project.latest_version % 10:
Expand Down Expand Up @@ -1208,7 +1205,7 @@ def clone_project(namespace, project_name): # noqa: E501
project_version = ProjectVersion(
p,
version,
current_user.username,
current_user.id,
changes,
get_ip(request),
user_agent,
Expand Down
4 changes: 2 additions & 2 deletions server/mergin/sync/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class ProjectVersionSchema(ma.SQLAlchemyAutoSchema):
project_name = fields.Function(lambda obj: obj.project.name)
namespace = fields.Function(lambda obj: obj.project.workspace.name)
name = fields.Function(lambda obj: ProjectVersion.to_v_name(obj.name))
author = fields.String()
author = fields.String(attribute="author.username")
changesets = fields.Method("get_diff_summary")
files = fields.String()
created = DateTimeWithZ()
Expand Down Expand Up @@ -361,7 +361,7 @@ class ProjectVersionListSchema(ma.SQLAlchemyAutoSchema):
project_name = fields.Function(lambda obj: obj.project.name)
namespace = fields.Function(lambda obj: obj.project.workspace.name)
name = fields.Function(lambda obj: ProjectVersion.to_v_name(obj.name))
author = fields.String()
author = fields.String(attribute="author.username")
created = DateTimeWithZ()
changes = fields.Method("_changes")
project_size = fields.Integer()
Expand Down
Loading