From c5c163169103a44d32cec4b80675fbc5b53beb50 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Wed, 6 Aug 2025 09:19:28 +0200 Subject: [PATCH 1/2] Cron job to remove outdated uploaded chunks --- server/application.py | 6 ++++ server/mergin/sync/tasks.py | 30 +++++++++------- server/mergin/sync/utils.py | 19 +++++++++++ server/mergin/tests/test_celery.py | 55 +++++++++++++++++++++++++++--- 4 files changed, 94 insertions(+), 16 deletions(-) diff --git a/server/application.py b/server/application.py index b1ab79ac..d8c351c1 100644 --- a/server/application.py +++ b/server/application.py @@ -27,6 +27,7 @@ remove_projects_archives, remove_temp_files, remove_projects_backups, + remove_unused_chunks, ) from mergin.celery import celery, configure_celery from mergin.stats.config import Configuration @@ -85,4 +86,9 @@ def setup_periodic_tasks(sender, **kwargs): crontab(hour=3, minute=0), remove_projects_archives, name="remove old project archives", + ), + sender.add_periodic_task( + crontab(hour="*/12", minute=0), + remove_unused_chunks, + name="clean up of outdated chunks", ) diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index f56fb273..310fefbb 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -6,13 +6,14 @@ import shutil import os import time -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app from .models import Project, ProjectVersion, FileHistory from .storages.disk import move_to_tmp from .config import Configuration +from .utils import remove_outdated_files from ..celery import celery from ..app import db @@ -144,14 +145,19 @@ def create_project_version_zip(version_id: int): @celery.task def remove_projects_archives(): """Remove created zip files for project versions if they were not accessed for certain time""" - for file in os.listdir(current_app.config["PROJECTS_ARCHIVES_DIR"]): - path = os.path.join(current_app.config["PROJECTS_ARCHIVES_DIR"], file) - if datetime.fromtimestamp( - os.path.getatime(path), tz=timezone.utc - ) < datetime.now(timezone.utc) - timedelta( - days=current_app.config["PROJECTS_ARCHIVES_EXPIRATION"] - ): - try: - os.remove(path) - except OSError as e: - logging.error(f"Unable to remove {path}: {str(e)}") + remove_outdated_files( + Configuration.PROJECTS_ARCHIVES_DIR, + timedelta(days=Configuration.PROJECTS_ARCHIVES_EXPIRATION), + ) + + +@celery.task +def remove_unused_chunks(): + """Remove old chunks in shared directory. These are basically just residual from failed uploads.""" + small_hash_dirs = os.listdir(Configuration.UPLOAD_CHUNKS_DIR) + time_delta = timedelta(seconds=Configuration.UPLOAD_CHUNKS_EXPIRATION) + for _dir in small_hash_dirs: + dir = os.path.join(Configuration.UPLOAD_CHUNKS_DIR, _dir) + if not os.path.isdir(dir): + continue + remove_outdated_files(dir, time_delta) diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index 7babb231..acd93b26 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -2,11 +2,13 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import logging import math import os import hashlib import re import secrets +from datetime import datetime, timedelta, timezone from threading import Timer from uuid import UUID from shapely import wkb @@ -592,3 +594,20 @@ def get_chunk_location(id: str): small_hash = id[:2] file_name = id[2:] return os.path.join(chunk_dir, small_hash, file_name) + + +def remove_outdated_files(dir: str, time_delta: timedelta): + """Remove all files within directory where last access time passed expiration date""" + for file in os.listdir(dir): + path = os.path.join(dir, file) + if not os.path.isfile(path): + continue + + if ( + datetime.fromtimestamp(os.path.getatime(path), tz=timezone.utc) + < datetime.now(timezone.utc) - time_delta + ): + try: + os.remove(path) + except OSError as e: + logging.error(f"Unable to remove {path}: {str(e)}") diff --git a/server/mergin/tests/test_celery.py b/server/mergin/tests/test_celery.py index a5d07f47..b236d4ba 100644 --- a/server/mergin/tests/test_celery.py +++ b/server/mergin/tests/test_celery.py @@ -2,8 +2,10 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import math import os -from datetime import datetime, timedelta +import uuid +from datetime import datetime, timedelta, timezone from flask import current_app from flask_mail import Mail from unittest.mock import patch @@ -12,17 +14,32 @@ from ..config import Configuration from ..sync.models import Project, AccessRequest, ProjectRole, ProjectVersion from ..celery import send_email_async +from ..sync.config import Configuration as SyncConfiguration from ..sync.tasks import ( remove_temp_files, remove_projects_backups, create_project_version_zip, remove_projects_archives, + remove_unused_chunks, ) from ..sync.storages.disk import move_to_tmp -from . import test_project, test_workspace_name, test_workspace_id -from .utils import add_user, create_workspace, create_project, login, modify_file_times +from ..sync.utils import get_chunk_location +from . import ( + test_project, + test_workspace_name, + test_workspace_id, + test_project_dir, + json_headers, +) +from .utils import ( + CHUNK_SIZE, + add_user, + create_workspace, + create_project, + login, + modify_file_times, +) from ..auth.models import User -from . import json_headers def test_send_email(app): @@ -157,3 +174,33 @@ def test_create_project_version_zip(diff_project): modify_file_times(latest_version.zip_path, new_time) remove_projects_archives() assert not os.path.exists(latest_version.zip_path) + + +def test_remove_chunks(app): + """Test cleanup of outdated chunks""" + # pretend chunks were uploaded + chunks = [] + src_file = os.path.join(test_project_dir, "base.gpkg") + with open(src_file, "rb") as in_file: + f_size = os.path.getsize(src_file) + for i in range(math.ceil(f_size / CHUNK_SIZE)): + chunk_id = str(uuid.uuid4()) + chunk_location = get_chunk_location(chunk_id) + os.makedirs(os.path.dirname(chunk_location), exist_ok=True) + with open(chunk_location, "wb") as out_file: + out_file.write(in_file.read(CHUNK_SIZE)) + chunks.append(chunk_location) + + remove_unused_chunks() + assert all(os.path.exists(chunk) for chunk in chunks) + + def _atime_mock(path: str) -> float: + """Mock file stats to be already expired""" + return ( + datetime.now(timezone.utc) + - timedelta(seconds=SyncConfiguration.UPLOAD_CHUNKS_EXPIRATION) + ).timestamp() - 1 + + with patch("os.path.getatime", _atime_mock): + remove_unused_chunks() + assert not any(os.path.exists(chunk) for chunk in chunks) From 73b2fc89cb2a37afbe2fc9663c3a9b3c9cff6059 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Wed, 6 Aug 2025 09:35:20 +0200 Subject: [PATCH 2/2] Run cleanup more frequently --- server/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/application.py b/server/application.py index d8c351c1..fc3dc195 100644 --- a/server/application.py +++ b/server/application.py @@ -88,7 +88,7 @@ def setup_periodic_tasks(sender, **kwargs): name="remove old project archives", ), sender.add_periodic_task( - crontab(hour="*/12", minute=0), + crontab(hour="*/4", minute=0), remove_unused_chunks, name="clean up of outdated chunks", )