Skip to content
2 changes: 1 addition & 1 deletion state-manager/app/controller/create_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def create_states(namespace_name: str, graph_name: str, body: CreateReques

return CreateResponseModel(
status=StateStatusEnum.CREATED,
states=[ResponseStateModel(state_id=str(state.id), node_name=state.node_name, graph_name=state.graph_name, inputs=state.inputs, created_at=state.created_at) for state in newStates]
states=[ResponseStateModel(state_id=str(state.id), identifier=state.identifier, node_name=state.node_name, graph_name=state.graph_name, inputs=state.inputs, created_at=state.created_at) for state in newStates]
)

except Exception as e:
Expand Down
7 changes: 6 additions & 1 deletion state-manager/app/controller/upsert_graph_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
from app.models.graph_models import UpsertGraphTemplateRequest, UpsertGraphTemplateResponse
from app.models.db.graph_template_model import GraphTemplate
from app.models.graph_template_validation_status import GraphTemplateValidationStatus
from app.tasks.verify_graph import verify_graph

from fastapi import BackgroundTasks
from beanie.operators import Set

logger = LogsManager().get_logger()

async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, x_exosphere_request_id: str) -> UpsertGraphTemplateResponse:
async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, x_exosphere_request_id: str, background_tasks: BackgroundTasks) -> UpsertGraphTemplateResponse:
try:
graph_template = await GraphTemplate.find_one(
GraphTemplate.name == graph_name,
Expand Down Expand Up @@ -43,6 +46,8 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
).set_secrets(body.secrets)
)

background_tasks.add_task(verify_graph, graph_template)

return UpsertGraphTemplateResponse(
nodes=graph_template.nodes,
validation_status=graph_template.validation_status,
Expand Down
7 changes: 3 additions & 4 deletions state-manager/app/routes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fastapi import APIRouter, status, Request, Depends, HTTPException
from fastapi import APIRouter, status, Request, Depends, HTTPException, BackgroundTasks
from uuid import uuid4
from bson import ObjectId

Expand Down Expand Up @@ -28,7 +28,6 @@
from .models.secrets_response import SecretsResponseModel
from .controller.get_secrets import get_secrets


logger = LogsManager().get_logger()

router = APIRouter(prefix="/v0/namespace/{namespace_name}")
Expand Down Expand Up @@ -121,7 +120,7 @@ async def errored_state_route(namespace_name: str, state_id: str, body: ErroredR
response_description="Graph template upserted successfully",
tags=["graph"]
)
async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, request: Request, api_key: str = Depends(check_api_key)):
async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, request: Request, background_tasks: BackgroundTasks, api_key: str = Depends(check_api_key)):
x_exosphere_request_id = getattr(request.state, "x_exosphere_request_id", str(uuid4()))

if api_key:
Expand All @@ -130,7 +129,7 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")

return await upsert_graph_template_controller(namespace_name, graph_name, body, x_exosphere_request_id)
return await upsert_graph_template_controller(namespace_name, graph_name, body, x_exosphere_request_id, background_tasks)


@router.put(
Expand Down
Empty file.
93 changes: 93 additions & 0 deletions state-manager/app/tasks/verify_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from app.models.db.graph_template_model import GraphTemplate, NodeTemplate
from app.models.graph_template_validation_status import GraphTemplateValidationStatus
from app.models.db.registered_node import RegisteredNode
from app.singletons.logs_manager import LogsManager
from beanie.operators import In

logger = LogsManager().get_logger()

async def verify_nodes_names(nodes: list[NodeTemplate], errors: list[str]):
for node in nodes:
if node.node_name is None or node.node_name == "":
errors.append(f"Node {node.identifier} has no name")

async def verify_nodes_namespace(nodes: list[NodeTemplate], graph_namespace: str, errors: list[str]):
for node in nodes:
if node.namespace != graph_namespace and node.namespace != "exospherehost":
errors.append(f"Node {node.identifier} has invalid namespace '{node.namespace}'. Must match graph namespace '{graph_namespace}' or use universal namespace 'exospherehost'")

async def verify_node_exists(nodes: list[NodeTemplate], graph_namespace: str, errors: list[str]):
graph_namespace_node_names = [
node.node_name for node in nodes if node.namespace == graph_namespace
]
graph_namespace_database_nodes = await RegisteredNode.find(
In(RegisteredNode.name, graph_namespace_node_names),
RegisteredNode.namespace == graph_namespace
).to_list()
exospherehost_node_names = [
node.node_name for node in nodes if node.namespace == "exospherehost"
]
exospherehost_database_nodes = await RegisteredNode.find(
In(RegisteredNode.name, exospherehost_node_names),
RegisteredNode.namespace == "exospherehost"
).to_list()

template_nodes = set([(node.node_name, node.namespace) for node in nodes])
database_nodes = set([(node.name, node.namespace) for node in graph_namespace_database_nodes + exospherehost_database_nodes])

nodes_not_found = template_nodes - database_nodes

for node in nodes_not_found:
errors.append(f"Node {node[0]} in namespace {node[1]} does not exist.")

async def verify_node_identifiers(nodes: list[NodeTemplate], errors: list[str]):
identifier_to_nodes = {}

# First pass: collect all nodes by identifier
for node in nodes:
if node.identifier is None or node.identifier == "":
errors.append(f"Node {node.node_name} in namespace {node.namespace} has no identifier")
continue

if node.identifier not in identifier_to_nodes:
identifier_to_nodes[node.identifier] = []
identifier_to_nodes[node.identifier].append(node)

# Check for duplicates and report all nodes sharing the same identifier
for identifier, nodes_with_identifier in identifier_to_nodes.items():
if len(nodes_with_identifier) > 1:
node_list = ", ".join([f"{node.node_name} in namespace {node.namespace}" for node in nodes_with_identifier])
errors.append(f"Duplicate identifier '{identifier}' found in nodes: {node_list}")

# Check next_nodes references using the valid identifiers
valid_identifiers = set(identifier_to_nodes.keys())
for node in nodes:
if node.next_nodes is None:
continue
for next_node in node.next_nodes:
if next_node not in valid_identifiers:
errors.append(f"Node {node.node_name} in namespace {node.namespace} has a next node {next_node} that does not exist in the graph")

async def verify_graph(graph_template: GraphTemplate):
try:
errors = []
await verify_nodes_names(graph_template.nodes, errors)
await verify_nodes_namespace(graph_template.nodes, graph_template.namespace, errors)
await verify_node_exists(graph_template.nodes, graph_template.namespace, errors)
await verify_node_identifiers(graph_template.nodes, errors)

if errors:
graph_template.validation_status = GraphTemplateValidationStatus.INVALID
graph_template.validation_errors = errors
await graph_template.save()
return

graph_template.validation_status = GraphTemplateValidationStatus.VALID
graph_template.validation_errors = None
await graph_template.save()

except Exception as e:
logger.error(f"Exception during graph validation for graph template {graph_template.id}: {str(e)}", exc_info=True)
graph_template.validation_status = GraphTemplateValidationStatus.INVALID
graph_template.validation_errors = [f"Validation failed due to unexpected error: {str(e)}"]
await graph_template.save()