diff --git a/mergin/client.py b/mergin/client.py index db667974..a0551e0f 100644 --- a/mergin/client.py +++ b/mergin/client.py @@ -134,6 +134,22 @@ def is_versioned_file(self, file): f_extension = os.path.splitext(file)[1] return f_extension in diff_extensions + def is_gpkg_open(self, path): + """ + Check whether geopackage file is open (and wal file exists) + + :param path: absolute path of file on disk + :type path: str + :returns: whether file is open + :rtype: bool + """ + f_extension = os.path.splitext(path)[1] + if f_extension != '.gpkg': + return False + if os.path.exists(f'{path}-wal'): + return True + return False + def ignore_file(self, file): """ Helper function for blacklisting certain types of files. @@ -165,6 +181,7 @@ def inspect_files(self): for file in files: if self.ignore_file(file): continue + abs_path = os.path.abspath(os.path.join(root, file)) rel_path = os.path.relpath(abs_path, start=self.dir) proj_path = '/'.join(rel_path.split(os.path.sep)) # we need posix path @@ -222,7 +239,8 @@ def compare_file_sets(self, origin, current): path = f["path"] if path not in origin_map: continue - if f["checksum"] == origin_map[path]["checksum"]: + # with open WAL files we don't know yet, better to mark file as updated + if not self.is_gpkg_open(self.fpath(path)) and f["checksum"] == origin_map[path]["checksum"]: continue f["origin_checksum"] = origin_map[path]["checksum"] updated.append(f) @@ -298,7 +316,12 @@ def get_push_changes(self): :rtype: dict """ changes = self.compare_file_sets(self.metadata['files'], self.inspect_files()) + # do checkpoint to push changes from wal file to gpkg for file in changes['added'] + changes['updated']: + size, checksum = do_sqlite_checkpoint(self.fpath(file["path"])) + if size and checksum: + file["size"] = size + file["checksum"] = checksum file['chunks'] = [str(uuid.uuid4()) for i in range(math.ceil(file["size"] / UPLOAD_CHUNK_SIZE))] if not self.geodiff: @@ -311,8 +334,9 @@ def get_push_changes(self): if not self.is_versioned_file(path): continue + # we use geodiff to check if we can push only diff files current_file = self.fpath(path) - origin_file = self.fpath(path, self.meta_dir) + origin_file = self.fpath_meta(path) diff_id = str(uuid.uuid4()) diff_name = path + '-diff-' + diff_id diff_file = self.fpath_meta(diff_name) @@ -332,7 +356,8 @@ def get_push_changes(self): else: not_updated.append(file) except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError) as e: - pass # we do force update + # changes from wal file already committed + pass changes['updated'] = [f for f in changes['updated'] if f not in not_updated] return changes @@ -477,15 +502,16 @@ def apply_push_changes(self, changes): elif k == 'added': shutil.copy(self.fpath(path), basefile) elif k == 'updated': - # in case for geopackage cannot be created diff + # in case for geopackage cannot be created diff (e.g. forced update with committed changes from wal file) if "diff" not in item: - continue - # better to apply diff to previous basefile to avoid issues with geodiff tmp files - changeset = self.fpath_meta(item['diff']['path']) - patch_error = self.apply_diffs(basefile, [changeset]) - if patch_error: - # in case of local sync issues it is safier to remove basefile, next time it will be downloaded from server - os.remove(basefile) + shutil.copy(self.fpath(path), basefile) + else: + # better to apply diff to previous basefile to avoid issues with geodiff tmp files + changeset = self.fpath_meta(item['diff']['path']) + patch_error = self.apply_diffs(basefile, [changeset]) + if patch_error: + # in case of local sync issues it is safier to remove basefile, next time it will be downloaded from server + os.remove(basefile) else: pass @@ -944,10 +970,6 @@ def _push_changes(self, mp, data, parallel): with concurrent.futures.ThreadPoolExecutor() as executor: futures_map = {} for file in upload_files: - # do checkpoint to push changes from wal file to gpkg if there is no diff - if "diff" not in file and mp.is_versioned_file(file["path"]): - do_sqlite_checkpoint(mp.fpath(file["path"])) - file["checksum"] = generate_checksum(mp.fpath(file["path"])) file['location'] = mp.fpath_meta(file['diff']['path']) if 'diff' in file else mp.fpath(file['path']) future = executor.submit(self._upload_file, info["transaction"], file, parallel) futures_map[future] = file @@ -960,10 +982,6 @@ def _push_changes(self, mp, data, parallel): raise ClientError("Timeout error: failed to upload {}".format(file)) else: for file in upload_files: - # do checkpoint to push changes from wal file to gpkg if there is no diff - if "diff" not in file and mp.is_versioned_file(file["path"]): - do_sqlite_checkpoint(mp.fpath(file["path"])) - file["checksum"] = generate_checksum(mp.fpath(file["path"])) file['location'] = mp.fpath_meta(file['diff']['path']) if 'diff' in file else mp.fpath(file['path']) self._upload_file(info["transaction"], file, parallel) @@ -1085,6 +1103,7 @@ def _download_file(self, project_path, file, directory, parallel=True, diff_only } file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, file['path']))) basename = os.path.basename(file['diff']['path']) if diff_only else os.path.basename(file['path']) + expected_size = file['diff']['size'] if diff_only else file['size'] if file['size'] == 0: os.makedirs(file_dir, exist_ok=True) @@ -1125,7 +1144,7 @@ def download_file_part(part): shutil.copyfileobj(chunk, final) os.remove(file_part) - if os.path.getsize(os.path.join(file_dir, basename)) != file['size']: + if os.path.getsize(os.path.join(file_dir, basename)) != expected_size: os.remove(os.path.join(file_dir, basename)) raise ClientError(f'Download of file {basename} failed. Please try it again.') diff --git a/mergin/test/modified_schema/base.gpkg-shm b/mergin/test/modified_schema/base.gpkg-shm deleted file mode 100644 index 30e14afa..00000000 Binary files a/mergin/test/modified_schema/base.gpkg-shm and /dev/null differ diff --git a/mergin/test/modified_schema/base.gpkg-wal b/mergin/test/modified_schema/base.gpkg-wal deleted file mode 100644 index 603abafc..00000000 Binary files a/mergin/test/modified_schema/base.gpkg-wal and /dev/null differ diff --git a/mergin/test/modified_schema/base.gpkg b/mergin/test/modified_schema/modified_schema.gpkg similarity index 99% rename from mergin/test/modified_schema/base.gpkg rename to mergin/test/modified_schema/modified_schema.gpkg index 3321f867..ad5a2963 100644 Binary files a/mergin/test/modified_schema/base.gpkg and b/mergin/test/modified_schema/modified_schema.gpkg differ diff --git a/mergin/test/modified_schema/modified_schema.gpkg-wal b/mergin/test/modified_schema/modified_schema.gpkg-wal new file mode 100644 index 00000000..97cce029 Binary files /dev/null and b/mergin/test/modified_schema/modified_schema.gpkg-wal differ diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index ef4e532c..0c7975c5 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -136,11 +136,6 @@ def test_push_pull_changes(mc, parallel): f_updated = 'test3.txt' with open(os.path.join(project_dir, f_updated), 'w') as f: f.write('Modified') - src_files = os.listdir(CHANGED_SCHEMA_DIR) - for file_name in src_files: - full_file_name = os.path.join(CHANGED_SCHEMA_DIR, file_name) - if os.path.isfile(full_file_name): - shutil.copy(full_file_name, project_dir) # check changes before applied pull_changes, push_changes, _ = mc.project_status(project_dir) @@ -342,3 +337,33 @@ def test_list_of_push_changes(mc): mc.project_status(project_dir) +def test_force_gpkg_update(mc): + test_project = 'test_force_update' + project = API_USER + '/' + test_project + project_dir = os.path.join(TMP_DIR, test_project) # primary project dir for updates + + cleanup(mc, project, [project_dir]) + # create remote project + shutil.copytree(TEST_DATA_DIR, project_dir) + mc.create_project(test_project, project_dir) + + # test push changes with force gpkg file upload: + mp = MerginProject(project_dir) + f_updated = 'base.gpkg' + checksum = generate_checksum(mp.fpath(f_updated)) + + # base.gpkg updated to modified_schema (inserted new column) + shutil.move(mp.fpath(f_updated), mp.fpath_meta(f_updated)) # make local copy for changeset calculation (which will fail) + shutil.copy(os.path.join(CHANGED_SCHEMA_DIR, 'modified_schema.gpkg'), mp.fpath(f_updated)) + shutil.copy(os.path.join(CHANGED_SCHEMA_DIR, 'modified_schema.gpkg-wal'), mp.fpath(f_updated + '-wal')) + mc.push_project(project_dir) + # by this point local file has been updated (changes committed from wal) + updated_checksum = generate_checksum(mp.fpath(f_updated)) + assert checksum != updated_checksum + + # check project after push + project_info = mc.project_info(project) + assert project_info['version'] == 'v2' + f_remote = next((f for f in project_info['files'] if f['path'] == f_updated), None) + assert f_remote['checksum'] == updated_checksum + assert 'diff' not in f_remote diff --git a/mergin/utils.py b/mergin/utils.py index 29e6f52e..f7944e66 100644 --- a/mergin/utils.py +++ b/mergin/utils.py @@ -71,12 +71,23 @@ def int_version(version): def do_sqlite_checkpoint(path): """ - function to do checkpoint over the geopackage file which was not able to do diff file + Function to do checkpoint over the geopackage file which was not able to do diff file. + + :param path: file's absolute path on disk + :type path: str + :returns: new size and checksum of file after checkpoint + :rtype: int, str """ - if ".gpkg" in path and os.path.exists(f'{path}-wal') and os.path.exists(f'{path}-shm'): + new_size = None + new_checksum = None + if ".gpkg" in path and os.path.exists(f'{path}-wal'): conn = sqlite3.connect(path) cursor = conn.cursor() cursor.execute("PRAGMA wal_checkpoint=FULL") cursor.execute("VACUUM") conn.commit() conn.close() + new_size = os.path.getsize(path) + new_checksum = generate_checksum(path) + + return new_size, new_checksum