diff --git a/.github/workflows/tests_mergin_db_sync.yaml b/.github/workflows/tests_mergin_db_sync.yaml index 0f16271..6d6d5ee 100644 --- a/.github/workflows/tests_mergin_db_sync.yaml +++ b/.github/workflows/tests_mergin_db_sync.yaml @@ -57,4 +57,10 @@ jobs: - name: Run tests run: | - pytest test --cov=. --cov-report=term-missing:skip-covered -vv \ No newline at end of file + pytest test --cov=. --cov-report=term-missing:skip-covered -vv + + - name: Check files using the black formatter + uses: rickstaa/action-black@v1 + id: action_black + with: + black_args: "." diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a91856d --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,5 @@ +repos: +- repo: https://github.com/psf/black + rev: 23.3.0 + hooks: + - id: black \ No newline at end of file diff --git a/config.py b/config.py index d8bc035..1098b7a 100644 --- a/config.py +++ b/config.py @@ -6,7 +6,9 @@ License: MIT """ -from dynaconf import Dynaconf +from dynaconf import ( + Dynaconf, +) import platform import tempfile import pathlib @@ -16,7 +18,7 @@ envvar_prefix=False, settings_files=[], geodiff_exe="geodiff.exe" if platform.system() == "Windows" else "geodiff", - working_dir=(pathlib.Path(tempfile.gettempdir()) / "dbsync").as_posix() + working_dir=(pathlib.Path(tempfile.gettempdir()) / "dbsync").as_posix(), ) @@ -25,13 +27,22 @@ class ConfigError(Exception): def validate_config(config): - """ Validate config - make sure values are consistent """ + """Validate config - make sure values are consistent""" # validate that geodiff can be found, otherwise it does not make sense to run DB Sync try: - subprocess.run([config.geodiff_exe, "help"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + [ + config.geodiff_exe, + "help", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) except FileNotFoundError: - raise ConfigError("Config error: Geodiff executable not found. Is it installed and available in `PATH` environment variable?") + raise ConfigError( + "Config error: Geodiff executable not found. Is it installed and available in `PATH` environment variable?" + ) if not (config.mergin.url and config.mergin.username and config.mergin.password): raise ConfigError("Config error: Incorrect mergin settings") @@ -42,48 +53,85 @@ def validate_config(config): if "init_from" not in config: raise ConfigError("Config error: Missing parameter `init_from` in the configuration.") - if config.init_from not in ["gpkg", "db"]: - raise ConfigError(f"Config error: `init_from` parameter must be either `gpkg` or `db`. Current value is `{config.init_from}`.") + if config.init_from not in [ + "gpkg", + "db", + ]: + raise ConfigError( + f"Config error: `init_from` parameter must be either `gpkg` or `db`. Current value is `{config.init_from}`." + ) for conn in config.connections: - for attr in ["driver", "conn_info", "modified", "base", "mergin_project", "sync_file"]: - if not hasattr(conn, attr): - raise ConfigError(f"Config error: Incorrect connection settings. Required parameter `{attr}` is missing.") + for attr in [ + "driver", + "conn_info", + "modified", + "base", + "mergin_project", + "sync_file", + ]: + if not hasattr( + conn, + attr, + ): + raise ConfigError( + f"Config error: Incorrect connection settings. Required parameter `{attr}` is missing." + ) if conn.driver != "postgres": raise ConfigError("Config error: Only 'postgres' driver is currently supported.") if "/" not in conn.mergin_project: - raise ConfigError("Config error: Name of the Mergin Maps project should be provided in the namespace/name format.") + raise ConfigError( + "Config error: Name of the Mergin Maps project should be provided in the namespace/name format." + ) if "skip_tables" in conn: if conn.skip_tables is None: continue - elif isinstance(conn.skip_tables, str): + elif isinstance( + conn.skip_tables, + str, + ): continue - elif not isinstance(conn.skip_tables, list): + elif not isinstance( + conn.skip_tables, + list, + ): raise ConfigError("Config error: Ignored tables parameter should be a list") -def get_ignored_tables(connection): +def get_ignored_tables( + connection, +): if "skip_tables" in connection: if connection.skip_tables is None: return [] - elif isinstance(connection.skip_tables, str): + elif isinstance( + connection.skip_tables, + str, + ): return [connection.skip_tables] - elif isinstance(connection.skip_tables, list): + elif isinstance( + connection.skip_tables, + list, + ): return connection.skip_tables else: return [] -def update_config_path(path_param: str) -> None: +def update_config_path( + path_param: str, +) -> None: config_file_path = pathlib.Path(path_param) if config_file_path.exists(): print(f"Using config file: {path_param}") - user_file_config = Dynaconf(envvar_prefix=False, - settings_files=[config_file_path]) + user_file_config = Dynaconf( + envvar_prefix=False, + settings_files=[config_file_path], + ) config.update(user_file_config) else: raise IOError(f"Config file {config_file_path} does not exist.") diff --git a/dbsync.py b/dbsync.py index 45597fa..486f99a 100644 --- a/dbsync.py +++ b/dbsync.py @@ -21,16 +21,33 @@ import psycopg2 import psycopg2.extensions -from psycopg2 import sql -from itertools import chain - -from mergin import MerginClient, MerginProject, LoginError, ClientError, InvalidProject -from version import __version__ -from config import config, validate_config, get_ignored_tables, ConfigError +from psycopg2 import ( + sql, +) +from itertools import ( + chain, +) + +from mergin import ( + MerginClient, + MerginProject, + LoginError, + ClientError, + InvalidProject, +) +from version import ( + __version__, +) +from config import ( + config, + validate_config, + get_ignored_tables, + ConfigError, +) # set high logging level for geodiff (used by geodiff executable) # so we get as much information as possible -os.environ["GEODIFF_LOGGER_LEVEL"] = '4' # 0 = nothing, 1 = errors, 2 = warning, 3 = info, 4 = debug +os.environ["GEODIFF_LOGGER_LEVEL"] = "4" # 0 = nothing, 1 = errors, 2 = warning, 3 = info, 4 = debug FORCE_INIT_MESSAGE = "Running `dbsync_deamon.py` with `--force-init` should fix the issue." @@ -38,55 +55,96 @@ class DbSyncError(Exception): default_print_password = "password='*****'" - def __init__(self, message): + def __init__( + self, + message, + ): # escaped password - message = re.sub(r'password=[\"\'].+[\"\'](?=\s)', self.default_print_password, message) + message = re.sub( + r"password=[\"\'].+[\"\'](?=\s)", + self.default_print_password, + message, + ) # not escaped password - message = re.sub(r'password=\S+', self.default_print_password, message) + message = re.sub( + r"password=\S+", + self.default_print_password, + message, + ) super().__init__(message) -def _add_quotes_to_schema_name(schema: str) -> str: - matches = re.findall(r"[^a-z0-9_]", schema) +def _add_quotes_to_schema_name( + schema: str, +) -> str: + matches = re.findall( + r"[^a-z0-9_]", + schema, + ) if len(matches) != 0: - schema = schema.replace("\"", "\"\"") + schema = schema.replace( + '"', + '""', + ) schema = f'"{schema}"' return schema -def _tables_list_to_string(tables): +def _tables_list_to_string( + tables, +): return ";".join(tables) -def _check_has_working_dir(work_path): +def _check_has_working_dir( + work_path, +): if not os.path.exists(work_path): raise DbSyncError("The project working directory does not exist: " + work_path) - if not os.path.exists(os.path.join(work_path, '.mergin')): + if not os.path.exists( + os.path.join( + work_path, + ".mergin", + ) + ): raise DbSyncError("The project working directory does not seem to contain Mergin Maps project: " + work_path) -def _check_has_sync_file(file_path): - """ Checks whether the dbsync environment is initialized already (so that we can pull/push). - Emits an exception if not initialized yet. """ +def _check_has_sync_file( + file_path, +): + """Checks whether the dbsync environment is initialized already (so that we can pull/push). + Emits an exception if not initialized yet.""" if not os.path.exists(file_path): raise DbSyncError("The output GPKG file does not exist: " + file_path) -def _drop_schema(conn, schema_name: str) -> None: +def _drop_schema( + conn, + schema_name: str, +) -> None: cur = conn.cursor() cur.execute(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(sql.Identifier(schema_name))) conn.commit() -def _check_schema_exists(conn, schema_name): +def _check_schema_exists( + conn, + schema_name, +): cur = conn.cursor() - cur.execute("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = %s)", (schema_name,)) + cur.execute( + "SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = %s)", + (schema_name,), + ) return cur.fetchone()[0] -def _check_postgis_available(conn: psycopg2.extensions.connection) -> bool: +def _check_postgis_available( + conn: psycopg2.extensions.connection, +) -> bool: cur = conn.cursor() cur.execute("SELECT extname FROM pg_extension;") try: @@ -99,7 +157,9 @@ def _check_postgis_available(conn: psycopg2.extensions.connection) -> bool: return False -def _try_install_postgis(conn: psycopg2.extensions.connection) -> bool: +def _try_install_postgis( + conn: psycopg2.extensions.connection, +) -> bool: cur = conn.cursor() try: cur.execute("CREATE EXTENSION postgis;") @@ -109,15 +169,22 @@ def _try_install_postgis(conn: psycopg2.extensions.connection) -> bool: def _check_has_password(): - """ Checks whether we have password for Mergin Maps user - if not, we will ask for it """ + """Checks whether we have password for Mergin Maps user - if not, we will ask for it""" if config.mergin.password is None: - config.mergin.password = getpass.getpass(prompt="Mergin Maps password for '{}': ".format(config.mergin.username)) + config.mergin.password = getpass.getpass( + prompt="Mergin Maps password for '{}': ".format(config.mergin.username) + ) -def _run_geodiff(cmd): - """ will run a command (with geodiff) and report what got to stderr and raise exception - if the command returns non-zero exit code """ - res = subprocess.run(cmd, stderr=subprocess.PIPE) +def _run_geodiff( + cmd, +): + """will run a command (with geodiff) and report what got to stderr and raise exception + if the command returns non-zero exit code""" + res = subprocess.run( + cmd, + stderr=subprocess.PIPE, + ) geodiff_stderr = res.stderr.decode() if geodiff_stderr: print("GEODIFF: " + geodiff_stderr) @@ -125,102 +192,332 @@ def _run_geodiff(cmd): raise DbSyncError("geodiff failed!\n" + str(cmd)) -def _geodiff_create_changeset(driver, conn_info, base, modified, changeset, ignored_tables): +def _geodiff_create_changeset( + driver, + conn_info, + base, + modified, + changeset, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "diff", "--driver", driver, conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), base, modified, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver", + driver, + conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + base, + modified, + changeset, + ] + ) else: - _run_geodiff([config.geodiff_exe, "diff", "--driver", driver, conn_info, base, modified, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver", + driver, + conn_info, + base, + modified, + changeset, + ] + ) -def _geodiff_apply_changeset(driver, conn_info, base, changeset, ignored_tables): +def _geodiff_apply_changeset( + driver, + conn_info, + base, + changeset, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "apply", "--driver", driver, conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), base, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "apply", + "--driver", + driver, + conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + base, + changeset, + ] + ) else: - _run_geodiff([config.geodiff_exe, "apply", "--driver", driver, conn_info, base, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "apply", + "--driver", + driver, + conn_info, + base, + changeset, + ] + ) -def _geodiff_rebase(driver, conn_info, base, our, base2their, conflicts, ignored_tables): +def _geodiff_rebase( + driver, + conn_info, + base, + our, + base2their, + conflicts, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "rebase-db", "--driver", driver, conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), base, our, base2their, conflicts]) + _run_geodiff( + [ + config.geodiff_exe, + "rebase-db", + "--driver", + driver, + conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + base, + our, + base2their, + conflicts, + ] + ) else: - _run_geodiff([config.geodiff_exe, "rebase-db", "--driver", driver, conn_info, base, our, base2their, conflicts]) + _run_geodiff( + [ + config.geodiff_exe, + "rebase-db", + "--driver", + driver, + conn_info, + base, + our, + base2their, + conflicts, + ] + ) -def _geodiff_list_changes_details(changeset): - """ Returns a list with changeset details: - [ { 'table': 'foo', 'type': 'update', 'changes': [ ... old/new column values ... ] }, ... ] +def _geodiff_list_changes_details( + changeset, +): + """Returns a list with changeset details: + [ { 'table': 'foo', 'type': 'update', 'changes': [ ... old/new column values ... ] }, ... ] """ tmp_dir = tempfile.gettempdir() - tmp_output = os.path.join(tmp_dir, 'dbsync-changeset-details') + tmp_output = os.path.join( + tmp_dir, + "dbsync-changeset-details", + ) if os.path.exists(tmp_output): os.remove(tmp_output) - _run_geodiff([config.geodiff_exe, "as-json", changeset, tmp_output]) + _run_geodiff( + [ + config.geodiff_exe, + "as-json", + changeset, + tmp_output, + ] + ) with open(tmp_output) as f: out = json.load(f) os.remove(tmp_output) return out["geodiff"] -def _geodiff_list_changes_summary(changeset): - """ Returns a list with changeset summary: - [ { 'table': 'foo', 'insert': 1, 'update': 2, 'delete': 3 }, ... ] +def _geodiff_list_changes_summary( + changeset, +): + """Returns a list with changeset summary: + [ { 'table': 'foo', 'insert': 1, 'update': 2, 'delete': 3 }, ... ] """ tmp_dir = tempfile.gettempdir() - tmp_output = os.path.join(tmp_dir, 'dbsync-changeset-summary') + tmp_output = os.path.join( + tmp_dir, + "dbsync-changeset-summary", + ) if os.path.exists(tmp_output): os.remove(tmp_output) - _run_geodiff([config.geodiff_exe, "as-summary", changeset, tmp_output]) + _run_geodiff( + [ + config.geodiff_exe, + "as-summary", + changeset, + tmp_output, + ] + ) with open(tmp_output) as f: out = json.load(f) os.remove(tmp_output) return out["geodiff_summary"] -def _geodiff_make_copy(src_driver, src_conn_info, src, dst_driver, dst_conn_info, dst, ignored_tables): +def _geodiff_make_copy( + src_driver, + src_conn_info, + src, + dst_driver, + dst_conn_info, + dst, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "copy", "--driver-1", src_driver, src_conn_info, "--driver-2", dst_driver, dst_conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), src, dst]) + _run_geodiff( + [ + config.geodiff_exe, + "copy", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + src, + dst, + ] + ) else: - _run_geodiff([config.geodiff_exe, "copy", "--driver-1", src_driver, src_conn_info, "--driver-2", dst_driver, dst_conn_info, src, dst]) + _run_geodiff( + [ + config.geodiff_exe, + "copy", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + src, + dst, + ] + ) -def _geodiff_create_changeset_dr(src_driver, src_conn_info, src, dst_driver, dst_conn_info, dst, changeset, ignored_tables): +def _geodiff_create_changeset_dr( + src_driver, + src_conn_info, + src, + dst_driver, + dst_conn_info, + dst, + changeset, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "diff", "--driver-1", src_driver, src_conn_info, "--driver-2", dst_driver, dst_conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), src, dst, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + src, + dst, + changeset, + ] + ) else: - _run_geodiff([config.geodiff_exe, "diff", "--driver-1", src_driver, src_conn_info, "--driver-2", dst_driver, dst_conn_info, src, dst, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + src, + dst, + changeset, + ] + ) -def _compare_datasets(src_driver, src_conn_info, src, dst_driver, dst_conn_info, dst, ignored_tables, summary_only=True): - """ Compare content of two datasets (from various drivers) and return geodiff JSON summary of changes """ +def _compare_datasets( + src_driver, + src_conn_info, + src, + dst_driver, + dst_conn_info, + dst, + ignored_tables, + summary_only=True, +): + """Compare content of two datasets (from various drivers) and return geodiff JSON summary of changes""" tmp_dir = tempfile.gettempdir() - tmp_changeset = os.path.join(tmp_dir, ''.join(random.choices(string.ascii_letters, k=8))) - - _geodiff_create_changeset_dr(src_driver, src_conn_info, src, dst_driver, dst_conn_info, dst, tmp_changeset, ignored_tables) + tmp_changeset = os.path.join( + tmp_dir, + "".join( + random.choices( + string.ascii_letters, + k=8, + ) + ), + ) + + _geodiff_create_changeset_dr( + src_driver, + src_conn_info, + src, + dst_driver, + dst_conn_info, + dst, + tmp_changeset, + ignored_tables, + ) if summary_only: return _geodiff_list_changes_summary(tmp_changeset) else: return _geodiff_list_changes_details(tmp_changeset) -def _print_changes_summary(summary, label=None): - """ Takes a geodiff JSON summary of changes and prints them """ +def _print_changes_summary( + summary, + label=None, +): + """Takes a geodiff JSON summary of changes and prints them""" print("Changes:" if label is None else label) for item in summary: - print("{:20} {:4} {:4} {:4}".format(item['table'], item['insert'], item['update'], item['delete'])) + print( + "{:20} {:4} {:4} {:4}".format( + item["table"], + item["insert"], + item["update"], + item["delete"], + ) + ) -def _print_mergin_changes(diff_dict): - """ Takes a dictionary with format { 'added': [...], 'removed': [...], 'updated': [...] } +def _print_mergin_changes( + diff_dict, +): + """Takes a dictionary with format { 'added': [...], 'removed': [...], 'updated': [...] } where each item is another dictionary with file details, e.g.: { 'path': 'myfile.gpkg', size: 123456, ... } and prints it in a way that's easy to parse for a human :-) """ - for item in diff_dict['added']: - print(" added: " + item['path']) - for item in diff_dict['updated']: - print(" updated: " + item['path']) - for item in diff_dict['removed']: - print(" removed: " + item['path']) + for item in diff_dict["added"]: + print(" added: " + item["path"]) + for item in diff_dict["updated"]: + print(" updated: " + item["path"]) + for item in diff_dict["removed"]: + print(" removed: " + item["path"]) # Dictionary used by _get_mergin_project() function below. @@ -228,7 +525,9 @@ def _print_mergin_changes(diff_dict): cached_mergin_project_objects = {} -def _get_mergin_project(work_path): +def _get_mergin_project( + work_path, +): """ Returns a cached MerginProject object or creates one if it does not exist yet. This is to avoid creating many of these objects (e.g. every pull/push) because it does @@ -242,24 +541,38 @@ def _get_mergin_project(work_path): return cached_mergin_project_objects[work_path] -def _get_project_version(work_path): - """ Returns the current version of the project """ +def _get_project_version( + work_path, +): + """Returns the current version of the project""" mp = _get_mergin_project(work_path) return mp.metadata["version"] -def _get_project_id(mp): - """ Returns the project ID """ +def _get_project_id( + mp, +): + """Returns the project ID""" try: project_id = uuid.UUID(mp.metadata["project_id"]) - except (KeyError, ValueError): + except ( + KeyError, + ValueError, + ): project_id = None return project_id -def _set_db_project_comment(conn, schema, project_name, version, project_id=None, error=None): - """ Set postgres COMMENT on SCHEMA with Mergin Maps project name and version - or eventually error message if initialisation failed +def _set_db_project_comment( + conn, + schema, + project_name, + version, + project_id=None, + error=None, +): + """Set postgres COMMENT on SCHEMA with Mergin Maps project name and version + or eventually error message if initialisation failed """ comment = { "name": project_name, @@ -271,35 +584,59 @@ def _set_db_project_comment(conn, schema, project_name, version, project_id=None comment["error"] = error cur = conn.cursor() query = sql.SQL("COMMENT ON SCHEMA {} IS %s").format(sql.Identifier(schema)) - cur.execute(query.as_string(conn), (json.dumps(comment), )) + cur.execute( + query.as_string(conn), + (json.dumps(comment),), + ) conn.commit() def _get_db_project_comment(conn, schema): - """ Get Mergin Maps project name and its current version in db schema""" + """Get Mergin Maps project name and its current version in db schema""" cur = conn.cursor() schema = _add_quotes_to_schema_name(schema) - cur.execute("SELECT obj_description(%s::regnamespace, 'pg_namespace')", (schema, )) + cur.execute( + "SELECT obj_description(%s::regnamespace, 'pg_namespace')", + (schema,), + ) res = cur.fetchone()[0] try: comment = json.loads(res) if res else None - except (TypeError, json.decoder.JSONDecodeError): + except ( + TypeError, + json.decoder.JSONDecodeError, + ): return return comment -def _redownload_project(conn_cfg, mc, work_dir, db_proj_info): +def _redownload_project( + conn_cfg, + mc, + work_dir, + db_proj_info, +): print(f"Removing local working directory {work_dir}") shutil.rmtree(work_dir) - print(f"Downloading version {db_proj_info['version']} of Mergin Maps project {conn_cfg.mergin_project} " - f"to {work_dir}") + print( + f"Downloading version {db_proj_info['version']} of Mergin Maps project {conn_cfg.mergin_project} " + f"to {work_dir}" + ) try: - mc.download_project(conn_cfg.mergin_project, work_dir, db_proj_info["version"]) + mc.download_project( + conn_cfg.mergin_project, + work_dir, + db_proj_info["version"], + ) except ClientError as e: raise DbSyncError("Mergin Maps client error: " + str(e)) -def _validate_local_project_id(mp, mc, server_info=None): +def _validate_local_project_id( + mp, + mc, + server_info=None, +): """Compare local project ID with remote version on the server.""" local_project_id = _get_project_id(mp) if local_project_id is None: @@ -319,20 +656,31 @@ def _validate_local_project_id(mp, mc, server_info=None): def create_mergin_client(): - """ Create instance of MerginClient""" + """Create instance of MerginClient""" _check_has_password() try: - return MerginClient(config.mergin.url, login=config.mergin.username, password=config.mergin.password, plugin_version=f"DB-sync/{__version__}") + return MerginClient( + config.mergin.url, + login=config.mergin.username, + password=config.mergin.password, + plugin_version=f"DB-sync/{__version__}", + ) except LoginError as e: # this could be auth failure, but could be also server problem (e.g. worker crash) - raise DbSyncError(f"Unable to log in to Mergin Maps: {str(e)} \n\n" + - "Have you specified correct credentials in configuration file?") + raise DbSyncError( + f"Unable to log in to Mergin Maps: {str(e)} \n\n" + + "Have you specified correct credentials in configuration file?" + ) except ClientError as e: # this could be e.g. DNS error raise DbSyncError("Mergin Maps client error: " + str(e)) -def revert_local_changes(mc, mp, local_changes=None): +def revert_local_changes( + mc, + mp, + local_changes=None, +): """Revert local changes from the existing project.""" if local_changes is None: local_changes = mp.get_push_changes() @@ -341,22 +689,42 @@ def revert_local_changes(mc, mp, local_changes=None): print("Reverting local changes: " + str(local_changes)) for add_change in local_changes["added"]: added_file = add_change["path"] - added_filepath = os.path.join(mp.dir, added_file) + added_filepath = os.path.join( + mp.dir, + added_file, + ) os.remove(added_filepath) - for update_delete_change in chain(local_changes["updated"], local_changes["removed"]): + for update_delete_change in chain( + local_changes["updated"], + local_changes["removed"], + ): update_delete_file = update_delete_change["path"] - update_delete_filepath = os.path.join(mp.dir, update_delete_file) + update_delete_filepath = os.path.join( + mp.dir, + update_delete_file, + ) delete_file = os.path.isfile(update_delete_filepath) if update_delete_file.lower().endswith(".gpkg"): - update_delete_filepath_base = os.path.join(mp.meta_dir, update_delete_file) + update_delete_filepath_base = os.path.join( + mp.meta_dir, + update_delete_file, + ) if delete_file: os.remove(update_delete_filepath) - shutil.copy(update_delete_filepath_base, update_delete_filepath) + shutil.copy( + update_delete_filepath_base, + update_delete_filepath, + ) else: if delete_file: os.remove(update_delete_filepath) try: - mc.download_file(mp.dir, update_delete_file, update_delete_filepath, mp.metadata["version"]) + mc.download_file( + mp.dir, + update_delete_file, + update_delete_filepath, + mp.metadata["version"], + ) except ClientError as e: raise DbSyncError("Mergin Maps client error: " + str(e)) leftovers = mp.get_push_changes() @@ -365,14 +733,20 @@ def revert_local_changes(mc, mp, local_changes=None): def pull(conn_cfg, mc): - """ Downloads any changes from Mergin Maps and applies them to the database """ + """Downloads any changes from Mergin Maps and applies them to the database""" print(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) project_name = conn_cfg.mergin_project.split("/")[1] - work_dir = os.path.join(config.working_dir, project_name) - gpkg_full_path = os.path.join(work_dir, conn_cfg.sync_file) + work_dir = os.path.join( + config.working_dir, + project_name, + ) + gpkg_full_path = os.path.join( + work_dir, + conn_cfg.sync_file, + ) _check_has_working_dir(work_dir) _check_has_sync_file(gpkg_full_path) @@ -397,31 +771,60 @@ def pull(conn_cfg, mc): local_changes = mp.get_push_changes() if any(local_changes.values()): - local_changes = revert_local_changes(mc, mp, local_changes) + local_changes = revert_local_changes( + mc, + mp, + local_changes, + ) if any(local_changes.values()): - raise DbSyncError("There are pending changes in the local directory - that should never happen! " + str(local_changes)) + raise DbSyncError( + "There are pending changes in the local directory - that should never happen! " + str(local_changes) + ) if server_version == local_version: print("No changes on Mergin Maps.") return - gpkg_basefile = os.path.join(work_dir, '.mergin', conn_cfg.sync_file) + gpkg_basefile = os.path.join( + work_dir, + ".mergin", + conn_cfg.sync_file, + ) gpkg_basefile_old = gpkg_basefile + "-old" # make a copy of the basefile in the current version (base) - because after pull it will be set to "their" - shutil.copy(gpkg_basefile, gpkg_basefile_old) + shutil.copy( + gpkg_basefile, + gpkg_basefile_old, + ) tmp_dir = tempfile.gettempdir() - tmp_base2our = os.path.join(tmp_dir, f'{project_name}-dbsync-pull-base2our') - tmp_base2their = os.path.join(tmp_dir, f'{project_name}-dbsync-pull-base2their') + tmp_base2our = os.path.join( + tmp_dir, + f"{project_name}-dbsync-pull-base2our", + ) + tmp_base2their = os.path.join( + tmp_dir, + f"{project_name}-dbsync-pull-base2their", + ) # find out our local changes in the database (base2our) - _geodiff_create_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, conn_cfg.modified, tmp_base2our, ignored_tables) + _geodiff_create_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + conn_cfg.modified, + tmp_base2our, + ignored_tables, + ) needs_rebase = False if os.path.getsize(tmp_base2our) != 0: needs_rebase = True summary = _geodiff_list_changes_summary(tmp_base2our) - _print_changes_summary(summary, "DB Changes:") + _print_changes_summary( + summary, + "DB Changes:", + ) try: mc.pull_project(work_dir) # will do rebase as needed @@ -432,39 +835,88 @@ def pull(conn_cfg, mc): print("Pulled new version from Mergin Maps: " + _get_project_version(work_dir)) # simple case when there are no pending local changes - just apply whatever changes are coming - _geodiff_create_changeset("sqlite", "", gpkg_basefile_old, gpkg_basefile, tmp_base2their, ignored_tables) + _geodiff_create_changeset( + "sqlite", + "", + gpkg_basefile_old, + gpkg_basefile, + tmp_base2their, + ignored_tables, + ) # summarize changes summary = _geodiff_list_changes_summary(tmp_base2their) - _print_changes_summary(summary, "Mergin Maps Changes:") + _print_changes_summary( + summary, + "Mergin Maps Changes:", + ) if not needs_rebase: print("Applying new version [no rebase]") - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_base2their, ignored_tables) - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, tmp_base2their, ignored_tables) + _geodiff_apply_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + tmp_base2their, + ignored_tables, + ) + _geodiff_apply_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + tmp_base2their, + ignored_tables, + ) else: print("Applying new version [WITH rebase]") - tmp_conflicts = os.path.join(tmp_dir, f'{project_name}-dbsync-pull-conflicts') - _geodiff_rebase(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, - conn_cfg.modified, tmp_base2their, tmp_conflicts, ignored_tables) - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_base2their, ignored_tables) + tmp_conflicts = os.path.join( + tmp_dir, + f"{project_name}-dbsync-pull-conflicts", + ) + _geodiff_rebase( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + conn_cfg.modified, + tmp_base2their, + tmp_conflicts, + ignored_tables, + ) + _geodiff_apply_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + tmp_base2their, + ignored_tables, + ) os.remove(gpkg_basefile_old) conn = psycopg2.connect(conn_cfg.conn_info) version = _get_project_version(work_dir) - _set_db_project_comment(conn, conn_cfg.base, conn_cfg.mergin_project, version) + _set_db_project_comment( + conn, + conn_cfg.base, + conn_cfg.mergin_project, + version, + ) def status(conn_cfg, mc): - """ Figure out if there are any pending changes in the database or in Mergin Maps""" + """Figure out if there are any pending changes in the database or in Mergin Maps""" print(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) project_name = conn_cfg.mergin_project.split("/")[1] - work_dir = os.path.join(config.working_dir, project_name) - gpkg_full_path = os.path.join(work_dir, conn_cfg.sync_file) + work_dir = os.path.join( + config.working_dir, + project_name, + ) + gpkg_full_path = os.path.join( + work_dir, + conn_cfg.sync_file, + ) _check_has_working_dir(work_dir) _check_has_sync_file(gpkg_full_path) @@ -478,15 +930,22 @@ def status(conn_cfg, mc): local_version = mp.metadata["version"] print("Checking status...") try: - server_info = mc.project_info(project_path, since=local_version) + server_info = mc.project_info( + project_path, + since=local_version, + ) except ClientError as e: raise DbSyncError("Mergin Maps client error: " + str(e)) # Make sure that local project ID (if available) is the same as on the server - _validate_local_project_id(mp, mc, server_info) + _validate_local_project_id( + mp, + mc, + server_info, + ) status_push = mp.get_push_changes() - if status_push['added'] or status_push['updated'] or status_push['removed']: + if status_push["added"] or status_push["updated"] or status_push["removed"]: raise DbSyncError("Pending changes in the local directory - that should never happen! " + str(status_push)) print("Working directory " + work_dir) @@ -495,7 +954,7 @@ def status(conn_cfg, mc): print("Server is at version " + server_info["version"]) status_pull = mp.get_pull_changes(server_info["files"]) - if status_pull['added'] or status_pull['updated'] or status_pull['removed']: + if status_pull["added"] or status_pull["updated"] or status_pull["removed"]: print("There are pending changes on server:") _print_mergin_changes(status_pull) else: @@ -504,17 +963,33 @@ def status(conn_cfg, mc): print("") conn = psycopg2.connect(conn_cfg.conn_info) - if not _check_schema_exists(conn, conn_cfg.base): + if not _check_schema_exists( + conn, + conn_cfg.base, + ): raise DbSyncError("The base schema does not exist: " + conn_cfg.base) - if not _check_schema_exists(conn, conn_cfg.modified): + if not _check_schema_exists( + conn, + conn_cfg.modified, + ): raise DbSyncError("The 'modified' schema does not exist: " + conn_cfg.modified) # get changes in the DB tmp_dir = tempfile.gettempdir() - tmp_changeset_file = os.path.join(tmp_dir, f'{project_name}-dbsync-status-base2our') + tmp_changeset_file = os.path.join( + tmp_dir, + f"{project_name}-dbsync-status-base2our", + ) if os.path.exists(tmp_changeset_file): os.remove(tmp_changeset_file) - _geodiff_create_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, conn_cfg.modified, tmp_changeset_file, ignored_tables) + _geodiff_create_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + conn_cfg.modified, + tmp_changeset_file, + ignored_tables, + ) if os.path.getsize(tmp_changeset_file) == 0: print("No changes in the database.") @@ -526,7 +1001,7 @@ def status(conn_cfg, mc): def push(conn_cfg, mc): - """ Take changes in the 'modified' schema in the database and push them to Mergin Maps""" + """Take changes in the 'modified' schema in the database and push them to Mergin Maps""" print(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) @@ -534,12 +1009,21 @@ def push(conn_cfg, mc): project_name = conn_cfg.mergin_project.split("/")[1] tmp_dir = tempfile.gettempdir() - tmp_changeset_file = os.path.join(tmp_dir, f'{project_name}-dbsync-push-base2our') + tmp_changeset_file = os.path.join( + tmp_dir, + f"{project_name}-dbsync-push-base2our", + ) if os.path.exists(tmp_changeset_file): os.remove(tmp_changeset_file) - work_dir = os.path.join(config.working_dir, project_name) - gpkg_full_path = os.path.join(work_dir, conn_cfg.sync_file) + work_dir = os.path.join( + config.working_dir, + project_name, + ) + gpkg_full_path = os.path.join( + work_dir, + conn_cfg.sync_file, + ) _check_has_working_dir(work_dir) _check_has_sync_file(gpkg_full_path) @@ -562,9 +1046,10 @@ def push(conn_cfg, mc): raise DbSyncError("Mergin Maps client error: " + str(e)) status_push = mp.get_push_changes() - if status_push['added'] or status_push['updated'] or status_push['removed']: + if status_push["added"] or status_push["updated"] or status_push["removed"]: raise DbSyncError( - "There are pending changes in the local directory - that should never happen! " + str(status_push)) + "There are pending changes in the local directory - that should never happen! " + str(status_push) + ) # check there are no pending changes on server if server_version != local_version: @@ -572,13 +1057,26 @@ def push(conn_cfg, mc): conn = psycopg2.connect(conn_cfg.conn_info) - if not _check_schema_exists(conn, conn_cfg.base): + if not _check_schema_exists( + conn, + conn_cfg.base, + ): raise DbSyncError("The base schema does not exist: " + conn_cfg.base) - if not _check_schema_exists(conn, conn_cfg.modified): + if not _check_schema_exists( + conn, + conn_cfg.modified, + ): raise DbSyncError("The 'modified' schema does not exist: " + conn_cfg.modified) # get changes in the DB - _geodiff_create_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, conn_cfg.modified, tmp_changeset_file, ignored_tables) + _geodiff_create_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + conn_cfg.modified, + tmp_changeset_file, + ignored_tables, + ) if os.path.getsize(tmp_changeset_file) == 0: print("No changes in the database.") @@ -590,7 +1088,13 @@ def push(conn_cfg, mc): # write changes to the local geopackage print("Writing DB changes to working dir...") - _geodiff_apply_changeset("sqlite", "", gpkg_full_path, tmp_changeset_file, ignored_tables) + _geodiff_apply_changeset( + "sqlite", + "", + gpkg_full_path, + tmp_changeset_file, + ignored_tables, + ) # write to the server try: @@ -604,12 +1108,27 @@ def push(conn_cfg, mc): # update base schema in the DB print("Updating DB base schema...") - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_changeset_file, ignored_tables) - _set_db_project_comment(conn, conn_cfg.base, conn_cfg.mergin_project, version) - - -def init(conn_cfg, mc, from_gpkg=True): - """ Initialize the dbsync so that it is possible to do two-way sync between Mergin Maps and a database """ + _geodiff_apply_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + tmp_changeset_file, + ignored_tables, + ) + _set_db_project_comment( + conn, + conn_cfg.base, + conn_cfg.mergin_project, + version, + ) + + +def init( + conn_cfg, + mc, + from_gpkg=True, +): + """Initialize the dbsync so that it is possible to do two-way sync between Mergin Maps and a database""" print(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) @@ -629,54 +1148,103 @@ def init(conn_cfg, mc, from_gpkg=True): if not _try_install_postgis(conn): raise DbSyncError("Cannot find or activate `postgis` extension. You may need to install it.") - base_schema_exists = _check_schema_exists(conn, conn_cfg.base) - modified_schema_exists = _check_schema_exists(conn, conn_cfg.modified) - - work_dir = os.path.join(config.working_dir, project_name) - gpkg_full_path = os.path.join(work_dir, conn_cfg.sync_file) + base_schema_exists = _check_schema_exists( + conn, + conn_cfg.base, + ) + modified_schema_exists = _check_schema_exists( + conn, + conn_cfg.modified, + ) + + work_dir = os.path.join( + config.working_dir, + project_name, + ) + gpkg_full_path = os.path.join( + work_dir, + conn_cfg.sync_file, + ) if modified_schema_exists and base_schema_exists: print("Modified and base schemas already exist") # this is not a first run of db-sync init - db_proj_info = _get_db_project_comment(conn, conn_cfg.base) + db_proj_info = _get_db_project_comment( + conn, + conn_cfg.base, + ) if not db_proj_info: - raise DbSyncError("Base schema exists but missing which project it belongs to. " - "This may be a result of a previously failed attempt to initialize DB sync. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + "Base schema exists but missing which project it belongs to. " + "This may be a result of a previously failed attempt to initialize DB sync. " + f"{FORCE_INIT_MESSAGE}" + ) if "error" in db_proj_info: - changes_gpkg_base = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.base, ignored_tables, - summary_only=False) - changes = json.dumps(changes_gpkg_base, indent=2) + changes_gpkg_base = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + summary_only=False, + ) + changes = json.dumps( + changes_gpkg_base, + indent=2, + ) print(f"Changeset from failed init:\n {changes}") raise DbSyncError(db_proj_info["error"]) # make sure working directory contains the same version of project if not os.path.exists(work_dir): - print(f"Downloading version {db_proj_info['version']} of Mergin Maps project {conn_cfg.mergin_project} " - f"to {work_dir}") - mc.download_project(conn_cfg.mergin_project, work_dir, db_proj_info["version"]) + print( + f"Downloading version {db_proj_info['version']} of Mergin Maps project {conn_cfg.mergin_project} " + f"to {work_dir}" + ) + mc.download_project( + conn_cfg.mergin_project, + work_dir, + db_proj_info["version"], + ) else: # Get project ID from DB if available try: local_version = _get_project_version(work_dir) print(f"Working directory {work_dir} already exists, with project version {local_version}") # Compare local and database project version - db_project_id_str = getattr(db_proj_info, "project_id", None) + db_project_id_str = getattr( + db_proj_info, + "project_id", + None, + ) db_project_id = uuid.UUID(db_project_id_str) if db_project_id_str else None mp = _get_mergin_project(work_dir) local_project_id = _get_project_id(mp) if (db_project_id and local_project_id) and (db_project_id != local_project_id): - raise DbSyncError("Database project ID doesn't match local project ID. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError("Database project ID doesn't match local project ID. " f"{FORCE_INIT_MESSAGE}") if local_version != db_proj_info["version"]: - _redownload_project(conn_cfg, mc, work_dir, db_proj_info) + _redownload_project( + conn_cfg, + mc, + work_dir, + db_proj_info, + ) except InvalidProject as e: print(f"Error: {e}") - _redownload_project(conn_cfg, mc, work_dir, db_proj_info) + _redownload_project( + conn_cfg, + mc, + work_dir, + db_proj_info, + ) else: if not os.path.exists(work_dir): print("Downloading latest Mergin Maps project " + conn_cfg.mergin_project + " to " + work_dir) - mc.download_project(conn_cfg.mergin_project, work_dir) + mc.download_project( + conn_cfg.mergin_project, + work_dir, + ) else: local_version = _get_project_version(work_dir) print(f"Working directory {work_dir} already exists, with project version {local_version}") @@ -689,12 +1257,20 @@ def init(conn_cfg, mc, from_gpkg=True): _validate_local_project_id(mp, mc) # check there are no pending changes on server (or locally - which should never happen) - status_pull, status_push, _ = mc.project_status(work_dir) - if status_pull['added'] or status_pull['updated'] or status_pull['removed']: + ( + status_pull, + status_push, + _, + ) = mc.project_status(work_dir) + if status_pull["added"] or status_pull["updated"] or status_pull["removed"]: print("There are pending changes on server, please run pull command after init") - if status_push['added'] or status_push['updated'] or status_push['removed']: - raise DbSyncError("There are pending changes in the local directory - that should never happen! " + str(status_push) + " " + - f"{FORCE_INIT_MESSAGE}") + if status_push["added"] or status_push["updated"] or status_push["removed"]: + raise DbSyncError( + "There are pending changes in the local directory - that should never happen! " + + str(status_push) + + " " + + f"{FORCE_INIT_MESSAGE}" + ) if from_gpkg: if not os.path.exists(gpkg_full_path): @@ -702,118 +1278,244 @@ def init(conn_cfg, mc, from_gpkg=True): if modified_schema_exists and base_schema_exists: # if db schema already exists make sure it is already synchronized with source gpkg or fail - summary_modified = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.modified, ignored_tables) - summary_base = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.base, ignored_tables) + summary_modified = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + ignored_tables, + ) + summary_base = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + ) if len(summary_base): # seems someone modified base schema manually - this should never happen! print(f"Local project version at {local_version} and base schema at {db_proj_info['version']}") - _print_changes_summary(summary_base, "Base schema changes:") - raise DbSyncError("The db schemas already exist but 'base' schema is not synchronized with source GPKG. " - f"{FORCE_INIT_MESSAGE}") + _print_changes_summary( + summary_base, + "Base schema changes:", + ) + raise DbSyncError( + "The db schemas already exist but 'base' schema is not synchronized with source GPKG. " + f"{FORCE_INIT_MESSAGE}" + ) elif len(summary_modified): print("Modified schema is not synchronised with source GPKG, please run pull/push commands to fix it") - _print_changes_summary(summary_modified, "Pending Changes:") + _print_changes_summary( + summary_modified, + "Pending Changes:", + ) return else: print("The GPKG file, base and modified schemas are already initialized and in sync") return # nothing to do elif modified_schema_exists: - raise DbSyncError(f"The 'modified' schema exists but the base schema is missing: {conn_cfg.base}. " - "This may be a result of a previously failed attempt to initialize DB sync. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + f"The 'modified' schema exists but the base schema is missing: {conn_cfg.base}. " + "This may be a result of a previously failed attempt to initialize DB sync. " + f"{FORCE_INIT_MESSAGE}" + ) elif base_schema_exists: - raise DbSyncError(f"The base schema exists but the modified schema is missing: {conn_cfg.modified}. " - "This may be a result of a previously failed attempt to initialize DB sync. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + f"The base schema exists but the modified schema is missing: {conn_cfg.modified}. " + "This may be a result of a previously failed attempt to initialize DB sync. " + f"{FORCE_INIT_MESSAGE}" + ) # initialize: we have an existing GeoPackage in our Mergin Maps project and we want to initialize database print("The base and modified schemas do not exist yet, going to initialize them ...") try: # COPY: gpkg -> modified - _geodiff_make_copy("sqlite", "", gpkg_full_path, - conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, ignored_tables) + _geodiff_make_copy( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + ignored_tables, + ) # COPY: modified -> base - _geodiff_make_copy(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, - conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, ignored_tables) + _geodiff_make_copy( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + ) # sanity check to verify that right after initialization we do not have any changes # between the 'base' schema and the geopackage in Mergin Maps project, to make sure that # copying data back and forth will keep data intact - changes_gpkg_base = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.base, ignored_tables, - summary_only=False) + changes_gpkg_base = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + summary_only=False, + ) # mark project version into db schema if len(changes_gpkg_base): - changes = json.dumps(changes_gpkg_base, indent=2) + changes = json.dumps( + changes_gpkg_base, + indent=2, + ) print(f"Changeset after internal copy (should be empty):\n {changes}") - raise DbSyncError('Initialization of db-sync failed due to a bug in geodiff.\n ' - 'Please report this problem to mergin-db-sync developers') + raise DbSyncError( + "Initialization of db-sync failed due to a bug in geodiff.\n " + "Please report this problem to mergin-db-sync developers" + ) except DbSyncError: - print(f"Cleaning up after a failed DB sync init - dropping schemas {conn_cfg.base} and {conn_cfg.modified}.") - _drop_schema(conn, conn_cfg.base) - _drop_schema(conn, conn_cfg.modified) + print( + f"Cleaning up after a failed DB sync init - dropping schemas {conn_cfg.base} and {conn_cfg.modified}." + ) + _drop_schema( + conn, + conn_cfg.base, + ) + _drop_schema( + conn, + conn_cfg.modified, + ) raise - _set_db_project_comment(conn, conn_cfg.base, conn_cfg.mergin_project, local_version) + _set_db_project_comment( + conn, + conn_cfg.base, + conn_cfg.mergin_project, + local_version, + ) else: if not modified_schema_exists: - raise DbSyncError(f"The 'modified' schema does not exist: {conn_cfg.modified}. " - "This schema is necessary if initialization should be done from database (parameter `init-from-db`).") + raise DbSyncError( + f"The 'modified' schema does not exist: {conn_cfg.modified}. " + "This schema is necessary if initialization should be done from database (parameter `init-from-db`)." + ) if os.path.exists(gpkg_full_path) and base_schema_exists: # make sure output gpkg is in sync with db or fail - summary_modified = _compare_datasets(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, - "sqlite", "", gpkg_full_path, ignored_tables) - summary_base = _compare_datasets(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, - "sqlite", "", gpkg_full_path, ignored_tables) + summary_modified = _compare_datasets( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + "sqlite", + "", + gpkg_full_path, + ignored_tables, + ) + summary_base = _compare_datasets( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + "sqlite", + "", + gpkg_full_path, + ignored_tables, + ) if len(summary_base): - print(f"Local project version at {_get_project_version(work_dir)} and base schema at {db_proj_info['version']}") - _print_changes_summary(summary_base, "Base schema changes:") - raise DbSyncError("The output GPKG file exists already but is not synchronized with db 'base' schema." - f"{FORCE_INIT_MESSAGE}") + print( + f"Local project version at {_get_project_version(work_dir)} and base schema at {db_proj_info['version']}" + ) + _print_changes_summary( + summary_base, + "Base schema changes:", + ) + raise DbSyncError( + "The output GPKG file exists already but is not synchronized with db 'base' schema." + f"{FORCE_INIT_MESSAGE}" + ) elif len(summary_modified): - print("The output GPKG file exists already but it is not synchronised with modified schema, " - "please run pull/push commands to fix it") - _print_changes_summary(summary_modified, "Pending Changes:") + print( + "The output GPKG file exists already but it is not synchronised with modified schema, " + "please run pull/push commands to fix it" + ) + _print_changes_summary( + summary_modified, + "Pending Changes:", + ) return else: print("The GPKG file, base and modified schemas are already initialized and in sync") return # nothing to do elif os.path.exists(gpkg_full_path): - raise DbSyncError(f"The output GPKG exists but the base schema is missing: {conn_cfg.base}. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + f"The output GPKG exists but the base schema is missing: {conn_cfg.base}. " f"{FORCE_INIT_MESSAGE}" + ) elif base_schema_exists: - raise DbSyncError(f"The base schema exists but the output GPKG exists is missing: {gpkg_full_path}. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + f"The base schema exists but the output GPKG exists is missing: {gpkg_full_path}. " + f"{FORCE_INIT_MESSAGE}" + ) # initialize: we have an existing schema in database with tables and we want to initialize geopackage # within our Mergin Maps project print("The base schema and the output GPKG do not exist yet, going to initialize them ...") try: # COPY: modified -> base - _geodiff_make_copy(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, - conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, ignored_tables) + _geodiff_make_copy( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + ) # COPY: modified -> gpkg - _geodiff_make_copy(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, - "sqlite", "", gpkg_full_path, ignored_tables) + _geodiff_make_copy( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + "sqlite", + "", + gpkg_full_path, + ignored_tables, + ) # sanity check to verify that right after initialization we do not have any changes # between the 'base' schema and the geopackage in Mergin Maps project, to make sure that # copying data back and forth will keep data intact - changes_gpkg_base = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.base, ignored_tables, summary_only=False) + changes_gpkg_base = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + summary_only=False, + ) if len(changes_gpkg_base): - changes = json.dumps(changes_gpkg_base, indent=2) + changes = json.dumps( + changes_gpkg_base, + indent=2, + ) print(f"Changeset after internal copy (should be empty):\n {changes}") - raise DbSyncError('Initialization of db-sync failed due to a bug in geodiff.\n ' - 'Please report this problem to mergin-db-sync developers') + raise DbSyncError( + "Initialization of db-sync failed due to a bug in geodiff.\n " + "Please report this problem to mergin-db-sync developers" + ) except DbSyncError: print(f"Cleaning up after a failed DB sync init - dropping schema {conn_cfg.base}.") - _drop_schema(conn, conn_cfg.base) + _drop_schema( + conn, + conn_cfg.base, + ) raise # upload gpkg to Mergin Maps (client takes care of storing metadata) @@ -821,13 +1523,22 @@ def init(conn_cfg, mc, from_gpkg=True): # mark project version into db schema version = _get_project_version(work_dir) - _set_db_project_comment(conn, conn_cfg.base, conn_cfg.mergin_project, version) + _set_db_project_comment( + conn, + conn_cfg.base, + conn_cfg.mergin_project, + version, + ) def dbsync_init(mc): from_gpkg = config.init_from.lower() == "gpkg" for conn in config.connections: - init(conn, mc, from_gpkg=from_gpkg) + init( + conn, + mc, + from_gpkg=from_gpkg, + ) print("Init done!") @@ -846,7 +1557,9 @@ def dbsync_push(mc): print("Push done!") -def dbsync_status(mc): +def dbsync_status( + mc, +): for conn in config.connections: status(conn, mc) @@ -865,7 +1578,10 @@ def clean(conn_cfg, mc): try: # to remove sync file, download project to created directory, drop file and push changes back file = temp_folder / conn_cfg.sync_file - mc.download_project(conn_cfg.mergin_project, str(temp_folder)) + mc.download_project( + conn_cfg.mergin_project, + str(temp_folder), + ) if file.exists(): file.unlink() mc.push_project(str(temp_folder)) @@ -882,16 +1598,24 @@ def clean(conn_cfg, mc): raise DbSyncError("Unable to connect to the database: " + str(e)) try: - _drop_schema(conn_db, conn_cfg.base) + _drop_schema( + conn_db, + conn_cfg.base, + ) if not from_db: - _drop_schema(conn_db, conn_cfg.modified) + _drop_schema( + conn_db, + conn_cfg.modified, + ) except psycopg2.Error as e: raise DbSyncError("Unable to drop schema from database: " + str(e)) -def dbsync_clean(mc): +def dbsync_clean( + mc, +): for conn in config.connections: clean(conn, mc) diff --git a/dbsync_daemon.py b/dbsync_daemon.py index 8dd0b24..b3e9b79 100644 --- a/dbsync_daemon.py +++ b/dbsync_daemon.py @@ -1,4 +1,3 @@ - # keep running until killed by ctrl+c: # - sleep N seconds # - pull @@ -15,8 +14,89 @@ import typing import dbsync -from version import __version__ -from config import config, validate_config, ConfigError, update_config_path +from version import ( + __version__, +) +from config import ( + config, + validate_config, + ConfigError, + update_config_path, +) + + +def is_pyinstaller() -> bool: + if ( + getattr( + sys, + "frozen", + False, + ) + and platform.system() == "Windows" + ): + return True + return False + + +def pyinstaller_update_path() -> None: + path = pathlib.Path(__file__).parent / "lib" + os.environ["PATH"] += os.pathsep + path.as_posix() + + +def pyinstaller_path_fix() -> None: + if is_pyinstaller(): + pyinstaller_update_path() + + +LOGGER: logging.Logger = None + + +def setup_logger( + log_path, + log_verbosity: str, + with_time=True, + with_level=True, +) -> logging.Logger: + global LOGGER + LOGGER = logging.getLogger(f"{log_path}") + if log_verbosity == "messages": + LOGGER.setLevel(logging.DEBUG) + elif log_verbosity == "errors": + LOGGER.setLevel(logging.WARNING) + else: + LOGGER.setLevel(logging.WARNING) + if not LOGGER.handlers: + log_handler = logging.FileHandler( + log_path, + mode="a", + ) + format = "%(asctime)s -" if with_time else "" + format += "%(levelname)s - %(message)s" if with_level else "%(message)s" + log_handler.setFormatter(logging.Formatter(format)) + LOGGER.addHandler(log_handler) + + +def handle_error( + error: typing.Union[ + Exception, + str, + ] +): + if LOGGER: + LOGGER.error(str(error)) + print( + "Error: " + str(error), + file=sys.stderr, + ) + sys.exit(1) + + +def handle_message( + msg: str, +): + if LOGGER: + LOGGER.debug(msg) + print(msg) def is_pyinstaller() -> bool: @@ -72,22 +152,58 @@ def main(): pyinstaller_path_fix() - parser = argparse.ArgumentParser(prog='dbsync_deamon.py', - description='Synchronization tool between Mergin Maps project and database.', - epilog='www.merginmaps.com') - - parser.add_argument("config_file", nargs="?", default="config.yaml", help="Path to file with configuration. Default value is config.yaml in current working directory.") - parser.add_argument("--skip-init", action="store_true", help="Skip DB sync init step to make the tool start faster. It is not recommend to use it unless you are really sure you can skip the initial sanity checks.") - parser.add_argument("--single-run", action="store_true", help="Run just once performing single pull and push operation, instead of running in infinite loop.") - parser.add_argument("--force-init", action="store_true", help="Force removing working directory and schemas from DB to initialize from scratch.") - parser.add_argument("--log-file", default="", action="store", help="Store logging to file.") - parser.add_argument("--log-verbosity", choices=["errors", "messages"], default="errors", help="Log messages, not only errors.") + parser = argparse.ArgumentParser( + prog="dbsync_deamon.py", + description="Synchronization tool between Mergin Maps project and database.", + epilog="www.merginmaps.com", + ) + + parser.add_argument( + "config_file", + nargs="?", + default="config.yaml", + help="Path to file with configuration. Default value is config.yaml in current working directory.", + ) + parser.add_argument( + "--skip-init", + action="store_true", + help="Skip DB sync init step to make the tool start faster. It is not recommend to use it unless you are really sure you can skip the initial sanity checks.", + ) + parser.add_argument( + "--single-run", + action="store_true", + help="Run just once performing single pull and push operation, instead of running in infinite loop.", + ) + parser.add_argument( + "--force-init", + action="store_true", + help="Force removing working directory and schemas from DB to initialize from scratch.", + ) + parser.add_argument( + "--log-file", + default="", + action="store", + help="Store logging to file.", + ) + parser.add_argument( + "--log-verbosity", + choices=[ + "errors", + "messages", + ], + default="errors", + help="Log messages, not only errors.", + ) args = parser.parse_args() if args.log_file: log_file = pathlib.Path(args.log_file) - setup_logger(log_file.as_posix(), args.log_verbosity) + + setup_logger( + log_file.as_posix(), + args.log_verbosity, + ) handle_message(f"== starting mergin-db-sync daemon == version {__version__} ==") @@ -113,7 +229,6 @@ def main(): dbsync.dbsync_clean(mc) if args.single_run: - if not args.skip_init: try: dbsync.dbsync_init(mc) @@ -131,7 +246,6 @@ def main(): handle_error(e) else: - if not args.skip_init: try: dbsync.dbsync_init(mc) @@ -139,7 +253,6 @@ def main(): handle_error(e) while True: - print(datetime.datetime.now()) try: @@ -150,7 +263,7 @@ def main(): dbsync.dbsync_push(mc) # check mergin client token expiration - delta = mc._auth_session['expire'] - datetime.datetime.now(datetime.timezone.utc) + delta = mc._auth_session["expire"] - datetime.datetime.now(datetime.timezone.utc) if delta.total_seconds() < 3600: mc = dbsync.create_mergin_client() @@ -161,5 +274,5 @@ def main(): time.sleep(sleep_time) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/listen_test.py b/listen_test.py index ddfdcc7..15707dc 100644 --- a/listen_test.py +++ b/listen_test.py @@ -2,7 +2,7 @@ import psycopg2 import psycopg2.extensions -DSN="" +DSN = "" conn = psycopg2.connect(DSN) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) @@ -15,11 +15,20 @@ import dbsync -sleep_time = dbsync.config['daemon']['sleep_time'] +sleep_time = dbsync.config["daemon"]["sleep_time"] while True: - if select.select([conn],[],[],5) == ([],[],[]): + if select.select( + [conn], + [], + [], + 5, + ) == ( + [], + [], + [], + ): print("Timeout") print("Trying to pull") @@ -29,7 +38,12 @@ conn.poll() while conn.notifies: notify = conn.notifies.pop(0) - print("Got NOTIFY:", notify.pid, notify.channel, notify.payload) + print( + "Got NOTIFY:", + notify.pid, + notify.channel, + notify.payload, + ) # new stuff in the database - let's push a new version @@ -43,7 +57,6 @@ dbsync.dbsync_push() - # TODO: create on init # CREATE RULE geodiff_rule_update_simple AS ON UPDATE TO gd_sync_base.simple DO ALSO NOTIFY geodiff; # CREATE RULE geodiff_rule_insert_simple AS ON INSERT TO gd_sync_base.simple DO ALSO NOTIFY geodiff; diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6f4dd3e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,17 @@ +[tool.black] +line-length = 120 +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.hg + | \.mypy_cache + | \.vscode + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index d531e16..d13863a 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -5,45 +5,69 @@ import psycopg2 import psycopg2.extensions -from psycopg2 import sql - -from mergin import MerginClient, ClientError - -from dbsync import dbsync_init -from config import config - -GEODIFF_EXE = os.environ.get('TEST_GEODIFF_EXE') -DB_CONNINFO = os.environ.get('TEST_DB_CONNINFO') -SERVER_URL = os.environ.get('TEST_MERGIN_URL') -API_USER = os.environ.get('TEST_API_USERNAME') -USER_PWD = os.environ.get('TEST_API_PASSWORD') -WORKSPACE = os.environ.get('TEST_API_WORKSPACE') +from psycopg2 import ( + sql, +) + +from mergin import ( + MerginClient, + ClientError, +) + +from dbsync import ( + dbsync_init, +) +from config import ( + config, +) + +GEODIFF_EXE = os.environ.get("TEST_GEODIFF_EXE") +DB_CONNINFO = os.environ.get("TEST_DB_CONNINFO") +SERVER_URL = os.environ.get("TEST_MERGIN_URL") +API_USER = os.environ.get("TEST_API_USERNAME") +USER_PWD = os.environ.get("TEST_API_PASSWORD") +WORKSPACE = os.environ.get("TEST_API_WORKSPACE") TMP_DIR = tempfile.gettempdir() -TEST_DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_data') - - -def _reset_config(project_name: str = "mergin"): - """ helper to reset config settings to ensure valid config """ - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' +TEST_DATA_DIR = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "test_data", +) + + +def _reset_config( + project_name: str = "mergin", +): + """helper to reset config settings to ensure valid config""" + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" full_project_name = WORKSPACE + "/" + project_name - config.update({ - 'MERGIN__USERNAME': API_USER, - 'MERGIN__PASSWORD': USER_PWD, - 'MERGIN__URL': SERVER_URL, - 'init_from': "gpkg", - 'CONNECTIONS': [{"driver": "postgres", - "conn_info": DB_CONNINFO, - "modified": db_schema_main, - "base": db_schema_base, - "mergin_project": full_project_name, - "sync_file": "test_sync.gpkg"}] - }) - - -def cleanup(mc, project, dirs): - """ cleanup leftovers from previous test if needed such as remote project and local directories """ + config.update( + { + "MERGIN__USERNAME": API_USER, + "MERGIN__PASSWORD": USER_PWD, + "MERGIN__URL": SERVER_URL, + "init_from": "gpkg", + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": DB_CONNINFO, + "modified": db_schema_main, + "base": db_schema_base, + "mergin_project": full_project_name, + "sync_file": "test_sync.gpkg", + } + ], + } + ) + + +def cleanup( + mc, + project, + dirs, +): + """cleanup leftovers from previous test if needed such as remote project and local directories""" try: print("Deleting project on Mergin Maps server: " + project) mc.delete_project(project) @@ -55,8 +79,12 @@ def cleanup(mc, project, dirs): shutil.rmtree(d) -def cleanup_db(conn, schema_base, schema_main): - """ Removes test schemas from previous tests """ +def cleanup_db( + conn, + schema_base, + schema_main, +): + """Removes test schemas from previous tests""" cur = conn.cursor() cur.execute(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(sql.Identifier(schema_base))) cur.execute(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(sql.Identifier(schema_main))) @@ -71,64 +99,109 @@ def init_sync_from_geopackage(mc, project_name, source_gpkg_path, ignored_tables - configure DB sync and let it do the init (make copies to the database) """ full_project_name = WORKSPACE + "/" + project_name - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory - sync_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync') # used by dbsync - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory + sync_project_dir = os.path.join( + TMP_DIR, + project_name + "_dbsync", + ) # used by dbsync + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" conn = psycopg2.connect(DB_CONNINFO) - cleanup(mc, full_project_name, [project_dir, sync_project_dir]) - cleanup_db(conn, db_schema_base, db_schema_main) + cleanup( + mc, + full_project_name, + [ + project_dir, + sync_project_dir, + ], + ) + cleanup_db( + conn, + db_schema_base, + db_schema_main, + ) # prepare a new Mergin Maps project - mc.create_project(project_name, namespace=WORKSPACE) - mc.download_project(full_project_name, project_dir) - shutil.copy(source_gpkg_path, os.path.join(project_dir, 'test_sync.gpkg')) + mc.create_project( + project_name, + namespace=WORKSPACE, + ) + mc.download_project( + full_project_name, + project_dir, + ) + shutil.copy( + source_gpkg_path, + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) for extra_filepath in extra_init_files: extra_filename = os.path.basename(extra_filepath) - target_extra_filepath = os.path.join(project_dir, extra_filename) - shutil.copy(extra_filepath, target_extra_filepath) + target_extra_filepath = os.path.join( + project_dir, + extra_filename, + ) + shutil.copy( + extra_filepath, + target_extra_filepath, + ) mc.push_project(project_dir) # prepare dbsync config # patch config to fit testing purposes if ignored_tables: - connection = {"driver": "postgres", - "conn_info": DB_CONNINFO, - "modified": db_schema_main, - "base": db_schema_base, - "mergin_project": full_project_name, - "sync_file": "test_sync.gpkg", - "skip_tables": ignored_tables} + connection = { + "driver": "postgres", + "conn_info": DB_CONNINFO, + "modified": db_schema_main, + "base": db_schema_base, + "mergin_project": full_project_name, + "sync_file": "test_sync.gpkg", + "skip_tables": ignored_tables, + } else: - connection = {"driver": "postgres", - "conn_info": DB_CONNINFO, - "modified": db_schema_main, - "base": db_schema_base, - "mergin_project": full_project_name, - "sync_file": "test_sync.gpkg"} - - config.update({ - 'GEODIFF_EXE': GEODIFF_EXE, - 'WORKING_DIR': sync_project_dir, - 'MERGIN__USERNAME': API_USER, - 'MERGIN__PASSWORD': USER_PWD, - 'MERGIN__URL': SERVER_URL, - 'CONNECTIONS': [connection], - 'init_from': "gpkg" - }) + connection = { + "driver": "postgres", + "conn_info": DB_CONNINFO, + "modified": db_schema_main, + "base": db_schema_base, + "mergin_project": full_project_name, + "sync_file": "test_sync.gpkg", + } + + config.update( + { + "GEODIFF_EXE": GEODIFF_EXE, + "WORKING_DIR": sync_project_dir, + "MERGIN__USERNAME": API_USER, + "MERGIN__PASSWORD": USER_PWD, + "MERGIN__URL": SERVER_URL, + "CONNECTIONS": [connection], + "init_from": "gpkg", + } + ) dbsync_init(mc) -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def mc(): assert SERVER_URL and API_USER and USER_PWD - #assert SERVER_URL and SERVER_URL.rstrip('/') != 'https://app.merginmaps.com/' and API_USER and USER_PWD - return MerginClient(SERVER_URL, login=API_USER, password=USER_PWD) + # assert SERVER_URL and SERVER_URL.rstrip('/') != 'https://app.merginmaps.com/' and API_USER and USER_PWD + return MerginClient( + SERVER_URL, + login=API_USER, + password=USER_PWD, + ) -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def db_connection() -> psycopg2.extensions.connection: return psycopg2.connect(DB_CONNINFO) diff --git a/test/test_basic.py b/test/test_basic.py index 11cc874..0af19a8 100644 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -8,27 +8,65 @@ import pathlib import psycopg2 -from psycopg2 import sql - -from mergin import MerginClient - -from dbsync import dbsync_init, dbsync_pull, dbsync_push, dbsync_status, config, DbSyncError, _geodiff_make_copy, \ - _get_db_project_comment, _get_mergin_project, _get_project_id, _validate_local_project_id, config, _add_quotes_to_schema_name, \ - dbsync_clean, _check_schema_exists - -from .conftest import (WORKSPACE, TMP_DIR, DB_CONNINFO, GEODIFF_EXE, API_USER, USER_PWD, SERVER_URL, - TEST_DATA_DIR, init_sync_from_geopackage) - - - -def test_init_from_gpkg(mc: MerginClient): +from psycopg2 import ( + sql, +) + +from mergin import ( + MerginClient, +) + +from dbsync import ( + dbsync_init, + dbsync_pull, + dbsync_push, + dbsync_status, + config, + DbSyncError, + _geodiff_make_copy, + _get_db_project_comment, + _get_mergin_project, + _get_project_id, + _validate_local_project_id, + config, + _add_quotes_to_schema_name, + dbsync_clean, + _check_schema_exists, +) + +from .conftest import ( + WORKSPACE, + TMP_DIR, + DB_CONNINFO, + GEODIFF_EXE, + API_USER, + USER_PWD, + SERVER_URL, + TEST_DATA_DIR, + init_sync_from_geopackage, +) + + +def test_init_from_gpkg( + mc: MerginClient, +): project_name = "test_init" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # test that database schemas are created + tables are populated conn = psycopg2.connect(DB_CONNINFO) @@ -39,47 +77,86 @@ def test_init_from_gpkg(mc: MerginClient): dbsync_init(mc) cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 3 - db_proj_info = _get_db_project_comment(conn, db_schema_base) + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) assert db_proj_info["name"] == config.connections[0].mergin_project - assert db_proj_info["version"] == 'v1' + assert db_proj_info["version"] == "v1" # make change in GPKG and push to server to create pending changes, it should pass but not sync - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # remove local copy of project (to mimic loss at docker restart) shutil.rmtree(config.working_dir) dbsync_init(mc) cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 3 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v1' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v1" # let's remove local working dir and download different version from server to mimic versions mismatch shutil.rmtree(config.working_dir) - mc.download_project(config.connections[0].mergin_project, config.working_dir, 'v2') + mc.download_project( + config.connections[0].mergin_project, + config.working_dir, + "v2", + ) # run init again, it should handle local working dir properly (e.g. download correct version) and pass but not sync dbsync_init(mc) - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v1' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v1" # pull server changes to db to make sure we can sync again dbsync_pull(mc) cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 4 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v2" # update some feature from 'modified' db to create mismatch with src geopackage, it should pass but not sync fid = 1 - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) old_value = cur.fetchone()[3] - cur.execute(sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) conn.commit() - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) assert cur.fetchone()[3] == 100 dbsync_init(mc) # check geopackage has not been modified - after init we are not synced! - gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_conn = sqlite3.connect( + os.path.join( + project_dir, + "test_sync.gpkg", + ) + ) gpkg_cur = gpkg_conn.cursor() gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}") assert gpkg_cur.fetchone()[3] == old_value @@ -88,55 +165,109 @@ def test_init_from_gpkg(mc: MerginClient): mc.pull_project(project_dir) gpkg_cur.execute(f"SELECT * FROM simple WHERE fid ={fid}") assert gpkg_cur.fetchone()[3] == 100 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v3' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v3" # update some feature from 'base' db to create mismatch with src geopackage and modified cur.execute(sql.SQL("SELECT * from {}.simple").format(sql.Identifier(db_schema_base))) fid = cur.fetchone()[0] old_value = cur.fetchone()[3] - cur.execute(sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_base)), (fid,)) + cur.execute( + sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_base)), + (fid,), + ) conn.commit() - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_base)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_base)), + (fid,), + ) assert cur.fetchone()[3] == 100 with pytest.raises(DbSyncError) as err: dbsync_init(mc) assert "The db schemas already exist but 'base' schema is not synchronized with source GPKG" in str(err.value) # make local changes to src file to introduce local changes - shutil.copy(os.path.join(TEST_DATA_DIR, 'base.gpkg'), os.path.join(config.working_dir, project_name, config.connections[0].sync_file)) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ), + os.path.join( + config.working_dir, + project_name, + config.connections[0].sync_file, + ), + ) with pytest.raises(DbSyncError) as err: dbsync_init(mc) assert "There are pending changes in the local directory - that should never happen" in str(err.value) -def test_init_from_gpkg_with_incomplete_dir(mc: MerginClient): +def test_init_from_gpkg_with_incomplete_dir( + mc: MerginClient, +): project_name = "test_init_incomplete_dir" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - init_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync', project_name) - init_sync_from_geopackage(mc, project_name, source_gpkg_path) - assert set(os.listdir(init_project_dir)) == set(['test_sync.gpkg', '.mergin']) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + init_project_dir = os.path.join( + TMP_DIR, + project_name + "_dbsync", + project_name, + ) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) + assert set(os.listdir(init_project_dir)) == set( + [ + "test_sync.gpkg", + ".mergin", + ] + ) shutil.rmtree(init_project_dir) # Remove dir with content os.makedirs(init_project_dir) # Recreate empty project working dir assert os.listdir(init_project_dir) == [] dbsync_init(mc) - assert set(os.listdir(init_project_dir)) == set(['test_sync.gpkg', '.mergin']) + assert set(os.listdir(init_project_dir)) == set( + [ + "test_sync.gpkg", + ".mergin", + ] + ) -def test_basic_pull(mc: MerginClient): +def test_basic_pull( + mc: MerginClient, +): """ Test initialization and one pull from Mergin Maps to DB 1. create a Mergin Maps project using py-client with a testing gpkg 2. run init, check that everything is fine 3. make change in gpkg (copy new version), check everything is fine """ - project_name = 'test_sync_pull' + project_name = "test_sync_pull" db_schema_main = project_name + "_main" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory - - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory + + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) @@ -146,7 +277,16 @@ def test_basic_pull(mc: MerginClient): assert cur.fetchone()[0] == 3 # make change in GPKG and push - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # pull the change from Mergin Maps to DB @@ -156,22 +296,37 @@ def test_basic_pull(mc: MerginClient): cur = conn.cursor() cur.execute(sql.SQL("SELECT count(*) from {}.simple").format((sql.Identifier(db_schema_main)))) assert cur.fetchone()[0] == 4 - db_proj_info = _get_db_project_comment(conn, project_name + "_base") - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + project_name + "_base", + ) + assert db_proj_info["version"] == "v2" print("---") dbsync_status(mc) -def test_basic_push(mc: MerginClient): - """ Initialize a project and test push of a new row from PostgreSQL to Mergin Maps""" +def test_basic_push( + mc: MerginClient, +): + """Initialize a project and test push of a new row from PostgreSQL to Mergin Maps""" project_name = "test_sync_push" db_schema_main = project_name + "_main" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory - - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory + + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) @@ -182,21 +337,33 @@ def test_basic_push(mc: MerginClient): # make a change in PostgreSQL cur = conn.cursor() - cur.execute(sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format( + sql.Identifier(db_schema_main) + ) + ) cur.execute("COMMIT") cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 4 # push the change from DB to PostgreSQL dbsync_push(mc) - db_proj_info = _get_db_project_comment(conn, project_name + "_base") - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + project_name + "_base", + ) + assert db_proj_info["version"] == "v2" # pull new version of the project to the work project directory mc.pull_project(project_dir) # check that the insert has been applied to our GeoPackage - gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_conn = sqlite3.connect( + os.path.join( + project_dir, + "test_sync.gpkg", + ) + ) gpkg_cur = gpkg_conn.cursor() gpkg_cur.execute("SELECT count(*) FROM simple") assert gpkg_cur.fetchone()[0] == 4 @@ -205,8 +372,10 @@ def test_basic_push(mc: MerginClient): dbsync_status(mc) -def test_basic_both(mc: MerginClient): - """ Initializes a sync project and does both a change in Mergin Maps and in the database, +def test_basic_both( + mc: MerginClient, +): + """Initializes a sync project and does both a change in Mergin Maps and in the database, and lets DB sync handle it: changes in PostgreSQL need to be rebased on top of changes in Mergin Maps server. """ @@ -214,10 +383,20 @@ def test_basic_both(mc: MerginClient): db_schema_main = project_name + "_main" db_schema_base = project_name + "_base" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory - - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory + + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) @@ -227,29 +406,53 @@ def test_basic_both(mc: MerginClient): assert cur.fetchone()[0] == 3 # make change in GPKG and push - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # make a change in PostgreSQL cur = conn.cursor() - cur.execute(sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format( + sql.Identifier(db_schema_main) + ) + ) cur.execute("COMMIT") cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 4 # first pull changes from Mergin Maps to DB (+rebase changes in DB) and then push the changes from DB to Mergin Maps dbsync_pull(mc) - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v2" dbsync_push(mc) - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v3' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v3" # pull new version of the project to the work project directory mc.pull_project(project_dir) # check that the insert has been applied to our GeoPackage - gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_conn = sqlite3.connect( + os.path.join( + project_dir, + "test_sync.gpkg", + ) + ) gpkg_cur = gpkg_conn.cursor() gpkg_cur.execute("SELECT count(*) FROM simple") assert gpkg_cur.fetchone()[0] == 5 @@ -263,63 +466,167 @@ def test_basic_both(mc: MerginClient): dbsync_status(mc) -def test_init_with_skip(mc: MerginClient): +def test_init_with_skip( + mc: MerginClient, +): project_name = "test_init_skip" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base_2tables.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base_2tables.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" - init_sync_from_geopackage(mc, project_name, source_gpkg_path, ["lines"]) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ["lines"], + ) # test that database schemas does not have ignored table conn = psycopg2.connect(DB_CONNINFO) cur = conn.cursor() - cur.execute(sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format( + sql.Identifier(db_schema_main) + ) + ) assert cur.fetchone()[0] == False cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 0 # run again, nothing should change dbsync_init(mc) - cur.execute(sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format( + sql.Identifier(db_schema_main) + ) + ) assert cur.fetchone()[0] == False cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 0 # make change in GPKG and push to server to create pending changes, it should pass but not sync - shutil.copy(os.path.join(TEST_DATA_DIR, 'modified_all.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "modified_all.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # pull server changes to db to make sure only points table is updated dbsync_pull(mc) - cur.execute(sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format( + sql.Identifier(db_schema_main) + ) + ) assert cur.fetchone()[0] == False cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 4 -def test_with_local_changes(mc: MerginClient): +def test_with_local_changes( + mc: MerginClient, +): project_name = "test_local_changes" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - extra_files = [os.path.join(TEST_DATA_DIR, f) for f in ["note_1.txt", "note_3.txt", "modified_all.gpkg"]] - dbsync_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync', - project_name) # project location within dbsync working dir - - init_sync_from_geopackage(mc, project_name, source_gpkg_path, [], *extra_files) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + extra_files = [ + os.path.join( + TEST_DATA_DIR, + f, + ) + for f in [ + "note_1.txt", + "note_3.txt", + "modified_all.gpkg", + ] + ] + dbsync_project_dir = os.path.join( + TMP_DIR, + project_name + "_dbsync", + project_name, + ) # project location within dbsync working dir + + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + [], + *extra_files, + ) # update GPKG - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(dbsync_project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + dbsync_project_dir, + "test_sync.gpkg", + ), + ) # update non-GPGK file - shutil.copy(os.path.join(TEST_DATA_DIR, 'note_2.txt'), os.path.join(dbsync_project_dir, 'note_1.txt')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "note_2.txt", + ), + os.path.join( + dbsync_project_dir, + "note_1.txt", + ), + ) # add GPKG file - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(dbsync_project_dir, 'inserted_1_A.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + dbsync_project_dir, + "inserted_1_A.gpkg", + ), + ) # add non-GPGK file - shutil.copy(os.path.join(TEST_DATA_DIR, 'note_2.txt'), os.path.join(dbsync_project_dir, 'note_2.txt')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "note_2.txt", + ), + os.path.join( + dbsync_project_dir, + "note_2.txt", + ), + ) # remove GPKG file - os.remove(os.path.join(dbsync_project_dir, 'modified_all.gpkg')) + os.remove( + os.path.join( + dbsync_project_dir, + "modified_all.gpkg", + ) + ) # remove non-GPKG file - os.remove(os.path.join(dbsync_project_dir, 'note_3.txt')) + os.remove( + os.path.join( + dbsync_project_dir, + "note_3.txt", + ) + ) # Check local changes in the sync project dir mp = _get_mergin_project(dbsync_project_dir) local_changes = mp.get_push_changes() @@ -331,16 +638,31 @@ def test_with_local_changes(mc: MerginClient): dbsync_status(mc) -def test_recreated_project_ids(mc: MerginClient): +def test_recreated_project_ids( + mc: MerginClient, +): project_name = "test_recreated_project_ids" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory full_project_name = WORKSPACE + "/" + project_name - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # delete remote project mc.delete_project(full_project_name) # recreate project with the same name - mc.create_project(project_name, namespace=WORKSPACE) + mc.create_project( + project_name, + namespace=WORKSPACE, + ) # comparing project IDs after recreating it with the same name mp = _get_mergin_project(project_dir) local_project_id = _get_project_id(mp) @@ -353,14 +675,35 @@ def test_recreated_project_ids(mc: MerginClient): dbsync_status(mc) -@pytest.mark.parametrize("project_name", ['test_init_1', 'Test_Init_2', "Test 3", "Test-4"]) -def test_project_names(mc: MerginClient, project_name: str): - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' +@pytest.mark.parametrize( + "project_name", + [ + "test_init_1", + "Test_Init_2", + "Test 3", + "Test-4", + ], +) +def test_project_names( + mc: MerginClient, + project_name: str, +): + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # test that database schemas are created + tables are populated conn = psycopg2.connect(DB_CONNINFO) @@ -369,27 +712,53 @@ def test_project_names(mc: MerginClient, project_name: str): assert cur.fetchone()[0] == 3 # make change in GPKG and push - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # pull server changes to db to make sure we can sync again dbsync_pull(mc) cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 4 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v2" # update some feature from 'modified' db to create mismatch with src geopackage, it should pass but not sync fid = 1 - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) old_value = cur.fetchone()[3] - cur.execute(sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) conn.commit() - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) assert cur.fetchone()[3] == 100 dbsync_init(mc) # check geopackage has not been modified - after init we are not synced! - gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_conn = sqlite3.connect( + os.path.join( + project_dir, + "test_sync.gpkg", + ) + ) gpkg_cur = gpkg_conn.cursor() gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}") assert gpkg_cur.fetchone()[3] == old_value @@ -398,17 +767,29 @@ def test_project_names(mc: MerginClient, project_name: str): mc.pull_project(project_dir) gpkg_cur.execute(f"SELECT * FROM simple WHERE fid ={fid}") assert gpkg_cur.fetchone()[3] == 100 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v3' - - -def test_init_from_gpkg_missing_schema(mc: MerginClient): - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v3" + + +def test_init_from_gpkg_missing_schema( + mc: MerginClient, +): + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) project_name = "test_init_missing_schema" db_schema_base = project_name + "_base" db_schema_main = project_name + "_main" - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) cur = conn.cursor() @@ -426,7 +807,11 @@ def test_init_from_gpkg_missing_schema(mc: MerginClient): assert "The 'modified' schema exists but the base schema is missing" in str(err.value) assert "This may be a result of a previously failed attempt to initialize DB sync" in str(err.value) - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # drop main schema to mimic some mismatch cur.execute(sql.SQL("DROP SCHEMA {} CASCADE").format(sql.Identifier(db_schema_main))) @@ -442,12 +827,21 @@ def test_init_from_gpkg_missing_schema(mc: MerginClient): assert "This may be a result of a previously failed attempt to initialize DB sync" in str(err.value) -def test_init_from_gpkg_missing_comment(mc: MerginClient): +def test_init_from_gpkg_missing_comment( + mc: MerginClient, +): project_name = "test_init_missing_comment" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) schema_name = project_name + "_base" - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) cur = conn.cursor() @@ -461,7 +855,10 @@ def test_init_from_gpkg_missing_comment(mc: MerginClient): # drop base schema to mimic some mismatch query = sql.SQL("COMMENT ON SCHEMA {} IS %s").format(sql.Identifier(schema_name)) - cur.execute(query.as_string(conn), ("",)) + cur.execute( + query.as_string(conn), + ("",), + ) conn.commit() with pytest.raises(DbSyncError) as err: @@ -473,13 +870,21 @@ def test_init_from_gpkg_missing_comment(mc: MerginClient): cur.fetchone() is None -def test_dbsync_clean_from_gpkg(mc: MerginClient): +def test_dbsync_clean_from_gpkg( + mc: MerginClient, +): project_name = "test_clean" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) db_schema_base = project_name + "_base" db_schema_main = project_name + "_main" full_project_name = WORKSPACE + "/" + project_name - sync_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync') + sync_project_dir = os.path.join( + TMP_DIR, + project_name + "_dbsync", + ) connection = { "driver": "postgres", @@ -487,38 +892,69 @@ def test_dbsync_clean_from_gpkg(mc: MerginClient): "modified": db_schema_main, "base": db_schema_base, "mergin_project": full_project_name, - "sync_file": "test_sync.gpkg"} - - config.update({ - 'GEODIFF_EXE': GEODIFF_EXE, - 'WORKING_DIR': sync_project_dir, - 'MERGIN__USERNAME': API_USER, - 'MERGIN__PASSWORD': USER_PWD, - 'MERGIN__URL': SERVER_URL, - 'CONNECTIONS': [connection], - 'init_from': "gpkg" - }) + "sync_file": "test_sync.gpkg", + } + + config.update( + { + "GEODIFF_EXE": GEODIFF_EXE, + "WORKING_DIR": sync_project_dir, + "MERGIN__USERNAME": API_USER, + "MERGIN__PASSWORD": USER_PWD, + "MERGIN__URL": SERVER_URL, + "CONNECTIONS": [connection], + "init_from": "gpkg", + } + ) conn = psycopg2.connect(DB_CONNINFO) # we can run clean even before init dbsync_clean(mc) - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # edit sync GPKG and push to server - con = sqlite3.connect(os.path.join(sync_project_dir, project_name, 'test_sync.gpkg')) + con = sqlite3.connect( + os.path.join( + sync_project_dir, + project_name, + "test_sync.gpkg", + ) + ) cur = con.cursor() - cur.execute("ALTER TABLE simple ADD COLUMN \"new_field\" TEXT;") + cur.execute('ALTER TABLE simple ADD COLUMN "new_field" TEXT;') cur.execute("CREATE TABLE new_table (id INTEGER PRIMARY KEY, number INTEGER DEFAULT 0);") cur.execute("INSERT INTO new_table (number) VALUES (99);") con.commit() con.close() - mc.push_project(os.path.join(sync_project_dir, project_name)) + mc.push_project( + os.path.join( + sync_project_dir, + project_name, + ) + ) # replace it locally back with previous version - so there is mismatch, on server there is a column, that does not exist locally - os.remove(os.path.join(sync_project_dir, project_name, 'test_sync.gpkg')) - shutil.copy(source_gpkg_path, os.path.join(sync_project_dir, project_name, 'test_sync.gpkg')) + os.remove( + os.path.join( + sync_project_dir, + project_name, + "test_sync.gpkg", + ) + ) + shutil.copy( + source_gpkg_path, + os.path.join( + sync_project_dir, + project_name, + "test_sync.gpkg", + ), + ) # try init then pull and push, causing geodiff failed error with pytest.raises(DbSyncError) as err: @@ -528,22 +964,44 @@ def test_dbsync_clean_from_gpkg(mc: MerginClient): assert "geodiff failed" in str(err.value) # prior to dbsync_clean everything exists - assert _check_schema_exists(conn, db_schema_base) - assert _check_schema_exists(conn, db_schema_main) + assert _check_schema_exists( + conn, + db_schema_base, + ) + assert _check_schema_exists( + conn, + db_schema_main, + ) assert pathlib.Path(config.working_dir).exists() dbsync_clean(mc) # after the dbsync_clean nothing exists assert pathlib.Path(config.working_dir).exists() is False - assert _check_schema_exists(conn, db_schema_base) is False - assert _check_schema_exists(conn, db_schema_main) is False + assert ( + _check_schema_exists( + conn, + db_schema_base, + ) + is False + ) + assert ( + _check_schema_exists( + conn, + db_schema_main, + ) + is False + ) # make sure that running the clean second time does not cause issue dbsync_clean(mc) # after clean we can init - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # test that after clean everything works dbsync_init(mc) diff --git a/test/test_config.py b/test/test_config.py index 732326a..c8badfc 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -7,9 +7,16 @@ """ import pytest -from config import config, ConfigError, validate_config, get_ignored_tables +from config import ( + config, + ConfigError, + validate_config, + get_ignored_tables, +) -from .conftest import _reset_config +from .conftest import ( + _reset_config, +) def test_config(): @@ -17,73 +24,235 @@ def test_config(): _reset_config() validate_config(config) - with pytest.raises(ConfigError, match="Config error: Incorrect mergin settings"): - config.update({'MERGIN__USERNAME': None}) + with pytest.raises( + ConfigError, + match="Config error: Incorrect mergin settings", + ): + config.update({"MERGIN__USERNAME": None}) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Missing parameter `init_from` in the configuration"): - config.unset('init_from', force=True) + with pytest.raises( + ConfigError, + match="Config error: Missing parameter `init_from` in the configuration", + ): + config.unset( + "init_from", + force=True, + ) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: `init_from` parameter must be either `gpkg` or `db`"): - config.update({'init_from': "anywhere"}) + with pytest.raises( + ConfigError, + match="Config error: `init_from` parameter must be either `gpkg` or `db`", + ): + config.update({"init_from": "anywhere"}) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Connections list can not be empty"): - config.update({'CONNECTIONS': []}) + with pytest.raises( + ConfigError, + match="Config error: Connections list can not be empty", + ): + config.update({"CONNECTIONS": []}) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Incorrect connection settings"): - config.update({'CONNECTIONS': [{"modified": "mergin_main"}]}) + with pytest.raises( + ConfigError, + match="Config error: Incorrect connection settings", + ): + config.update({"CONNECTIONS": [{"modified": "mergin_main"}]}) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Only 'postgres' driver is currently supported."): - config.update({'CONNECTIONS': [{"driver": "oracle", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg"}]}) + with pytest.raises( + ConfigError, + match="Config error: Only 'postgres' driver is currently supported.", + ): + config.update( + { + "CONNECTIONS": [ + { + "driver": "oracle", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + } + ] + } + ) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Name of the Mergin Maps project should be provided in the namespace/name format."): - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "dbsync", "sync_file": "sync.gpkg"}]}) + with pytest.raises( + ConfigError, + match="Config error: Name of the Mergin Maps project should be provided in the namespace/name format.", + ): + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "dbsync", + "sync_file": "sync.gpkg", + } + ] + } + ) validate_config(config) def test_skip_tables(): _reset_config() - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": None}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": None, + } + ] + } + ) validate_config(config) - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": []}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": [], + } + ] + } + ) validate_config(config) - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": "table"}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": "table", + } + ] + } + ) validate_config(config) - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": ["table"]}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": ["table"], + } + ] + } + ) def test_get_ignored_tables(): _reset_config() - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": None}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": None, + } + ] + } + ) ignored_tables = get_ignored_tables(config.connections[0]) assert ignored_tables == [] - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": []}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": [], + } + ] + } + ) ignored_tables = get_ignored_tables(config.connections[0]) assert ignored_tables == [] - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": "table"}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": "table", + } + ] + } + ) validate_config(config) ignored_tables = get_ignored_tables(config.connections[0]) assert ignored_tables == ["table"] - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": ["table"]}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": ["table"], + } + ] + } + ) validate_config(config) ignored_tables = get_ignored_tables(config.connections[0]) assert ignored_tables == ["table"] diff --git a/test/test_db_functions.py b/test/test_db_functions.py index 1c15659..feff1dd 100644 --- a/test/test_db_functions.py +++ b/test/test_db_functions.py @@ -1,10 +1,15 @@ import psycopg2 import psycopg2.extensions -from dbsync import _check_postgis_available, _try_install_postgis +from dbsync import ( + _check_postgis_available, + _try_install_postgis, +) -def test_check_postgis_available(db_connection: psycopg2.extensions.connection): +def test_check_postgis_available( + db_connection: psycopg2.extensions.connection, +): cur = db_connection.cursor() assert _check_postgis_available(db_connection) @@ -14,7 +19,9 @@ def test_check_postgis_available(db_connection: psycopg2.extensions.connection): assert _check_postgis_available(db_connection) is False -def test_try_install_postgis(db_connection: psycopg2.extensions.connection): +def test_try_install_postgis( + db_connection: psycopg2.extensions.connection, +): cur = db_connection.cursor() cur.execute("DROP EXTENSION IF EXISTS postgis CASCADE;") diff --git a/test/test_dbsyncerror.py b/test/test_dbsyncerror.py index e759c6c..42b37cd 100644 --- a/test/test_dbsyncerror.py +++ b/test/test_dbsyncerror.py @@ -1,14 +1,22 @@ import pytest -from dbsync import DbSyncError +from dbsync import ( + DbSyncError, +) -@pytest.mark.parametrize("password", ['password=\"my_secret password 8417\\.\"', - 'password=\'my_secret password\'', - "password=my_secret_password84189./+-" - ]) -def test_DbSyncError_password_print(password: str): - host = "host=\"localhost\"" +@pytest.mark.parametrize( + "password", + [ + 'password="my_secret password 8417\\."', + "password='my_secret password'", + "password=my_secret_password84189./+-", + ], +) +def test_DbSyncError_password_print( + password: str, +): + host = 'host="localhost"' user = "user=user" conn_string = f"{user} {password} {host}" diff --git a/version.py b/version.py index 7b344ec..72f26f5 100644 --- a/version.py +++ b/version.py @@ -1 +1 @@ -__version__ = '1.1.2' +__version__ = "1.1.2"