Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,22 @@ def is_versioned_file(self, file):
f_extension = os.path.splitext(file)[1]
return f_extension in diff_extensions

def is_gpkg_open(self, path):
"""
Check whether geopackage file is open (and wal file exists)

:param path: absolute path of file on disk
:type path: str
:returns: whether file is open
:rtype: bool
"""
f_extension = os.path.splitext(path)[1]
if f_extension != '.gpkg':
return False
if os.path.exists(f'{path}-wal'):
return True
return False

def ignore_file(self, file):
"""
Helper function for blacklisting certain types of files.
Expand Down Expand Up @@ -165,6 +181,7 @@ def inspect_files(self):
for file in files:
if self.ignore_file(file):
continue

abs_path = os.path.abspath(os.path.join(root, file))
rel_path = os.path.relpath(abs_path, start=self.dir)
proj_path = '/'.join(rel_path.split(os.path.sep)) # we need posix path
Expand Down Expand Up @@ -222,7 +239,8 @@ def compare_file_sets(self, origin, current):
path = f["path"]
if path not in origin_map:
continue
if f["checksum"] == origin_map[path]["checksum"]:
# with open WAL files we don't know yet, better to mark file as updated
if not self.is_gpkg_open(self.fpath(path)) and f["checksum"] == origin_map[path]["checksum"]:
continue
f["origin_checksum"] = origin_map[path]["checksum"]
updated.append(f)
Expand Down Expand Up @@ -298,7 +316,12 @@ def get_push_changes(self):
:rtype: dict
"""
changes = self.compare_file_sets(self.metadata['files'], self.inspect_files())
# do checkpoint to push changes from wal file to gpkg
for file in changes['added'] + changes['updated']:
size, checksum = do_sqlite_checkpoint(self.fpath(file["path"]))
if size and checksum:
file["size"] = size
file["checksum"] = checksum
file['chunks'] = [str(uuid.uuid4()) for i in range(math.ceil(file["size"] / UPLOAD_CHUNK_SIZE))]

if not self.geodiff:
Expand All @@ -311,8 +334,9 @@ def get_push_changes(self):
if not self.is_versioned_file(path):
continue

# we use geodiff to check if we can push only diff files
current_file = self.fpath(path)
origin_file = self.fpath(path, self.meta_dir)
origin_file = self.fpath_meta(path)
diff_id = str(uuid.uuid4())
diff_name = path + '-diff-' + diff_id
diff_file = self.fpath_meta(diff_name)
Expand All @@ -332,7 +356,8 @@ def get_push_changes(self):
else:
not_updated.append(file)
except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError) as e:
pass # we do force update
# changes from wal file already committed
pass

changes['updated'] = [f for f in changes['updated'] if f not in not_updated]
return changes
Expand Down Expand Up @@ -477,15 +502,16 @@ def apply_push_changes(self, changes):
elif k == 'added':
shutil.copy(self.fpath(path), basefile)
elif k == 'updated':
# in case for geopackage cannot be created diff
# in case for geopackage cannot be created diff (e.g. forced update with committed changes from wal file)
if "diff" not in item:
continue
# better to apply diff to previous basefile to avoid issues with geodiff tmp files
changeset = self.fpath_meta(item['diff']['path'])
patch_error = self.apply_diffs(basefile, [changeset])
if patch_error:
# in case of local sync issues it is safier to remove basefile, next time it will be downloaded from server
os.remove(basefile)
shutil.copy(self.fpath(path), basefile)
else:
# better to apply diff to previous basefile to avoid issues with geodiff tmp files
changeset = self.fpath_meta(item['diff']['path'])
patch_error = self.apply_diffs(basefile, [changeset])
if patch_error:
# in case of local sync issues it is safier to remove basefile, next time it will be downloaded from server
os.remove(basefile)
else:
pass

Expand Down Expand Up @@ -944,10 +970,6 @@ def _push_changes(self, mp, data, parallel):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures_map = {}
for file in upload_files:
# do checkpoint to push changes from wal file to gpkg if there is no diff
if "diff" not in file and mp.is_versioned_file(file["path"]):
do_sqlite_checkpoint(mp.fpath(file["path"]))
file["checksum"] = generate_checksum(mp.fpath(file["path"]))
file['location'] = mp.fpath_meta(file['diff']['path']) if 'diff' in file else mp.fpath(file['path'])
future = executor.submit(self._upload_file, info["transaction"], file, parallel)
futures_map[future] = file
Expand All @@ -960,10 +982,6 @@ def _push_changes(self, mp, data, parallel):
raise ClientError("Timeout error: failed to upload {}".format(file))
else:
for file in upload_files:
# do checkpoint to push changes from wal file to gpkg if there is no diff
if "diff" not in file and mp.is_versioned_file(file["path"]):
do_sqlite_checkpoint(mp.fpath(file["path"]))
file["checksum"] = generate_checksum(mp.fpath(file["path"]))
file['location'] = mp.fpath_meta(file['diff']['path']) if 'diff' in file else mp.fpath(file['path'])
self._upload_file(info["transaction"], file, parallel)

Expand Down Expand Up @@ -1085,6 +1103,7 @@ def _download_file(self, project_path, file, directory, parallel=True, diff_only
}
file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, file['path'])))
basename = os.path.basename(file['diff']['path']) if diff_only else os.path.basename(file['path'])
expected_size = file['diff']['size'] if diff_only else file['size']

if file['size'] == 0:
os.makedirs(file_dir, exist_ok=True)
Expand Down Expand Up @@ -1125,7 +1144,7 @@ def download_file_part(part):
shutil.copyfileobj(chunk, final)
os.remove(file_part)

if os.path.getsize(os.path.join(file_dir, basename)) != file['size']:
if os.path.getsize(os.path.join(file_dir, basename)) != expected_size:
os.remove(os.path.join(file_dir, basename))
raise ClientError(f'Download of file {basename} failed. Please try it again.')

Expand Down
Binary file removed mergin/test/modified_schema/base.gpkg-shm
Binary file not shown.
Binary file removed mergin/test/modified_schema/base.gpkg-wal
Binary file not shown.
Binary file not shown.
Binary file not shown.
35 changes: 30 additions & 5 deletions mergin/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ def test_push_pull_changes(mc, parallel):
f_updated = 'test3.txt'
with open(os.path.join(project_dir, f_updated), 'w') as f:
f.write('Modified')
src_files = os.listdir(CHANGED_SCHEMA_DIR)
for file_name in src_files:
full_file_name = os.path.join(CHANGED_SCHEMA_DIR, file_name)
if os.path.isfile(full_file_name):
shutil.copy(full_file_name, project_dir)

# check changes before applied
pull_changes, push_changes, _ = mc.project_status(project_dir)
Expand Down Expand Up @@ -342,3 +337,33 @@ def test_list_of_push_changes(mc):
mc.project_status(project_dir)


def test_force_gpkg_update(mc):
test_project = 'test_force_update'
project = API_USER + '/' + test_project
project_dir = os.path.join(TMP_DIR, test_project) # primary project dir for updates

cleanup(mc, project, [project_dir])
# create remote project
shutil.copytree(TEST_DATA_DIR, project_dir)
mc.create_project(test_project, project_dir)

# test push changes with force gpkg file upload:
mp = MerginProject(project_dir)
f_updated = 'base.gpkg'
checksum = generate_checksum(mp.fpath(f_updated))

# base.gpkg updated to modified_schema (inserted new column)
shutil.move(mp.fpath(f_updated), mp.fpath_meta(f_updated)) # make local copy for changeset calculation (which will fail)
shutil.copy(os.path.join(CHANGED_SCHEMA_DIR, 'modified_schema.gpkg'), mp.fpath(f_updated))
shutil.copy(os.path.join(CHANGED_SCHEMA_DIR, 'modified_schema.gpkg-wal'), mp.fpath(f_updated + '-wal'))
mc.push_project(project_dir)
# by this point local file has been updated (changes committed from wal)
updated_checksum = generate_checksum(mp.fpath(f_updated))
assert checksum != updated_checksum

# check project after push
project_info = mc.project_info(project)
assert project_info['version'] == 'v2'
f_remote = next((f for f in project_info['files'] if f['path'] == f_updated), None)
assert f_remote['checksum'] == updated_checksum
assert 'diff' not in f_remote
15 changes: 13 additions & 2 deletions mergin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,23 @@ def int_version(version):

def do_sqlite_checkpoint(path):
"""
function to do checkpoint over the geopackage file which was not able to do diff file
Function to do checkpoint over the geopackage file which was not able to do diff file.

:param path: file's absolute path on disk
:type path: str
:returns: new size and checksum of file after checkpoint
:rtype: int, str
"""
if ".gpkg" in path and os.path.exists(f'{path}-wal') and os.path.exists(f'{path}-shm'):
new_size = None
new_checksum = None
if ".gpkg" in path and os.path.exists(f'{path}-wal'):
conn = sqlite3.connect(path)
cursor = conn.cursor()
cursor.execute("PRAGMA wal_checkpoint=FULL")
cursor.execute("VACUUM")
conn.commit()
conn.close()
new_size = os.path.getsize(path)
new_checksum = generate_checksum(path)

return new_size, new_checksum