Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions gts/src/gts/entities.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple

from .gts import GtsID
from .path_resolver import GtsPathResolver
from .schema_cast import GtsEntityCastResult, SchemaCastError

if TYPE_CHECKING:
from .path_resolver import GtsPathResolver


@dataclass
class ValidationError:
Expand Down Expand Up @@ -166,16 +168,18 @@ def _is_json_schema_entity(self) -> bool:
return True
return False

def resolve_path(self, path: str) -> JsonPathResolver:
resolver = JsonPathResolver(self.gts_id.id if self.gts_id else "", self.content)
def resolve_path(self, path: str) -> "GtsPathResolver":
from .path_resolver import GtsPathResolver

resolver = GtsPathResolver(self.gts_id.id if self.gts_id else "", self.content)
return resolver.resolve(path)

def cast(
self,
to_schema: JsonEntity,
from_schema: JsonEntity,
to_schema: "GtsEntity",
from_schema: "GtsEntity",
resolver: Optional[Any] = None,
) -> JsonEntityCastResult:
) -> GtsEntityCastResult:
if self.is_schema:
# When casting a schema, from_schema might be a standard JSON Schema (no gts_id)
# In that case, skip the sanity check
Expand Down
18 changes: 5 additions & 13 deletions gts/src/gts/files_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, path: str | List[str], cfg: Optional[GtsConfig] = None) -> No

def _collect_files(self) -> None:
"""Collect all JSON and YAML files from the specified paths, following symlinks."""
valid_extensions = {'.json', '.jsonc', '.gts', '.yaml', '.yml'}
valid_extensions = {".json", ".jsonc", ".gts", ".yaml", ".yml"}
seen: set[str] = set()
collected: List[Path] = []

Expand Down Expand Up @@ -77,7 +77,7 @@ def _collect_files(self) -> None:
def _load_file(self, file_path: Path) -> Any:
"""Load content from JSON or YAML file."""
with file_path.open("r", encoding="utf-8") as f:
if file_path.suffix.lower() in {'.yaml', '.yml'}:
if file_path.suffix.lower() in {".yaml", ".yml"}:
return yaml.safe_load(f)
else:
return json.load(f)
Expand All @@ -89,29 +89,21 @@ def _process_file(self, file_path: Path) -> List[GtsEntity]:
try:
content = self._load_file(file_path)
json_file = GtsFile(
path=str(file_path),
name=file_path.name,
content=content
path=str(file_path), name=file_path.name, content=content
)

# Handle both single objects and arrays
if isinstance(content, list):
for idx, item in enumerate(content):
entity = GtsEntity(
file=json_file,
list_sequence=idx,
content=item,
cfg=self.cfg
file=json_file, list_sequence=idx, content=item, cfg=self.cfg
)
if entity.gts_id:
logging.debug(f"- discovered entity: {entity.gts_id.id}")
entities.append(entity)
else:
entity = GtsEntity(
file=json_file,
list_sequence=None,
content=content,
cfg=self.cfg
file=json_file, list_sequence=None, content=content, cfg=self.cfg
)
if entity.gts_id:
logging.debug(f"- discovered entity: {entity.gts_id.id}")
Expand Down
16 changes: 8 additions & 8 deletions gts/src/gts/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def reload_from_path(self, path: str | List[str]) -> None:
def add_entity(
self, content: Dict[str, Any], validate: bool = False
) -> GtsAddEntityResult:
entity = JsonEntity(content=content, cfg=self.cfg)
entity = GtsEntity(content=content, cfg=self.cfg)
if not entity.gts_id:
return GtsAddEntityResult(
ok=False, error="Unable to detect GTS ID in entity"
Expand Down Expand Up @@ -433,33 +433,33 @@ def schema_graph(self, gts_id: str) -> GtsSchemaGraphResult:

def compatibility(
self, old_schema_id: str, new_schema_id: str
) -> JsonEntityCastResult:
) -> GtsEntityCastResult:
return self.store.is_minor_compatible(old_schema_id, new_schema_id)

def cast(self, from_id: str, to_schema_id: str) -> JsonEntityCastResult:
def cast(self, from_id: str, to_schema_id: str) -> GtsEntityCastResult:
try:
return self.store.cast(from_id, to_schema_id)
except Exception as e:
return JsonEntityCastResult(error=str(e))
return GtsEntityCastResult(error=str(e))

def query(self, expr: str, limit: int = 100) -> GtsStoreQueryResult:
return self.store.query(expr, limit)

def attr(self, gts_with_path: str) -> JsonPathResolver:
def attr(self, gts_with_path: str) -> GtsPathResolver:
gts, path = GtsID.split_at_path(gts_with_path)
if path is None:
return JsonPathResolver(gts_id=gts, content=None).failure(
return GtsPathResolver(gts_id=gts, content=None).failure(
"", "Attribute selector requires '@path' in the identifier"
)
entity = self.store.get(gts)
if not entity:
return JsonPathResolver(gts_id=gts, content=None).failure(
return GtsPathResolver(gts_id=gts, content=None).failure(
path, f"Entity not found: {gts}"
)
return entity.resolve_path(path)

def extract_id(self, content: Dict[str, Any]) -> GtsExtractIdResult:
entity = JsonEntity(content=content, cfg=self.cfg)
entity = GtsEntity(content=content, cfg=self.cfg)
return GtsExtractIdResult(
id=entity.gts_id.id if entity.gts_id else "",
schema_id=entity.schemaId,
Expand Down
73 changes: 56 additions & 17 deletions gts/src/gts/schema_cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,25 +295,41 @@ def _cast_instance_to_schema(
p_type = p_schema.get("type")
if p_type == "object" and isinstance(val, dict):
nested_schema = GtsEntityCastResult._effective_object_schema(p_schema)
new_obj, add_sub, rem_sub, new_incompatibility_reasons = GtsEntityCastResult._cast_instance_to_schema(
val, nested_schema, base_path=(f"{base_path}.{prop}" if base_path else prop), incompatibility_reasons=incompatibility_reasons
new_obj, add_sub, rem_sub, new_incompatibility_reasons = (
GtsEntityCastResult._cast_instance_to_schema(
val,
nested_schema,
base_path=(f"{base_path}.{prop}" if base_path else prop),
incompatibility_reasons=incompatibility_reasons,
)
)
result[prop] = new_obj
added.extend(add_sub)
removed.extend(rem_sub)
incompatibility_reasons.extend(new_incompatibility_reasons)
elif p_type == "array" and isinstance(val, list):
items_schema = p_schema.get("items")
if isinstance(items_schema, dict) and items_schema.get("type") == "object":
nested_schema = GtsEntityCastResult._effective_object_schema(items_schema)
if (
isinstance(items_schema, dict)
and items_schema.get("type") == "object"
):
nested_schema = GtsEntityCastResult._effective_object_schema(
items_schema
)
new_list: List[Any] = []
for idx, item in enumerate(val):
if isinstance(item, dict):
new_item, add_sub, rem_sub, new_incompatibility_reasons = GtsEntityCastResult._cast_instance_to_schema(
item,
nested_schema,
base_path=(f"{base_path}.{prop}[{idx}]" if base_path else f"{prop}[{idx}]"),
incompatibility_reasons=incompatibility_reasons,
new_item, add_sub, rem_sub, new_incompatibility_reasons = (
GtsEntityCastResult._cast_instance_to_schema(
item,
nested_schema,
base_path=(
f"{base_path}.{prop}[{idx}]"
if base_path
else f"{prop}[{idx}]"
),
incompatibility_reasons=incompatibility_reasons,
)
)
new_list.append(new_item)
added.extend(add_sub)
Expand Down Expand Up @@ -481,23 +497,38 @@ def _check_constraint_compatibility(
if prop_type in ("number", "integer"):
errors.extend(
GtsEntityCastResult._check_min_max_constraint(
prop, old_prop_schema, new_prop_schema, "minimum", "maximum", check_tightening
prop,
old_prop_schema,
new_prop_schema,
"minimum",
"maximum",
check_tightening,
)
)

# String constraints
if prop_type == "string":
errors.extend(
GtsEntityCastResult._check_min_max_constraint(
prop, old_prop_schema, new_prop_schema, "minLength", "maxLength", check_tightening
prop,
old_prop_schema,
new_prop_schema,
"minLength",
"maxLength",
check_tightening,
)
)

# Array constraints
if prop_type == "array":
errors.extend(
GtsEntityCastResult._check_min_max_constraint(
prop, old_prop_schema, new_prop_schema, "minItems", "maxItems", check_tightening
prop,
old_prop_schema,
new_prop_schema,
"minItems",
"maxItems",
check_tightening,
)
)

Expand Down Expand Up @@ -588,8 +619,10 @@ def _check_schema_compatibility(

# Recursively check nested object properties
if old_type == "object" and new_type == "object":
nested_compat, nested_errors = GtsEntityCastResult._check_schema_compatibility(
old_prop_schema, new_prop_schema, check_backward
nested_compat, nested_errors = (
GtsEntityCastResult._check_schema_compatibility(
old_prop_schema, new_prop_schema, check_backward
)
)
if not nested_compat:
for err in nested_errors:
Expand All @@ -615,7 +648,9 @@ def _check_backward_compatibility(
- Cannot add enum values
- Cannot tighten constraints (decrease max, increase min, etc.)
"""
return GtsEntityCastResult._check_schema_compatibility(old_schema, new_schema, check_backward=True)
return GtsEntityCastResult._check_schema_compatibility(
old_schema, new_schema, check_backward=True
)

@staticmethod
def _check_forward_compatibility(
Expand All @@ -634,7 +669,9 @@ def _check_forward_compatibility(
- Cannot remove enum values
- Cannot relax constraints (increase max, decrease min, etc.)
"""
return GtsEntityCastResult._check_schema_compatibility(old_schema, new_schema, check_backward=False)
return GtsEntityCastResult._check_schema_compatibility(
old_schema, new_schema, check_backward=False
)

@staticmethod
def _diff_objects(
Expand Down Expand Up @@ -697,7 +734,9 @@ def _only_optional_add_remove(
) -> bool:
if not isinstance(a, dict) or not isinstance(b, dict):
if a != b:
reasons.append(f"{GtsEntityCastResult._path_label(path)}: value changed")
reasons.append(
f"{GtsEntityCastResult._path_label(path)}: value changed"
)
return False
return True

Expand Down
14 changes: 7 additions & 7 deletions gts/src/gts/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __init__(self, reader: GtsReader) -> None:
Args:
reader: GtsReader instance to populate entities from
"""
self._by_id: Dict[str, JsonEntity] = {}
self._by_id: Dict[str, GtsEntity] = {}
self._reader = reader

# Populate entities from reader if provided
Expand Down Expand Up @@ -148,7 +148,7 @@ def register_schema(self, type_id: str, schema: Dict[str, Any]) -> None:
raise ValueError("Schema type_id must end with '~'")
# parse sanity
gts_id = GtsID(type_id)
entity = JsonEntity(content=schema, gts_id=gts_id, is_schema=True)
entity = GtsEntity(content=schema, gts_id=gts_id, is_schema=True)
self._by_id[type_id] = entity

def get(self, entity_id: str) -> Optional[GtsEntity]:
Expand Down Expand Up @@ -369,7 +369,7 @@ def is_minor_compatible(
new_entity = self.get(new_schema_id)

if not old_entity or not new_entity:
return JsonEntityCastResult(
return GtsEntityCastResult(
from_id=old_schema_id,
to_id=new_schema_id,
direction="unknown",
Expand All @@ -390,16 +390,16 @@ def is_minor_compatible(

# Use the cast method's compatibility checking logic
is_backward, backward_errors = (
JsonEntityCastResult._check_backward_compatibility(old_schema, new_schema)
GtsEntityCastResult._check_backward_compatibility(old_schema, new_schema)
)
is_forward, forward_errors = JsonEntityCastResult._check_forward_compatibility(
is_forward, forward_errors = GtsEntityCastResult._check_forward_compatibility(
old_schema, new_schema
)

# Determine direction
direction = JsonEntityCastResult._infer_direction(old_schema_id, new_schema_id)
direction = GtsEntityCastResult._infer_direction(old_schema_id, new_schema_id)

return JsonEntityCastResult(
return GtsEntityCastResult(
from_id=old_schema_id,
to_id=new_schema_id,
direction=direction,
Expand Down
Loading
Loading