diff --git a/state-manager/app/controller/create_states.py b/state-manager/app/controller/create_states.py index 435825e7..77d5cc81 100644 --- a/state-manager/app/controller/create_states.py +++ b/state-manager/app/controller/create_states.py @@ -56,7 +56,7 @@ async def create_states(namespace_name: str, graph_name: str, body: CreateReques return CreateResponseModel( status=StateStatusEnum.CREATED, - states=[ResponseStateModel(state_id=str(state.id), node_name=state.node_name, graph_name=state.graph_name, inputs=state.inputs, created_at=state.created_at) for state in newStates] + states=[ResponseStateModel(state_id=str(state.id), identifier=state.identifier, node_name=state.node_name, graph_name=state.graph_name, inputs=state.inputs, created_at=state.created_at) for state in newStates] ) except Exception as e: diff --git a/state-manager/app/controller/upsert_graph_template.py b/state-manager/app/controller/upsert_graph_template.py index ea956ded..1fabb381 100644 --- a/state-manager/app/controller/upsert_graph_template.py +++ b/state-manager/app/controller/upsert_graph_template.py @@ -2,11 +2,14 @@ from app.models.graph_models import UpsertGraphTemplateRequest, UpsertGraphTemplateResponse from app.models.db.graph_template_model import GraphTemplate from app.models.graph_template_validation_status import GraphTemplateValidationStatus +from app.tasks.verify_graph import verify_graph + +from fastapi import BackgroundTasks from beanie.operators import Set logger = LogsManager().get_logger() -async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, x_exosphere_request_id: str) -> UpsertGraphTemplateResponse: +async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, x_exosphere_request_id: str, background_tasks: BackgroundTasks) -> UpsertGraphTemplateResponse: try: graph_template = await GraphTemplate.find_one( GraphTemplate.name == graph_name, @@ -43,6 +46,8 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse ).set_secrets(body.secrets) ) + background_tasks.add_task(verify_graph, graph_template) + return UpsertGraphTemplateResponse( nodes=graph_template.nodes, validation_status=graph_template.validation_status, diff --git a/state-manager/app/routes.py b/state-manager/app/routes.py index f532687c..659256d9 100644 --- a/state-manager/app/routes.py +++ b/state-manager/app/routes.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, status, Request, Depends, HTTPException +from fastapi import APIRouter, status, Request, Depends, HTTPException, BackgroundTasks from uuid import uuid4 from bson import ObjectId @@ -28,7 +28,6 @@ from .models.secrets_response import SecretsResponseModel from .controller.get_secrets import get_secrets - logger = LogsManager().get_logger() router = APIRouter(prefix="/v0/namespace/{namespace_name}") @@ -121,7 +120,7 @@ async def errored_state_route(namespace_name: str, state_id: str, body: ErroredR response_description="Graph template upserted successfully", tags=["graph"] ) -async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, request: Request, api_key: str = Depends(check_api_key)): +async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, request: Request, background_tasks: BackgroundTasks, api_key: str = Depends(check_api_key)): x_exosphere_request_id = getattr(request.state, "x_exosphere_request_id", str(uuid4())) if api_key: @@ -130,7 +129,7 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key") - return await upsert_graph_template_controller(namespace_name, graph_name, body, x_exosphere_request_id) + return await upsert_graph_template_controller(namespace_name, graph_name, body, x_exosphere_request_id, background_tasks) @router.put( diff --git a/state-manager/app/tasks/__init__.py b/state-manager/app/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/state-manager/app/tasks/verify_graph.py b/state-manager/app/tasks/verify_graph.py new file mode 100644 index 00000000..3ca70d0e --- /dev/null +++ b/state-manager/app/tasks/verify_graph.py @@ -0,0 +1,93 @@ +from app.models.db.graph_template_model import GraphTemplate, NodeTemplate +from app.models.graph_template_validation_status import GraphTemplateValidationStatus +from app.models.db.registered_node import RegisteredNode +from app.singletons.logs_manager import LogsManager +from beanie.operators import In + +logger = LogsManager().get_logger() + +async def verify_nodes_names(nodes: list[NodeTemplate], errors: list[str]): + for node in nodes: + if node.node_name is None or node.node_name == "": + errors.append(f"Node {node.identifier} has no name") + +async def verify_nodes_namespace(nodes: list[NodeTemplate], graph_namespace: str, errors: list[str]): + for node in nodes: + if node.namespace != graph_namespace and node.namespace != "exospherehost": + errors.append(f"Node {node.identifier} has invalid namespace '{node.namespace}'. Must match graph namespace '{graph_namespace}' or use universal namespace 'exospherehost'") + +async def verify_node_exists(nodes: list[NodeTemplate], graph_namespace: str, errors: list[str]): + graph_namespace_node_names = [ + node.node_name for node in nodes if node.namespace == graph_namespace + ] + graph_namespace_database_nodes = await RegisteredNode.find( + In(RegisteredNode.name, graph_namespace_node_names), + RegisteredNode.namespace == graph_namespace + ).to_list() + exospherehost_node_names = [ + node.node_name for node in nodes if node.namespace == "exospherehost" + ] + exospherehost_database_nodes = await RegisteredNode.find( + In(RegisteredNode.name, exospherehost_node_names), + RegisteredNode.namespace == "exospherehost" + ).to_list() + + template_nodes = set([(node.node_name, node.namespace) for node in nodes]) + database_nodes = set([(node.name, node.namespace) for node in graph_namespace_database_nodes + exospherehost_database_nodes]) + + nodes_not_found = template_nodes - database_nodes + + for node in nodes_not_found: + errors.append(f"Node {node[0]} in namespace {node[1]} does not exist.") + +async def verify_node_identifiers(nodes: list[NodeTemplate], errors: list[str]): + identifier_to_nodes = {} + + # First pass: collect all nodes by identifier + for node in nodes: + if node.identifier is None or node.identifier == "": + errors.append(f"Node {node.node_name} in namespace {node.namespace} has no identifier") + continue + + if node.identifier not in identifier_to_nodes: + identifier_to_nodes[node.identifier] = [] + identifier_to_nodes[node.identifier].append(node) + + # Check for duplicates and report all nodes sharing the same identifier + for identifier, nodes_with_identifier in identifier_to_nodes.items(): + if len(nodes_with_identifier) > 1: + node_list = ", ".join([f"{node.node_name} in namespace {node.namespace}" for node in nodes_with_identifier]) + errors.append(f"Duplicate identifier '{identifier}' found in nodes: {node_list}") + + # Check next_nodes references using the valid identifiers + valid_identifiers = set(identifier_to_nodes.keys()) + for node in nodes: + if node.next_nodes is None: + continue + for next_node in node.next_nodes: + if next_node not in valid_identifiers: + errors.append(f"Node {node.node_name} in namespace {node.namespace} has a next node {next_node} that does not exist in the graph") + +async def verify_graph(graph_template: GraphTemplate): + try: + errors = [] + await verify_nodes_names(graph_template.nodes, errors) + await verify_nodes_namespace(graph_template.nodes, graph_template.namespace, errors) + await verify_node_exists(graph_template.nodes, graph_template.namespace, errors) + await verify_node_identifiers(graph_template.nodes, errors) + + if errors: + graph_template.validation_status = GraphTemplateValidationStatus.INVALID + graph_template.validation_errors = errors + await graph_template.save() + return + + graph_template.validation_status = GraphTemplateValidationStatus.VALID + graph_template.validation_errors = None + await graph_template.save() + + except Exception as e: + logger.error(f"Exception during graph validation for graph template {graph_template.id}: {str(e)}", exc_info=True) + graph_template.validation_status = GraphTemplateValidationStatus.INVALID + graph_template.validation_errors = [f"Validation failed due to unexpected error: {str(e)}"] + await graph_template.save() \ No newline at end of file