Skip to content
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ venv/
env/

# Version files generated by hatch-vcs
coordinode/coordinode/_version.py
coordinode/_version.py
langchain-coordinode/langchain_coordinode/_version.py
llama-index-coordinode/llama_index/graph_stores/coordinode/_version.py
GAPS.md
CLAUDE.md
79 changes: 47 additions & 32 deletions langchain-coordinode/langchain_coordinode/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,29 @@ def refresh_schema(self) -> None:
structured = _parse_schema(text)
# Augment with relationship triples (start_label, type, end_label) via
# Cypher — get_schema_text() only lists edge types without direction.
# CoordiNode: wildcard [r] returns no results; build typed pattern from
# the rel_props keys returned by _parse_schema().
rel_types = list(structured.get("rel_props", {}).keys())
if rel_types:
rel_filter = "|".join(_cypher_ident(t) for t in rel_types)
rows = self._client.cypher(
f"MATCH (a)-[r:{rel_filter}]->(b) "
"RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst"
)
# No LIMIT here intentionally: RETURN DISTINCT already collapses all edges
# to unique (src_label, rel_type, dst_label) combinations, so the result
# is bounded by the number of distinct relationship type triples, not by
# total edge count. Adding a LIMIT would silently drop relationship types
# that happen to appear beyond the limit, producing an incomplete schema.
rows = self._client.cypher(
"MATCH (a)-[r]->(b) RETURN DISTINCT labels(a) AS src_labels, type(r) AS rel, labels(b) AS dst_labels"
)
Comment thread
polaz marked this conversation as resolved.
if rows:
# Deduplicate after _first_label() normalization: RETURN DISTINCT operates on
# raw label lists, but _first_label(min()) can collapse different multi-label
# combinations to the same (start, type, end) triple (e.g. ['Employee','Person']
# and ['Person','Employee'] both min-normalize to 'Employee'). Use a set to
# ensure each relationship triple appears at most once.
triples: set[tuple[str, str, str]] = set()
for row in rows:
start = _first_label(row.get("src_labels"))
end = _first_label(row.get("dst_labels"))
rel = row.get("rel")
if start and rel and end:
triples.add((start, rel, end))
structured["relationships"] = [
{"start": row["src"], "type": row["rel"], "end": row["dst"]}
for row in rows
if row.get("src") and row.get("rel") and row.get("dst")
{"start": start, "type": rel, "end": end} for start, rel, end in sorted(triples)
]
Comment thread
coderabbitai[bot] marked this conversation as resolved.
self._structured_schema = structured

Expand All @@ -95,18 +105,14 @@ def add_graph_documents(
) -> None:
"""Store nodes and relationships extracted from ``GraphDocument`` objects.

Nodes are upserted by ``id`` (used as the ``name`` property) via
``MERGE``, so repeated calls are safe for nodes.

Relationships are created with unconditional ``CREATE`` because
CoordiNode does not yet support ``MERGE`` for edge patterns. Re-ingesting
the same ``GraphDocument`` will therefore produce duplicate edges.
Both nodes and relationships are upserted via ``MERGE``, so repeated
calls with the same data are idempotent.

Args:
graph_documents: List of ``langchain_community.graphs.graph_document.GraphDocument``.
include_source: If ``True``, also store the source ``Document`` as a
``__Document__`` node linked to every extracted entity via
``MENTIONS`` edges (also unconditional ``CREATE``).
``MENTIONS`` edges.
"""
for doc in graph_documents:
for node in doc.nodes:
Expand All @@ -133,12 +139,10 @@ def _upsert_node(self, node: Any) -> None:
)

def _create_edge(self, rel: Any) -> None:
"""Create a relationship via unconditional CREATE.
"""Upsert a relationship via MERGE (idempotent).

CoordiNode does not support MERGE for edge patterns. Re-ingesting the
same relationship will create a duplicate edge. SET r += $props is
skipped when props is empty because SET r += {} is not supported by all
server versions.
SET r += $props is skipped when props is empty because
SET r += {} is not supported by all server versions.
"""
src_label = _cypher_ident(rel.source.type or "Entity")
dst_label = _cypher_ident(rel.target.type or "Entity")
Expand All @@ -148,19 +152,19 @@ def _create_edge(self, rel: Any) -> None:
self._client.cypher(
f"MATCH (src:{src_label} {{name: $src}}) "
f"MATCH (dst:{dst_label} {{name: $dst}}) "
f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props",
f"MERGE (src)-[r:{rel_type}]->(dst) SET r += $props",
params={"src": rel.source.id, "dst": rel.target.id, "props": props},
)
else:
self._client.cypher(
f"MATCH (src:{src_label} {{name: $src}}) "
f"MATCH (dst:{dst_label} {{name: $dst}}) "
f"CREATE (src)-[r:{rel_type}]->(dst)",
f"MERGE (src)-[r:{rel_type}]->(dst)",
params={"src": rel.source.id, "dst": rel.target.id},
)

def _link_document_to_entities(self, doc: Any) -> None:
"""Upsert a ``__Document__`` node and CREATE ``MENTIONS`` edges to all entities."""
"""Upsert a ``__Document__`` node and MERGE ``MENTIONS`` edges to all entities."""
src_id = getattr(doc.source, "id", None) or _stable_document_id(doc.source)
self._client.cypher(
"MERGE (d:__Document__ {id: $id}) SET d.page_content = $text",
Expand All @@ -169,7 +173,7 @@ def _link_document_to_entities(self, doc: Any) -> None:
for node in doc.nodes:
label = _cypher_ident(node.type or "Entity")
self._client.cypher(
f"MATCH (d:__Document__ {{id: $doc_id}}) MATCH (n:{label} {{name: $name}}) CREATE (d)-[:MENTIONS]->(n)",
f"MATCH (d:__Document__ {{id: $doc_id}}) MATCH (n:{label} {{name: $name}}) MERGE (d)-[:MENTIONS]->(n)",
params={"doc_id": src_id, "name": node.id},
)

Expand Down Expand Up @@ -211,10 +215,7 @@ def _stable_document_id(source: Any) -> str:

Combines ``page_content`` and sorted ``metadata`` items so the same
document produces the same ``__Document__`` node ID across different
Python processes. This makes document-node creation stable when
``include_source=True`` is used, but does not make re-ingest fully
idempotent because ``MENTIONS`` edges are not deduplicated until edge
``MERGE``/dedup support is added to CoordiNode.
Python processes.
"""
content = getattr(source, "page_content", "") or ""
metadata = getattr(source, "metadata", {}) or {}
Expand All @@ -232,6 +233,20 @@ def _stable_document_id(source: Any) -> str:
return hashlib.sha256(canonical.encode()).hexdigest()[:32]


def _first_label(labels: Any) -> str | None:
"""Extract a stable label from a labels() result (list of strings).

openCypher does not guarantee a stable ordering for labels(), so using
labels[0] would produce nondeterministic schema entries across calls.
We return the lexicographically smallest label as a deterministic rule.
"""
if isinstance(labels, list) and labels:
return str(min(labels))
if isinstance(labels, str):
return labels
return None
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def _cypher_ident(name: str) -> str:
"""Escape a label/type name for use as a Cypher identifier."""
# ASCII-only word characters: letter/digit/underscore, not starting with digit.
Expand Down
86 changes: 18 additions & 68 deletions llama-index-coordinode/llama_index/graph_stores/coordinode/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,7 @@ def get_triplets(
properties: dict[str, Any] | None = None,
ids: list[str] | None = None,
) -> list[list[LabelledNode]]:
"""Retrieve triplets (subject, predicate, object) as node triples.

Note:
``relation_names`` is **required**. CoordiNode does not support
untyped wildcard ``[r]`` relationship patterns — they silently return
no rows. Omitting ``relation_names`` raises ``NotImplementedError``.
"""
"""Retrieve triplets (subject, predicate, object) as node triples."""
conditions: list[str] = []
params: dict[str, Any] = {}

Expand All @@ -133,22 +127,15 @@ def get_triplets(
conditions.append("(n.name IN $entity_names OR m.name IN $entity_names)")
params["entity_names"] = entity_names
if relation_names:
# Escape each type name to prevent Cypher injection
rel_filter = "|".join(_cypher_ident(t) for t in relation_names)
rel_pattern = f"[r:{rel_filter}]"
else:
# CoordiNode: wildcard [r] pattern returns no results.
# Callers must supply relation_names for the query to work.
raise NotImplementedError(
"CoordinodePropertyGraphStore.get_triplets() requires relation_names — "
"CoordiNode does not support untyped wildcard [r] patterns"
)
rel_pattern = "[r]"

where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
# CoordiNode: use r.__type__ instead of type(r) — type() returns null.
cypher = (
f"MATCH (n)-{rel_pattern}->(m) {where} "
"RETURN n, r.__type__ AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
"RETURN n, type(r) AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
"LIMIT 1000"
)
result = self._client.cypher(cypher, params=params)
Expand Down Expand Up @@ -189,27 +176,23 @@ def get_rel_map(
if not graph_nodes:
return []

# CoordiNode: wildcard [r] pattern returns no results. Fetch all
# known edge types from the schema and build a typed pattern instead,
# e.g. [r:TYPE_A|TYPE_B|...].
schema_text = self._client.get_schema_text()
edge_types = _parse_edge_types_from_schema(schema_text)

ignored = set(ignore_rels) if ignore_rels else set()
active_types = [t for t in edge_types if t not in ignored]

if not active_types:
return []

rel_filter = "|".join(_cypher_ident(t) for t in active_types)
node_ids = [n.id for n in graph_nodes]
safe_limit = int(limit) # coerce to int to prevent Cypher injection via non-integer input
safe_limit = int(limit)
params: dict[str, object] = {"ids": node_ids}

# Push ignore_rels filter into the WHERE clause so LIMIT applies only
# to non-ignored edges and callers receive up to `limit` visible results.
if ignored:
params["ignored"] = list(ignored)
ignore_clause = "AND type(r) NOT IN $ignored "
else:
ignore_clause = ""

cypher = (
f"MATCH (n)-[r:{rel_filter}]->(m) "
f"WHERE n.id IN $ids "
f"RETURN n, r.__type__ AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id "
"MATCH (n)-[r]->(m) "
f"WHERE n.id IN $ids {ignore_clause}"
f"RETURN n, type(r) AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id "
f"LIMIT {safe_limit}"
)
result = self._client.cypher(cypher, params=params)
Expand Down Expand Up @@ -237,28 +220,21 @@ def upsert_nodes(self, nodes: list[LabelledNode]) -> None:
self._client.cypher(cypher, params={"id": node.id, "props": props})

def upsert_relations(self, relations: list[Relation]) -> None:
"""Upsert relationships into the graph."""
"""Upsert relationships into the graph (idempotent via MERGE)."""
for rel in relations:
props = rel.properties or {}
label = _cypher_ident(rel.label)
# CoordiNode does not yet support MERGE for edge patterns; use CREATE.
# A WHERE NOT (src)-[:TYPE]->(dst) guard was tested but returns 0
# rows silently in CoordiNode, making all CREATE statements no-ops.
# Until server-side MERGE or pattern predicates are supported,
# repeated calls will create duplicate edges.
# SET r += $props is skipped when props is empty — SET r += {} is
# not supported by all server versions.
if props:
cypher = (
f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) "
f"CREATE (src)-[r:{label}]->(dst) SET r += $props"
f"MERGE (src)-[r:{label}]->(dst) SET r += $props"
)
self._client.cypher(
cypher,
params={"src_id": rel.source_id, "dst_id": rel.target_id, "props": props},
)
else:
cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) CREATE (src)-[r:{label}]->(dst)"
cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) MERGE (src)-[r:{label}]->(dst)"
self._client.cypher(
cypher,
params={"src_id": rel.source_id, "dst_id": rel.target_id},
Expand Down Expand Up @@ -376,29 +352,3 @@ def _node_label(node: LabelledNode) -> str:
if isinstance(node, EntityNode):
return node.label or "Entity"
return "Node"


def _parse_edge_types_from_schema(schema_text: str) -> list[str]:
"""Extract edge type names from CoordiNode schema text.

Parses the "Edge types:" section produced by ``get_schema_text()``.
"""
edge_types: list[str] = []
lines = iter(schema_text.splitlines())

# Advance to the "Edge types:" header.
for line in lines:
if line.strip().lower().startswith("edge types"):
break

# Collect bullet items until the first blank line.
for line in lines:
stripped = line.strip()
if not stripped:
break
if stripped.startswith(("-", "*")):
name = stripped.lstrip("-* ").split("(")[0].strip()
if name:
edge_types.append(name)

return edge_types
18 changes: 6 additions & 12 deletions tests/integration/adapters/test_langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,15 @@ def test_add_graph_documents_creates_relationship(graph, unique_tag):
graph.add_graph_documents([doc])

# Verify the relationship was created, not just the source node.
# count(*) instead of count(r): CoordiNode returns 0 for relationship-variable counts
result = graph.query(
"MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) RETURN count(*) AS cnt",
"MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) RETURN count(r) AS cnt",
params={"src": f"Charlie-{unique_tag}", "dst": f"GraphRAG-{unique_tag}"},
)
assert result[0]["cnt"] >= 1, f"relationship not found: {result}"
assert result[0]["cnt"] == 1, f"expected exactly 1 relationship: {result}"


def test_add_graph_documents_idempotent(graph, unique_tag):
"""Calling add_graph_documents twice must not raise.

Nodes are idempotent (MERGE). Edges are NOT — CoordiNode does not yet
support MERGE for edges, so unconditional CREATE is used and duplicate
edges are expected after two ingests.
"""
"""Calling add_graph_documents twice produces exactly one edge (MERGE idempotent)."""
node_a = Node(id=f"Idempotent-{unique_tag}", type="LCIdempotent")
node_b = Node(id=f"IdempTarget-{unique_tag}", type="LCIdempotent")
rel = Relationship(source=node_a, target=node_b, type="LC_IDEMP_REL")
Expand All @@ -131,12 +125,12 @@ def test_add_graph_documents_idempotent(graph, unique_tag):
)
assert result[0]["cnt"] == 1

# Edges: unconditional CREATE → count >= 1 (may be > 1 due to CoordiNode limitation)
# Edges: MERGE keeps count at 1 (idempotent)
result = graph.query(
"MATCH (a:LCIdempotent {name: $src})-[r:LC_IDEMP_REL]->(b:LCIdempotent {name: $dst}) RETURN count(*) AS cnt",
"MATCH (a:LCIdempotent {name: $src})-[r:LC_IDEMP_REL]->(b:LCIdempotent {name: $dst}) RETURN count(r) AS cnt",
params={"src": f"Idempotent-{unique_tag}", "dst": f"IdempTarget-{unique_tag}"},
)
assert result[0]["cnt"] >= 1
assert result[0]["cnt"] == 1


def test_schema_refreshes_after_add(graph, unique_tag):
Expand Down
Loading
Loading