Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions gts/src/gts/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class GtsConfig:
"id",
],
schema_id_fields=[
"$schema",
"gtsTid",
"gtsType",
"gtsT",
Expand Down Expand Up @@ -162,10 +161,7 @@ def _is_json_schema_entity(self) -> bool:
return True
if url.startswith("https://json-schema.org/"):
return True
if url.startswith("gts://"):
return True
if url.startswith("gts."):
return True
# Issue #25: strict check, no GTS IDs in $schema
return False

def resolve_path(self, path: str) -> "GtsPathResolver":
Expand Down Expand Up @@ -251,8 +247,12 @@ def _extract_gts_ids_with_paths(self) -> List[Dict[str, str]]:

def gts_id_matcher(node: Any, path: str) -> Optional[Dict[str, str]]:
"""Match GTS ID strings."""
if isinstance(node, str) and GtsID.is_valid(node):
return {"id": node, "sourcePath": path or "root"}
if isinstance(node, str):
val = node
if val.startswith("gts://"):
val = val[6:]
if GtsID.is_valid(val):
return {"id": val, "sourcePath": path or "root"}
return None

self._walk_and_collect(self.content, found, gts_id_matcher)
Expand All @@ -265,8 +265,12 @@ def _extract_ref_strings_with_paths(self) -> List[Dict[str, str]]:
def ref_matcher(node: Any, path: str) -> Optional[Dict[str, str]]:
"""Match $ref properties in dict nodes."""
if isinstance(node, dict) and isinstance(node.get("$ref"), str):
val = node["$ref"]
# Issue #32: handle gts:// prefix
if val.startswith("gts://"):
val = val[6:]
ref_path = f"{path}.$ref" if path else "$ref"
return {"id": node["$ref"], "sourcePath": ref_path}
return {"id": val, "sourcePath": ref_path}
return None

self._walk_and_collect(self.content, refs, ref_matcher)
Expand All @@ -277,7 +281,12 @@ def _get_field_value(self, field: str) -> Optional[str]:
if not isinstance(self.content, dict):
return None
v = self.content.get(field)
return v if isinstance(v, str) and v.strip() else None
if isinstance(v, str) and v.strip():
# Issue #31, #32: Handle gts:// prefix in fields (e.g. $id)
if v.startswith("gts://"):
v = v[6:]
return v
return None

def _first_non_empty_field(self, fields: List[str]) -> Optional[Tuple[str, str]]:
"""Find first non-empty field, preferring valid GTS IDs."""
Expand Down
87 changes: 78 additions & 9 deletions gts/src/gts/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ def _create_ref_resolver(self, schema: Dict[str, Any]) -> RefResolver:

def resolve_gts_ref(uri: str) -> Dict[str, Any]:
"""Resolve a GTS ID reference to its schema content."""
# Issue #32: handle gts:// prefix
if uri.startswith("gts://"):
uri = uri[6:]
try:
return self.get_schema_content(uri)
except KeyError:
Expand All @@ -194,15 +197,69 @@ def resolve_gts_ref(uri: str) -> Dict[str, Any]:
store[entity_id] = entity.content

# Create RefResolver with custom handlers
resolver = RefResolver.from_schema(
schema, store=store, handlers={"": resolve_gts_ref}
)
# Issue #32: Support "gts" scheme
handlers = {"": resolve_gts_ref, "gts": resolve_gts_ref}
resolver = RefResolver.from_schema(schema, store=store, handlers=handlers)
return resolver

def items(self):
"""Return all entity ID and entity pairs."""
return self._by_id.items()

@staticmethod
def _validate_schema_refs(schema: Dict[str, Any], path: str = "") -> None:
"""
Validate all $ref values in a schema.

Rules:
- Local refs (starting with #) are always valid
- External refs MUST use gts:// URI format
- The GTS ID after gts:// must be a valid GTS identifier

Args:
schema: Schema content to validate
path: Current path in schema (for error messages)

Raises:
ValueError: If any $ref is invalid
"""
if isinstance(schema, dict):
# Check $ref if present
if "$ref" in schema:
ref_uri = schema["$ref"]
if isinstance(ref_uri, str):
current_path = f"{path}.$ref" if path else "$ref"

# Local refs (JSON Pointer) are always valid
if ref_uri.startswith("#"):
pass # Valid local ref
# GTS refs must use gts:// URI format
elif ref_uri.startswith("gts://"):
gts_id = ref_uri[6:] # Strip prefix
# Validate the GTS ID
if not GtsID.is_valid(gts_id):
raise ValueError(
f"Invalid $ref at '{current_path}': '{ref_uri}' contains invalid GTS identifier '{gts_id}'"
)
# Any other external ref is invalid
else:
raise ValueError(
f"Invalid $ref at '{current_path}': '{ref_uri}' must be a local ref (starting with '#') "
f"or a GTS URI (starting with 'gts://')"
)

# Recursively validate nested objects
for key, value in schema.items():
if key == "$ref":
continue # Already validated above
nested_path = f"{path}.{key}" if path else key
GtsStore._validate_schema_refs(value, nested_path)

elif isinstance(schema, list):
for idx, item in enumerate(schema):
nested_path = f"{path}[{idx}]"
GtsStore._validate_schema_refs(item, nested_path)

def _validate_schema_x_gts_refs(self, gts_id: str) -> None:
"""
Validate a schema's x-gts-ref fields.
Expand Down Expand Up @@ -256,15 +313,30 @@ def validate_schema(self, gts_id: str) -> None:
if not isinstance(schema_content, dict):
raise ValueError(f"Schema '{gts_id}' content must be a dictionary")

# Issue #25: strict check, no GTS IDs in $schema
meta_schema_url = schema_content.get("$schema")
if meta_schema_url and isinstance(meta_schema_url, str):
if meta_schema_url.startswith("gts.") or meta_schema_url.startswith(
"gts://"
):
raise ValueError(
f"Invalid $schema URL '{meta_schema_url}': must be a standard JSON Schema URL, not a GTS ID"
)

logging.info(f"Validating schema {gts_id}")

# 1. Validate against JSON Schema meta-schema
# 1. Validate $ref fields - must be local (#...) or gts:// URIs
# Issue #32: This validation must happen first to enforce strict $ref format
self._validate_schema_refs(schema_content, "")

# 2. Validate x-gts-ref fields (before JSON Schema validation)
self._validate_schema_x_gts_refs(gts_id)

# 3. Validate against JSON Schema meta-schema
try:
from jsonschema import Draft7Validator
from jsonschema.validators import validator_for

# Determine which meta-schema to use based on $schema field
meta_schema_url = schema_content.get("$schema")
if meta_schema_url:
# Use the appropriate validator for the schema version
validator_class = validator_for({"$schema": meta_schema_url})
Expand All @@ -277,9 +349,6 @@ def validate_schema(self, gts_id: str) -> None:
except Exception as e:
raise Exception(f"JSON Schema validation failed for '{gts_id}': {str(e)}")

# 2. Validate x-gts-ref fields
self._validate_schema_x_gts_refs(gts_id)

def validate_instance(
self,
gts_id: str,
Expand Down
20 changes: 12 additions & 8 deletions tests/test_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def test_default_config(self):
"""Test default config has expected fields."""
assert "$id" in DEFAULT_GTS_CONFIG.entity_id_fields
assert "gtsId" in DEFAULT_GTS_CONFIG.entity_id_fields
assert "$schema" in DEFAULT_GTS_CONFIG.schema_id_fields
# Issue #25: $schema should NOT be in schema_id_fields (only JSON Schema URLs allowed)
assert "$schema" not in DEFAULT_GTS_CONFIG.schema_id_fields
assert "gtsType" in DEFAULT_GTS_CONFIG.schema_id_fields


Expand Down Expand Up @@ -119,26 +120,28 @@ def test_entity_schema_detection_https(self):
assert entity.is_schema is True

def test_entity_schema_detection_gts_uri(self):
"""Test schema detection via gts:// URI."""
"""Test that gts:// URI in $schema is NOT recognized as schema (Issue #25)."""
entity = GtsEntity(
content={
"$schema": "gts://vendor.package.namespace.meta.v1~",
"type": "object",
},
)

assert entity.is_schema is True
# Issue #25: GTS IDs (even with gts:// prefix) in $schema should NOT be recognized as schemas
assert entity.is_schema is False

def test_entity_schema_detection_gts_prefix(self):
"""Test schema detection via gts. prefix."""
"""Test that gts. prefix in $schema is NOT recognized as schema (Issue #25)."""
entity = GtsEntity(
content={
"$schema": "gts.vendor.package.namespace.meta.v1~",
"type": "object",
},
)

assert entity.is_schema is True
# Issue #25: GTS IDs in $schema should NOT be recognized as schemas
assert entity.is_schema is False

def test_entity_not_schema(self):
"""Test non-schema entity."""
Expand All @@ -163,17 +166,18 @@ def test_entity_id_calculation(self):
assert entity.selected_entity_field == "$id"

def test_entity_schema_id_calculation(self):
"""Test schema ID calculation from content fields."""
"""Test schema ID calculation from content fields (not from $schema per Issue #25)."""
entity = GtsEntity(
content={
"$schema": "gts.vendor.package.namespace.type.v1~",
"type": "gts.vendor.package.namespace.type.v1~",
"name": "test",
},
cfg=DEFAULT_GTS_CONFIG,
)

# Issue #25: $schema is no longer used for schema_id, use 'type' field instead
assert entity.schemaId == "gts.vendor.package.namespace.type.v1~"
assert entity.selected_schema_id_field == "$schema"
assert entity.selected_schema_id_field == "type"

def test_entity_label_from_file(self):
"""Test entity label derived from file."""
Expand Down
Loading