diff --git a/dbsync.py b/dbsync.py index 5b5c19a..bd673ff 100644 --- a/dbsync.py +++ b/dbsync.py @@ -21,7 +21,7 @@ from mergin import MerginClient, MerginProject, LoginError, ClientError from version import __version__ - +from psycopg2 import sql # set high logging level for geodiff (used by geodiffinfo executable) # so we get as much information as possible @@ -225,6 +225,30 @@ def _get_project_version(): return mp.metadata["version"] +def _set_db_project_comment(conn, schema, project_name, version): + """ Set postgres COMMENT on SCHEMA with mergin project name and version """ + comment = { + "name": project_name, + "version": version + } + cur = conn.cursor() + query = sql.SQL("COMMENT ON SCHEMA {} IS %s").format(sql.Identifier(schema)) + cur.execute(query.as_string(conn), (json.dumps(comment), )) + conn.commit() + + +def _get_db_project_comment(conn, schema): + """ Get mergin project name and its current version in db schema""" + cur = conn.cursor() + cur.execute("SELECT obj_description(%s::regnamespace, 'pg_namespace')", (schema, )) + res = cur.fetchone()[0] + try: + comment = json.loads(res) if res else None + except (TypeError, json.decoder.JSONDecodeError): + return + return comment + + def create_mergin_client(): """ Create instance of MerginClient""" _check_has_password() @@ -312,7 +336,8 @@ def dbsync_pull(mc): _geodiff_apply_changeset(config.db_driver, config.db_conn_info, config.db_schema_base, tmp_base2their) os.remove(gpkg_basefile_old) - + conn = psycopg2.connect(config.db_conn_info) + _set_db_project_comment(conn, config.db_schema_base, config.mergin_project_name, server_version) print("Pull done!") @@ -439,11 +464,13 @@ def dbsync_push(mc): # TODO: should we do some cleanup here? (undo changes in the local geopackage?) raise DbSyncError("Mergin client error on push: " + str(e)) - print("Pushed new version to Mergin: " + _get_project_version()) + version = _get_project_version() + print("Pushed new version to Mergin: " + version) # update base schema in the DB print("Updating DB base schema...") _geodiff_apply_changeset(config.db_driver, config.db_conn_info, config.db_schema_base, tmp_changeset_file) + _set_db_project_comment(conn, config.db_schema_base, config.mergin_project_name, version) print("Push done!") @@ -453,20 +480,36 @@ def dbsync_init(mc, from_gpkg=True): # let's start with various environment checks to make sure # the environment is set up correctly before doing any work - gpkg_full_path = os.path.join(config.project_working_dir, config.mergin_sync_file) - if not os.path.exists(config.project_working_dir): - # download the Mergin project - print("Download Mergin project " + config.mergin_project_name + " to " + config.project_working_dir) - mc.download_project(config.mergin_project_name, config.project_working_dir) - # make sure we have working directory now - _check_has_working_dir() - print("Connecting to the database...") try: conn = psycopg2.connect(config.db_conn_info) except psycopg2.Error as e: raise DbSyncError("Unable to connect to the database: " + str(e)) + base_schema_exists = _check_schema_exists(conn, config.db_schema_base) + modified_schema_exists = _check_schema_exists(conn, config.db_schema_modified) + + try: + server_info = mc.project_info(config.mergin_project_name) + except ClientError as e: + raise DbSyncError("Mergin client error: " + str(e)) + + gpkg_full_path = os.path.join(config.project_working_dir, config.mergin_sync_file) + if not os.path.exists(config.project_working_dir): + # download the Mergin project + if modified_schema_exists and base_schema_exists: + db_proj_info = _get_db_project_comment(conn, config.db_schema_base) + if not db_proj_info: + raise DbSyncError("Base schema exists but missing which project it belongs to") + print("Download version " + db_proj_info["version"] + " of Mergin project " + config.mergin_project_name + + " to " + config.project_working_dir) + mc.download_project(config.mergin_project_name, config.project_working_dir, db_proj_info["version"]) + else: + print("Download latest Mergin project " + config.mergin_project_name + " to " + config.project_working_dir) + mc.download_project(config.mergin_project_name, config.project_working_dir) + # make sure we have working directory now + _check_has_working_dir() + # check there are no pending changes on server (or locally - which should never happen) status_pull, status_push, _ = mc.project_status(config.project_working_dir) if status_pull['added'] or status_pull['updated'] or status_pull['removed']: @@ -474,8 +517,6 @@ def dbsync_init(mc, from_gpkg=True): if status_push['added'] or status_push['updated'] or status_push['removed']: raise DbSyncError("There are pending changes in the local directory - that should never happen! " + str(status_push)) - base_schema_exists = _check_schema_exists(conn, config.db_schema_base) - modified_schema_exists = _check_schema_exists(conn, config.db_schema_modified) if from_gpkg: if not os.path.exists(gpkg_full_path): raise DbSyncError("The input GPKG file does not exist: " + gpkg_full_path) @@ -487,9 +528,11 @@ def dbsync_init(mc, from_gpkg=True): summary_base = _compare_datasets("sqlite", "", gpkg_full_path, config.db_driver, config.db_conn_info, config.db_schema_base) if len(summary_base): + # seems someone modified base schema manually - this should never happen! raise DbSyncError("The db schemas already exist but 'base' schema is not synchronized with source GPKG") elif len(summary_modified): print("Modified schema is not synchronised with source GPKG, please run pull/push commands to fix it") + _print_changes_summary(summary_modified, "Pending Changes:") return else: print("The GPKG file, base and modified schemas are already initialized and in sync") @@ -508,6 +551,10 @@ def dbsync_init(mc, from_gpkg=True): # COPY: modified -> base _geodiff_make_copy(config.db_driver, config.db_conn_info, config.db_schema_modified, config.db_driver, config.db_conn_info, config.db_schema_base) + + # mark project version into db schema + _set_db_project_comment(conn, config.db_schema_base, f'{server_info["namespace"]}/{server_info["name"]}', + server_info["version"]) else: if not modified_schema_exists: raise DbSyncError("The 'modified' schema does not exist: " + config.db_schema_modified) @@ -519,10 +566,11 @@ def dbsync_init(mc, from_gpkg=True): summary_base = _compare_datasets(config.db_conn_info, config.db_schema_base, "sqlite", "", gpkg_full_path, config.db_driver) if len(summary_base): - raise DbSyncError("The output GPKG file exists already but it is not synchronized with db 'base' schema") + raise DbSyncError("The output GPKG file exists already but is not synchronized with db 'base' schema") elif len(summary_modified): print("The output GPKG file exists already but it is not synchronised with modified schema, " "please run pull/push commands to fix it") + _print_changes_summary(summary_modified, "Pending Changes:") return else: print("The GPKG file, base and modified schemas are already initialized and in sync") @@ -546,6 +594,10 @@ def dbsync_init(mc, from_gpkg=True): # upload gpkg to mergin (client takes care of storing metadata) mc.push_project(config.project_working_dir) + # mark project version into db schema + version = _get_project_version() + _set_db_project_comment(conn, config.db_schema_base, f'{server_info["namespace"]}/{server_info["name"]}', version) + def show_usage(): print("dbsync") diff --git a/test/test_basic.py b/test/test_basic.py index f3b99fa..6d82bb3 100644 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -8,8 +8,8 @@ import psycopg2 from mergin import MerginClient, ClientError -from dbsync import dbsync_init, dbsync_pull, dbsync_push, dbsync_status, config, DbSyncError - +from dbsync import dbsync_init, dbsync_pull, dbsync_push, dbsync_status, config, DbSyncError, _geodiff_make_copy, \ + _get_db_project_comment GEODIFFINFO_EXE = os.environ.get('TEST_GEODIFFINFO_EXE') DB_CONNINFO = os.environ.get('TEST_DB_CONNINFO') @@ -103,16 +103,13 @@ def test_init_from_gpkg(mc): cur = conn.cursor() cur.execute(f"SELECT count(*) from {db_schema_main}.simple") assert cur.fetchone()[0] == 3 - # run again, nothing should change dbsync_init(mc, from_gpkg=True) cur.execute(f"SELECT count(*) from {db_schema_main}.simple") assert cur.fetchone()[0] == 3 - - # make change in GPKG and push to server to create pending changes, it should pass - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) - mc.push_project(project_dir) - dbsync_init(mc, from_gpkg=True) + db_proj_info = _get_db_project_comment(conn, db_schema_base) + assert db_proj_info["name"] == config.mergin_project_name + assert db_proj_info["version"] == 'v1' # rename base schema to mimic some mismatch cur.execute(f"ALTER SCHEMA {db_schema_base} RENAME TO schema_tmp") @@ -124,27 +121,53 @@ def test_init_from_gpkg(mc): cur.execute(f"ALTER SCHEMA schema_tmp RENAME TO {db_schema_base}") conn.commit() - # remove some feature from 'modified' db to create mismatch with src geopackage, it should pass - cur.execute(f"SELECT * from {db_schema_main}.simple") - cur.execute(f"DELETE FROM {db_schema_main}.simple WHERE fid ={cur.fetchone()[0]}") - conn.commit() + # make change in GPKG and push to server to create pending changes, it should pass but not sync + shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + mc.push_project(project_dir) + # remove local copy of project (to mimic loss at docker restart) + shutil.rmtree(config.project_working_dir) + dbsync_init(mc, from_gpkg=True) cur.execute(f"SELECT count(*) from {db_schema_main}.simple") - assert cur.fetchone()[0] == 2 + assert cur.fetchone()[0] == 3 + # pull server changes to db to make sure we can sync again + dbsync_pull(mc) + cur.execute(f"SELECT count(*) from {db_schema_main}.simple") + assert cur.fetchone()[0] == 4 + + # update some feature from 'modified' db to create mismatch with src geopackage, it should pass but not sync + fid = 1 + cur.execute(f"SELECT * from {db_schema_main}.simple WHERE fid={fid}") + old_value = cur.fetchone()[3] + cur.execute(f"UPDATE {db_schema_main}.simple SET rating=100 WHERE fid={fid}") + conn.commit() + cur.execute(f"SELECT * from {db_schema_main}.simple WHERE fid={fid}") + assert cur.fetchone()[3] == 100 dbsync_init(mc, from_gpkg=True) + # check geopackage has not been modified - after init we are not synced! + gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_cur = gpkg_conn.cursor() + gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}") + assert gpkg_cur.fetchone()[3] == old_value + # push db changes to server (and download new version to local working dir) to make sure we can sync again + dbsync_push(mc) + mc.pull_project(project_dir) + gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}") + assert gpkg_cur.fetchone()[3] == 100 - # remove some feature from 'base' db to create mismatch with src geopackage + # update some feature from 'base' db to create mismatch with src geopackage and modified cur.execute(f"SELECT * from {db_schema_base}.simple") - cur.execute(f"DELETE FROM {db_schema_base}.simple WHERE fid ={cur.fetchone()[0]}") + fid = cur.fetchone()[0] + old_value = cur.fetchone()[3] + cur.execute(f"UPDATE {db_schema_base}.simple SET rating=100 WHERE fid={fid}") conn.commit() - cur.execute(f"SELECT count(*) from {db_schema_base}.simple") - assert cur.fetchone()[0] == 2 - + cur.execute(f"SELECT * from {db_schema_base}.simple WHERE fid={fid}") + assert cur.fetchone()[3] == 100 with pytest.raises(DbSyncError) as err: dbsync_init(mc, from_gpkg=True) assert "The db schemas already exist but 'base' schema is not synchronized with source GPKG" in str(err.value) # make local changes to src file to introduce local changes - shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(config.project_working_dir, config.mergin_sync_file)) + shutil.copy(os.path.join(TEST_DATA_DIR, 'base.gpkg'), os.path.join(config.project_working_dir, config.mergin_sync_file)) with pytest.raises(DbSyncError) as err: dbsync_init(mc, from_gpkg=True) assert "There are pending changes in the local directory - that should never happen" in str(err.value) @@ -182,6 +205,8 @@ def test_basic_pull(mc): cur = conn.cursor() cur.execute("SELECT count(*) from test_sync_pull_main.simple") assert cur.fetchone()[0] == 4 + db_proj_info = _get_db_project_comment(conn, 'test_sync_pull_base') + assert db_proj_info["version"] == 'v2' print("---") dbsync_status(mc) @@ -212,6 +237,8 @@ def test_basic_push(mc): # push the change from DB to PostgreSQL dbsync_push(mc) + db_proj_info = _get_db_project_comment(conn, 'test_sync_push_base') + assert db_proj_info["version"] == 'v2' # pull new version of the project to the work project directory mc.pull_project(project_dir) @@ -258,7 +285,11 @@ def test_basic_both(mc): # first pull changes from Mergin to DB (+rebase changes in DB) and then push the changes from DB to Mergin dbsync_pull(mc) + db_proj_info = _get_db_project_comment(conn, 'test_sync_both_base') + assert db_proj_info["version"] == 'v2' dbsync_push(mc) + db_proj_info = _get_db_project_comment(conn, 'test_sync_both_base') + assert db_proj_info["version"] == 'v3' # pull new version of the project to the work project directory mc.pull_project(project_dir)