From 32bef1385116b4a01cc4de5e87736e3670ce6ef1 Mon Sep 17 00:00:00 2001 From: Jan Caha Date: Mon, 8 May 2023 15:36:03 +0200 Subject: [PATCH 1/8] windows functions --- dbsync_daemon.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/dbsync_daemon.py b/dbsync_daemon.py index b502540..2f87e87 100644 --- a/dbsync_daemon.py +++ b/dbsync_daemon.py @@ -8,14 +8,35 @@ import sys import time import argparse +import platform +import os +import pathlib import dbsync from version import __version__ from config import config, validate_config, ConfigError, update_config_path +def is_pyinstaller() -> bool: + if getattr(sys, 'frozen', False) and platform.system() == "Windows": + return True + return False + + +def pyinstaller_update_path() -> None: + path = pathlib.Path(__file__).parent / "lib" + os.environ["PATH"] += os.pathsep + path.as_posix() + + +def pyinstaller_path_fix() -> None: + if is_pyinstaller(): + pyinstaller_update_path() + + def main(): + pyinstaller_path_fix() + parser = argparse.ArgumentParser(prog='dbsync_deamon.py', description='Synchronization tool between Mergin Maps project and database.', epilog='www.merginmaps.com') @@ -23,6 +44,7 @@ def main(): parser.add_argument("config_file", nargs="?", default="config.yaml", help="Path to file with configuration. Default value is config.yaml in current working directory.") parser.add_argument("--skip-init", action="store_true", help="Skip DB sync init step to make the tool start faster. It is not recommend to use it unless you are really sure you can skip the initial sanity checks.") parser.add_argument("--single-run", action="store_true", help="Run just once performing single pull and push operation, instead of running in infinite loop.") + parser.add_argument("--force-init", action="store_true", help="Force removing working directory and schemas from DB to initialize from scratch.") args = parser.parse_args() @@ -41,9 +63,16 @@ def main(): print("Error: " + str(e), file=sys.stderr) sys.exit(1) + if args.force_init and args.skip_init: + print("Cannot use `--force-init` with `--skip-init` Initialization is required. ", file=sys.stderr) + sys.exit(1) + print("Logging in to Mergin...") mc = dbsync.create_mergin_client() + if args.force_init: + dbsync.dbsync_clean(mc) + if args.single_run: if not args.skip_init: From 5e1967137b798be69e1dc5dd03e5c8196acbedbe Mon Sep 17 00:00:00 2001 From: Jan Caha Date: Mon, 8 May 2023 15:36:30 +0200 Subject: [PATCH 2/8] script to build the windows version --- scripts/build_exe.bat | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100755 scripts/build_exe.bat diff --git a/scripts/build_exe.bat b/scripts/build_exe.bat new file mode 100755 index 0000000..4c0082a --- /dev/null +++ b/scripts/build_exe.bat @@ -0,0 +1,13 @@ +pyinstaller ../dbsync_daemon.py ^ + -c ^ + --noconfirm ^ + --add-binary="./windows_binaries/geodiff.exe;lib" ^ + --add-binary="./windows_binaries/geodiff.dll;lib" ^ + --add-binary="./windows_binaries/libcrypto-3-x64.dll;lib" ^ + --add-binary="./windows_binaries/LIBPQ.dll;lib" ^ + --add-binary="./windows_binaries/libssl-3-x64.dll;lib" ^ + --add-binary="./windows_binaries/sqlite3.dll;lib" ^ + --hidden-import dynaconf ^ + --collect-all mergin ^ + --clean ^ + -F \ No newline at end of file From 4f6cc40be9e03b62cf3c437fda1a5638f1246637 Mon Sep 17 00:00:00 2001 From: Jan Caha Date: Mon, 8 May 2023 17:19:46 +0200 Subject: [PATCH 3/8] simple logging --- dbsync_daemon.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/dbsync_daemon.py b/dbsync_daemon.py index 2f87e87..56b161c 100644 --- a/dbsync_daemon.py +++ b/dbsync_daemon.py @@ -9,6 +9,8 @@ import time import argparse import platform +import logging +import datetime import os import pathlib @@ -33,6 +35,18 @@ def pyinstaller_path_fix() -> None: pyinstaller_update_path() +def get_logger(log_path, with_time=True, with_level=True) -> logging.Logger: + log = logging.getLogger(f"{log_path}") + log.setLevel(logging.DEBUG) + if not log.handlers: + log_handler = logging.FileHandler(log_path, mode="a") + format = "%(asctime)s -" if with_time else "" + format += "%(levelname)s - %(message)s" if with_level else "%(message)s" + log_handler.setFormatter(logging.Formatter(format)) + log.addHandler(log_handler) + return log + + def main(): pyinstaller_path_fix() @@ -45,11 +59,19 @@ def main(): parser.add_argument("--skip-init", action="store_true", help="Skip DB sync init step to make the tool start faster. It is not recommend to use it unless you are really sure you can skip the initial sanity checks.") parser.add_argument("--single-run", action="store_true", help="Run just once performing single pull and push operation, instead of running in infinite loop.") parser.add_argument("--force-init", action="store_true", help="Force removing working directory and schemas from DB to initialize from scratch.") + parser.add_argument("--log-file", default="", action="store", help="Store logging to file.") args = parser.parse_args() print(f"== starting mergin-db-sync daemon == version {__version__} ==") + log_file: pathlib.Path = None + logger: logging.Logger = None + + if args.log_file: + log_file = pathlib.Path(args.log_file) + logger = get_logger(log_file.as_posix()) + try: update_config_path(args.config_file) except IOError as e: @@ -118,6 +140,8 @@ def main(): mc = dbsync.create_mergin_client() except dbsync.DbSyncError as e: + if logger: + logger.error(str(e)) print("Error: " + str(e), file=sys.stderr) print("Going to sleep") From a4d5c0e94408616d0c924981541cfc0b3a5910c8 Mon Sep 17 00:00:00 2001 From: Jan Caha Date: Tue, 9 May 2023 13:49:32 +0200 Subject: [PATCH 4/8] add logging --- dbsync_daemon.py | 80 ++++++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/dbsync_daemon.py b/dbsync_daemon.py index 56b161c..19e419e 100644 --- a/dbsync_daemon.py +++ b/dbsync_daemon.py @@ -10,9 +10,9 @@ import argparse import platform import logging -import datetime import os import pathlib +import typing import dbsync from version import __version__ @@ -35,16 +35,35 @@ def pyinstaller_path_fix() -> None: pyinstaller_update_path() -def get_logger(log_path, with_time=True, with_level=True) -> logging.Logger: - log = logging.getLogger(f"{log_path}") - log.setLevel(logging.DEBUG) - if not log.handlers: +LOGGER: logging.Logger = None + + +def setup_logger(log_path, log_messages: bool, with_time=True, with_level=True) -> logging.Logger: + global LOGGER + LOGGER = logging.getLogger(f"{log_path}") + if log_messages: + LOGGER.setLevel(logging.DEBUG) + else: + LOGGER.setLevel(logging.WARNING) + if not LOGGER.handlers: log_handler = logging.FileHandler(log_path, mode="a") format = "%(asctime)s -" if with_time else "" format += "%(levelname)s - %(message)s" if with_level else "%(message)s" log_handler.setFormatter(logging.Formatter(format)) - log.addHandler(log_handler) - return log + LOGGER.addHandler(log_handler) + + +def handle_error(error: typing.Union[Exception, str]): + if LOGGER: + LOGGER.error(str(error)) + print("Error: " + str(error), file=sys.stderr) + sys.exit(1) + + +def handle_message(msg: str): + if LOGGER: + LOGGER.debug(msg) + print(msg) def main(): @@ -60,36 +79,33 @@ def main(): parser.add_argument("--single-run", action="store_true", help="Run just once performing single pull and push operation, instead of running in infinite loop.") parser.add_argument("--force-init", action="store_true", help="Force removing working directory and schemas from DB to initialize from scratch.") parser.add_argument("--log-file", default="", action="store", help="Store logging to file.") + parser.add_argument("--log-messages", action="store_true", help="Log messages, not only errors.") args = parser.parse_args() - print(f"== starting mergin-db-sync daemon == version {__version__} ==") - - log_file: pathlib.Path = None - logger: logging.Logger = None - if args.log_file: log_file = pathlib.Path(args.log_file) - logger = get_logger(log_file.as_posix()) + setup_logger(log_file.as_posix(), args.log_messages) + + handle_message(f"== starting mergin-db-sync daemon == version {__version__} ==") try: update_config_path(args.config_file) except IOError as e: - print("Error: " + str(e), file=sys.stderr) - sys.exit(1) + handle_error(e) sleep_time = config.as_int("daemon.sleep_time") try: validate_config(config) except ConfigError as e: - print("Error: " + str(e), file=sys.stderr) - sys.exit(1) + handle_error(e) if args.force_init and args.skip_init: - print("Cannot use `--force-init` with `--skip-init` Initialization is required. ", file=sys.stderr) - sys.exit(1) + msg = "Cannot use `--force-init` with `--skip-init` Initialization is required. " + handle_error(msg) + + handle_message("Logging in to Mergin...") - print("Logging in to Mergin...") mc = dbsync.create_mergin_client() if args.force_init: @@ -101,18 +117,17 @@ def main(): try: dbsync.dbsync_init(mc) except dbsync.DbSyncError as e: - print("Error: " + str(e), file=sys.stderr) - sys.exit(1) + handle_error(e) try: - print("Trying to pull") + handle_message("Trying to pull") dbsync.dbsync_pull(mc) - print("Trying to push") + handle_message("Trying to push") dbsync.dbsync_push(mc) + except dbsync.DbSyncError as e: - print("Error: " + str(e), file=sys.stderr) - sys.exit(1) + handle_error(e) else: @@ -120,18 +135,17 @@ def main(): try: dbsync.dbsync_init(mc) except dbsync.DbSyncError as e: - print("Error: " + str(e), file=sys.stderr) - sys.exit(1) + handle_error(e) while True: print(datetime.datetime.now()) try: - print("Trying to pull") + handle_message("Trying to pull") dbsync.dbsync_pull(mc) - print("Trying to push") + handle_message("Trying to push") dbsync.dbsync_push(mc) # check mergin client token expiration @@ -140,11 +154,9 @@ def main(): mc = dbsync.create_mergin_client() except dbsync.DbSyncError as e: - if logger: - logger.error(str(e)) - print("Error: " + str(e), file=sys.stderr) + handle_error(e) - print("Going to sleep") + handle_message("Going to sleep") time.sleep(sleep_time) From 7abfa0c0d1b2773cb9ba9ea3ddfdb1bce5d1cae2 Mon Sep 17 00:00:00 2001 From: Jan Caha Date: Tue, 9 May 2023 13:57:54 +0200 Subject: [PATCH 5/8] simplify --- dbsync_daemon.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbsync_daemon.py b/dbsync_daemon.py index 19e419e..391d211 100644 --- a/dbsync_daemon.py +++ b/dbsync_daemon.py @@ -101,8 +101,7 @@ def main(): handle_error(e) if args.force_init and args.skip_init: - msg = "Cannot use `--force-init` with `--skip-init` Initialization is required. " - handle_error(msg) + handle_error("Cannot use `--force-init` with `--skip-init` Initialization is required. ") handle_message("Logging in to Mergin...") From 36b2b41a5cb5004e42f3e6247f8295590a04e389 Mon Sep 17 00:00:00 2001 From: Jan Caha Date: Tue, 9 May 2023 16:46:29 +0200 Subject: [PATCH 6/8] fix verbosity --- dbsync_daemon.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dbsync_daemon.py b/dbsync_daemon.py index 391d211..8dd0b24 100644 --- a/dbsync_daemon.py +++ b/dbsync_daemon.py @@ -38,11 +38,13 @@ def pyinstaller_path_fix() -> None: LOGGER: logging.Logger = None -def setup_logger(log_path, log_messages: bool, with_time=True, with_level=True) -> logging.Logger: +def setup_logger(log_path, log_verbosity: str, with_time=True, with_level=True) -> logging.Logger: global LOGGER LOGGER = logging.getLogger(f"{log_path}") - if log_messages: + if log_verbosity == "messages": LOGGER.setLevel(logging.DEBUG) + elif log_verbosity == "errors": + LOGGER.setLevel(logging.WARNING) else: LOGGER.setLevel(logging.WARNING) if not LOGGER.handlers: @@ -79,13 +81,13 @@ def main(): parser.add_argument("--single-run", action="store_true", help="Run just once performing single pull and push operation, instead of running in infinite loop.") parser.add_argument("--force-init", action="store_true", help="Force removing working directory and schemas from DB to initialize from scratch.") parser.add_argument("--log-file", default="", action="store", help="Store logging to file.") - parser.add_argument("--log-messages", action="store_true", help="Log messages, not only errors.") + parser.add_argument("--log-verbosity", choices=["errors", "messages"], default="errors", help="Log messages, not only errors.") args = parser.parse_args() if args.log_file: log_file = pathlib.Path(args.log_file) - setup_logger(log_file.as_posix(), args.log_messages) + setup_logger(log_file.as_posix(), args.log_verbosity) handle_message(f"== starting mergin-db-sync daemon == version {__version__} ==") From d73301b85690c5f74ddaa71adddd59a96fda0492 Mon Sep 17 00:00:00 2001 From: Jan Caha Date: Wed, 10 May 2023 09:46:18 +0200 Subject: [PATCH 7/8] black settings, pre-commit, github action --- .github/workflows/tests_mergin_db_sync.yaml | 8 +++++++- .pre-commit-config.yaml | 5 +++++ pyproject.toml | 17 +++++++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 .pre-commit-config.yaml create mode 100644 pyproject.toml diff --git a/.github/workflows/tests_mergin_db_sync.yaml b/.github/workflows/tests_mergin_db_sync.yaml index 0f16271..6d6d5ee 100644 --- a/.github/workflows/tests_mergin_db_sync.yaml +++ b/.github/workflows/tests_mergin_db_sync.yaml @@ -57,4 +57,10 @@ jobs: - name: Run tests run: | - pytest test --cov=. --cov-report=term-missing:skip-covered -vv \ No newline at end of file + pytest test --cov=. --cov-report=term-missing:skip-covered -vv + + - name: Check files using the black formatter + uses: rickstaa/action-black@v1 + id: action_black + with: + black_args: "." diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a91856d --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,5 @@ +repos: +- repo: https://github.com/psf/black + rev: 23.3.0 + hooks: + - id: black \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6f4dd3e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,17 @@ +[tool.black] +line-length = 120 +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.hg + | \.mypy_cache + | \.vscode + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' \ No newline at end of file From 314327c24494bbb56d8423002f791cefec3fbba5 Mon Sep 17 00:00:00 2001 From: Jan Caha Date: Wed, 10 May 2023 10:09:25 +0200 Subject: [PATCH 8/8] apply black --- config.py | 86 ++- dbsync.py | 1196 +++++++++++++++++++++++++++++-------- dbsync_daemon.py | 114 +++- listen_test.py | 23 +- test/conftest.py | 225 ++++--- test/test_basic.py | 768 +++++++++++++++++++----- test/test_config.py | 217 ++++++- test/test_db_functions.py | 13 +- test/test_dbsyncerror.py | 22 +- version.py | 2 +- 10 files changed, 2114 insertions(+), 552 deletions(-) diff --git a/config.py b/config.py index d8bc035..1098b7a 100644 --- a/config.py +++ b/config.py @@ -6,7 +6,9 @@ License: MIT """ -from dynaconf import Dynaconf +from dynaconf import ( + Dynaconf, +) import platform import tempfile import pathlib @@ -16,7 +18,7 @@ envvar_prefix=False, settings_files=[], geodiff_exe="geodiff.exe" if platform.system() == "Windows" else "geodiff", - working_dir=(pathlib.Path(tempfile.gettempdir()) / "dbsync").as_posix() + working_dir=(pathlib.Path(tempfile.gettempdir()) / "dbsync").as_posix(), ) @@ -25,13 +27,22 @@ class ConfigError(Exception): def validate_config(config): - """ Validate config - make sure values are consistent """ + """Validate config - make sure values are consistent""" # validate that geodiff can be found, otherwise it does not make sense to run DB Sync try: - subprocess.run([config.geodiff_exe, "help"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + [ + config.geodiff_exe, + "help", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) except FileNotFoundError: - raise ConfigError("Config error: Geodiff executable not found. Is it installed and available in `PATH` environment variable?") + raise ConfigError( + "Config error: Geodiff executable not found. Is it installed and available in `PATH` environment variable?" + ) if not (config.mergin.url and config.mergin.username and config.mergin.password): raise ConfigError("Config error: Incorrect mergin settings") @@ -42,48 +53,85 @@ def validate_config(config): if "init_from" not in config: raise ConfigError("Config error: Missing parameter `init_from` in the configuration.") - if config.init_from not in ["gpkg", "db"]: - raise ConfigError(f"Config error: `init_from` parameter must be either `gpkg` or `db`. Current value is `{config.init_from}`.") + if config.init_from not in [ + "gpkg", + "db", + ]: + raise ConfigError( + f"Config error: `init_from` parameter must be either `gpkg` or `db`. Current value is `{config.init_from}`." + ) for conn in config.connections: - for attr in ["driver", "conn_info", "modified", "base", "mergin_project", "sync_file"]: - if not hasattr(conn, attr): - raise ConfigError(f"Config error: Incorrect connection settings. Required parameter `{attr}` is missing.") + for attr in [ + "driver", + "conn_info", + "modified", + "base", + "mergin_project", + "sync_file", + ]: + if not hasattr( + conn, + attr, + ): + raise ConfigError( + f"Config error: Incorrect connection settings. Required parameter `{attr}` is missing." + ) if conn.driver != "postgres": raise ConfigError("Config error: Only 'postgres' driver is currently supported.") if "/" not in conn.mergin_project: - raise ConfigError("Config error: Name of the Mergin Maps project should be provided in the namespace/name format.") + raise ConfigError( + "Config error: Name of the Mergin Maps project should be provided in the namespace/name format." + ) if "skip_tables" in conn: if conn.skip_tables is None: continue - elif isinstance(conn.skip_tables, str): + elif isinstance( + conn.skip_tables, + str, + ): continue - elif not isinstance(conn.skip_tables, list): + elif not isinstance( + conn.skip_tables, + list, + ): raise ConfigError("Config error: Ignored tables parameter should be a list") -def get_ignored_tables(connection): +def get_ignored_tables( + connection, +): if "skip_tables" in connection: if connection.skip_tables is None: return [] - elif isinstance(connection.skip_tables, str): + elif isinstance( + connection.skip_tables, + str, + ): return [connection.skip_tables] - elif isinstance(connection.skip_tables, list): + elif isinstance( + connection.skip_tables, + list, + ): return connection.skip_tables else: return [] -def update_config_path(path_param: str) -> None: +def update_config_path( + path_param: str, +) -> None: config_file_path = pathlib.Path(path_param) if config_file_path.exists(): print(f"Using config file: {path_param}") - user_file_config = Dynaconf(envvar_prefix=False, - settings_files=[config_file_path]) + user_file_config = Dynaconf( + envvar_prefix=False, + settings_files=[config_file_path], + ) config.update(user_file_config) else: raise IOError(f"Config file {config_file_path} does not exist.") diff --git a/dbsync.py b/dbsync.py index 45597fa..486f99a 100644 --- a/dbsync.py +++ b/dbsync.py @@ -21,16 +21,33 @@ import psycopg2 import psycopg2.extensions -from psycopg2 import sql -from itertools import chain - -from mergin import MerginClient, MerginProject, LoginError, ClientError, InvalidProject -from version import __version__ -from config import config, validate_config, get_ignored_tables, ConfigError +from psycopg2 import ( + sql, +) +from itertools import ( + chain, +) + +from mergin import ( + MerginClient, + MerginProject, + LoginError, + ClientError, + InvalidProject, +) +from version import ( + __version__, +) +from config import ( + config, + validate_config, + get_ignored_tables, + ConfigError, +) # set high logging level for geodiff (used by geodiff executable) # so we get as much information as possible -os.environ["GEODIFF_LOGGER_LEVEL"] = '4' # 0 = nothing, 1 = errors, 2 = warning, 3 = info, 4 = debug +os.environ["GEODIFF_LOGGER_LEVEL"] = "4" # 0 = nothing, 1 = errors, 2 = warning, 3 = info, 4 = debug FORCE_INIT_MESSAGE = "Running `dbsync_deamon.py` with `--force-init` should fix the issue." @@ -38,55 +55,96 @@ class DbSyncError(Exception): default_print_password = "password='*****'" - def __init__(self, message): + def __init__( + self, + message, + ): # escaped password - message = re.sub(r'password=[\"\'].+[\"\'](?=\s)', self.default_print_password, message) + message = re.sub( + r"password=[\"\'].+[\"\'](?=\s)", + self.default_print_password, + message, + ) # not escaped password - message = re.sub(r'password=\S+', self.default_print_password, message) + message = re.sub( + r"password=\S+", + self.default_print_password, + message, + ) super().__init__(message) -def _add_quotes_to_schema_name(schema: str) -> str: - matches = re.findall(r"[^a-z0-9_]", schema) +def _add_quotes_to_schema_name( + schema: str, +) -> str: + matches = re.findall( + r"[^a-z0-9_]", + schema, + ) if len(matches) != 0: - schema = schema.replace("\"", "\"\"") + schema = schema.replace( + '"', + '""', + ) schema = f'"{schema}"' return schema -def _tables_list_to_string(tables): +def _tables_list_to_string( + tables, +): return ";".join(tables) -def _check_has_working_dir(work_path): +def _check_has_working_dir( + work_path, +): if not os.path.exists(work_path): raise DbSyncError("The project working directory does not exist: " + work_path) - if not os.path.exists(os.path.join(work_path, '.mergin')): + if not os.path.exists( + os.path.join( + work_path, + ".mergin", + ) + ): raise DbSyncError("The project working directory does not seem to contain Mergin Maps project: " + work_path) -def _check_has_sync_file(file_path): - """ Checks whether the dbsync environment is initialized already (so that we can pull/push). - Emits an exception if not initialized yet. """ +def _check_has_sync_file( + file_path, +): + """Checks whether the dbsync environment is initialized already (so that we can pull/push). + Emits an exception if not initialized yet.""" if not os.path.exists(file_path): raise DbSyncError("The output GPKG file does not exist: " + file_path) -def _drop_schema(conn, schema_name: str) -> None: +def _drop_schema( + conn, + schema_name: str, +) -> None: cur = conn.cursor() cur.execute(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(sql.Identifier(schema_name))) conn.commit() -def _check_schema_exists(conn, schema_name): +def _check_schema_exists( + conn, + schema_name, +): cur = conn.cursor() - cur.execute("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = %s)", (schema_name,)) + cur.execute( + "SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = %s)", + (schema_name,), + ) return cur.fetchone()[0] -def _check_postgis_available(conn: psycopg2.extensions.connection) -> bool: +def _check_postgis_available( + conn: psycopg2.extensions.connection, +) -> bool: cur = conn.cursor() cur.execute("SELECT extname FROM pg_extension;") try: @@ -99,7 +157,9 @@ def _check_postgis_available(conn: psycopg2.extensions.connection) -> bool: return False -def _try_install_postgis(conn: psycopg2.extensions.connection) -> bool: +def _try_install_postgis( + conn: psycopg2.extensions.connection, +) -> bool: cur = conn.cursor() try: cur.execute("CREATE EXTENSION postgis;") @@ -109,15 +169,22 @@ def _try_install_postgis(conn: psycopg2.extensions.connection) -> bool: def _check_has_password(): - """ Checks whether we have password for Mergin Maps user - if not, we will ask for it """ + """Checks whether we have password for Mergin Maps user - if not, we will ask for it""" if config.mergin.password is None: - config.mergin.password = getpass.getpass(prompt="Mergin Maps password for '{}': ".format(config.mergin.username)) + config.mergin.password = getpass.getpass( + prompt="Mergin Maps password for '{}': ".format(config.mergin.username) + ) -def _run_geodiff(cmd): - """ will run a command (with geodiff) and report what got to stderr and raise exception - if the command returns non-zero exit code """ - res = subprocess.run(cmd, stderr=subprocess.PIPE) +def _run_geodiff( + cmd, +): + """will run a command (with geodiff) and report what got to stderr and raise exception + if the command returns non-zero exit code""" + res = subprocess.run( + cmd, + stderr=subprocess.PIPE, + ) geodiff_stderr = res.stderr.decode() if geodiff_stderr: print("GEODIFF: " + geodiff_stderr) @@ -125,102 +192,332 @@ def _run_geodiff(cmd): raise DbSyncError("geodiff failed!\n" + str(cmd)) -def _geodiff_create_changeset(driver, conn_info, base, modified, changeset, ignored_tables): +def _geodiff_create_changeset( + driver, + conn_info, + base, + modified, + changeset, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "diff", "--driver", driver, conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), base, modified, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver", + driver, + conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + base, + modified, + changeset, + ] + ) else: - _run_geodiff([config.geodiff_exe, "diff", "--driver", driver, conn_info, base, modified, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver", + driver, + conn_info, + base, + modified, + changeset, + ] + ) -def _geodiff_apply_changeset(driver, conn_info, base, changeset, ignored_tables): +def _geodiff_apply_changeset( + driver, + conn_info, + base, + changeset, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "apply", "--driver", driver, conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), base, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "apply", + "--driver", + driver, + conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + base, + changeset, + ] + ) else: - _run_geodiff([config.geodiff_exe, "apply", "--driver", driver, conn_info, base, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "apply", + "--driver", + driver, + conn_info, + base, + changeset, + ] + ) -def _geodiff_rebase(driver, conn_info, base, our, base2their, conflicts, ignored_tables): +def _geodiff_rebase( + driver, + conn_info, + base, + our, + base2their, + conflicts, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "rebase-db", "--driver", driver, conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), base, our, base2their, conflicts]) + _run_geodiff( + [ + config.geodiff_exe, + "rebase-db", + "--driver", + driver, + conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + base, + our, + base2their, + conflicts, + ] + ) else: - _run_geodiff([config.geodiff_exe, "rebase-db", "--driver", driver, conn_info, base, our, base2their, conflicts]) + _run_geodiff( + [ + config.geodiff_exe, + "rebase-db", + "--driver", + driver, + conn_info, + base, + our, + base2their, + conflicts, + ] + ) -def _geodiff_list_changes_details(changeset): - """ Returns a list with changeset details: - [ { 'table': 'foo', 'type': 'update', 'changes': [ ... old/new column values ... ] }, ... ] +def _geodiff_list_changes_details( + changeset, +): + """Returns a list with changeset details: + [ { 'table': 'foo', 'type': 'update', 'changes': [ ... old/new column values ... ] }, ... ] """ tmp_dir = tempfile.gettempdir() - tmp_output = os.path.join(tmp_dir, 'dbsync-changeset-details') + tmp_output = os.path.join( + tmp_dir, + "dbsync-changeset-details", + ) if os.path.exists(tmp_output): os.remove(tmp_output) - _run_geodiff([config.geodiff_exe, "as-json", changeset, tmp_output]) + _run_geodiff( + [ + config.geodiff_exe, + "as-json", + changeset, + tmp_output, + ] + ) with open(tmp_output) as f: out = json.load(f) os.remove(tmp_output) return out["geodiff"] -def _geodiff_list_changes_summary(changeset): - """ Returns a list with changeset summary: - [ { 'table': 'foo', 'insert': 1, 'update': 2, 'delete': 3 }, ... ] +def _geodiff_list_changes_summary( + changeset, +): + """Returns a list with changeset summary: + [ { 'table': 'foo', 'insert': 1, 'update': 2, 'delete': 3 }, ... ] """ tmp_dir = tempfile.gettempdir() - tmp_output = os.path.join(tmp_dir, 'dbsync-changeset-summary') + tmp_output = os.path.join( + tmp_dir, + "dbsync-changeset-summary", + ) if os.path.exists(tmp_output): os.remove(tmp_output) - _run_geodiff([config.geodiff_exe, "as-summary", changeset, tmp_output]) + _run_geodiff( + [ + config.geodiff_exe, + "as-summary", + changeset, + tmp_output, + ] + ) with open(tmp_output) as f: out = json.load(f) os.remove(tmp_output) return out["geodiff_summary"] -def _geodiff_make_copy(src_driver, src_conn_info, src, dst_driver, dst_conn_info, dst, ignored_tables): +def _geodiff_make_copy( + src_driver, + src_conn_info, + src, + dst_driver, + dst_conn_info, + dst, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "copy", "--driver-1", src_driver, src_conn_info, "--driver-2", dst_driver, dst_conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), src, dst]) + _run_geodiff( + [ + config.geodiff_exe, + "copy", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + src, + dst, + ] + ) else: - _run_geodiff([config.geodiff_exe, "copy", "--driver-1", src_driver, src_conn_info, "--driver-2", dst_driver, dst_conn_info, src, dst]) + _run_geodiff( + [ + config.geodiff_exe, + "copy", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + src, + dst, + ] + ) -def _geodiff_create_changeset_dr(src_driver, src_conn_info, src, dst_driver, dst_conn_info, dst, changeset, ignored_tables): +def _geodiff_create_changeset_dr( + src_driver, + src_conn_info, + src, + dst_driver, + dst_conn_info, + dst, + changeset, + ignored_tables, +): if ignored_tables: - _run_geodiff([config.geodiff_exe, "diff", "--driver-1", src_driver, src_conn_info, "--driver-2", dst_driver, dst_conn_info, "--skip-tables", _tables_list_to_string(ignored_tables), src, dst, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + "--skip-tables", + _tables_list_to_string(ignored_tables), + src, + dst, + changeset, + ] + ) else: - _run_geodiff([config.geodiff_exe, "diff", "--driver-1", src_driver, src_conn_info, "--driver-2", dst_driver, dst_conn_info, src, dst, changeset]) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + src, + dst, + changeset, + ] + ) -def _compare_datasets(src_driver, src_conn_info, src, dst_driver, dst_conn_info, dst, ignored_tables, summary_only=True): - """ Compare content of two datasets (from various drivers) and return geodiff JSON summary of changes """ +def _compare_datasets( + src_driver, + src_conn_info, + src, + dst_driver, + dst_conn_info, + dst, + ignored_tables, + summary_only=True, +): + """Compare content of two datasets (from various drivers) and return geodiff JSON summary of changes""" tmp_dir = tempfile.gettempdir() - tmp_changeset = os.path.join(tmp_dir, ''.join(random.choices(string.ascii_letters, k=8))) - - _geodiff_create_changeset_dr(src_driver, src_conn_info, src, dst_driver, dst_conn_info, dst, tmp_changeset, ignored_tables) + tmp_changeset = os.path.join( + tmp_dir, + "".join( + random.choices( + string.ascii_letters, + k=8, + ) + ), + ) + + _geodiff_create_changeset_dr( + src_driver, + src_conn_info, + src, + dst_driver, + dst_conn_info, + dst, + tmp_changeset, + ignored_tables, + ) if summary_only: return _geodiff_list_changes_summary(tmp_changeset) else: return _geodiff_list_changes_details(tmp_changeset) -def _print_changes_summary(summary, label=None): - """ Takes a geodiff JSON summary of changes and prints them """ +def _print_changes_summary( + summary, + label=None, +): + """Takes a geodiff JSON summary of changes and prints them""" print("Changes:" if label is None else label) for item in summary: - print("{:20} {:4} {:4} {:4}".format(item['table'], item['insert'], item['update'], item['delete'])) + print( + "{:20} {:4} {:4} {:4}".format( + item["table"], + item["insert"], + item["update"], + item["delete"], + ) + ) -def _print_mergin_changes(diff_dict): - """ Takes a dictionary with format { 'added': [...], 'removed': [...], 'updated': [...] } +def _print_mergin_changes( + diff_dict, +): + """Takes a dictionary with format { 'added': [...], 'removed': [...], 'updated': [...] } where each item is another dictionary with file details, e.g.: { 'path': 'myfile.gpkg', size: 123456, ... } and prints it in a way that's easy to parse for a human :-) """ - for item in diff_dict['added']: - print(" added: " + item['path']) - for item in diff_dict['updated']: - print(" updated: " + item['path']) - for item in diff_dict['removed']: - print(" removed: " + item['path']) + for item in diff_dict["added"]: + print(" added: " + item["path"]) + for item in diff_dict["updated"]: + print(" updated: " + item["path"]) + for item in diff_dict["removed"]: + print(" removed: " + item["path"]) # Dictionary used by _get_mergin_project() function below. @@ -228,7 +525,9 @@ def _print_mergin_changes(diff_dict): cached_mergin_project_objects = {} -def _get_mergin_project(work_path): +def _get_mergin_project( + work_path, +): """ Returns a cached MerginProject object or creates one if it does not exist yet. This is to avoid creating many of these objects (e.g. every pull/push) because it does @@ -242,24 +541,38 @@ def _get_mergin_project(work_path): return cached_mergin_project_objects[work_path] -def _get_project_version(work_path): - """ Returns the current version of the project """ +def _get_project_version( + work_path, +): + """Returns the current version of the project""" mp = _get_mergin_project(work_path) return mp.metadata["version"] -def _get_project_id(mp): - """ Returns the project ID """ +def _get_project_id( + mp, +): + """Returns the project ID""" try: project_id = uuid.UUID(mp.metadata["project_id"]) - except (KeyError, ValueError): + except ( + KeyError, + ValueError, + ): project_id = None return project_id -def _set_db_project_comment(conn, schema, project_name, version, project_id=None, error=None): - """ Set postgres COMMENT on SCHEMA with Mergin Maps project name and version - or eventually error message if initialisation failed +def _set_db_project_comment( + conn, + schema, + project_name, + version, + project_id=None, + error=None, +): + """Set postgres COMMENT on SCHEMA with Mergin Maps project name and version + or eventually error message if initialisation failed """ comment = { "name": project_name, @@ -271,35 +584,59 @@ def _set_db_project_comment(conn, schema, project_name, version, project_id=None comment["error"] = error cur = conn.cursor() query = sql.SQL("COMMENT ON SCHEMA {} IS %s").format(sql.Identifier(schema)) - cur.execute(query.as_string(conn), (json.dumps(comment), )) + cur.execute( + query.as_string(conn), + (json.dumps(comment),), + ) conn.commit() def _get_db_project_comment(conn, schema): - """ Get Mergin Maps project name and its current version in db schema""" + """Get Mergin Maps project name and its current version in db schema""" cur = conn.cursor() schema = _add_quotes_to_schema_name(schema) - cur.execute("SELECT obj_description(%s::regnamespace, 'pg_namespace')", (schema, )) + cur.execute( + "SELECT obj_description(%s::regnamespace, 'pg_namespace')", + (schema,), + ) res = cur.fetchone()[0] try: comment = json.loads(res) if res else None - except (TypeError, json.decoder.JSONDecodeError): + except ( + TypeError, + json.decoder.JSONDecodeError, + ): return return comment -def _redownload_project(conn_cfg, mc, work_dir, db_proj_info): +def _redownload_project( + conn_cfg, + mc, + work_dir, + db_proj_info, +): print(f"Removing local working directory {work_dir}") shutil.rmtree(work_dir) - print(f"Downloading version {db_proj_info['version']} of Mergin Maps project {conn_cfg.mergin_project} " - f"to {work_dir}") + print( + f"Downloading version {db_proj_info['version']} of Mergin Maps project {conn_cfg.mergin_project} " + f"to {work_dir}" + ) try: - mc.download_project(conn_cfg.mergin_project, work_dir, db_proj_info["version"]) + mc.download_project( + conn_cfg.mergin_project, + work_dir, + db_proj_info["version"], + ) except ClientError as e: raise DbSyncError("Mergin Maps client error: " + str(e)) -def _validate_local_project_id(mp, mc, server_info=None): +def _validate_local_project_id( + mp, + mc, + server_info=None, +): """Compare local project ID with remote version on the server.""" local_project_id = _get_project_id(mp) if local_project_id is None: @@ -319,20 +656,31 @@ def _validate_local_project_id(mp, mc, server_info=None): def create_mergin_client(): - """ Create instance of MerginClient""" + """Create instance of MerginClient""" _check_has_password() try: - return MerginClient(config.mergin.url, login=config.mergin.username, password=config.mergin.password, plugin_version=f"DB-sync/{__version__}") + return MerginClient( + config.mergin.url, + login=config.mergin.username, + password=config.mergin.password, + plugin_version=f"DB-sync/{__version__}", + ) except LoginError as e: # this could be auth failure, but could be also server problem (e.g. worker crash) - raise DbSyncError(f"Unable to log in to Mergin Maps: {str(e)} \n\n" + - "Have you specified correct credentials in configuration file?") + raise DbSyncError( + f"Unable to log in to Mergin Maps: {str(e)} \n\n" + + "Have you specified correct credentials in configuration file?" + ) except ClientError as e: # this could be e.g. DNS error raise DbSyncError("Mergin Maps client error: " + str(e)) -def revert_local_changes(mc, mp, local_changes=None): +def revert_local_changes( + mc, + mp, + local_changes=None, +): """Revert local changes from the existing project.""" if local_changes is None: local_changes = mp.get_push_changes() @@ -341,22 +689,42 @@ def revert_local_changes(mc, mp, local_changes=None): print("Reverting local changes: " + str(local_changes)) for add_change in local_changes["added"]: added_file = add_change["path"] - added_filepath = os.path.join(mp.dir, added_file) + added_filepath = os.path.join( + mp.dir, + added_file, + ) os.remove(added_filepath) - for update_delete_change in chain(local_changes["updated"], local_changes["removed"]): + for update_delete_change in chain( + local_changes["updated"], + local_changes["removed"], + ): update_delete_file = update_delete_change["path"] - update_delete_filepath = os.path.join(mp.dir, update_delete_file) + update_delete_filepath = os.path.join( + mp.dir, + update_delete_file, + ) delete_file = os.path.isfile(update_delete_filepath) if update_delete_file.lower().endswith(".gpkg"): - update_delete_filepath_base = os.path.join(mp.meta_dir, update_delete_file) + update_delete_filepath_base = os.path.join( + mp.meta_dir, + update_delete_file, + ) if delete_file: os.remove(update_delete_filepath) - shutil.copy(update_delete_filepath_base, update_delete_filepath) + shutil.copy( + update_delete_filepath_base, + update_delete_filepath, + ) else: if delete_file: os.remove(update_delete_filepath) try: - mc.download_file(mp.dir, update_delete_file, update_delete_filepath, mp.metadata["version"]) + mc.download_file( + mp.dir, + update_delete_file, + update_delete_filepath, + mp.metadata["version"], + ) except ClientError as e: raise DbSyncError("Mergin Maps client error: " + str(e)) leftovers = mp.get_push_changes() @@ -365,14 +733,20 @@ def revert_local_changes(mc, mp, local_changes=None): def pull(conn_cfg, mc): - """ Downloads any changes from Mergin Maps and applies them to the database """ + """Downloads any changes from Mergin Maps and applies them to the database""" print(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) project_name = conn_cfg.mergin_project.split("/")[1] - work_dir = os.path.join(config.working_dir, project_name) - gpkg_full_path = os.path.join(work_dir, conn_cfg.sync_file) + work_dir = os.path.join( + config.working_dir, + project_name, + ) + gpkg_full_path = os.path.join( + work_dir, + conn_cfg.sync_file, + ) _check_has_working_dir(work_dir) _check_has_sync_file(gpkg_full_path) @@ -397,31 +771,60 @@ def pull(conn_cfg, mc): local_changes = mp.get_push_changes() if any(local_changes.values()): - local_changes = revert_local_changes(mc, mp, local_changes) + local_changes = revert_local_changes( + mc, + mp, + local_changes, + ) if any(local_changes.values()): - raise DbSyncError("There are pending changes in the local directory - that should never happen! " + str(local_changes)) + raise DbSyncError( + "There are pending changes in the local directory - that should never happen! " + str(local_changes) + ) if server_version == local_version: print("No changes on Mergin Maps.") return - gpkg_basefile = os.path.join(work_dir, '.mergin', conn_cfg.sync_file) + gpkg_basefile = os.path.join( + work_dir, + ".mergin", + conn_cfg.sync_file, + ) gpkg_basefile_old = gpkg_basefile + "-old" # make a copy of the basefile in the current version (base) - because after pull it will be set to "their" - shutil.copy(gpkg_basefile, gpkg_basefile_old) + shutil.copy( + gpkg_basefile, + gpkg_basefile_old, + ) tmp_dir = tempfile.gettempdir() - tmp_base2our = os.path.join(tmp_dir, f'{project_name}-dbsync-pull-base2our') - tmp_base2their = os.path.join(tmp_dir, f'{project_name}-dbsync-pull-base2their') + tmp_base2our = os.path.join( + tmp_dir, + f"{project_name}-dbsync-pull-base2our", + ) + tmp_base2their = os.path.join( + tmp_dir, + f"{project_name}-dbsync-pull-base2their", + ) # find out our local changes in the database (base2our) - _geodiff_create_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, conn_cfg.modified, tmp_base2our, ignored_tables) + _geodiff_create_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + conn_cfg.modified, + tmp_base2our, + ignored_tables, + ) needs_rebase = False if os.path.getsize(tmp_base2our) != 0: needs_rebase = True summary = _geodiff_list_changes_summary(tmp_base2our) - _print_changes_summary(summary, "DB Changes:") + _print_changes_summary( + summary, + "DB Changes:", + ) try: mc.pull_project(work_dir) # will do rebase as needed @@ -432,39 +835,88 @@ def pull(conn_cfg, mc): print("Pulled new version from Mergin Maps: " + _get_project_version(work_dir)) # simple case when there are no pending local changes - just apply whatever changes are coming - _geodiff_create_changeset("sqlite", "", gpkg_basefile_old, gpkg_basefile, tmp_base2their, ignored_tables) + _geodiff_create_changeset( + "sqlite", + "", + gpkg_basefile_old, + gpkg_basefile, + tmp_base2their, + ignored_tables, + ) # summarize changes summary = _geodiff_list_changes_summary(tmp_base2their) - _print_changes_summary(summary, "Mergin Maps Changes:") + _print_changes_summary( + summary, + "Mergin Maps Changes:", + ) if not needs_rebase: print("Applying new version [no rebase]") - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_base2their, ignored_tables) - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, tmp_base2their, ignored_tables) + _geodiff_apply_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + tmp_base2their, + ignored_tables, + ) + _geodiff_apply_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + tmp_base2their, + ignored_tables, + ) else: print("Applying new version [WITH rebase]") - tmp_conflicts = os.path.join(tmp_dir, f'{project_name}-dbsync-pull-conflicts') - _geodiff_rebase(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, - conn_cfg.modified, tmp_base2their, tmp_conflicts, ignored_tables) - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_base2their, ignored_tables) + tmp_conflicts = os.path.join( + tmp_dir, + f"{project_name}-dbsync-pull-conflicts", + ) + _geodiff_rebase( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + conn_cfg.modified, + tmp_base2their, + tmp_conflicts, + ignored_tables, + ) + _geodiff_apply_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + tmp_base2their, + ignored_tables, + ) os.remove(gpkg_basefile_old) conn = psycopg2.connect(conn_cfg.conn_info) version = _get_project_version(work_dir) - _set_db_project_comment(conn, conn_cfg.base, conn_cfg.mergin_project, version) + _set_db_project_comment( + conn, + conn_cfg.base, + conn_cfg.mergin_project, + version, + ) def status(conn_cfg, mc): - """ Figure out if there are any pending changes in the database or in Mergin Maps""" + """Figure out if there are any pending changes in the database or in Mergin Maps""" print(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) project_name = conn_cfg.mergin_project.split("/")[1] - work_dir = os.path.join(config.working_dir, project_name) - gpkg_full_path = os.path.join(work_dir, conn_cfg.sync_file) + work_dir = os.path.join( + config.working_dir, + project_name, + ) + gpkg_full_path = os.path.join( + work_dir, + conn_cfg.sync_file, + ) _check_has_working_dir(work_dir) _check_has_sync_file(gpkg_full_path) @@ -478,15 +930,22 @@ def status(conn_cfg, mc): local_version = mp.metadata["version"] print("Checking status...") try: - server_info = mc.project_info(project_path, since=local_version) + server_info = mc.project_info( + project_path, + since=local_version, + ) except ClientError as e: raise DbSyncError("Mergin Maps client error: " + str(e)) # Make sure that local project ID (if available) is the same as on the server - _validate_local_project_id(mp, mc, server_info) + _validate_local_project_id( + mp, + mc, + server_info, + ) status_push = mp.get_push_changes() - if status_push['added'] or status_push['updated'] or status_push['removed']: + if status_push["added"] or status_push["updated"] or status_push["removed"]: raise DbSyncError("Pending changes in the local directory - that should never happen! " + str(status_push)) print("Working directory " + work_dir) @@ -495,7 +954,7 @@ def status(conn_cfg, mc): print("Server is at version " + server_info["version"]) status_pull = mp.get_pull_changes(server_info["files"]) - if status_pull['added'] or status_pull['updated'] or status_pull['removed']: + if status_pull["added"] or status_pull["updated"] or status_pull["removed"]: print("There are pending changes on server:") _print_mergin_changes(status_pull) else: @@ -504,17 +963,33 @@ def status(conn_cfg, mc): print("") conn = psycopg2.connect(conn_cfg.conn_info) - if not _check_schema_exists(conn, conn_cfg.base): + if not _check_schema_exists( + conn, + conn_cfg.base, + ): raise DbSyncError("The base schema does not exist: " + conn_cfg.base) - if not _check_schema_exists(conn, conn_cfg.modified): + if not _check_schema_exists( + conn, + conn_cfg.modified, + ): raise DbSyncError("The 'modified' schema does not exist: " + conn_cfg.modified) # get changes in the DB tmp_dir = tempfile.gettempdir() - tmp_changeset_file = os.path.join(tmp_dir, f'{project_name}-dbsync-status-base2our') + tmp_changeset_file = os.path.join( + tmp_dir, + f"{project_name}-dbsync-status-base2our", + ) if os.path.exists(tmp_changeset_file): os.remove(tmp_changeset_file) - _geodiff_create_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, conn_cfg.modified, tmp_changeset_file, ignored_tables) + _geodiff_create_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + conn_cfg.modified, + tmp_changeset_file, + ignored_tables, + ) if os.path.getsize(tmp_changeset_file) == 0: print("No changes in the database.") @@ -526,7 +1001,7 @@ def status(conn_cfg, mc): def push(conn_cfg, mc): - """ Take changes in the 'modified' schema in the database and push them to Mergin Maps""" + """Take changes in the 'modified' schema in the database and push them to Mergin Maps""" print(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) @@ -534,12 +1009,21 @@ def push(conn_cfg, mc): project_name = conn_cfg.mergin_project.split("/")[1] tmp_dir = tempfile.gettempdir() - tmp_changeset_file = os.path.join(tmp_dir, f'{project_name}-dbsync-push-base2our') + tmp_changeset_file = os.path.join( + tmp_dir, + f"{project_name}-dbsync-push-base2our", + ) if os.path.exists(tmp_changeset_file): os.remove(tmp_changeset_file) - work_dir = os.path.join(config.working_dir, project_name) - gpkg_full_path = os.path.join(work_dir, conn_cfg.sync_file) + work_dir = os.path.join( + config.working_dir, + project_name, + ) + gpkg_full_path = os.path.join( + work_dir, + conn_cfg.sync_file, + ) _check_has_working_dir(work_dir) _check_has_sync_file(gpkg_full_path) @@ -562,9 +1046,10 @@ def push(conn_cfg, mc): raise DbSyncError("Mergin Maps client error: " + str(e)) status_push = mp.get_push_changes() - if status_push['added'] or status_push['updated'] or status_push['removed']: + if status_push["added"] or status_push["updated"] or status_push["removed"]: raise DbSyncError( - "There are pending changes in the local directory - that should never happen! " + str(status_push)) + "There are pending changes in the local directory - that should never happen! " + str(status_push) + ) # check there are no pending changes on server if server_version != local_version: @@ -572,13 +1057,26 @@ def push(conn_cfg, mc): conn = psycopg2.connect(conn_cfg.conn_info) - if not _check_schema_exists(conn, conn_cfg.base): + if not _check_schema_exists( + conn, + conn_cfg.base, + ): raise DbSyncError("The base schema does not exist: " + conn_cfg.base) - if not _check_schema_exists(conn, conn_cfg.modified): + if not _check_schema_exists( + conn, + conn_cfg.modified, + ): raise DbSyncError("The 'modified' schema does not exist: " + conn_cfg.modified) # get changes in the DB - _geodiff_create_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, conn_cfg.modified, tmp_changeset_file, ignored_tables) + _geodiff_create_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + conn_cfg.modified, + tmp_changeset_file, + ignored_tables, + ) if os.path.getsize(tmp_changeset_file) == 0: print("No changes in the database.") @@ -590,7 +1088,13 @@ def push(conn_cfg, mc): # write changes to the local geopackage print("Writing DB changes to working dir...") - _geodiff_apply_changeset("sqlite", "", gpkg_full_path, tmp_changeset_file, ignored_tables) + _geodiff_apply_changeset( + "sqlite", + "", + gpkg_full_path, + tmp_changeset_file, + ignored_tables, + ) # write to the server try: @@ -604,12 +1108,27 @@ def push(conn_cfg, mc): # update base schema in the DB print("Updating DB base schema...") - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_changeset_file, ignored_tables) - _set_db_project_comment(conn, conn_cfg.base, conn_cfg.mergin_project, version) - - -def init(conn_cfg, mc, from_gpkg=True): - """ Initialize the dbsync so that it is possible to do two-way sync between Mergin Maps and a database """ + _geodiff_apply_changeset( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + tmp_changeset_file, + ignored_tables, + ) + _set_db_project_comment( + conn, + conn_cfg.base, + conn_cfg.mergin_project, + version, + ) + + +def init( + conn_cfg, + mc, + from_gpkg=True, +): + """Initialize the dbsync so that it is possible to do two-way sync between Mergin Maps and a database""" print(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) @@ -629,54 +1148,103 @@ def init(conn_cfg, mc, from_gpkg=True): if not _try_install_postgis(conn): raise DbSyncError("Cannot find or activate `postgis` extension. You may need to install it.") - base_schema_exists = _check_schema_exists(conn, conn_cfg.base) - modified_schema_exists = _check_schema_exists(conn, conn_cfg.modified) - - work_dir = os.path.join(config.working_dir, project_name) - gpkg_full_path = os.path.join(work_dir, conn_cfg.sync_file) + base_schema_exists = _check_schema_exists( + conn, + conn_cfg.base, + ) + modified_schema_exists = _check_schema_exists( + conn, + conn_cfg.modified, + ) + + work_dir = os.path.join( + config.working_dir, + project_name, + ) + gpkg_full_path = os.path.join( + work_dir, + conn_cfg.sync_file, + ) if modified_schema_exists and base_schema_exists: print("Modified and base schemas already exist") # this is not a first run of db-sync init - db_proj_info = _get_db_project_comment(conn, conn_cfg.base) + db_proj_info = _get_db_project_comment( + conn, + conn_cfg.base, + ) if not db_proj_info: - raise DbSyncError("Base schema exists but missing which project it belongs to. " - "This may be a result of a previously failed attempt to initialize DB sync. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + "Base schema exists but missing which project it belongs to. " + "This may be a result of a previously failed attempt to initialize DB sync. " + f"{FORCE_INIT_MESSAGE}" + ) if "error" in db_proj_info: - changes_gpkg_base = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.base, ignored_tables, - summary_only=False) - changes = json.dumps(changes_gpkg_base, indent=2) + changes_gpkg_base = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + summary_only=False, + ) + changes = json.dumps( + changes_gpkg_base, + indent=2, + ) print(f"Changeset from failed init:\n {changes}") raise DbSyncError(db_proj_info["error"]) # make sure working directory contains the same version of project if not os.path.exists(work_dir): - print(f"Downloading version {db_proj_info['version']} of Mergin Maps project {conn_cfg.mergin_project} " - f"to {work_dir}") - mc.download_project(conn_cfg.mergin_project, work_dir, db_proj_info["version"]) + print( + f"Downloading version {db_proj_info['version']} of Mergin Maps project {conn_cfg.mergin_project} " + f"to {work_dir}" + ) + mc.download_project( + conn_cfg.mergin_project, + work_dir, + db_proj_info["version"], + ) else: # Get project ID from DB if available try: local_version = _get_project_version(work_dir) print(f"Working directory {work_dir} already exists, with project version {local_version}") # Compare local and database project version - db_project_id_str = getattr(db_proj_info, "project_id", None) + db_project_id_str = getattr( + db_proj_info, + "project_id", + None, + ) db_project_id = uuid.UUID(db_project_id_str) if db_project_id_str else None mp = _get_mergin_project(work_dir) local_project_id = _get_project_id(mp) if (db_project_id and local_project_id) and (db_project_id != local_project_id): - raise DbSyncError("Database project ID doesn't match local project ID. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError("Database project ID doesn't match local project ID. " f"{FORCE_INIT_MESSAGE}") if local_version != db_proj_info["version"]: - _redownload_project(conn_cfg, mc, work_dir, db_proj_info) + _redownload_project( + conn_cfg, + mc, + work_dir, + db_proj_info, + ) except InvalidProject as e: print(f"Error: {e}") - _redownload_project(conn_cfg, mc, work_dir, db_proj_info) + _redownload_project( + conn_cfg, + mc, + work_dir, + db_proj_info, + ) else: if not os.path.exists(work_dir): print("Downloading latest Mergin Maps project " + conn_cfg.mergin_project + " to " + work_dir) - mc.download_project(conn_cfg.mergin_project, work_dir) + mc.download_project( + conn_cfg.mergin_project, + work_dir, + ) else: local_version = _get_project_version(work_dir) print(f"Working directory {work_dir} already exists, with project version {local_version}") @@ -689,12 +1257,20 @@ def init(conn_cfg, mc, from_gpkg=True): _validate_local_project_id(mp, mc) # check there are no pending changes on server (or locally - which should never happen) - status_pull, status_push, _ = mc.project_status(work_dir) - if status_pull['added'] or status_pull['updated'] or status_pull['removed']: + ( + status_pull, + status_push, + _, + ) = mc.project_status(work_dir) + if status_pull["added"] or status_pull["updated"] or status_pull["removed"]: print("There are pending changes on server, please run pull command after init") - if status_push['added'] or status_push['updated'] or status_push['removed']: - raise DbSyncError("There are pending changes in the local directory - that should never happen! " + str(status_push) + " " + - f"{FORCE_INIT_MESSAGE}") + if status_push["added"] or status_push["updated"] or status_push["removed"]: + raise DbSyncError( + "There are pending changes in the local directory - that should never happen! " + + str(status_push) + + " " + + f"{FORCE_INIT_MESSAGE}" + ) if from_gpkg: if not os.path.exists(gpkg_full_path): @@ -702,118 +1278,244 @@ def init(conn_cfg, mc, from_gpkg=True): if modified_schema_exists and base_schema_exists: # if db schema already exists make sure it is already synchronized with source gpkg or fail - summary_modified = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.modified, ignored_tables) - summary_base = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.base, ignored_tables) + summary_modified = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + ignored_tables, + ) + summary_base = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + ) if len(summary_base): # seems someone modified base schema manually - this should never happen! print(f"Local project version at {local_version} and base schema at {db_proj_info['version']}") - _print_changes_summary(summary_base, "Base schema changes:") - raise DbSyncError("The db schemas already exist but 'base' schema is not synchronized with source GPKG. " - f"{FORCE_INIT_MESSAGE}") + _print_changes_summary( + summary_base, + "Base schema changes:", + ) + raise DbSyncError( + "The db schemas already exist but 'base' schema is not synchronized with source GPKG. " + f"{FORCE_INIT_MESSAGE}" + ) elif len(summary_modified): print("Modified schema is not synchronised with source GPKG, please run pull/push commands to fix it") - _print_changes_summary(summary_modified, "Pending Changes:") + _print_changes_summary( + summary_modified, + "Pending Changes:", + ) return else: print("The GPKG file, base and modified schemas are already initialized and in sync") return # nothing to do elif modified_schema_exists: - raise DbSyncError(f"The 'modified' schema exists but the base schema is missing: {conn_cfg.base}. " - "This may be a result of a previously failed attempt to initialize DB sync. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + f"The 'modified' schema exists but the base schema is missing: {conn_cfg.base}. " + "This may be a result of a previously failed attempt to initialize DB sync. " + f"{FORCE_INIT_MESSAGE}" + ) elif base_schema_exists: - raise DbSyncError(f"The base schema exists but the modified schema is missing: {conn_cfg.modified}. " - "This may be a result of a previously failed attempt to initialize DB sync. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + f"The base schema exists but the modified schema is missing: {conn_cfg.modified}. " + "This may be a result of a previously failed attempt to initialize DB sync. " + f"{FORCE_INIT_MESSAGE}" + ) # initialize: we have an existing GeoPackage in our Mergin Maps project and we want to initialize database print("The base and modified schemas do not exist yet, going to initialize them ...") try: # COPY: gpkg -> modified - _geodiff_make_copy("sqlite", "", gpkg_full_path, - conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, ignored_tables) + _geodiff_make_copy( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + ignored_tables, + ) # COPY: modified -> base - _geodiff_make_copy(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, - conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, ignored_tables) + _geodiff_make_copy( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + ) # sanity check to verify that right after initialization we do not have any changes # between the 'base' schema and the geopackage in Mergin Maps project, to make sure that # copying data back and forth will keep data intact - changes_gpkg_base = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.base, ignored_tables, - summary_only=False) + changes_gpkg_base = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + summary_only=False, + ) # mark project version into db schema if len(changes_gpkg_base): - changes = json.dumps(changes_gpkg_base, indent=2) + changes = json.dumps( + changes_gpkg_base, + indent=2, + ) print(f"Changeset after internal copy (should be empty):\n {changes}") - raise DbSyncError('Initialization of db-sync failed due to a bug in geodiff.\n ' - 'Please report this problem to mergin-db-sync developers') + raise DbSyncError( + "Initialization of db-sync failed due to a bug in geodiff.\n " + "Please report this problem to mergin-db-sync developers" + ) except DbSyncError: - print(f"Cleaning up after a failed DB sync init - dropping schemas {conn_cfg.base} and {conn_cfg.modified}.") - _drop_schema(conn, conn_cfg.base) - _drop_schema(conn, conn_cfg.modified) + print( + f"Cleaning up after a failed DB sync init - dropping schemas {conn_cfg.base} and {conn_cfg.modified}." + ) + _drop_schema( + conn, + conn_cfg.base, + ) + _drop_schema( + conn, + conn_cfg.modified, + ) raise - _set_db_project_comment(conn, conn_cfg.base, conn_cfg.mergin_project, local_version) + _set_db_project_comment( + conn, + conn_cfg.base, + conn_cfg.mergin_project, + local_version, + ) else: if not modified_schema_exists: - raise DbSyncError(f"The 'modified' schema does not exist: {conn_cfg.modified}. " - "This schema is necessary if initialization should be done from database (parameter `init-from-db`).") + raise DbSyncError( + f"The 'modified' schema does not exist: {conn_cfg.modified}. " + "This schema is necessary if initialization should be done from database (parameter `init-from-db`)." + ) if os.path.exists(gpkg_full_path) and base_schema_exists: # make sure output gpkg is in sync with db or fail - summary_modified = _compare_datasets(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, - "sqlite", "", gpkg_full_path, ignored_tables) - summary_base = _compare_datasets(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, - "sqlite", "", gpkg_full_path, ignored_tables) + summary_modified = _compare_datasets( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + "sqlite", + "", + gpkg_full_path, + ignored_tables, + ) + summary_base = _compare_datasets( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + "sqlite", + "", + gpkg_full_path, + ignored_tables, + ) if len(summary_base): - print(f"Local project version at {_get_project_version(work_dir)} and base schema at {db_proj_info['version']}") - _print_changes_summary(summary_base, "Base schema changes:") - raise DbSyncError("The output GPKG file exists already but is not synchronized with db 'base' schema." - f"{FORCE_INIT_MESSAGE}") + print( + f"Local project version at {_get_project_version(work_dir)} and base schema at {db_proj_info['version']}" + ) + _print_changes_summary( + summary_base, + "Base schema changes:", + ) + raise DbSyncError( + "The output GPKG file exists already but is not synchronized with db 'base' schema." + f"{FORCE_INIT_MESSAGE}" + ) elif len(summary_modified): - print("The output GPKG file exists already but it is not synchronised with modified schema, " - "please run pull/push commands to fix it") - _print_changes_summary(summary_modified, "Pending Changes:") + print( + "The output GPKG file exists already but it is not synchronised with modified schema, " + "please run pull/push commands to fix it" + ) + _print_changes_summary( + summary_modified, + "Pending Changes:", + ) return else: print("The GPKG file, base and modified schemas are already initialized and in sync") return # nothing to do elif os.path.exists(gpkg_full_path): - raise DbSyncError(f"The output GPKG exists but the base schema is missing: {conn_cfg.base}. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + f"The output GPKG exists but the base schema is missing: {conn_cfg.base}. " f"{FORCE_INIT_MESSAGE}" + ) elif base_schema_exists: - raise DbSyncError(f"The base schema exists but the output GPKG exists is missing: {gpkg_full_path}. " - f"{FORCE_INIT_MESSAGE}") + raise DbSyncError( + f"The base schema exists but the output GPKG exists is missing: {gpkg_full_path}. " + f"{FORCE_INIT_MESSAGE}" + ) # initialize: we have an existing schema in database with tables and we want to initialize geopackage # within our Mergin Maps project print("The base schema and the output GPKG do not exist yet, going to initialize them ...") try: # COPY: modified -> base - _geodiff_make_copy(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, - conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, ignored_tables) + _geodiff_make_copy( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + ) # COPY: modified -> gpkg - _geodiff_make_copy(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, - "sqlite", "", gpkg_full_path, ignored_tables) + _geodiff_make_copy( + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.modified, + "sqlite", + "", + gpkg_full_path, + ignored_tables, + ) # sanity check to verify that right after initialization we do not have any changes # between the 'base' schema and the geopackage in Mergin Maps project, to make sure that # copying data back and forth will keep data intact - changes_gpkg_base = _compare_datasets("sqlite", "", gpkg_full_path, conn_cfg.driver, - conn_cfg.conn_info, conn_cfg.base, ignored_tables, summary_only=False) + changes_gpkg_base = _compare_datasets( + "sqlite", + "", + gpkg_full_path, + conn_cfg.driver, + conn_cfg.conn_info, + conn_cfg.base, + ignored_tables, + summary_only=False, + ) if len(changes_gpkg_base): - changes = json.dumps(changes_gpkg_base, indent=2) + changes = json.dumps( + changes_gpkg_base, + indent=2, + ) print(f"Changeset after internal copy (should be empty):\n {changes}") - raise DbSyncError('Initialization of db-sync failed due to a bug in geodiff.\n ' - 'Please report this problem to mergin-db-sync developers') + raise DbSyncError( + "Initialization of db-sync failed due to a bug in geodiff.\n " + "Please report this problem to mergin-db-sync developers" + ) except DbSyncError: print(f"Cleaning up after a failed DB sync init - dropping schema {conn_cfg.base}.") - _drop_schema(conn, conn_cfg.base) + _drop_schema( + conn, + conn_cfg.base, + ) raise # upload gpkg to Mergin Maps (client takes care of storing metadata) @@ -821,13 +1523,22 @@ def init(conn_cfg, mc, from_gpkg=True): # mark project version into db schema version = _get_project_version(work_dir) - _set_db_project_comment(conn, conn_cfg.base, conn_cfg.mergin_project, version) + _set_db_project_comment( + conn, + conn_cfg.base, + conn_cfg.mergin_project, + version, + ) def dbsync_init(mc): from_gpkg = config.init_from.lower() == "gpkg" for conn in config.connections: - init(conn, mc, from_gpkg=from_gpkg) + init( + conn, + mc, + from_gpkg=from_gpkg, + ) print("Init done!") @@ -846,7 +1557,9 @@ def dbsync_push(mc): print("Push done!") -def dbsync_status(mc): +def dbsync_status( + mc, +): for conn in config.connections: status(conn, mc) @@ -865,7 +1578,10 @@ def clean(conn_cfg, mc): try: # to remove sync file, download project to created directory, drop file and push changes back file = temp_folder / conn_cfg.sync_file - mc.download_project(conn_cfg.mergin_project, str(temp_folder)) + mc.download_project( + conn_cfg.mergin_project, + str(temp_folder), + ) if file.exists(): file.unlink() mc.push_project(str(temp_folder)) @@ -882,16 +1598,24 @@ def clean(conn_cfg, mc): raise DbSyncError("Unable to connect to the database: " + str(e)) try: - _drop_schema(conn_db, conn_cfg.base) + _drop_schema( + conn_db, + conn_cfg.base, + ) if not from_db: - _drop_schema(conn_db, conn_cfg.modified) + _drop_schema( + conn_db, + conn_cfg.modified, + ) except psycopg2.Error as e: raise DbSyncError("Unable to drop schema from database: " + str(e)) -def dbsync_clean(mc): +def dbsync_clean( + mc, +): for conn in config.connections: clean(conn, mc) diff --git a/dbsync_daemon.py b/dbsync_daemon.py index 8dd0b24..5fdeef9 100644 --- a/dbsync_daemon.py +++ b/dbsync_daemon.py @@ -1,4 +1,3 @@ - # keep running until killed by ctrl+c: # - sleep N seconds # - pull @@ -15,12 +14,26 @@ import typing import dbsync -from version import __version__ -from config import config, validate_config, ConfigError, update_config_path +from version import ( + __version__, +) +from config import ( + config, + validate_config, + ConfigError, + update_config_path, +) def is_pyinstaller() -> bool: - if getattr(sys, 'frozen', False) and platform.system() == "Windows": + if ( + getattr( + sys, + "frozen", + False, + ) + and platform.system() == "Windows" + ): return True return False @@ -38,7 +51,12 @@ def pyinstaller_path_fix() -> None: LOGGER: logging.Logger = None -def setup_logger(log_path, log_verbosity: str, with_time=True, with_level=True) -> logging.Logger: +def setup_logger( + log_path, + log_verbosity: str, + with_time=True, + with_level=True, +) -> logging.Logger: global LOGGER LOGGER = logging.getLogger(f"{log_path}") if log_verbosity == "messages": @@ -48,46 +66,93 @@ def setup_logger(log_path, log_verbosity: str, with_time=True, with_level=True) else: LOGGER.setLevel(logging.WARNING) if not LOGGER.handlers: - log_handler = logging.FileHandler(log_path, mode="a") + log_handler = logging.FileHandler( + log_path, + mode="a", + ) format = "%(asctime)s -" if with_time else "" format += "%(levelname)s - %(message)s" if with_level else "%(message)s" log_handler.setFormatter(logging.Formatter(format)) LOGGER.addHandler(log_handler) -def handle_error(error: typing.Union[Exception, str]): +def handle_error( + error: typing.Union[ + Exception, + str, + ] +): if LOGGER: LOGGER.error(str(error)) - print("Error: " + str(error), file=sys.stderr) + print( + "Error: " + str(error), + file=sys.stderr, + ) sys.exit(1) -def handle_message(msg: str): +def handle_message( + msg: str, +): if LOGGER: LOGGER.debug(msg) print(msg) def main(): - pyinstaller_path_fix() - parser = argparse.ArgumentParser(prog='dbsync_deamon.py', - description='Synchronization tool between Mergin Maps project and database.', - epilog='www.merginmaps.com') - - parser.add_argument("config_file", nargs="?", default="config.yaml", help="Path to file with configuration. Default value is config.yaml in current working directory.") - parser.add_argument("--skip-init", action="store_true", help="Skip DB sync init step to make the tool start faster. It is not recommend to use it unless you are really sure you can skip the initial sanity checks.") - parser.add_argument("--single-run", action="store_true", help="Run just once performing single pull and push operation, instead of running in infinite loop.") - parser.add_argument("--force-init", action="store_true", help="Force removing working directory and schemas from DB to initialize from scratch.") - parser.add_argument("--log-file", default="", action="store", help="Store logging to file.") - parser.add_argument("--log-verbosity", choices=["errors", "messages"], default="errors", help="Log messages, not only errors.") + parser = argparse.ArgumentParser( + prog="dbsync_deamon.py", + description="Synchronization tool between Mergin Maps project and database.", + epilog="www.merginmaps.com", + ) + + parser.add_argument( + "config_file", + nargs="?", + default="config.yaml", + help="Path to file with configuration. Default value is config.yaml in current working directory.", + ) + parser.add_argument( + "--skip-init", + action="store_true", + help="Skip DB sync init step to make the tool start faster. It is not recommend to use it unless you are really sure you can skip the initial sanity checks.", + ) + parser.add_argument( + "--single-run", + action="store_true", + help="Run just once performing single pull and push operation, instead of running in infinite loop.", + ) + parser.add_argument( + "--force-init", + action="store_true", + help="Force removing working directory and schemas from DB to initialize from scratch.", + ) + parser.add_argument( + "--log-file", + default="", + action="store", + help="Store logging to file.", + ) + parser.add_argument( + "--log-verbosity", + choices=[ + "errors", + "messages", + ], + default="errors", + help="Log messages, not only errors.", + ) args = parser.parse_args() if args.log_file: log_file = pathlib.Path(args.log_file) - setup_logger(log_file.as_posix(), args.log_verbosity) + setup_logger( + log_file.as_posix(), + args.log_verbosity, + ) handle_message(f"== starting mergin-db-sync daemon == version {__version__} ==") @@ -113,7 +178,6 @@ def main(): dbsync.dbsync_clean(mc) if args.single_run: - if not args.skip_init: try: dbsync.dbsync_init(mc) @@ -131,7 +195,6 @@ def main(): handle_error(e) else: - if not args.skip_init: try: dbsync.dbsync_init(mc) @@ -139,7 +202,6 @@ def main(): handle_error(e) while True: - print(datetime.datetime.now()) try: @@ -150,7 +212,7 @@ def main(): dbsync.dbsync_push(mc) # check mergin client token expiration - delta = mc._auth_session['expire'] - datetime.datetime.now(datetime.timezone.utc) + delta = mc._auth_session["expire"] - datetime.datetime.now(datetime.timezone.utc) if delta.total_seconds() < 3600: mc = dbsync.create_mergin_client() @@ -161,5 +223,5 @@ def main(): time.sleep(sleep_time) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/listen_test.py b/listen_test.py index ddfdcc7..15707dc 100644 --- a/listen_test.py +++ b/listen_test.py @@ -2,7 +2,7 @@ import psycopg2 import psycopg2.extensions -DSN="" +DSN = "" conn = psycopg2.connect(DSN) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) @@ -15,11 +15,20 @@ import dbsync -sleep_time = dbsync.config['daemon']['sleep_time'] +sleep_time = dbsync.config["daemon"]["sleep_time"] while True: - if select.select([conn],[],[],5) == ([],[],[]): + if select.select( + [conn], + [], + [], + 5, + ) == ( + [], + [], + [], + ): print("Timeout") print("Trying to pull") @@ -29,7 +38,12 @@ conn.poll() while conn.notifies: notify = conn.notifies.pop(0) - print("Got NOTIFY:", notify.pid, notify.channel, notify.payload) + print( + "Got NOTIFY:", + notify.pid, + notify.channel, + notify.payload, + ) # new stuff in the database - let's push a new version @@ -43,7 +57,6 @@ dbsync.dbsync_push() - # TODO: create on init # CREATE RULE geodiff_rule_update_simple AS ON UPDATE TO gd_sync_base.simple DO ALSO NOTIFY geodiff; # CREATE RULE geodiff_rule_insert_simple AS ON INSERT TO gd_sync_base.simple DO ALSO NOTIFY geodiff; diff --git a/test/conftest.py b/test/conftest.py index d531e16..d13863a 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -5,45 +5,69 @@ import psycopg2 import psycopg2.extensions -from psycopg2 import sql - -from mergin import MerginClient, ClientError - -from dbsync import dbsync_init -from config import config - -GEODIFF_EXE = os.environ.get('TEST_GEODIFF_EXE') -DB_CONNINFO = os.environ.get('TEST_DB_CONNINFO') -SERVER_URL = os.environ.get('TEST_MERGIN_URL') -API_USER = os.environ.get('TEST_API_USERNAME') -USER_PWD = os.environ.get('TEST_API_PASSWORD') -WORKSPACE = os.environ.get('TEST_API_WORKSPACE') +from psycopg2 import ( + sql, +) + +from mergin import ( + MerginClient, + ClientError, +) + +from dbsync import ( + dbsync_init, +) +from config import ( + config, +) + +GEODIFF_EXE = os.environ.get("TEST_GEODIFF_EXE") +DB_CONNINFO = os.environ.get("TEST_DB_CONNINFO") +SERVER_URL = os.environ.get("TEST_MERGIN_URL") +API_USER = os.environ.get("TEST_API_USERNAME") +USER_PWD = os.environ.get("TEST_API_PASSWORD") +WORKSPACE = os.environ.get("TEST_API_WORKSPACE") TMP_DIR = tempfile.gettempdir() -TEST_DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_data') - - -def _reset_config(project_name: str = "mergin"): - """ helper to reset config settings to ensure valid config """ - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' +TEST_DATA_DIR = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "test_data", +) + + +def _reset_config( + project_name: str = "mergin", +): + """helper to reset config settings to ensure valid config""" + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" full_project_name = WORKSPACE + "/" + project_name - config.update({ - 'MERGIN__USERNAME': API_USER, - 'MERGIN__PASSWORD': USER_PWD, - 'MERGIN__URL': SERVER_URL, - 'init_from': "gpkg", - 'CONNECTIONS': [{"driver": "postgres", - "conn_info": DB_CONNINFO, - "modified": db_schema_main, - "base": db_schema_base, - "mergin_project": full_project_name, - "sync_file": "test_sync.gpkg"}] - }) - - -def cleanup(mc, project, dirs): - """ cleanup leftovers from previous test if needed such as remote project and local directories """ + config.update( + { + "MERGIN__USERNAME": API_USER, + "MERGIN__PASSWORD": USER_PWD, + "MERGIN__URL": SERVER_URL, + "init_from": "gpkg", + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": DB_CONNINFO, + "modified": db_schema_main, + "base": db_schema_base, + "mergin_project": full_project_name, + "sync_file": "test_sync.gpkg", + } + ], + } + ) + + +def cleanup( + mc, + project, + dirs, +): + """cleanup leftovers from previous test if needed such as remote project and local directories""" try: print("Deleting project on Mergin Maps server: " + project) mc.delete_project(project) @@ -55,8 +79,12 @@ def cleanup(mc, project, dirs): shutil.rmtree(d) -def cleanup_db(conn, schema_base, schema_main): - """ Removes test schemas from previous tests """ +def cleanup_db( + conn, + schema_base, + schema_main, +): + """Removes test schemas from previous tests""" cur = conn.cursor() cur.execute(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(sql.Identifier(schema_base))) cur.execute(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(sql.Identifier(schema_main))) @@ -71,64 +99,109 @@ def init_sync_from_geopackage(mc, project_name, source_gpkg_path, ignored_tables - configure DB sync and let it do the init (make copies to the database) """ full_project_name = WORKSPACE + "/" + project_name - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory - sync_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync') # used by dbsync - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory + sync_project_dir = os.path.join( + TMP_DIR, + project_name + "_dbsync", + ) # used by dbsync + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" conn = psycopg2.connect(DB_CONNINFO) - cleanup(mc, full_project_name, [project_dir, sync_project_dir]) - cleanup_db(conn, db_schema_base, db_schema_main) + cleanup( + mc, + full_project_name, + [ + project_dir, + sync_project_dir, + ], + ) + cleanup_db( + conn, + db_schema_base, + db_schema_main, + ) # prepare a new Mergin Maps project - mc.create_project(project_name, namespace=WORKSPACE) - mc.download_project(full_project_name, project_dir) - shutil.copy(source_gpkg_path, os.path.join(project_dir, 'test_sync.gpkg')) + mc.create_project( + project_name, + namespace=WORKSPACE, + ) + mc.download_project( + full_project_name, + project_dir, + ) + shutil.copy( + source_gpkg_path, + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) for extra_filepath in extra_init_files: extra_filename = os.path.basename(extra_filepath) - target_extra_filepath = os.path.join(project_dir, extra_filename) - shutil.copy(extra_filepath, target_extra_filepath) + target_extra_filepath = os.path.join( + project_dir, + extra_filename, + ) + shutil.copy( + extra_filepath, + target_extra_filepath, + ) mc.push_project(project_dir) # prepare dbsync config # patch config to fit testing purposes if ignored_tables: - connection = {"driver": "postgres", - "conn_info": DB_CONNINFO, - "modified": db_schema_main, - "base": db_schema_base, - "mergin_project": full_project_name, - "sync_file": "test_sync.gpkg", - "skip_tables": ignored_tables} + connection = { + "driver": "postgres", + "conn_info": DB_CONNINFO, + "modified": db_schema_main, + "base": db_schema_base, + "mergin_project": full_project_name, + "sync_file": "test_sync.gpkg", + "skip_tables": ignored_tables, + } else: - connection = {"driver": "postgres", - "conn_info": DB_CONNINFO, - "modified": db_schema_main, - "base": db_schema_base, - "mergin_project": full_project_name, - "sync_file": "test_sync.gpkg"} - - config.update({ - 'GEODIFF_EXE': GEODIFF_EXE, - 'WORKING_DIR': sync_project_dir, - 'MERGIN__USERNAME': API_USER, - 'MERGIN__PASSWORD': USER_PWD, - 'MERGIN__URL': SERVER_URL, - 'CONNECTIONS': [connection], - 'init_from': "gpkg" - }) + connection = { + "driver": "postgres", + "conn_info": DB_CONNINFO, + "modified": db_schema_main, + "base": db_schema_base, + "mergin_project": full_project_name, + "sync_file": "test_sync.gpkg", + } + + config.update( + { + "GEODIFF_EXE": GEODIFF_EXE, + "WORKING_DIR": sync_project_dir, + "MERGIN__USERNAME": API_USER, + "MERGIN__PASSWORD": USER_PWD, + "MERGIN__URL": SERVER_URL, + "CONNECTIONS": [connection], + "init_from": "gpkg", + } + ) dbsync_init(mc) -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def mc(): assert SERVER_URL and API_USER and USER_PWD - #assert SERVER_URL and SERVER_URL.rstrip('/') != 'https://app.merginmaps.com/' and API_USER and USER_PWD - return MerginClient(SERVER_URL, login=API_USER, password=USER_PWD) + # assert SERVER_URL and SERVER_URL.rstrip('/') != 'https://app.merginmaps.com/' and API_USER and USER_PWD + return MerginClient( + SERVER_URL, + login=API_USER, + password=USER_PWD, + ) -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def db_connection() -> psycopg2.extensions.connection: return psycopg2.connect(DB_CONNINFO) diff --git a/test/test_basic.py b/test/test_basic.py index 11cc874..0af19a8 100644 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -8,27 +8,65 @@ import pathlib import psycopg2 -from psycopg2 import sql - -from mergin import MerginClient - -from dbsync import dbsync_init, dbsync_pull, dbsync_push, dbsync_status, config, DbSyncError, _geodiff_make_copy, \ - _get_db_project_comment, _get_mergin_project, _get_project_id, _validate_local_project_id, config, _add_quotes_to_schema_name, \ - dbsync_clean, _check_schema_exists - -from .conftest import (WORKSPACE, TMP_DIR, DB_CONNINFO, GEODIFF_EXE, API_USER, USER_PWD, SERVER_URL, - TEST_DATA_DIR, init_sync_from_geopackage) - - - -def test_init_from_gpkg(mc: MerginClient): +from psycopg2 import ( + sql, +) + +from mergin import ( + MerginClient, +) + +from dbsync import ( + dbsync_init, + dbsync_pull, + dbsync_push, + dbsync_status, + config, + DbSyncError, + _geodiff_make_copy, + _get_db_project_comment, + _get_mergin_project, + _get_project_id, + _validate_local_project_id, + config, + _add_quotes_to_schema_name, + dbsync_clean, + _check_schema_exists, +) + +from .conftest import ( + WORKSPACE, + TMP_DIR, + DB_CONNINFO, + GEODIFF_EXE, + API_USER, + USER_PWD, + SERVER_URL, + TEST_DATA_DIR, + init_sync_from_geopackage, +) + + +def test_init_from_gpkg( + mc: MerginClient, +): project_name = "test_init" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # test that database schemas are created + tables are populated conn = psycopg2.connect(DB_CONNINFO) @@ -39,47 +77,86 @@ def test_init_from_gpkg(mc: MerginClient): dbsync_init(mc) cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 3 - db_proj_info = _get_db_project_comment(conn, db_schema_base) + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) assert db_proj_info["name"] == config.connections[0].mergin_project - assert db_proj_info["version"] == 'v1' + assert db_proj_info["version"] == "v1" # make change in GPKG and push to server to create pending changes, it should pass but not sync - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # remove local copy of project (to mimic loss at docker restart) shutil.rmtree(config.working_dir) dbsync_init(mc) cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 3 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v1' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v1" # let's remove local working dir and download different version from server to mimic versions mismatch shutil.rmtree(config.working_dir) - mc.download_project(config.connections[0].mergin_project, config.working_dir, 'v2') + mc.download_project( + config.connections[0].mergin_project, + config.working_dir, + "v2", + ) # run init again, it should handle local working dir properly (e.g. download correct version) and pass but not sync dbsync_init(mc) - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v1' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v1" # pull server changes to db to make sure we can sync again dbsync_pull(mc) cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 4 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v2" # update some feature from 'modified' db to create mismatch with src geopackage, it should pass but not sync fid = 1 - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) old_value = cur.fetchone()[3] - cur.execute(sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) conn.commit() - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) assert cur.fetchone()[3] == 100 dbsync_init(mc) # check geopackage has not been modified - after init we are not synced! - gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_conn = sqlite3.connect( + os.path.join( + project_dir, + "test_sync.gpkg", + ) + ) gpkg_cur = gpkg_conn.cursor() gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}") assert gpkg_cur.fetchone()[3] == old_value @@ -88,55 +165,109 @@ def test_init_from_gpkg(mc: MerginClient): mc.pull_project(project_dir) gpkg_cur.execute(f"SELECT * FROM simple WHERE fid ={fid}") assert gpkg_cur.fetchone()[3] == 100 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v3' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v3" # update some feature from 'base' db to create mismatch with src geopackage and modified cur.execute(sql.SQL("SELECT * from {}.simple").format(sql.Identifier(db_schema_base))) fid = cur.fetchone()[0] old_value = cur.fetchone()[3] - cur.execute(sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_base)), (fid,)) + cur.execute( + sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_base)), + (fid,), + ) conn.commit() - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_base)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_base)), + (fid,), + ) assert cur.fetchone()[3] == 100 with pytest.raises(DbSyncError) as err: dbsync_init(mc) assert "The db schemas already exist but 'base' schema is not synchronized with source GPKG" in str(err.value) # make local changes to src file to introduce local changes - shutil.copy(os.path.join(TEST_DATA_DIR, 'base.gpkg'), os.path.join(config.working_dir, project_name, config.connections[0].sync_file)) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ), + os.path.join( + config.working_dir, + project_name, + config.connections[0].sync_file, + ), + ) with pytest.raises(DbSyncError) as err: dbsync_init(mc) assert "There are pending changes in the local directory - that should never happen" in str(err.value) -def test_init_from_gpkg_with_incomplete_dir(mc: MerginClient): +def test_init_from_gpkg_with_incomplete_dir( + mc: MerginClient, +): project_name = "test_init_incomplete_dir" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - init_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync', project_name) - init_sync_from_geopackage(mc, project_name, source_gpkg_path) - assert set(os.listdir(init_project_dir)) == set(['test_sync.gpkg', '.mergin']) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + init_project_dir = os.path.join( + TMP_DIR, + project_name + "_dbsync", + project_name, + ) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) + assert set(os.listdir(init_project_dir)) == set( + [ + "test_sync.gpkg", + ".mergin", + ] + ) shutil.rmtree(init_project_dir) # Remove dir with content os.makedirs(init_project_dir) # Recreate empty project working dir assert os.listdir(init_project_dir) == [] dbsync_init(mc) - assert set(os.listdir(init_project_dir)) == set(['test_sync.gpkg', '.mergin']) + assert set(os.listdir(init_project_dir)) == set( + [ + "test_sync.gpkg", + ".mergin", + ] + ) -def test_basic_pull(mc: MerginClient): +def test_basic_pull( + mc: MerginClient, +): """ Test initialization and one pull from Mergin Maps to DB 1. create a Mergin Maps project using py-client with a testing gpkg 2. run init, check that everything is fine 3. make change in gpkg (copy new version), check everything is fine """ - project_name = 'test_sync_pull' + project_name = "test_sync_pull" db_schema_main = project_name + "_main" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory - - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory + + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) @@ -146,7 +277,16 @@ def test_basic_pull(mc: MerginClient): assert cur.fetchone()[0] == 3 # make change in GPKG and push - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # pull the change from Mergin Maps to DB @@ -156,22 +296,37 @@ def test_basic_pull(mc: MerginClient): cur = conn.cursor() cur.execute(sql.SQL("SELECT count(*) from {}.simple").format((sql.Identifier(db_schema_main)))) assert cur.fetchone()[0] == 4 - db_proj_info = _get_db_project_comment(conn, project_name + "_base") - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + project_name + "_base", + ) + assert db_proj_info["version"] == "v2" print("---") dbsync_status(mc) -def test_basic_push(mc: MerginClient): - """ Initialize a project and test push of a new row from PostgreSQL to Mergin Maps""" +def test_basic_push( + mc: MerginClient, +): + """Initialize a project and test push of a new row from PostgreSQL to Mergin Maps""" project_name = "test_sync_push" db_schema_main = project_name + "_main" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory - - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory + + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) @@ -182,21 +337,33 @@ def test_basic_push(mc: MerginClient): # make a change in PostgreSQL cur = conn.cursor() - cur.execute(sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format( + sql.Identifier(db_schema_main) + ) + ) cur.execute("COMMIT") cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 4 # push the change from DB to PostgreSQL dbsync_push(mc) - db_proj_info = _get_db_project_comment(conn, project_name + "_base") - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + project_name + "_base", + ) + assert db_proj_info["version"] == "v2" # pull new version of the project to the work project directory mc.pull_project(project_dir) # check that the insert has been applied to our GeoPackage - gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_conn = sqlite3.connect( + os.path.join( + project_dir, + "test_sync.gpkg", + ) + ) gpkg_cur = gpkg_conn.cursor() gpkg_cur.execute("SELECT count(*) FROM simple") assert gpkg_cur.fetchone()[0] == 4 @@ -205,8 +372,10 @@ def test_basic_push(mc: MerginClient): dbsync_status(mc) -def test_basic_both(mc: MerginClient): - """ Initializes a sync project and does both a change in Mergin Maps and in the database, +def test_basic_both( + mc: MerginClient, +): + """Initializes a sync project and does both a change in Mergin Maps and in the database, and lets DB sync handle it: changes in PostgreSQL need to be rebased on top of changes in Mergin Maps server. """ @@ -214,10 +383,20 @@ def test_basic_both(mc: MerginClient): db_schema_main = project_name + "_main" db_schema_base = project_name + "_base" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory - - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory + + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) @@ -227,29 +406,53 @@ def test_basic_both(mc: MerginClient): assert cur.fetchone()[0] == 3 # make change in GPKG and push - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # make a change in PostgreSQL cur = conn.cursor() - cur.execute(sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format( + sql.Identifier(db_schema_main) + ) + ) cur.execute("COMMIT") cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 4 # first pull changes from Mergin Maps to DB (+rebase changes in DB) and then push the changes from DB to Mergin Maps dbsync_pull(mc) - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v2" dbsync_push(mc) - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v3' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v3" # pull new version of the project to the work project directory mc.pull_project(project_dir) # check that the insert has been applied to our GeoPackage - gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_conn = sqlite3.connect( + os.path.join( + project_dir, + "test_sync.gpkg", + ) + ) gpkg_cur = gpkg_conn.cursor() gpkg_cur.execute("SELECT count(*) FROM simple") assert gpkg_cur.fetchone()[0] == 5 @@ -263,63 +466,167 @@ def test_basic_both(mc: MerginClient): dbsync_status(mc) -def test_init_with_skip(mc: MerginClient): +def test_init_with_skip( + mc: MerginClient, +): project_name = "test_init_skip" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base_2tables.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base_2tables.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" - init_sync_from_geopackage(mc, project_name, source_gpkg_path, ["lines"]) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ["lines"], + ) # test that database schemas does not have ignored table conn = psycopg2.connect(DB_CONNINFO) cur = conn.cursor() - cur.execute(sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format( + sql.Identifier(db_schema_main) + ) + ) assert cur.fetchone()[0] == False cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 0 # run again, nothing should change dbsync_init(mc) - cur.execute(sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format( + sql.Identifier(db_schema_main) + ) + ) assert cur.fetchone()[0] == False cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 0 # make change in GPKG and push to server to create pending changes, it should pass but not sync - shutil.copy(os.path.join(TEST_DATA_DIR, 'modified_all.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "modified_all.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # pull server changes to db to make sure only points table is updated dbsync_pull(mc) - cur.execute(sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format(sql.Identifier(db_schema_main))) + cur.execute( + sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format( + sql.Identifier(db_schema_main) + ) + ) assert cur.fetchone()[0] == False cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 4 -def test_with_local_changes(mc: MerginClient): +def test_with_local_changes( + mc: MerginClient, +): project_name = "test_local_changes" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - extra_files = [os.path.join(TEST_DATA_DIR, f) for f in ["note_1.txt", "note_3.txt", "modified_all.gpkg"]] - dbsync_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync', - project_name) # project location within dbsync working dir - - init_sync_from_geopackage(mc, project_name, source_gpkg_path, [], *extra_files) + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + extra_files = [ + os.path.join( + TEST_DATA_DIR, + f, + ) + for f in [ + "note_1.txt", + "note_3.txt", + "modified_all.gpkg", + ] + ] + dbsync_project_dir = os.path.join( + TMP_DIR, + project_name + "_dbsync", + project_name, + ) # project location within dbsync working dir + + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + [], + *extra_files, + ) # update GPKG - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(dbsync_project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + dbsync_project_dir, + "test_sync.gpkg", + ), + ) # update non-GPGK file - shutil.copy(os.path.join(TEST_DATA_DIR, 'note_2.txt'), os.path.join(dbsync_project_dir, 'note_1.txt')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "note_2.txt", + ), + os.path.join( + dbsync_project_dir, + "note_1.txt", + ), + ) # add GPKG file - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(dbsync_project_dir, 'inserted_1_A.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + dbsync_project_dir, + "inserted_1_A.gpkg", + ), + ) # add non-GPGK file - shutil.copy(os.path.join(TEST_DATA_DIR, 'note_2.txt'), os.path.join(dbsync_project_dir, 'note_2.txt')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "note_2.txt", + ), + os.path.join( + dbsync_project_dir, + "note_2.txt", + ), + ) # remove GPKG file - os.remove(os.path.join(dbsync_project_dir, 'modified_all.gpkg')) + os.remove( + os.path.join( + dbsync_project_dir, + "modified_all.gpkg", + ) + ) # remove non-GPKG file - os.remove(os.path.join(dbsync_project_dir, 'note_3.txt')) + os.remove( + os.path.join( + dbsync_project_dir, + "note_3.txt", + ) + ) # Check local changes in the sync project dir mp = _get_mergin_project(dbsync_project_dir) local_changes = mp.get_push_changes() @@ -331,16 +638,31 @@ def test_with_local_changes(mc: MerginClient): dbsync_status(mc) -def test_recreated_project_ids(mc: MerginClient): +def test_recreated_project_ids( + mc: MerginClient, +): project_name = "test_recreated_project_ids" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) # working directory full_project_name = WORKSPACE + "/" + project_name - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # delete remote project mc.delete_project(full_project_name) # recreate project with the same name - mc.create_project(project_name, namespace=WORKSPACE) + mc.create_project( + project_name, + namespace=WORKSPACE, + ) # comparing project IDs after recreating it with the same name mp = _get_mergin_project(project_dir) local_project_id = _get_project_id(mp) @@ -353,14 +675,35 @@ def test_recreated_project_ids(mc: MerginClient): dbsync_status(mc) -@pytest.mark.parametrize("project_name", ['test_init_1', 'Test_Init_2', "Test 3", "Test-4"]) -def test_project_names(mc: MerginClient, project_name: str): - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') - project_dir = os.path.join(TMP_DIR, project_name + '_work') - db_schema_main = project_name + '_main' - db_schema_base = project_name + '_base' +@pytest.mark.parametrize( + "project_name", + [ + "test_init_1", + "Test_Init_2", + "Test 3", + "Test-4", + ], +) +def test_project_names( + mc: MerginClient, + project_name: str, +): + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # test that database schemas are created + tables are populated conn = psycopg2.connect(DB_CONNINFO) @@ -369,27 +712,53 @@ def test_project_names(mc: MerginClient, project_name: str): assert cur.fetchone()[0] == 3 # make change in GPKG and push - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + shutil.copy( + os.path.join( + TEST_DATA_DIR, + "inserted_1_A.gpkg", + ), + os.path.join( + project_dir, + "test_sync.gpkg", + ), + ) mc.push_project(project_dir) # pull server changes to db to make sure we can sync again dbsync_pull(mc) cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 4 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v2' + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v2" # update some feature from 'modified' db to create mismatch with src geopackage, it should pass but not sync fid = 1 - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) old_value = cur.fetchone()[3] - cur.execute(sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) conn.commit() - cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + cur.execute( + sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), + (fid,), + ) assert cur.fetchone()[3] == 100 dbsync_init(mc) # check geopackage has not been modified - after init we are not synced! - gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_conn = sqlite3.connect( + os.path.join( + project_dir, + "test_sync.gpkg", + ) + ) gpkg_cur = gpkg_conn.cursor() gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}") assert gpkg_cur.fetchone()[3] == old_value @@ -398,17 +767,29 @@ def test_project_names(mc: MerginClient, project_name: str): mc.pull_project(project_dir) gpkg_cur.execute(f"SELECT * FROM simple WHERE fid ={fid}") assert gpkg_cur.fetchone()[3] == 100 - db_proj_info = _get_db_project_comment(conn, db_schema_base) - assert db_proj_info["version"] == 'v3' - - -def test_init_from_gpkg_missing_schema(mc: MerginClient): - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') + db_proj_info = _get_db_project_comment( + conn, + db_schema_base, + ) + assert db_proj_info["version"] == "v3" + + +def test_init_from_gpkg_missing_schema( + mc: MerginClient, +): + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) project_name = "test_init_missing_schema" db_schema_base = project_name + "_base" db_schema_main = project_name + "_main" - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) cur = conn.cursor() @@ -426,7 +807,11 @@ def test_init_from_gpkg_missing_schema(mc: MerginClient): assert "The 'modified' schema exists but the base schema is missing" in str(err.value) assert "This may be a result of a previously failed attempt to initialize DB sync" in str(err.value) - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # drop main schema to mimic some mismatch cur.execute(sql.SQL("DROP SCHEMA {} CASCADE").format(sql.Identifier(db_schema_main))) @@ -442,12 +827,21 @@ def test_init_from_gpkg_missing_schema(mc: MerginClient): assert "This may be a result of a previously failed attempt to initialize DB sync" in str(err.value) -def test_init_from_gpkg_missing_comment(mc: MerginClient): +def test_init_from_gpkg_missing_comment( + mc: MerginClient, +): project_name = "test_init_missing_comment" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) schema_name = project_name + "_base" - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) conn = psycopg2.connect(DB_CONNINFO) cur = conn.cursor() @@ -461,7 +855,10 @@ def test_init_from_gpkg_missing_comment(mc: MerginClient): # drop base schema to mimic some mismatch query = sql.SQL("COMMENT ON SCHEMA {} IS %s").format(sql.Identifier(schema_name)) - cur.execute(query.as_string(conn), ("",)) + cur.execute( + query.as_string(conn), + ("",), + ) conn.commit() with pytest.raises(DbSyncError) as err: @@ -473,13 +870,21 @@ def test_init_from_gpkg_missing_comment(mc: MerginClient): cur.fetchone() is None -def test_dbsync_clean_from_gpkg(mc: MerginClient): +def test_dbsync_clean_from_gpkg( + mc: MerginClient, +): project_name = "test_clean" - source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base.gpkg", + ) db_schema_base = project_name + "_base" db_schema_main = project_name + "_main" full_project_name = WORKSPACE + "/" + project_name - sync_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync') + sync_project_dir = os.path.join( + TMP_DIR, + project_name + "_dbsync", + ) connection = { "driver": "postgres", @@ -487,38 +892,69 @@ def test_dbsync_clean_from_gpkg(mc: MerginClient): "modified": db_schema_main, "base": db_schema_base, "mergin_project": full_project_name, - "sync_file": "test_sync.gpkg"} - - config.update({ - 'GEODIFF_EXE': GEODIFF_EXE, - 'WORKING_DIR': sync_project_dir, - 'MERGIN__USERNAME': API_USER, - 'MERGIN__PASSWORD': USER_PWD, - 'MERGIN__URL': SERVER_URL, - 'CONNECTIONS': [connection], - 'init_from': "gpkg" - }) + "sync_file": "test_sync.gpkg", + } + + config.update( + { + "GEODIFF_EXE": GEODIFF_EXE, + "WORKING_DIR": sync_project_dir, + "MERGIN__USERNAME": API_USER, + "MERGIN__PASSWORD": USER_PWD, + "MERGIN__URL": SERVER_URL, + "CONNECTIONS": [connection], + "init_from": "gpkg", + } + ) conn = psycopg2.connect(DB_CONNINFO) # we can run clean even before init dbsync_clean(mc) - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # edit sync GPKG and push to server - con = sqlite3.connect(os.path.join(sync_project_dir, project_name, 'test_sync.gpkg')) + con = sqlite3.connect( + os.path.join( + sync_project_dir, + project_name, + "test_sync.gpkg", + ) + ) cur = con.cursor() - cur.execute("ALTER TABLE simple ADD COLUMN \"new_field\" TEXT;") + cur.execute('ALTER TABLE simple ADD COLUMN "new_field" TEXT;') cur.execute("CREATE TABLE new_table (id INTEGER PRIMARY KEY, number INTEGER DEFAULT 0);") cur.execute("INSERT INTO new_table (number) VALUES (99);") con.commit() con.close() - mc.push_project(os.path.join(sync_project_dir, project_name)) + mc.push_project( + os.path.join( + sync_project_dir, + project_name, + ) + ) # replace it locally back with previous version - so there is mismatch, on server there is a column, that does not exist locally - os.remove(os.path.join(sync_project_dir, project_name, 'test_sync.gpkg')) - shutil.copy(source_gpkg_path, os.path.join(sync_project_dir, project_name, 'test_sync.gpkg')) + os.remove( + os.path.join( + sync_project_dir, + project_name, + "test_sync.gpkg", + ) + ) + shutil.copy( + source_gpkg_path, + os.path.join( + sync_project_dir, + project_name, + "test_sync.gpkg", + ), + ) # try init then pull and push, causing geodiff failed error with pytest.raises(DbSyncError) as err: @@ -528,22 +964,44 @@ def test_dbsync_clean_from_gpkg(mc: MerginClient): assert "geodiff failed" in str(err.value) # prior to dbsync_clean everything exists - assert _check_schema_exists(conn, db_schema_base) - assert _check_schema_exists(conn, db_schema_main) + assert _check_schema_exists( + conn, + db_schema_base, + ) + assert _check_schema_exists( + conn, + db_schema_main, + ) assert pathlib.Path(config.working_dir).exists() dbsync_clean(mc) # after the dbsync_clean nothing exists assert pathlib.Path(config.working_dir).exists() is False - assert _check_schema_exists(conn, db_schema_base) is False - assert _check_schema_exists(conn, db_schema_main) is False + assert ( + _check_schema_exists( + conn, + db_schema_base, + ) + is False + ) + assert ( + _check_schema_exists( + conn, + db_schema_main, + ) + is False + ) # make sure that running the clean second time does not cause issue dbsync_clean(mc) # after clean we can init - init_sync_from_geopackage(mc, project_name, source_gpkg_path) + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + ) # test that after clean everything works dbsync_init(mc) diff --git a/test/test_config.py b/test/test_config.py index 732326a..c8badfc 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -7,9 +7,16 @@ """ import pytest -from config import config, ConfigError, validate_config, get_ignored_tables +from config import ( + config, + ConfigError, + validate_config, + get_ignored_tables, +) -from .conftest import _reset_config +from .conftest import ( + _reset_config, +) def test_config(): @@ -17,73 +24,235 @@ def test_config(): _reset_config() validate_config(config) - with pytest.raises(ConfigError, match="Config error: Incorrect mergin settings"): - config.update({'MERGIN__USERNAME': None}) + with pytest.raises( + ConfigError, + match="Config error: Incorrect mergin settings", + ): + config.update({"MERGIN__USERNAME": None}) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Missing parameter `init_from` in the configuration"): - config.unset('init_from', force=True) + with pytest.raises( + ConfigError, + match="Config error: Missing parameter `init_from` in the configuration", + ): + config.unset( + "init_from", + force=True, + ) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: `init_from` parameter must be either `gpkg` or `db`"): - config.update({'init_from': "anywhere"}) + with pytest.raises( + ConfigError, + match="Config error: `init_from` parameter must be either `gpkg` or `db`", + ): + config.update({"init_from": "anywhere"}) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Connections list can not be empty"): - config.update({'CONNECTIONS': []}) + with pytest.raises( + ConfigError, + match="Config error: Connections list can not be empty", + ): + config.update({"CONNECTIONS": []}) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Incorrect connection settings"): - config.update({'CONNECTIONS': [{"modified": "mergin_main"}]}) + with pytest.raises( + ConfigError, + match="Config error: Incorrect connection settings", + ): + config.update({"CONNECTIONS": [{"modified": "mergin_main"}]}) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Only 'postgres' driver is currently supported."): - config.update({'CONNECTIONS': [{"driver": "oracle", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg"}]}) + with pytest.raises( + ConfigError, + match="Config error: Only 'postgres' driver is currently supported.", + ): + config.update( + { + "CONNECTIONS": [ + { + "driver": "oracle", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + } + ] + } + ) validate_config(config) _reset_config() - with pytest.raises(ConfigError, match="Config error: Name of the Mergin Maps project should be provided in the namespace/name format."): - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "dbsync", "sync_file": "sync.gpkg"}]}) + with pytest.raises( + ConfigError, + match="Config error: Name of the Mergin Maps project should be provided in the namespace/name format.", + ): + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "dbsync", + "sync_file": "sync.gpkg", + } + ] + } + ) validate_config(config) def test_skip_tables(): _reset_config() - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": None}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": None, + } + ] + } + ) validate_config(config) - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": []}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": [], + } + ] + } + ) validate_config(config) - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": "table"}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": "table", + } + ] + } + ) validate_config(config) - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": ["table"]}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": ["table"], + } + ] + } + ) def test_get_ignored_tables(): _reset_config() - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": None}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": None, + } + ] + } + ) ignored_tables = get_ignored_tables(config.connections[0]) assert ignored_tables == [] - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": []}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": [], + } + ] + } + ) ignored_tables = get_ignored_tables(config.connections[0]) assert ignored_tables == [] - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": "table"}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": "table", + } + ] + } + ) validate_config(config) ignored_tables = get_ignored_tables(config.connections[0]) assert ignored_tables == ["table"] - config.update({'CONNECTIONS': [{"driver": "postgres", "conn_info": "", "modified": "mergin_main", "base": "mergin_base", "mergin_project": "john/dbsync", "sync_file": "sync.gpkg", "skip_tables": ["table"]}]}) + config.update( + { + "CONNECTIONS": [ + { + "driver": "postgres", + "conn_info": "", + "modified": "mergin_main", + "base": "mergin_base", + "mergin_project": "john/dbsync", + "sync_file": "sync.gpkg", + "skip_tables": ["table"], + } + ] + } + ) validate_config(config) ignored_tables = get_ignored_tables(config.connections[0]) assert ignored_tables == ["table"] diff --git a/test/test_db_functions.py b/test/test_db_functions.py index 1c15659..feff1dd 100644 --- a/test/test_db_functions.py +++ b/test/test_db_functions.py @@ -1,10 +1,15 @@ import psycopg2 import psycopg2.extensions -from dbsync import _check_postgis_available, _try_install_postgis +from dbsync import ( + _check_postgis_available, + _try_install_postgis, +) -def test_check_postgis_available(db_connection: psycopg2.extensions.connection): +def test_check_postgis_available( + db_connection: psycopg2.extensions.connection, +): cur = db_connection.cursor() assert _check_postgis_available(db_connection) @@ -14,7 +19,9 @@ def test_check_postgis_available(db_connection: psycopg2.extensions.connection): assert _check_postgis_available(db_connection) is False -def test_try_install_postgis(db_connection: psycopg2.extensions.connection): +def test_try_install_postgis( + db_connection: psycopg2.extensions.connection, +): cur = db_connection.cursor() cur.execute("DROP EXTENSION IF EXISTS postgis CASCADE;") diff --git a/test/test_dbsyncerror.py b/test/test_dbsyncerror.py index e759c6c..42b37cd 100644 --- a/test/test_dbsyncerror.py +++ b/test/test_dbsyncerror.py @@ -1,14 +1,22 @@ import pytest -from dbsync import DbSyncError +from dbsync import ( + DbSyncError, +) -@pytest.mark.parametrize("password", ['password=\"my_secret password 8417\\.\"', - 'password=\'my_secret password\'', - "password=my_secret_password84189./+-" - ]) -def test_DbSyncError_password_print(password: str): - host = "host=\"localhost\"" +@pytest.mark.parametrize( + "password", + [ + 'password="my_secret password 8417\\."', + "password='my_secret password'", + "password=my_secret_password84189./+-", + ], +) +def test_DbSyncError_password_print( + password: str, +): + host = 'host="localhost"' user = "user=user" conn_string = f"{user} {password} {host}" diff --git a/version.py b/version.py index 7b344ec..72f26f5 100644 --- a/version.py +++ b/version.py @@ -1 +1 @@ -__version__ = '1.1.2' +__version__ = "1.1.2"