Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def pull_project(self, directory):
if job is None:
return # project is up to date
pull_project_wait(job)
return pull_project_finalize(job)
return pull_project_finalize(job, self.username())

def clone_project(self, source_project_path, cloned_project_name, cloned_project_namespace=None):
"""
Expand Down
40 changes: 20 additions & 20 deletions mergin/client_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DownloadJob:
Keeps all the important data about a pending download job.
Used for downloading whole projects but also single files.
"""

def __init__(self, project_path, total_size, version, update_tasks, download_queue_items, directory, mp, project_info):
self.project_path = project_path
self.total_size = total_size # size of data to download (in bytes)
Expand Down Expand Up @@ -70,21 +70,21 @@ def _download_items(file, directory, diff_only=False):
basename = os.path.basename(file['diff']['path']) if diff_only else os.path.basename(file['path'])
file_size = file['diff']['size'] if diff_only else file['size']
chunks = math.ceil(file_size / CHUNK_SIZE)

items = []
for part_index in range(chunks):
download_file_path = os.path.join(file_dir, basename + ".{}".format(part_index))
size = min(CHUNK_SIZE, file_size - part_index * CHUNK_SIZE)
items.append(DownloadQueueItem(file['path'], size, file['version'], diff_only, part_index, download_file_path))

return items


def _do_download(item, mc, mp, project_path, job):
""" runs in worker thread """
if job.is_cancelled:
return

# TODO: make download_blocking / save_to_file cancellable so that we can cancel as soon as possible

item.download_blocking(mc, mp, project_path)
Expand Down Expand Up @@ -153,9 +153,9 @@ def download_project_async(mc, project_path, directory, project_version=None):
total_size += item.size

mp.log.info(f"will download {len(update_tasks)} files in {len(download_list)} chunks, total size {total_size}")

job = DownloadJob(project_path, total_size, version, update_tasks, download_list, directory, mp, project_info)

# start download
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
job.futures = []
Expand All @@ -168,7 +168,7 @@ def download_project_async(mc, project_path, directory, project_version=None):

def download_project_wait(job):
""" blocks until all download tasks are finished """

concurrent.futures.wait(job.futures)


Expand Down Expand Up @@ -209,10 +209,10 @@ def download_project_finalize(job):
job.mp.log.info("--- download finished")

for task in job.update_tasks:

# right now only copy tasks...
task.apply(job.directory, job.mp)

# final update of project metadata
# TODO: why not exact copy of project info JSON ?
job.mp.metadata = {
Expand All @@ -239,7 +239,7 @@ class UpdateTask:
Entry for each file that will be updated.
At the end of a successful download of new data, all the tasks are executed.
"""

# TODO: methods other than COPY
def __init__(self, file_path, download_queue_items, destination_file=None, latest_version=True):
self.file_path = file_path
Expand All @@ -249,7 +249,7 @@ def __init__(self, file_path, download_queue_items, destination_file=None, lates

def apply(self, directory, mp):
""" assemble downloaded chunks into a single file """

if self.destination_file is None:
basename = os.path.basename(self.file_path)
file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, self.file_path)))
Expand All @@ -273,15 +273,15 @@ def apply(self, directory, mp):

class DownloadQueueItem:
""" a piece of data from a project that should be downloaded - it can be either a chunk or it can be a diff """

def __init__(self, file_path, size, version, diff_only, part_index, download_file_path):
self.file_path = file_path # relative path to the file within project
self.size = size # size of the item in bytes
self.version = version # version of the file ("v123")
self.diff_only = diff_only # whether downloading diff or full version
self.part_index = part_index # index of the chunk
self.download_file_path = download_file_path # full path to a temporary file which will receive the content

def __repr__(self):
return "<DownloadQueueItem path={} version={} diff_only={} part_index={} size={} dest={}>".format(
self.file_path, self.version, self.diff_only, self.part_index, self.size, self.download_file_path)
Expand Down Expand Up @@ -397,7 +397,7 @@ def pull_project_async(mc, directory):
for file in fetch_files:
diff_only = _pulling_file_with_diffs(file)
items = _download_items(file, temp_dir, diff_only)

# figure out destination path for the file
file_dir = os.path.dirname(os.path.normpath(os.path.join(temp_dir, file['path'])))
basename = os.path.basename(file['diff']['path']) if diff_only else os.path.basename(file['path'])
Expand Down Expand Up @@ -447,13 +447,13 @@ def pull_project_async(mc, directory):
for item in download_list:
future = job.executor.submit(_do_download, item, mc, mp, project_path, job)
job.futures.append(future)

return job


def pull_project_wait(job):
""" blocks until all download tasks are finished """

concurrent.futures.wait(job.futures)


Expand Down Expand Up @@ -513,16 +513,16 @@ def merge(self):
raise ClientError('Download of file {} failed. Please try it again.'.format(self.dest_file))


def pull_project_finalize(job):
def pull_project_finalize(job, user_name):
"""
To be called when pull in the background is finished and we need to do the finalization (merge chunks etc.)

This should not be called from a worker thread (e.g. directly from a handler when download is complete)

If any of the workers has thrown any exception, it will be re-raised (e.g. some network errors).
That also means that the whole job has been aborted.
"""

job.executor.shutdown(wait=True)

# make sure any exceptions from threads are not lost
Expand Down Expand Up @@ -565,7 +565,7 @@ def pull_project_finalize(job):
os.remove(basefile)
raise ClientError("Cannot patch basefile {}! Please try syncing again.".format(basefile))

conflicts = job.mp.apply_pull_changes(job.pull_changes, job.temp_dir)
conflicts = job.mp.apply_pull_changes(job.pull_changes, job.temp_dir, user_name)
job.mp.metadata = {
'name': job.project_path,
'version': job.version if job.version else "v0", # for new projects server version is ""
Expand Down
39 changes: 24 additions & 15 deletions mergin/merginproject.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@
from dateutil.tz import tzlocal

from .common import UPLOAD_CHUNK_SIZE, InvalidProject, ClientError
from .utils import generate_checksum, move_file, int_version, find, do_sqlite_checkpoint
from .utils import (
generate_checksum,
move_file,
int_version,
find,
do_sqlite_checkpoint,
unique_path_name,
conflicted_copy_file_name,
edit_conflict_file_name
)


this_dir = os.path.dirname(os.path.realpath(__file__))
Expand Down Expand Up @@ -373,7 +382,7 @@ def get_list_of_push_changes(self, push_changes):
pass
return changes

def apply_pull_changes(self, changes, temp_dir):
def apply_pull_changes(self, changes, temp_dir, user_name):
"""
Apply changes pulled from server.

Expand Down Expand Up @@ -410,7 +419,7 @@ def apply_pull_changes(self, changes, temp_dir):
# 'src' here is server version of file and 'dest' is locally modified
if self.is_versioned_file(path) and k == 'updated':
if path in modified:
conflict = self.update_with_rebase(path, src, dest, basefile, temp_dir)
conflict = self.update_with_rebase(path, src, dest, basefile, temp_dir, user_name)
if conflict:
conflicts.append(conflict)
else:
Expand All @@ -420,7 +429,7 @@ def apply_pull_changes(self, changes, temp_dir):
else:
# backup if needed
if path in modified and item['checksum'] != local_files_map[path]['checksum']:
conflict = self.backup_file(path)
conflict = self.create_conflicted_copy(path, user_name)
conflicts.append(conflict)

if k == 'removed':
Expand All @@ -440,7 +449,7 @@ def apply_pull_changes(self, changes, temp_dir):

return conflicts

def update_with_rebase(self, path, src, dest, basefile, temp_dir):
def update_with_rebase(self, path, src, dest, basefile, temp_dir, user_name):
"""
Update a versioned file with rebase.

Expand Down Expand Up @@ -472,6 +481,7 @@ def update_with_rebase(self, path, src, dest, basefile, temp_dir):

# create temp backup (ideally with geodiff) of locally modified file if needed later
f_conflict_file = self.fpath(f'{path}-local_backup', temp_dir)

try:
self.geodiff.create_changeset(basefile, dest, local_diff)
self.geodiff.make_copy_sqlite(basefile, f_conflict_file)
Expand All @@ -483,7 +493,8 @@ def update_with_rebase(self, path, src, dest, basefile, temp_dir):
# in case there will be any conflicting operations found during rebase,
# they will be stored in a JSON file - if there are no conflicts, the file
# won't even be created
rebase_conflicts = self.fpath(f'{path}_rebase_conflicts')
rebase_conflicts = unique_path_name(
edit_conflict_file_name(self.fpath(path), user_name, int_version(self.metadata['version'])))

# try to do rebase magic
try:
Expand All @@ -496,7 +507,7 @@ def update_with_rebase(self, path, src, dest, basefile, temp_dir):
self.log.warning("rebase failed! going to create conflict file")
# it would not be possible to commit local changes, they need to end up in new conflict file
self.geodiff.make_copy_sqlite(f_conflict_file, dest)
conflict = self.backup_file(path)
conflict = self.create_conflicted_copy(path, user_name)
# original file synced with server
self.geodiff.make_copy_sqlite(f_server_backup, basefile)
self.geodiff.make_copy_sqlite(f_server_backup, dest)
Expand Down Expand Up @@ -577,23 +588,21 @@ def apply_push_changes(self, changes):
else:
pass

def backup_file(self, file):
def create_conflicted_copy(self, file, user_name):
"""
Create backup file next to its origin.
Create conflicted copy file next to its origin.

:param file: path of file in project
:type file: str
:returns: path to backupfile
:returns: path to conflicted copy
:rtype: str
"""
src = self.fpath(file)
if not os.path.exists(src):
return
backup_path = self.fpath(f'{file}_conflict_copy')
index = 2
while os.path.exists(backup_path):
backup_path = self.fpath(f'{file}_conflict_copy{index}')
index += 1

backup_path = unique_path_name(conflicted_copy_file_name(self.fpath(file), user_name, int_version(self.metadata['version'])))

if self.is_versioned_file(file):
self.geodiff.make_copy_sqlite(src, backup_path)
else:
Expand Down
Loading