Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 66 additions & 14 deletions dbsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from mergin import MerginClient, MerginProject, LoginError, ClientError
from version import __version__

from psycopg2 import sql

# set high logging level for geodiff (used by geodiffinfo executable)
# so we get as much information as possible
Expand Down Expand Up @@ -225,6 +225,30 @@ def _get_project_version():
return mp.metadata["version"]


def _set_db_project_comment(conn, schema, project_name, version):
""" Set postgres COMMENT on SCHEMA with mergin project name and version """
comment = {
"name": project_name,
"version": version
}
cur = conn.cursor()
query = sql.SQL("COMMENT ON SCHEMA {} IS %s").format(sql.Identifier(schema))
cur.execute(query.as_string(conn), (json.dumps(comment), ))
conn.commit()


def _get_db_project_comment(conn, schema):
""" Get mergin project name and its current version in db schema"""
cur = conn.cursor()
cur.execute("SELECT obj_description(%s::regnamespace, 'pg_namespace')", (schema, ))
res = cur.fetchone()[0]
try:
comment = json.loads(res) if res else None
except (TypeError, json.decoder.JSONDecodeError):
return
return comment


def create_mergin_client():
""" Create instance of MerginClient"""
_check_has_password()
Expand Down Expand Up @@ -312,7 +336,8 @@ def dbsync_pull(mc):
_geodiff_apply_changeset(config.db_driver, config.db_conn_info, config.db_schema_base, tmp_base2their)

os.remove(gpkg_basefile_old)

conn = psycopg2.connect(config.db_conn_info)
_set_db_project_comment(conn, config.db_schema_base, config.mergin_project_name, server_version)
print("Pull done!")


Expand Down Expand Up @@ -439,11 +464,13 @@ def dbsync_push(mc):
# TODO: should we do some cleanup here? (undo changes in the local geopackage?)
raise DbSyncError("Mergin client error on push: " + str(e))

print("Pushed new version to Mergin: " + _get_project_version())
version = _get_project_version()
print("Pushed new version to Mergin: " + version)

# update base schema in the DB
print("Updating DB base schema...")
_geodiff_apply_changeset(config.db_driver, config.db_conn_info, config.db_schema_base, tmp_changeset_file)
_set_db_project_comment(conn, config.db_schema_base, config.mergin_project_name, version)

print("Push done!")

Expand All @@ -453,29 +480,43 @@ def dbsync_init(mc, from_gpkg=True):

# let's start with various environment checks to make sure
# the environment is set up correctly before doing any work
gpkg_full_path = os.path.join(config.project_working_dir, config.mergin_sync_file)
if not os.path.exists(config.project_working_dir):
# download the Mergin project
print("Download Mergin project " + config.mergin_project_name + " to " + config.project_working_dir)
mc.download_project(config.mergin_project_name, config.project_working_dir)
# make sure we have working directory now
_check_has_working_dir()

print("Connecting to the database...")
try:
conn = psycopg2.connect(config.db_conn_info)
except psycopg2.Error as e:
raise DbSyncError("Unable to connect to the database: " + str(e))

base_schema_exists = _check_schema_exists(conn, config.db_schema_base)
modified_schema_exists = _check_schema_exists(conn, config.db_schema_modified)

try:
server_info = mc.project_info(config.mergin_project_name)
except ClientError as e:
raise DbSyncError("Mergin client error: " + str(e))

gpkg_full_path = os.path.join(config.project_working_dir, config.mergin_sync_file)
if not os.path.exists(config.project_working_dir):
# download the Mergin project
if modified_schema_exists and base_schema_exists:
db_proj_info = _get_db_project_comment(conn, config.db_schema_base)
if not db_proj_info:
raise DbSyncError("Base schema exists but missing which project it belongs to")
print("Download version " + db_proj_info["version"] + " of Mergin project " + config.mergin_project_name +
" to " + config.project_working_dir)
mc.download_project(config.mergin_project_name, config.project_working_dir, db_proj_info["version"])
else:
print("Download latest Mergin project " + config.mergin_project_name + " to " + config.project_working_dir)
mc.download_project(config.mergin_project_name, config.project_working_dir)
# make sure we have working directory now
_check_has_working_dir()

# check there are no pending changes on server (or locally - which should never happen)
status_pull, status_push, _ = mc.project_status(config.project_working_dir)
if status_pull['added'] or status_pull['updated'] or status_pull['removed']:
print("There are pending changes on server, please run pull command after init")
if status_push['added'] or status_push['updated'] or status_push['removed']:
raise DbSyncError("There are pending changes in the local directory - that should never happen! " + str(status_push))

base_schema_exists = _check_schema_exists(conn, config.db_schema_base)
modified_schema_exists = _check_schema_exists(conn, config.db_schema_modified)
if from_gpkg:
if not os.path.exists(gpkg_full_path):
raise DbSyncError("The input GPKG file does not exist: " + gpkg_full_path)
Expand All @@ -487,9 +528,11 @@ def dbsync_init(mc, from_gpkg=True):
summary_base = _compare_datasets("sqlite", "", gpkg_full_path, config.db_driver,
config.db_conn_info, config.db_schema_base)
if len(summary_base):
# seems someone modified base schema manually - this should never happen!
raise DbSyncError("The db schemas already exist but 'base' schema is not synchronized with source GPKG")
elif len(summary_modified):
print("Modified schema is not synchronised with source GPKG, please run pull/push commands to fix it")
_print_changes_summary(summary_modified, "Pending Changes:")
return
else:
print("The GPKG file, base and modified schemas are already initialized and in sync")
Expand All @@ -508,6 +551,10 @@ def dbsync_init(mc, from_gpkg=True):
# COPY: modified -> base
_geodiff_make_copy(config.db_driver, config.db_conn_info, config.db_schema_modified,
config.db_driver, config.db_conn_info, config.db_schema_base)

# mark project version into db schema
_set_db_project_comment(conn, config.db_schema_base, f'{server_info["namespace"]}/{server_info["name"]}',
server_info["version"])
else:
if not modified_schema_exists:
raise DbSyncError("The 'modified' schema does not exist: " + config.db_schema_modified)
Expand All @@ -519,10 +566,11 @@ def dbsync_init(mc, from_gpkg=True):
summary_base = _compare_datasets(config.db_conn_info, config.db_schema_base,
"sqlite", "", gpkg_full_path, config.db_driver)
if len(summary_base):
raise DbSyncError("The output GPKG file exists already but it is not synchronized with db 'base' schema")
raise DbSyncError("The output GPKG file exists already but is not synchronized with db 'base' schema")
elif len(summary_modified):
print("The output GPKG file exists already but it is not synchronised with modified schema, "
"please run pull/push commands to fix it")
_print_changes_summary(summary_modified, "Pending Changes:")
return
else:
print("The GPKG file, base and modified schemas are already initialized and in sync")
Expand All @@ -546,6 +594,10 @@ def dbsync_init(mc, from_gpkg=True):
# upload gpkg to mergin (client takes care of storing metadata)
mc.push_project(config.project_working_dir)

# mark project version into db schema
version = _get_project_version()
_set_db_project_comment(conn, config.db_schema_base, f'{server_info["namespace"]}/{server_info["name"]}', version)


def show_usage():
print("dbsync")
Expand Down
69 changes: 50 additions & 19 deletions test/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import psycopg2

from mergin import MerginClient, ClientError
from dbsync import dbsync_init, dbsync_pull, dbsync_push, dbsync_status, config, DbSyncError

from dbsync import dbsync_init, dbsync_pull, dbsync_push, dbsync_status, config, DbSyncError, _geodiff_make_copy, \
_get_db_project_comment

GEODIFFINFO_EXE = os.environ.get('TEST_GEODIFFINFO_EXE')
DB_CONNINFO = os.environ.get('TEST_DB_CONNINFO')
Expand Down Expand Up @@ -103,16 +103,13 @@ def test_init_from_gpkg(mc):
cur = conn.cursor()
cur.execute(f"SELECT count(*) from {db_schema_main}.simple")
assert cur.fetchone()[0] == 3

# run again, nothing should change
dbsync_init(mc, from_gpkg=True)
cur.execute(f"SELECT count(*) from {db_schema_main}.simple")
assert cur.fetchone()[0] == 3

# make change in GPKG and push to server to create pending changes, it should pass
shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg'))
mc.push_project(project_dir)
dbsync_init(mc, from_gpkg=True)
db_proj_info = _get_db_project_comment(conn, db_schema_base)
assert db_proj_info["name"] == config.mergin_project_name
assert db_proj_info["version"] == 'v1'

# rename base schema to mimic some mismatch
cur.execute(f"ALTER SCHEMA {db_schema_base} RENAME TO schema_tmp")
Expand All @@ -124,27 +121,53 @@ def test_init_from_gpkg(mc):
cur.execute(f"ALTER SCHEMA schema_tmp RENAME TO {db_schema_base}")
conn.commit()

# remove some feature from 'modified' db to create mismatch with src geopackage, it should pass
cur.execute(f"SELECT * from {db_schema_main}.simple")
cur.execute(f"DELETE FROM {db_schema_main}.simple WHERE fid ={cur.fetchone()[0]}")
conn.commit()
# make change in GPKG and push to server to create pending changes, it should pass but not sync
shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg'))
mc.push_project(project_dir)
# remove local copy of project (to mimic loss at docker restart)
shutil.rmtree(config.project_working_dir)
dbsync_init(mc, from_gpkg=True)
cur.execute(f"SELECT count(*) from {db_schema_main}.simple")
assert cur.fetchone()[0] == 2
assert cur.fetchone()[0] == 3
# pull server changes to db to make sure we can sync again
dbsync_pull(mc)
cur.execute(f"SELECT count(*) from {db_schema_main}.simple")
assert cur.fetchone()[0] == 4

# update some feature from 'modified' db to create mismatch with src geopackage, it should pass but not sync
fid = 1
cur.execute(f"SELECT * from {db_schema_main}.simple WHERE fid={fid}")
old_value = cur.fetchone()[3]
cur.execute(f"UPDATE {db_schema_main}.simple SET rating=100 WHERE fid={fid}")
conn.commit()
cur.execute(f"SELECT * from {db_schema_main}.simple WHERE fid={fid}")
assert cur.fetchone()[3] == 100
dbsync_init(mc, from_gpkg=True)
# check geopackage has not been modified - after init we are not synced!
gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg'))
gpkg_cur = gpkg_conn.cursor()
gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}")
assert gpkg_cur.fetchone()[3] == old_value
# push db changes to server (and download new version to local working dir) to make sure we can sync again
dbsync_push(mc)
mc.pull_project(project_dir)
gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}")
assert gpkg_cur.fetchone()[3] == 100

# remove some feature from 'base' db to create mismatch with src geopackage
# update some feature from 'base' db to create mismatch with src geopackage and modified
cur.execute(f"SELECT * from {db_schema_base}.simple")
cur.execute(f"DELETE FROM {db_schema_base}.simple WHERE fid ={cur.fetchone()[0]}")
fid = cur.fetchone()[0]
old_value = cur.fetchone()[3]
cur.execute(f"UPDATE {db_schema_base}.simple SET rating=100 WHERE fid={fid}")
conn.commit()
cur.execute(f"SELECT count(*) from {db_schema_base}.simple")
assert cur.fetchone()[0] == 2

cur.execute(f"SELECT * from {db_schema_base}.simple WHERE fid={fid}")
assert cur.fetchone()[3] == 100
with pytest.raises(DbSyncError) as err:
dbsync_init(mc, from_gpkg=True)
assert "The db schemas already exist but 'base' schema is not synchronized with source GPKG" in str(err.value)

# make local changes to src file to introduce local changes
shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(config.project_working_dir, config.mergin_sync_file))
shutil.copy(os.path.join(TEST_DATA_DIR, 'base.gpkg'), os.path.join(config.project_working_dir, config.mergin_sync_file))
with pytest.raises(DbSyncError) as err:
dbsync_init(mc, from_gpkg=True)
assert "There are pending changes in the local directory - that should never happen" in str(err.value)
Expand Down Expand Up @@ -182,6 +205,8 @@ def test_basic_pull(mc):
cur = conn.cursor()
cur.execute("SELECT count(*) from test_sync_pull_main.simple")
assert cur.fetchone()[0] == 4
db_proj_info = _get_db_project_comment(conn, 'test_sync_pull_base')
assert db_proj_info["version"] == 'v2'

print("---")
dbsync_status(mc)
Expand Down Expand Up @@ -212,6 +237,8 @@ def test_basic_push(mc):

# push the change from DB to PostgreSQL
dbsync_push(mc)
db_proj_info = _get_db_project_comment(conn, 'test_sync_push_base')
assert db_proj_info["version"] == 'v2'

# pull new version of the project to the work project directory
mc.pull_project(project_dir)
Expand Down Expand Up @@ -258,7 +285,11 @@ def test_basic_both(mc):

# first pull changes from Mergin to DB (+rebase changes in DB) and then push the changes from DB to Mergin
dbsync_pull(mc)
db_proj_info = _get_db_project_comment(conn, 'test_sync_both_base')
assert db_proj_info["version"] == 'v2'
dbsync_push(mc)
db_proj_info = _get_db_project_comment(conn, 'test_sync_both_base')
assert db_proj_info["version"] == 'v3'

# pull new version of the project to the work project directory
mc.pull_project(project_dir)
Expand Down