Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 146 additions & 126 deletions src/BioSimSpace/Align/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def merge(
The merged molecule.
"""
from sire.legacy import CAS as _SireCAS
from sire.legacy import IO as _SireIO
from sire.legacy import MM as _SireMM
from sire.legacy import Base as _SireBase
from sire.legacy import Mol as _SireMol
Expand Down Expand Up @@ -1053,23 +1054,62 @@ def merge(

# Check that the merge hasn't modified the connectivity.
if roi is None:
# Pre-compute ring membership as frozensets of integer indices for O(1)
# lookup in the inner loops, avoiding repeated Sire in_ring calls.
n_merged = edit_mol.info().num_atoms()
c0_ring = frozenset(
x for x in range(molecule0.num_atoms()) if c0.in_ring(_SireMol.AtomIdx(x))
)
c1_ring = frozenset(
x for x in range(molecule1.num_atoms()) if c1.in_ring(_SireMol.AtomIdx(x))
)
conn0_ring = frozenset(
i for i in range(n_merged) if conn0.in_ring(_SireMol.AtomIdx(i))
)
conn1_ring = frozenset(
i for i in range(n_merged) if conn1.in_ring(_SireMol.AtomIdx(i))
)

# Pre-compute merged-space AtomIdx arrays so the inner loops avoid
# repeated dict lookups and AtomIdx construction.
mol0_map = [
mol0_merged_mapping[_SireMol.AtomIdx(i)]
for i in range(molecule0.num_atoms())
]
mol1_map = [
mol1_merged_mapping[_SireMol.AtomIdx(i)]
for i in range(molecule1.num_atoms())
]

# molecule0
for x in range(0, molecule0.num_atoms()):
# Convert to an AtomIdx.
for x in range(molecule0.num_atoms()):
idx = _SireMol.AtomIdx(x)

# Map the index to its position in the merged molecule.
idx_map = mol0_merged_mapping[idx]
idx_map = mol0_map[x]
x_in_ring = x in c0_ring or idx_map.value() in conn1_ring

for y in range(x + 1, molecule0.num_atoms()):
# Convert to an AtomIdx.
idy = _SireMol.AtomIdx(y)
idy_map = mol0_map[y]

# Check whether the connectivity type has changed.
conn_type_changed = c0.connection_type(
idx, idy
) != conn1.connection_type(idx_map, idy_map)

# Map the index to its position in the merged molecule.
idy_map = mol0_merged_mapping[idy]
# Fast path: if neither atom is in a ring in either end state
# and the connectivity hasn't changed, nothing to check.
if (
not x_in_ring
and y not in c0_ring
and idy_map.value() not in conn1_ring
and not conn_type_changed
):
continue

# Was a ring opened/closed?
is_ring_broken = _is_ring_broken(c0, conn1, idx, idy, idx_map, idy_map)
# Combined ring check — calls find_paths once per connectivity.
is_ring_broken, is_ring_size_change = _check_ring(
c0, conn1, idx, idy, idx_map, idy_map
)

# A ring was broken and it is not allowed.
if is_ring_broken and not allow_ring_breaking:
Expand All @@ -1079,11 +1119,6 @@ def merge(
"to 'True'."
)

# Did a ring change size?
is_ring_size_change = _is_ring_size_changed(
c0, conn1, idx, idy, idx_map, idy_map
)

# A ring changed size and it is not allowed.
if (
not is_ring_broken
Expand All @@ -1098,36 +1133,49 @@ def merge(
"preferable."
)

# The connectivity has changed.
if c0.connection_type(idx, idy) != conn1.connection_type(
idx_map, idy_map
# The connectivity changed for an unknown reason.
if (
conn_type_changed
and not (is_ring_broken or is_ring_size_change)
and not force
):
# The connectivity changed for an unknown reason.
if not (is_ring_broken or is_ring_size_change) and not force:
raise _IncompatibleError(
"The merge has changed the molecular connectivity "
"but a ring didn't open/close or change size. "
"If you want to proceed with this mapping pass "
"'force=True'. You are warned that the resulting "
"perturbation will likely be unstable."
)
raise _IncompatibleError(
"The merge has changed the molecular connectivity "
"but a ring didn't open/close or change size. "
"If you want to proceed with this mapping pass "
"'force=True'. You are warned that the resulting "
"perturbation will likely be unstable."
)

# molecule1
for x in range(0, molecule1.num_atoms()):
# Convert to an AtomIdx.
for x in range(molecule1.num_atoms()):
idx = _SireMol.AtomIdx(x)

# Map the index to its position in the merged molecule.
idx_map = mol1_merged_mapping[idx]
idx_map = mol1_map[x]
x_in_ring = x in c1_ring or idx_map.value() in conn0_ring

for y in range(x + 1, molecule1.num_atoms()):
# Convert to an AtomIdx.
idy = _SireMol.AtomIdx(y)
idy_map = mol1_map[y]

# Map the index to its position in the merged molecule.
idy_map = mol1_merged_mapping[idy]
# Check whether the connectivity type has changed.
conn_type_changed = c1.connection_type(
idx, idy
) != conn0.connection_type(idx_map, idy_map)

# Was a ring opened/closed?
is_ring_broken = _is_ring_broken(c1, conn0, idx, idy, idx_map, idy_map)
# Fast path: if neither atom is in a ring in either end state
# and the connectivity hasn't changed, nothing to check.
if (
not x_in_ring
and y not in c1_ring
and idy_map.value() not in conn0_ring
and not conn_type_changed
):
continue

# Combined ring check — calls find_paths once per connectivity.
is_ring_broken, is_ring_size_change = _check_ring(
c1, conn0, idx, idy, idx_map, idy_map
)

# A ring was broken and it is not allowed.
if is_ring_broken and not allow_ring_breaking:
Expand All @@ -1137,11 +1185,6 @@ def merge(
"to 'True'."
)

# Did a ring change size?
is_ring_size_change = _is_ring_size_changed(
c1, conn0, idx, idy, idx_map, idy_map
)

# A ring changed size and it is not allowed.
if (
not is_ring_broken
Expand All @@ -1156,19 +1199,19 @@ def merge(
"preferable."
)

# The connectivity has changed.
if c1.connection_type(idx, idy) != conn0.connection_type(
idx_map, idy_map
# The connectivity changed for an unknown reason.
if (
conn_type_changed
and not (is_ring_broken or is_ring_size_change)
and not force
):
# The connectivity changed for an unknown reason.
if not (is_ring_broken or is_ring_size_change) and not force:
raise _IncompatibleError(
"The merge has changed the molecular connectivity "
"but a ring didn't open/close or change size. "
"If you want to proceed with this mapping pass "
"'force=True'. You are warned that the resulting "
"perturbation will likely be unstable."
)
raise _IncompatibleError(
"The merge has changed the molecular connectivity "
"but a ring didn't open/close or change size. "
"If you want to proceed with this mapping pass "
"'force=True'. You are warned that the resulting "
"perturbation will likely be unstable."
)

# Set the "connectivity" property. If the end state connectivity is the same,
# then we can just set the "connectivity" property.
Expand All @@ -1177,24 +1220,22 @@ def merge(
else:
edit_mol.set_property("connectivity0", conn0)
edit_mol.set_property("connectivity1", conn1)

# Create the CLJNBPairs matrices.
ff = molecule0.property(ff0)

# Create the new intrascale matrices.
scale_factor_14 = _SireMM.CLJScaleFactor(
ff.electrostatic14_scale_factor(), ff.vdw14_scale_factor()
# Merge the intrascale properties of the two molecules.
merged_intrascale = _SireIO.mergeIntrascale(
molecule0.property("intrascale"),
molecule1.property("intrascale"),
edit_mol.info(),
mol0_merged_mapping,
mol1_merged_mapping,
)
clj_nb_pairs0 = _SireMM.CLJNBPairs(conn0, scale_factor_14)
clj_nb_pairs1 = _SireMM.CLJNBPairs(conn1, scale_factor_14)

# Store the two molecular components.
edit_mol.set_property("molecule0", molecule0)
edit_mol.set_property("molecule1", molecule1)

# Set the "intrascale" properties.
edit_mol.set_property("intrascale0", clj_nb_pairs0)
edit_mol.set_property("intrascale1", clj_nb_pairs1)
edit_mol.set_property("intrascale0", merged_intrascale[0])
edit_mol.set_property("intrascale1", merged_intrascale[1])

# Set the "forcefield" properties.
edit_mol.set_property("forcefield0", molecule0.property(ff0))
Expand All @@ -1217,14 +1258,14 @@ def merge(
return mol


def _is_ring_broken(conn0, conn1, idx0, idy0, idx1, idy1, max_path=50):
def _check_ring(conn0, conn1, idx0, idy0, idx1, idy1, max_path=50, max_ring_size=12):
"""
Internal function to test whether a perturbation changes the connectivity
around two atoms such that a ring is broken.
Internal function to test whether a perturbation opens/closes a ring or
changes its size for a given pair of atoms.

Two atoms share a ring if and only if there are at least two distinct paths
between them in the connectivity graph. A ring is broken when this changes
between end states.
Combines the work of the former _is_ring_broken and _is_ring_size_changed
into a single function, calling find_paths only once per connectivity
object rather than twice.

Parameters
----------
Expand All @@ -1251,18 +1292,31 @@ def _is_ring_broken(conn0, conn1, idx0, idy0, idx1, idy1, max_path=50):
Maximum path length used when searching for rings. The default of 50
covers typical macrocycles. Increase if larger rings need to be
detected.

max_ring_size : int
The maximum ring size considered when checking for ring size changes.

Returns
-------

(is_ring_broken, is_ring_size_changed) : (bool, bool)
"""

# Find all paths between the two atoms in each end state. A single
# find_paths call covers both the ring-broken and ring-size-changed checks.
paths0 = conn0.find_paths(idx0, idy0, max_path)
paths1 = conn1.find_paths(idx1, idy1, max_path)
n0 = len(paths0)
n1 = len(paths1)

# --- Ring broken check ---

# Two atoms share a ring iff there are ≥2 distinct paths between them.
# This also handles atoms adjacent to a ring (not members themselves):
# substituents on neighbouring ring atoms have ≥2 paths between them
# through the ring, which collapses to 1 when the ring is broken.
n0 = len(conn0.find_paths(idx0, idy0, max_path))
n1 = len(conn1.find_paths(idx1, idy1, max_path))

# Ring break/open: one state has ≥2 paths, the other doesn't.
if (n0 >= 2) != (n1 >= 2):
return True
return True, False

# Supplementary check for rings larger than max_path: find_paths may only
# find the direct-bond path and miss the long way around the ring, giving
Expand All @@ -1271,67 +1325,33 @@ def _is_ring_broken(conn0, conn1, idx0, idy0, idx1, idy1, max_path=50):
if (conn0.in_ring(idx0) and conn0.in_ring(idy0)) != (
conn1.in_ring(idx1) and conn1.in_ring(idy1)
):
return True
return True, False

# A direct bond was replaced by a ring path (or vice versa), leaving the
# atoms completely disconnected in one topology (0 paths). This occurs
# when a new ring forms and the old direct bond between two atoms is not
# present in the ring topology.
return (n0 == 0) != (n1 == 0)


def _is_ring_size_changed(conn0, conn1, idx0, idy0, idx1, idy1, max_ring_size=12):
"""
Internal function to test whether a perturbation changes the connectivity
around two atoms such that a ring changes size.

The size of the smallest ring containing two atoms equals the length of
the shortest path between them. If the shortest path changes between end
states, the ring has changed size.

Parameters
----------

conn0 : Sire.Mol.Connectivity
The connectivity object for the first end state.

conn1 : Sire.Mol.Connectivity
The connectivity object for the second end state.

idx0 : Sire.Mol.AtomIdx
The index of the first atom in the first state.

idy0 : Sire.Mol.AtomIdx
The index of the second atom in the first state.

idx1 : Sire.Mol.AtomIdx
The index of the first atom in the second state.

idy1 : Sire.Mol.AtomIdx
The index of the second atom in the second state.

max_ring_size : int
The maximum size of what is considered to be a ring.
"""
if (n0 == 0) != (n1 == 0):
return True, False

# Work out the paths connecting the atoms in the two end states.
paths0 = conn0.find_paths(idx0, idy0, max_ring_size)
paths1 = conn1.find_paths(idx1, idy1, max_ring_size)
# --- Ring size changed check ---

# No ring in state 0 — nothing to compare.
if len(paths0) < 2:
return False
# Filter to paths within max_ring_size to avoid flagging macrocycle size
# changes for rings beyond the threshold.
short0 = [p for p in paths0 if len(p) <= max_ring_size]

ring0 = min(len(p) for p in paths0)
# No ring of relevant size in state 0 — nothing to compare.
if len(short0) < 2:
return False, False

# No ring in state 1 — the ring was broken rather than resized; let
# _is_ring_broken handle this case.
if len(paths1) < 2:
return False
short1 = [p for p in paths1 if len(p) <= max_ring_size]

ring1 = min(len(p) for p in paths1)
# No ring of relevant size in state 1 — the ring was broken rather than
# resized; let the ring-broken check handle this case.
if len(short1) < 2:
return False, False

return ring0 != ring1
return False, min(len(p) for p in short0) != min(len(p) for p in short1)


def _removeDummies(molecule, is_lambda1):
Expand Down
Loading