Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
remove_projects_archives,
remove_temp_files,
remove_projects_backups,
remove_unused_chunks,
)
from mergin.celery import celery, configure_celery
from mergin.stats.config import Configuration
Expand Down Expand Up @@ -85,4 +86,9 @@ def setup_periodic_tasks(sender, **kwargs):
crontab(hour=3, minute=0),
remove_projects_archives,
name="remove old project archives",
),
sender.add_periodic_task(
crontab(hour="*/4", minute=0),
remove_unused_chunks,
name="clean up of outdated chunks",
)
30 changes: 18 additions & 12 deletions server/mergin/sync/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import shutil
import os
import time
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from zipfile import ZIP_DEFLATED, ZipFile
from flask import current_app

from .models import Project, ProjectVersion, FileHistory
from .storages.disk import move_to_tmp
from .config import Configuration
from .utils import remove_outdated_files
from ..celery import celery
from ..app import db

Expand Down Expand Up @@ -144,14 +145,19 @@ def create_project_version_zip(version_id: int):
@celery.task
def remove_projects_archives():
"""Remove created zip files for project versions if they were not accessed for certain time"""
for file in os.listdir(current_app.config["PROJECTS_ARCHIVES_DIR"]):
path = os.path.join(current_app.config["PROJECTS_ARCHIVES_DIR"], file)
if datetime.fromtimestamp(
os.path.getatime(path), tz=timezone.utc
) < datetime.now(timezone.utc) - timedelta(
days=current_app.config["PROJECTS_ARCHIVES_EXPIRATION"]
):
try:
os.remove(path)
except OSError as e:
logging.error(f"Unable to remove {path}: {str(e)}")
remove_outdated_files(
Configuration.PROJECTS_ARCHIVES_DIR,
timedelta(days=Configuration.PROJECTS_ARCHIVES_EXPIRATION),
)


@celery.task
def remove_unused_chunks():
"""Remove old chunks in shared directory. These are basically just residual from failed uploads."""
small_hash_dirs = os.listdir(Configuration.UPLOAD_CHUNKS_DIR)
time_delta = timedelta(seconds=Configuration.UPLOAD_CHUNKS_EXPIRATION)
for _dir in small_hash_dirs:
dir = os.path.join(Configuration.UPLOAD_CHUNKS_DIR, _dir)
if not os.path.isdir(dir):
continue
remove_outdated_files(dir, time_delta)
19 changes: 19 additions & 0 deletions server/mergin/sync/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
#
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial

import logging
import math
import os
import hashlib
import re
import secrets
from datetime import datetime, timedelta, timezone
from threading import Timer
from uuid import UUID
from shapely import wkb
Expand Down Expand Up @@ -592,3 +594,20 @@ def get_chunk_location(id: str):
small_hash = id[:2]
file_name = id[2:]
return os.path.join(chunk_dir, small_hash, file_name)


def remove_outdated_files(dir: str, time_delta: timedelta):
"""Remove all files within directory where last access time passed expiration date"""
for file in os.listdir(dir):
path = os.path.join(dir, file)
if not os.path.isfile(path):
continue

if (
datetime.fromtimestamp(os.path.getatime(path), tz=timezone.utc)
< datetime.now(timezone.utc) - time_delta
):
try:
os.remove(path)
except OSError as e:
logging.error(f"Unable to remove {path}: {str(e)}")
55 changes: 51 additions & 4 deletions server/mergin/tests/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial

import math
import os
from datetime import datetime, timedelta
import uuid
from datetime import datetime, timedelta, timezone
from flask import current_app
from flask_mail import Mail
from unittest.mock import patch
Expand All @@ -12,17 +14,32 @@
from ..config import Configuration
from ..sync.models import Project, AccessRequest, ProjectRole, ProjectVersion
from ..celery import send_email_async
from ..sync.config import Configuration as SyncConfiguration
from ..sync.tasks import (
remove_temp_files,
remove_projects_backups,
create_project_version_zip,
remove_projects_archives,
remove_unused_chunks,
)
from ..sync.storages.disk import move_to_tmp
from . import test_project, test_workspace_name, test_workspace_id
from .utils import add_user, create_workspace, create_project, login, modify_file_times
from ..sync.utils import get_chunk_location
from . import (
test_project,
test_workspace_name,
test_workspace_id,
test_project_dir,
json_headers,
)
from .utils import (
CHUNK_SIZE,
add_user,
create_workspace,
create_project,
login,
modify_file_times,
)
from ..auth.models import User
from . import json_headers


def test_send_email(app):
Expand Down Expand Up @@ -157,3 +174,33 @@ def test_create_project_version_zip(diff_project):
modify_file_times(latest_version.zip_path, new_time)
remove_projects_archives()
assert not os.path.exists(latest_version.zip_path)


def test_remove_chunks(app):
"""Test cleanup of outdated chunks"""
# pretend chunks were uploaded
chunks = []
src_file = os.path.join(test_project_dir, "base.gpkg")
with open(src_file, "rb") as in_file:
f_size = os.path.getsize(src_file)
for i in range(math.ceil(f_size / CHUNK_SIZE)):
chunk_id = str(uuid.uuid4())
chunk_location = get_chunk_location(chunk_id)
os.makedirs(os.path.dirname(chunk_location), exist_ok=True)
with open(chunk_location, "wb") as out_file:
out_file.write(in_file.read(CHUNK_SIZE))
chunks.append(chunk_location)

remove_unused_chunks()
assert all(os.path.exists(chunk) for chunk in chunks)

def _atime_mock(path: str) -> float:
"""Mock file stats to be already expired"""
return (
datetime.now(timezone.utc)
- timedelta(seconds=SyncConfiguration.UPLOAD_CHUNKS_EXPIRATION)
).timestamp() - 1

with patch("os.path.getatime", _atime_mock):
remove_unused_chunks()
assert not any(os.path.exists(chunk) for chunk in chunks)
Loading