Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions gts/src/gts/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,30 @@ def _calc_json_schema_id(self, cfg: GtsConfig) -> Optional[str]:
return schema_val
return None

# For instances, look in schema_id_fields
# PRIORITY 1: Check entity_id_fields for a GTS ID (gtsId, id, etc.)
# If found and it's a chained ID, extract schema from the chain
# NOTE: Skip $id field for instances - $id should only influence schema_id for schemas
entity_id_cand = self._first_non_empty_field(cfg.entity_id_fields)
if entity_id_cand and GtsID.is_valid(entity_id_cand[1]):
# Skip $id for non-schemas: $id without $schema means the doc is an instance
# and $id should not be used to derive schema_id
if entity_id_cand[0] == "$id" and not self.is_schema:
pass # Skip to PRIORITY 2
else:
idv = entity_id_cand[1]
# If already a type id (ends with '~'), use it as-is
if idv.endswith("~"):
self.selected_schema_id_field = entity_id_cand[0]
return idv
# For chained IDs (well-known instances), extract schema:
# everything up to and including last '~'
last_tilde = idv.rfind("~")
if last_tilde > 0:
self.selected_schema_id_field = entity_id_cand[0]
return idv[: last_tilde + 1]

# PRIORITY 2: Fall back to explicit schema_id_fields (type, gtsTid, etc.)
# Only check these if no chained GTS ID was found in entity_id_fields
cand = self._first_non_empty_field(cfg.schema_id_fields)
if cand:
self.selected_schema_id_field = cand[0]
Expand All @@ -362,22 +385,7 @@ def _calc_json_schema_id(self, cfg: GtsConfig) -> Optional[str]:
return schema_id[: last_tilde + 1]
return schema_id

# For instances with chained GTS ID in entity_id field, derive schema_id
# BUT only if the ID is a proper chained instance ID (not a single schema segment)
if self.selected_entity_field and self.selected_entity_field != "$id":
# Only derive from fields like "id", not from "$id" (which is for schemas)
idv = self._get_field_value(self.selected_entity_field)
if idv and GtsID.is_valid(idv):
# Check if it's a chained ID (instance ID) vs single segment (schema ID)
if not idv.endswith("~"):
# Instance ID: extract schema part (everything up to and including last ~)
last_tilde = idv.rfind("~")
if last_tilde > 0:
self.selected_schema_id_field = self.selected_entity_field
return idv[: last_tilde + 1]

# No schema reference found for instance
# Note: Single-segment schema IDs in $id don't count as schema_id for instances
return None

def _extract_uuid_from_content(self) -> Optional[str]:
Expand Down
11 changes: 11 additions & 0 deletions gts/src/gts/gts.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,17 @@ def __init__(self, id: str):
self.gts_id_segments.append(GtsIdSegment(i + 1, offset, parts[i]))
offset += len(parts[i])

# Issue #37: Single-segment instance IDs are not allowed
# An instance ID (not ending with ~) must be chained (have at least 2 segments)
if not self.id.endswith("~") and len(self.gts_id_segments) == 1:
# Check if it's a wildcard (wildcards are allowed as single segment)
if not any(seg.is_wildcard for seg in self.gts_id_segments):
raise GtsInvalidId(
id,
"Single-segment instance IDs are not allowed. "
"Instance IDs must be chained (e.g., type~instance).",
)

@property
def is_type(self) -> bool:
return self.id.endswith("~")
Expand Down
65 changes: 58 additions & 7 deletions gts/src/gts/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ class GtsIdValidationResult:
id: str
valid: bool
error: str = ""
is_wildcard: bool = False

def to_dict(self) -> Dict[str, Any]:
return {"id": self.id, "valid": self.valid, "error": self.error}
return {
"id": self.id,
"valid": self.valid,
"error": self.error,
"is_wildcard": self.is_wildcard,
}


@dataclass
Expand Down Expand Up @@ -60,13 +66,17 @@ class GtsIdParseResult:
ok: bool
segments: List[GtsIdSegment] = field(default_factory=list)
error: str = ""
is_wildcard: bool = False
is_schema: bool = False

def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"ok": self.ok,
"segments": [s.to_dict() for s in self.segments],
"error": self.error,
"is_wildcard": self.is_wildcard,
"is_schema": self.is_schema,
}


Expand Down Expand Up @@ -331,6 +341,18 @@ def add_entity(
ok=False, error="Unable to detect GTS ID in schema"
)

# Validate $id prefix for schemas: must use gts:// URI, not plain gts.
if entity.is_schema and validate:
raw_id = content.get("$id", "")
if isinstance(raw_id, str):
# Reject plain gts. prefix (without gts://)
if raw_id.startswith("gts.") and not raw_id.startswith("gts://"):
return GtsAddEntityResult(
ok=False,
error="Schema $id must use gts:// URI format, not plain gts. prefix",
is_schema=True,
)

# Register the entity (use raw_id for non-GTS instances)
self.store.register(entity)

Expand Down Expand Up @@ -376,15 +398,29 @@ def add_schema(self, type_id: str, schema: Dict[str, Any]) -> GtsAddSchemaResult
return GtsAddSchemaResult(ok=False, error=str(e))

def validate_id(self, gts_id: str) -> GtsIdValidationResult:
# Check if it's a wildcard pattern (contains *)
is_wildcard = "*" in gts_id
try:
_ = GtsID(gts_id)
return GtsIdValidationResult(id=gts_id, valid=True)
if is_wildcard:
# For wildcards, try parsing as GtsWildcard
_ = GtsWildcard(gts_id)
else:
_ = GtsID(gts_id)
return GtsIdValidationResult(id=gts_id, valid=True, is_wildcard=is_wildcard)
except Exception as e:
return GtsIdValidationResult(id=gts_id, valid=False, error=str(e))
return GtsIdValidationResult(
id=gts_id, valid=False, error=str(e), is_wildcard=is_wildcard
)

def parse_id(self, gts_id: str) -> GtsIdParseResult:
# Check if it's a wildcard pattern (contains *)
is_wildcard = "*" in gts_id
try:
segs = GtsID(gts_id).gts_id_segments
if is_wildcard:
parsed = GtsWildcard(gts_id)
else:
parsed = GtsID(gts_id)
segs = parsed.gts_id_segments
segments = [
GtsIdSegment(
vendor=s.vendor,
Expand All @@ -397,12 +433,27 @@ def parse_id(self, gts_id: str) -> GtsIdParseResult:
)
for s in segs
]
return GtsIdParseResult(id=gts_id, ok=True, segments=segments)
# is_schema: true if ends with ~ and not a wildcard ending with ~*
is_schema = gts_id.endswith("~") and not is_wildcard
return GtsIdParseResult(
id=gts_id,
ok=True,
segments=segments,
is_wildcard=is_wildcard,
is_schema=is_schema,
)
except Exception as e:
return GtsIdParseResult(id=gts_id, ok=False, error=str(e))
return GtsIdParseResult(
id=gts_id, ok=False, error=str(e), is_wildcard=is_wildcard
)

def match_id_pattern(self, candidate: str, pattern: str) -> GtsIdMatchResult:
try:
# If candidate contains '*', validate it as a wildcard pattern
# This catches malformed wildcards like 'a*' (wildcard not on token boundary)
if "*" in candidate:
# Validate candidate as a wildcard pattern first
_ = GtsWildcard(candidate)
c = GtsID(candidate)
p = GtsWildcard(pattern)
match = c.wildcard_match(p)
Expand Down
Loading