Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 72 additions & 20 deletions gts/src/gts/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class GtsEntity:
selected_entity_field: Optional[str] = None
selected_schema_id_field: Optional[str] = None
description: str = ""
raw_id: Optional[str] = None # Stores raw ID value (may be non-GTS)
schemaRefs: List[Dict[str, str]] = field(default_factory=list)

def __init__(
Expand Down Expand Up @@ -122,6 +123,7 @@ def __init__(
# Calculate IDs if config provided
if cfg is not None:
idv = self._calc_json_entity_id(cfg)
self.raw_id = idv # Store raw ID even if non-GTS
self.schemaId = self._calc_json_schema_id(cfg)
# If no valid GTS ID found in entity fields, use schema ID as fallback
if not (idv and GtsID.is_valid(idv)):
Expand Down Expand Up @@ -289,13 +291,11 @@ def _get_field_value(self, field: str) -> Optional[str]:
return None

def _first_non_empty_field(self, fields: List[str]) -> Optional[Tuple[str, str]]:
"""Find first non-empty field, preferring valid GTS IDs."""
# First pass: look for valid GTS IDs
for f in fields:
v = self._get_field_value(f)
if v and GtsID.is_valid(v):
return f, v
# Second pass: any non-empty string
"""Find first non-empty field value in order.

Returns the first non-empty string value without preferring GTS IDs.
This ensures UUID and non-GTS values are returned when they appear first.
"""
for f in fields:
v = self._get_field_value(f)
if v:
Expand All @@ -311,22 +311,74 @@ def _calc_json_entity_id(self, cfg: GtsConfig) -> str:
return f"{self.file.path}#{self.list_sequence}"
return self.file.path if self.file else ""

def _calc_json_schema_id(self, cfg: GtsConfig) -> str:
def _calc_json_schema_id(self, cfg: GtsConfig) -> Optional[str]:
"""Calculate schema_id based on entity type and content.

Rules:
- For schemas: extract parent from $id chain, or fallback to $schema
- For instances: look for type/schema fields in schema_id_fields
- Return None if no schema reference found for instances
"""
# For schemas, derive from the entity ID (parent of chain)
if self.is_schema:
# Get entity ID (the $id field for schemas)
idv = self._get_field_value("$id")
if idv and GtsID.is_valid(idv):
# Check if it's a chained ID (derived schema)
last_tilde = idv.rfind("~")
if last_tilde > 0:
# Find the previous segment (parent)
parent_end = last_tilde
# Check if there's another segment before this one
prefix = idv[:parent_end]
prev_tilde = prefix.rfind("~")
if prev_tilde > 0:
# Has a parent chain - return first segment (base type)
self.selected_schema_id_field = "$id"
return prefix[: prev_tilde + 1]
else:
# Single segment schema - base type, return $schema
schema_val = self._get_field_value("$schema")
if schema_val:
self.selected_schema_id_field = "$schema"
return schema_val
# Fallback to $schema for schemas
schema_val = self._get_field_value("$schema")
if schema_val:
self.selected_schema_id_field = "$schema"
return schema_val
return None

# For instances, look in schema_id_fields
cand = self._first_non_empty_field(cfg.schema_id_fields)
if cand:
self.selected_schema_id_field = cand[0]
return cand[1]
idv = self._calc_json_entity_id(cfg)
if idv and isinstance(idv, str) and GtsID.is_valid(idv):
if idv.endswith("~"):
return idv
last = idv.rfind("~")
if last > 0:
self.selected_schema_id_field = self.selected_entity_field
return idv[: last + 1]
if self.file and self.list_sequence is not None:
return f"{self.file.path}#{self.list_sequence}"
return self.file.path if self.file else ""
schema_id = cand[1]
# If schema_id is a chained GTS ID, extract parent (base type)
if GtsID.is_valid(schema_id):
last_tilde = schema_id.rfind("~")
if last_tilde > 0 and not schema_id.endswith("~"):
# It's an instance ID in type field - extract schema part
return schema_id[: last_tilde + 1]
return schema_id

# For instances with chained GTS ID in entity_id field, derive schema_id
# BUT only if the ID is a proper chained instance ID (not a single schema segment)
if self.selected_entity_field and self.selected_entity_field != "$id":
# Only derive from fields like "id", not from "$id" (which is for schemas)
idv = self._get_field_value(self.selected_entity_field)
if idv and GtsID.is_valid(idv):
# Check if it's a chained ID (instance ID) vs single segment (schema ID)
if not idv.endswith("~"):
# Instance ID: extract schema part (everything up to and including last ~)
last_tilde = idv.rfind("~")
if last_tilde > 0:
self.selected_schema_id_field = self.selected_entity_field
return idv[: last_tilde + 1]

# No schema reference found for instance
# Note: Single-segment schema IDs in $id don't count as schema_id for instances
return None

def _extract_uuid_from_content(self) -> Optional[str]:
"""Extract a UUID value from content to use as instance identifier."""
Expand Down
29 changes: 23 additions & 6 deletions gts/src/gts/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def to_dict(self) -> Dict[str, Any]:
result["is_schema"] = self.is_schema
else:
result["error"] = self.error
result["is_schema"] = self.is_schema
return result


Expand Down Expand Up @@ -315,12 +316,22 @@ def add_entity(
self, content: Dict[str, Any], validate: bool = False
) -> GtsAddEntityResult:
entity = GtsEntity(content=content, cfg=self.cfg)
if not entity.gts_id:

# For instances (non-schemas), require an id field
if not entity.is_schema:
# Instance must have an id from entity_id_fields (not just derived from schema)
if not entity.raw_id or not entity.selected_entity_field:
return GtsAddEntityResult(
ok=False, error="Instance must have an id field", is_schema=False
)

# Schemas MUST have a valid GTS ID
if entity.is_schema and not entity.gts_id:
return GtsAddEntityResult(
ok=False, error="Unable to detect GTS ID in entity"
ok=False, error="Unable to detect GTS ID in schema"
)

# Register the entity first
# Register the entity (use raw_id for non-GTS instances)
self.store.register(entity)

# Always validate schemas
Expand All @@ -333,17 +344,19 @@ def add_entity(
)

# If validation is requested, validate the instance as well
if validate and not entity.is_schema:
if validate and not entity.is_schema and entity.gts_id:
try:
self.store.validate_instance(entity.gts_id.id)
except Exception as e:
return GtsAddEntityResult(
ok=False, error=f"Validation failed: {str(e)}"
)

# Return gts_id if available, otherwise raw_id
entity_id = entity.gts_id.id if entity.gts_id else (entity.raw_id or "")
return GtsAddEntityResult(
ok=True,
id=entity.gts_id.id,
id=entity_id,
schema_id=entity.schemaId,
is_schema=entity.is_schema,
)
Expand Down Expand Up @@ -460,8 +473,12 @@ def attr(self, gts_with_path: str) -> GtsPathResolver:

def extract_id(self, content: Dict[str, Any]) -> GtsExtractIdResult:
entity = GtsEntity(content=content, cfg=self.cfg)
# Always use raw_id - that's the actual value found in the entity_id_fields
# Note: gts_id may be derived from schemaId as fallback, but extract-id
# should return what was actually in the selected field
id_value = entity.raw_id or ""
return GtsExtractIdResult(
id=entity.gts_id.id if entity.gts_id else "",
id=id_value,
schema_id=entity.schemaId,
selected_entity_field=entity.selected_entity_field,
selected_schema_id_field=entity.selected_schema_id_field,
Expand Down
4 changes: 3 additions & 1 deletion gts/src/gts/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ def _register_routes(self) -> None:
async def add_entity(
self, body: Dict[str, Any] = Body(...), validate: bool = Query(False)
) -> JSONResponse:
return JSONResponse(self.ops.add_entity(body, validate=validate).to_dict())
result = self.ops.add_entity(body, validate=validate)
status_code = 200 if result.ok else 422
return JSONResponse(result.to_dict(), status_code=status_code)

async def add_entities(
self, body: List[Dict[str, Any]] = Body(...)
Expand Down
16 changes: 12 additions & 4 deletions gts/src/gts/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,18 @@ def _populate_from_reader(self) -> None:
self._by_id[entity.gts_id.id] = entity

def register(self, entity: GtsEntity) -> None:
"""Register a JsonEntity in the store."""
if not entity.gts_id or not entity.gts_id.id:
raise ValueError("Entity must have a valid gts_id")
self._by_id[entity.gts_id.id] = entity
"""Register a GtsEntity in the store.

If entity has a valid gts_id, use that as the key.
Otherwise, use raw_id for non-GTS entities.
"""
if entity.gts_id and entity.gts_id.id:
self._by_id[entity.gts_id.id] = entity
elif entity.raw_id:
# Allow non-GTS entities with raw_id (e.g., UUIDs or simple strings)
self._by_id[entity.raw_id] = entity
else:
raise ValueError("Entity must have a valid gts_id or raw_id")

def register_schema(self, type_id: str, schema: Dict[str, Any]) -> None:
"""
Expand Down
Loading