diff --git a/src/backend/base/langflow/alembic/versions/ea8c52f13171_add_status_column_in_flow.py b/src/backend/base/langflow/alembic/versions/ea8c52f13171_add_status_column_in_flow.py new file mode 100644 index 000000000000..6276bdec2a20 --- /dev/null +++ b/src/backend/base/langflow/alembic/versions/ea8c52f13171_add_status_column_in_flow.py @@ -0,0 +1,47 @@ +"""add status column in flow. + +Revision ID: ea8c52f13171 +Revises: d37bc4322900 +Create Date: 2025-05-07 14:30:49.260805 + +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +from langflow.utils import migration + +# revision identifiers, used by Alembic. +revision: str = "ea8c52f13171" # pragma: allowlist secret +down_revision: str | None = "d37bc4322900" # pragma: allowlist secret +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + conn = op.get_bind() + # ### commands auto generated by Alembic - please adjust! ### + deployment_state_enum = sa.Enum("DRAFT", "DEPLOYED", name="deployment_state_enum") + deployment_state_enum.create(conn, checkfirst=True) + with op.batch_alter_table("flow", schema=None) as batch_op: + if not migration.column_exists(table_name="flow", column_name="status", conn=conn): + batch_op.add_column( + sa.Column("status", deployment_state_enum, server_default=sa.text("'DRAFT'"), nullable=False) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + conn = op.get_bind() + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("flow", schema=None) as batch_op: + if migration.column_exists(table_name="flow", column_name="status", conn=conn): + batch_op.drop_column("status") + + deployment_state_enum = sa.Enum("DRAFT", "DEPLOYED", name="deployment_state_enum") + deployment_state_enum.drop(conn, checkfirst=True) + + # ### end Alembic commands ### diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index 60a9e09551da..b60b1253aef4 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -23,6 +23,7 @@ from lfx.graph.schema import RunOutputs from lfx.log.logger import logger from lfx.schema.schema import InputValueRequest +from lfx.services.cache.utils import CACHE_MISS from lfx.services.settings.service import SettingsService from sqlmodel import select @@ -40,25 +41,31 @@ from langflow.events.event_manager import create_stream_tokens_event_manager from langflow.exceptions.api import APIException, InvalidChatInputError from langflow.exceptions.serialization import SerializationError -from langflow.helpers.flow import get_flow_by_id_or_endpoint_name +from langflow.helpers.flow import get_flow_by_id_or_endpoint_name, get_flow_by_id_or_endpoint_name_from_cache from langflow.interface.initialize.loading import update_params_with_load_from_db_fields from langflow.processing.process import process_tweaks, run_graph_internal from langflow.schema.graph import Tweaks from langflow.services.auth.utils import api_key_security, get_current_active_user, get_webhook_user from langflow.services.cache.utils import save_uploaded_file -from langflow.services.database.models.flow.model import Flow, FlowRead +from langflow.services.database.models.flow.model import Flow from langflow.services.database.models.flow.utils import get_all_webhook_components_in_flow from langflow.services.database.models.user.model import User, UserRead -from langflow.services.deps import get_session_service, get_settings_service, get_telemetry_service +from langflow.services.deps import ( + get_session_service, + get_settings_service, + get_telemetry_service, +) from langflow.services.telemetry.schema import RunPayload from langflow.utils.compression import compress_response from langflow.utils.version import get_version_info if TYPE_CHECKING: from langflow.events.event_manager import EventManager - router = APIRouter(tags=["Base"]) +# Constants for byte size conversion +BYTES_PER_KB = 1024.0 + async def parse_input_request_from_body(http_request: Request) -> SimplifiedAPIRequest: """Parse SimplifiedAPIRequest from HTTP request body. @@ -133,7 +140,7 @@ def validate_input_and_tweaks(input_request: SimplifiedAPIRequest) -> None: async def simple_run_flow( - flow: Flow, + flow: Flow | Graph, input_request: SimplifiedAPIRequest, *, stream: bool = False, @@ -303,7 +310,7 @@ async def run_flow_generator( async def simplified_run_flow( *, background_tasks: BackgroundTasks, - flow: Annotated[FlowRead | None, Depends(get_flow_by_id_or_endpoint_name)], + flow: Annotated[Graph | None, Depends(get_flow_by_id_or_endpoint_name_from_cache)], input_request: SimplifiedAPIRequest | None = None, stream: bool = False, api_key_user: Annotated[UserRead, Depends(api_key_security)], @@ -351,7 +358,7 @@ async def simplified_run_flow( if input_request is None: input_request = await parse_input_request_from_body(http_request) - if flow is None: + if flow is None or flow is CACHE_MISS: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found") # Extract request-level variables from headers with prefix X-LANGFLOW-GLOBAL-VAR-* diff --git a/src/backend/base/langflow/api/v1/flows.py b/src/backend/base/langflow/api/v1/flows.py index 6caa0f7bc05b..ff0e1421ac4f 100644 --- a/src/backend/base/langflow/api/v1/flows.py +++ b/src/backend/base/langflow/api/v1/flows.py @@ -26,6 +26,7 @@ from langflow.initial_setup.constants import STARTER_FOLDER_NAME from langflow.services.database.models.flow.model import ( AccessTypeEnum, + DeploymentStateEnum, Flow, FlowCreate, FlowHeader, @@ -35,7 +36,8 @@ from langflow.services.database.models.flow.utils import get_webhook_component_in_flow from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME from langflow.services.database.models.folder.model import Folder -from langflow.services.deps import get_settings_service +from langflow.services.deps import get_flow_cache_service, get_settings_service +from langflow.services.flow_cache.service import FlowCacheService from langflow.utils.compression import compress_response # build router @@ -157,14 +159,24 @@ async def create_flow( session: DbSession, flow: FlowCreate, current_user: CurrentActiveUser, + flow_cache_service: Annotated[FlowCacheService, Depends(get_flow_cache_service)], ): try: db_flow = await _new_flow(session=session, flow=flow, user_id=current_user.id) + + # If flow is created as DEPLOYED, lock it before committing + if db_flow.status == DeploymentStateEnum.DEPLOYED: + db_flow.locked = True + await session.commit() await session.refresh(db_flow) await _save_flow_to_fs(db_flow) + # Add deployed flows to cache + if db_flow.status == DeploymentStateEnum.DEPLOYED: + await flow_cache_service.add_flow_to_cache(db_flow) + except Exception as e: if "UNIQUE constraint failed" in str(e): # Get the name of the column that failed @@ -321,6 +333,7 @@ async def update_flow( flow_id: UUID, flow: FlowUpdate, current_user: CurrentActiveUser, + flow_cache_service: Annotated[FlowCacheService, Depends(get_flow_cache_service)], ): """Update a flow.""" settings_service = get_settings_service() @@ -334,6 +347,9 @@ async def update_flow( if not db_flow: raise HTTPException(status_code=404, detail="Flow not found") + # Store old endpoint name for cache cleanup if it changes + old_endpoint_name = db_flow.endpoint_name + update_data = flow.model_dump(exclude_unset=True, exclude_none=True) # Specifically handle endpoint_name when it's explicitly set to null or empty string @@ -356,6 +372,17 @@ async def update_flow( default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() if default_folder: db_flow.folder_id = default_folder.id + if db_flow.status == DeploymentStateEnum.DEPLOYED: + # Refresh the flow in the in-memory cache to ensure we have the latest version + # Pass old_endpoint_name in case it changed, to clean up stale aliases + old_name = old_endpoint_name if old_endpoint_name != db_flow.endpoint_name else None + await flow_cache_service.refresh_flow_in_cache(db_flow, old_endpoint_name=old_name) + db_flow.locked = True + elif db_flow.status == DeploymentStateEnum.DRAFT and update_data.get("status") == DeploymentStateEnum.DRAFT: + # Only unlock if status was explicitly changed to DRAFT (not just omitted from request) + # Pass old_endpoint_name to clean up all cache aliases + await flow_cache_service.remove_flow_from_cache(db_flow, old_endpoint_name=old_endpoint_name) + db_flow.locked = False session.add(db_flow) await session.commit() @@ -539,6 +566,33 @@ async def download_multiple_file( return flows_without_api_keys[0] +@router.get("/cache/stats", response_model=dict, status_code=200) +async def get_flow_cache_stats( + *, + _current_user: CurrentActiveUser, + flow_cache_service: Annotated[FlowCacheService, Depends(get_flow_cache_service)], +): + """Get statistics about the flow cache. + + Returns information about the current state of the flow cache, including: + - Number of flows currently cached + - Maximum cache size (if configured) + - List of cached flow identifiers (IDs and endpoint names) + + This is useful for monitoring cache performance and debugging deployment issues. + + Requires authentication (user must be logged in). + + Args: + _current_user (User): The current authenticated user (required for auth) + flow_cache_service (FlowCacheService): The flow cache service + + Returns: + dict: Cache statistics including size, max_size, and cached keys + """ + return await flow_cache_service.get_cache_stats() + + all_starter_folder_flows_response: Response | None = None diff --git a/src/backend/base/langflow/api/v1/mcp_projects.py b/src/backend/base/langflow/api/v1/mcp_projects.py index 793df020614e..cd73b37b631a 100644 --- a/src/backend/base/langflow/api/v1/mcp_projects.py +++ b/src/backend/base/langflow/api/v1/mcp_projects.py @@ -219,7 +219,12 @@ async def list_project_tools( raise HTTPException(status_code=404, detail="Project not found") # Query flows in the project - flows_query = select(Flow).where(Flow.folder_id == project_id, Flow.is_component == False) # noqa: E712 + # Note: All flows are available via MCP regardless of deployment status + # Deployed flows will use cache for better performance + flows_query = select(Flow).where( + Flow.folder_id == project_id, + Flow.is_component == False, # noqa: E712 + ) # Optionally filter for MCP-enabled flows only if mcp_enabled: @@ -248,6 +253,7 @@ async def list_project_tools( # inputSchema=json_schema_from_flow(flow), name=flow.name, description=flow.description, + status=flow.status, ) tools.append(tool) except Exception as e: # noqa: BLE001 diff --git a/src/backend/base/langflow/api/v1/mcp_utils.py b/src/backend/base/langflow/api/v1/mcp_utils.py index 47232c244cf3..059f5fb17216 100644 --- a/src/backend/base/langflow/api/v1/mcp_utils.py +++ b/src/backend/base/langflow/api/v1/mcp_utils.py @@ -25,7 +25,7 @@ from langflow.schema.message import Message from langflow.services.database.models import Flow from langflow.services.database.models.user.model import User -from langflow.services.deps import get_settings_service, get_storage_service, session_scope +from langflow.services.deps import get_flow_cache_service, get_settings_service, get_storage_service, session_scope T = TypeVar("T") P = ParamSpec("P") @@ -192,6 +192,12 @@ async def execute_tool(session): msg = f"Flow '{name}' not found in project {project_id}" raise ValueError(msg) + # Try to get the flow from cache for better performance (deployed flows are cached) + # If not in cache, use the flow from database - no breaking changes + flow_cache_service = get_flow_cache_service() + cached_graph = await flow_cache_service.get_cached_graph(str(flow.id)) + flow_to_run = cached_graph if cached_graph is not None else flow + # Process inputs processed_inputs = dict(arguments) @@ -231,7 +237,7 @@ async def send_progress_updates(progress_token): try: try: result = await simple_run_flow( - flow=flow, + flow=flow_to_run, input_request=input_request, stream=False, api_key_user=current_user, diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index 69f8439732a6..787e6c2a0321 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -444,6 +444,7 @@ class MCPSettings(BaseModel): action_description: str | None = None name: str | None = None description: str | None = None + status: Literal["DRAFT", "DEPLOYED"] | None = None class MCPProjectUpdateRequest(BaseModel): diff --git a/src/backend/base/langflow/helpers/flow.py b/src/backend/base/langflow/helpers/flow.py index 46f4b3810f33..a7c45379b120 100644 --- a/src/backend/base/langflow/helpers/flow.py +++ b/src/backend/base/langflow/helpers/flow.py @@ -5,12 +5,14 @@ from fastapi import HTTPException from lfx.log.logger import logger +from lfx.services.cache.utils import CACHE_MISS from pydantic.v1 import BaseModel, Field, create_model from sqlmodel import select from langflow.schema.schema import INPUT_FIELD_NAME -from langflow.services.database.models.flow.model import Flow, FlowRead -from langflow.services.deps import get_settings_service, session_scope +from langflow.services.database.models.flow import Flow +from langflow.services.database.models.flow.model import FlowRead +from langflow.services.deps import get_flow_cache_service, get_settings_service, session_scope if TYPE_CHECKING: from collections.abc import Awaitable, Callable @@ -295,6 +297,32 @@ async def get_flow_by_id_or_endpoint_name(flow_id_or_name: str, user_id: str | U return FlowRead.model_validate(flow, from_attributes=True) +async def get_flow_by_id_or_endpoint_name_from_cache(flow_id_or_name: str, *, use_cache: bool = True): + """Get a flow by ID or endpoint name, using cache when available. + + Args: + flow_id_or_name: Flow UUID or endpoint name + use_cache: Whether to check the cache first (default: True) + + Returns: + Graph instance if using cache and found, FlowRead otherwise + + Notes: + - If use_cache=True, tries cache first for deployed flows + - Falls back to database if not found in cache + - If use_cache=False, always queries database + """ + if use_cache: + flow_cache_service = get_flow_cache_service() + flow = await flow_cache_service.get_cached_graph(flow_id_or_name) + if flow is not None and flow != CACHE_MISS: + # Cache hit - return the Graph instance + return flow + # Cache miss - fall through to database query + + return await get_flow_by_id_or_endpoint_name(flow_id_or_name) + + async def generate_unique_flow_name(flow_name, user_id, session): original_name = flow_name n = 1 diff --git a/src/backend/base/langflow/interface/initialize/loading.py b/src/backend/base/langflow/interface/initialize/loading.py index 9a6632853b9a..312f13aeaf25 100644 --- a/src/backend/base/langflow/interface/initialize/loading.py +++ b/src/backend/base/langflow/interface/initialize/loading.py @@ -1,201 +1,27 @@ from __future__ import annotations -import inspect -import os -import warnings -from typing import TYPE_CHECKING, Any - -import orjson -from lfx.custom.eval import eval_custom_component_code -from lfx.log.logger import logger -from pydantic import PydanticDeprecatedSince20 - -from langflow.schema.artifact import get_artifact_type, post_process_raw -from langflow.schema.data import Data -from langflow.services.deps import get_tracing_service, session_scope - -if TYPE_CHECKING: - from lfx.custom.custom_component.component import Component - from lfx.custom.custom_component.custom_component import CustomComponent - from lfx.graph.vertex.base import Vertex - - from langflow.events.event_manager import EventManager - - -def instantiate_class( - vertex: Vertex, - user_id=None, - event_manager: EventManager | None = None, -) -> Any: - """Instantiate class from module type and key, and params.""" - vertex_type = vertex.vertex_type - base_type = vertex.base_type - logger.debug(f"Instantiating {vertex_type} of type {base_type}") - - if not base_type: - msg = "No base type provided for vertex" - raise ValueError(msg) - - custom_params = get_params(vertex.params) - code = custom_params.pop("code") - class_object: type[CustomComponent | Component] = eval_custom_component_code(code) - custom_component: CustomComponent | Component = class_object( - _user_id=user_id, - _parameters=custom_params, - _vertex=vertex, - _tracing_service=get_tracing_service(), - _id=vertex.id, - ) - if hasattr(custom_component, "set_event_manager"): - custom_component.set_event_manager(event_manager) - return custom_component, custom_params - - -async def get_instance_results( - custom_component, - custom_params: dict, - vertex: Vertex, - *, - fallback_to_env_vars: bool = False, - base_type: str = "component", -): - custom_params = await update_params_with_load_from_db_fields( - custom_component, - custom_params, - vertex.load_from_db_fields, - fallback_to_env_vars=fallback_to_env_vars, - ) - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) - if base_type == "custom_components": - return await build_custom_component(params=custom_params, custom_component=custom_component) - if base_type == "component": - return await build_component(params=custom_params, custom_component=custom_component) - msg = f"Base type {base_type} not found." - raise ValueError(msg) - - -def get_params(vertex_params): - params = vertex_params - params = convert_params_to_sets(params) - params = convert_kwargs(params) - return params.copy() - - -def convert_params_to_sets(params): - """Convert certain params to sets.""" - if "allowed_special" in params: - params["allowed_special"] = set(params["allowed_special"]) - if "disallowed_special" in params: - params["disallowed_special"] = set(params["disallowed_special"]) - return params - - -def convert_kwargs(params): - # Loop through items to avoid repeated lookups - items_to_remove = [] - for key, value in params.items(): - if ("kwargs" in key or "config" in key) and isinstance(value, str): - try: - params[key] = orjson.loads(value) - except orjson.JSONDecodeError: - items_to_remove.append(key) - - # Remove invalid keys outside the loop to avoid modifying dict during iteration - for key in items_to_remove: - params.pop(key, None) - - return params - - -async def update_params_with_load_from_db_fields( - custom_component: Component, - params, - load_from_db_fields, - *, - fallback_to_env_vars=False, -): - async with session_scope() as session: - for field in load_from_db_fields: - if field not in params or not params[field]: - continue - - try: - key = await custom_component.get_variable(name=params[field], field=field, session=session) - except ValueError as e: - if "User id is not set" in str(e): - raise - if "variable not found." in str(e) and not fallback_to_env_vars: - raise - await logger.adebug(str(e)) - key = None - - if fallback_to_env_vars and key is None: - key = os.getenv(params[field]) - if key: - await logger.ainfo(f"Using environment variable {params[field]} for {field}") - else: - await logger.aerror(f"Environment variable {params[field]} is not set.") - - params[field] = key if key is not None else None - if key is None: - await logger.awarning(f"Could not get value for {field}. Setting it to None.") - - return params - - -async def build_component( - params: dict, - custom_component: Component, -): - # Now set the params as attributes of the custom_component - custom_component.set_attributes(params) - build_results, artifacts = await custom_component.build_results() - - return custom_component, build_results, artifacts - - -async def build_custom_component(params: dict, custom_component: CustomComponent): - if "retriever" in params and hasattr(params["retriever"], "as_retriever"): - params["retriever"] = params["retriever"].as_retriever() - - # Determine if the build method is asynchronous - is_async = inspect.iscoroutinefunction(custom_component.build) - - # New feature: the component has a list of outputs and we have - # to check the vertex.edges to see which is connected (coulb be multiple) - # and then we'll get the output which has the name of the method we should call. - # the methods don't require any params because they are already set in the custom_component - # so we can just call them - - if is_async: - # Await the build method directly if it's async - build_result = await custom_component.build(**params) - else: - # Call the build method directly if it's sync - build_result = custom_component.build(**params) - custom_repr = custom_component.custom_repr() - if custom_repr is None and isinstance(build_result, dict | Data | str): - custom_repr = build_result - if not isinstance(custom_repr, str): - custom_repr = str(custom_repr) - raw = custom_component.repr_value - if hasattr(raw, "data") and raw is not None: - raw = raw.data - - elif hasattr(raw, "model_dump") and raw is not None: - raw = raw.model_dump() - if raw is None and isinstance(build_result, dict | Data | str): - raw = build_result.data if isinstance(build_result, Data) else build_result - - artifact_type = get_artifact_type(custom_component.repr_value or raw, build_result) - raw = post_process_raw(raw, artifact_type) - artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} - - if custom_component._vertex is not None: - custom_component._artifacts = {custom_component._vertex.outputs[0].get("name"): artifact} - custom_component._results = {custom_component._vertex.outputs[0].get("name"): build_result} - return custom_component, build_result, artifact - - msg = "Custom component does not have a vertex" - raise ValueError(msg) +# Re-export everything from lfx.interface.initialize.loading for backwards compatibility +from lfx.interface.initialize.loading import ( + build_component, + build_custom_component, + convert_kwargs, + convert_params_to_sets, + get_instance_results, + get_params, + instantiate_class, + update_params_with_load_from_db_fields, + update_table_params_with_load_from_db_fields, +) + +# Make re-exported functions available at module level +__all__ = [ + "build_component", + "build_custom_component", + "convert_kwargs", + "convert_params_to_sets", + "get_instance_results", + "get_params", + "instantiate_class", + "update_params_with_load_from_db_fields", + "update_table_params_with_load_from_db_fields", +] diff --git a/src/backend/base/langflow/services/database/models/flow/model.py b/src/backend/base/langflow/services/database/models/flow/model.py index c3cf523a9f3b..2e5fcda56ecd 100644 --- a/src/backend/base/langflow/services/database/models/flow/model.py +++ b/src/backend/base/langflow/services/database/models/flow/model.py @@ -29,6 +29,11 @@ class AccessTypeEnum(str, Enum): PUBLIC = "PUBLIC" +class DeploymentStateEnum(str, Enum): + DRAFT = "DRAFT" + DEPLOYED = "DEPLOYED" + + class FlowBase(SQLModel): # Supresses warnings during migrations __mapper_args__ = {"confirm_deleted_rows": False} @@ -66,6 +71,19 @@ class FlowBase(SQLModel): server_default=text("'PRIVATE'"), ), ) + status: DeploymentStateEnum = Field( + default=DeploymentStateEnum.DRAFT, + sa_column=Column( + SQLEnum( + DeploymentStateEnum, + name="deployment_state_enum", + values_callable=lambda enum: [member.value for member in enum], + ), + nullable=False, + server_default=text("'DRAFT'"), + ), + description="The current deployment state of the flow", + ) @field_validator("endpoint_name") @classmethod @@ -263,6 +281,7 @@ class FlowUpdate(SQLModel): action_name: str | None = None action_description: str | None = None access_type: AccessTypeEnum | None = None + status: DeploymentStateEnum | None = None fs_path: str | None = None @field_validator("endpoint_name") diff --git a/src/backend/base/langflow/services/database/models/flow/utils.py b/src/backend/base/langflow/services/database/models/flow/utils.py index 051af0b3b238..f7469e5f0b1c 100644 --- a/src/backend/base/langflow/services/database/models/flow/utils.py +++ b/src/backend/base/langflow/services/database/models/flow/utils.py @@ -21,9 +21,17 @@ def get_all_webhook_components_in_flow(flow_data: dict | None): def get_components_versions(flow: Flow): versions: dict[str, str] = {} - if flow.data is None: + + # Safely get graph_data or data, preferring graph_data + data = getattr(flow, "graph_data", None) + if data is None or not isinstance(data, dict): + data = getattr(flow, "data", None) + + # If data is still None or not a dict, return empty versions + if data is None or not isinstance(data, dict): return versions - nodes = flow.data.get("nodes", []) + + nodes = data.get("nodes", []) for node in nodes: data = node.get("data", {}) data_node = data.get("node", {}) diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index 1672ecdf8b49..f9d18cd7cb91 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -197,54 +197,53 @@ async def with_session(self): await session.rollback() raise - async def assign_orphaned_flows_to_superuser(self) -> None: + async def assign_orphaned_flows_to_superuser(self, session: AsyncSession) -> None: """Assign orphaned flows to the default superuser when auto login is enabled.""" settings_service = get_settings_service() if not settings_service.auth_settings.AUTO_LOGIN: return - async with self.with_session() as session: - # Fetch orphaned flows - stmt = ( - select(models.Flow) - .join(models.Folder) - .where( - models.Flow.user_id == None, # noqa: E711 - models.Folder.name != STARTER_FOLDER_NAME, - ) + # Fetch orphaned flows + stmt = ( + select(models.Flow) + .join(models.Folder) + .where( + models.Flow.user_id == None, # noqa: E711 + models.Folder.name != STARTER_FOLDER_NAME, ) - orphaned_flows = (await session.exec(stmt)).all() + ) + orphaned_flows = (await session.exec(stmt)).all() - if not orphaned_flows: - return + if not orphaned_flows: + return - await logger.adebug("Assigning orphaned flows to the default superuser") + await logger.adebug("Assigning orphaned flows to the default superuser") - # Retrieve superuser - superuser_username = settings_service.auth_settings.SUPERUSER - superuser = await get_user_by_username(session, superuser_username) + # Retrieve superuser + superuser_username = settings_service.auth_settings.SUPERUSER + superuser = await get_user_by_username(session, superuser_username) - if not superuser: - error_message = "Default superuser not found" - await logger.aerror(error_message) - raise RuntimeError(error_message) + if not superuser: + error_message = "Default superuser not found" + await logger.aerror(error_message) + raise RuntimeError(error_message) - # Get existing flow names for the superuser - existing_names: set[str] = set( - (await session.exec(select(models.Flow.name).where(models.Flow.user_id == superuser.id))).all() - ) + # Get existing flow names for the superuser + existing_names: set[str] = set( + (await session.exec(select(models.Flow.name).where(models.Flow.user_id == superuser.id))).all() + ) - # Process orphaned flows - for flow in orphaned_flows: - flow.user_id = superuser.id - flow.name = self._generate_unique_flow_name(flow.name, existing_names) - existing_names.add(flow.name) - session.add(flow) + # Process orphaned flows + for flow in orphaned_flows: + flow.user_id = superuser.id + flow.name = self._generate_unique_flow_name(flow.name, existing_names) + existing_names.add(flow.name) + session.add(flow) - # Commit changes - await session.commit() - await logger.adebug("Successfully assigned orphaned flows to the default superuser") + # Commit changes + await session.commit() + await logger.adebug("Successfully assigned orphaned flows to the default superuser") @staticmethod def _generate_unique_flow_name(original_name: str, existing_names: set[str]) -> str: diff --git a/src/backend/base/langflow/services/deps.py b/src/backend/base/langflow/services/deps.py index 36fdb8c7de84..a18ceb85ebbb 100644 --- a/src/backend/base/langflow/services/deps.py +++ b/src/backend/base/langflow/services/deps.py @@ -16,6 +16,7 @@ from langflow.services.cache.service import AsyncBaseCacheService, CacheService from langflow.services.chat.service import ChatService from langflow.services.database.service import DatabaseService + from langflow.services.flow_cache.service import FlowCacheService from langflow.services.job_queue.service import JobQueueService from langflow.services.session.service import SessionService from langflow.services.state.service import StateService @@ -241,3 +242,10 @@ def get_queue_service() -> JobQueueService: from langflow.services.job_queue.factory import JobQueueServiceFactory return get_service(ServiceType.JOB_QUEUE_SERVICE, JobQueueServiceFactory()) + + +def get_flow_cache_service() -> FlowCacheService: + """Retrieves the FlowCacheService instance from the service manager.""" + from langflow.services.flow_cache.factory import FlowCacheServiceFactory + + return get_service(ServiceType.FLOW_CACHE_SERVICE, FlowCacheServiceFactory()) diff --git a/src/backend/base/langflow/services/flow_cache/__init__.py b/src/backend/base/langflow/services/flow_cache/__init__.py new file mode 100644 index 000000000000..4564297b7b44 --- /dev/null +++ b/src/backend/base/langflow/services/flow_cache/__init__.py @@ -0,0 +1 @@ +"""Flow cache service for storing and retrieving deployed flow Graph instances.""" diff --git a/src/backend/base/langflow/services/flow_cache/factory.py b/src/backend/base/langflow/services/flow_cache/factory.py new file mode 100644 index 000000000000..b7ceeba411fd --- /dev/null +++ b/src/backend/base/langflow/services/flow_cache/factory.py @@ -0,0 +1,24 @@ +"""Factory for creating and managing FlowCacheService instances.""" + +from langflow.services.factory import ServiceFactory +from langflow.services.flow_cache.service import FlowCacheService + + +class FlowCacheServiceFactory(ServiceFactory): + """Factory for creating FlowCacheService instances with singleton pattern.""" + + def __init__(self) -> None: + """Initialize the FlowCacheServiceFactory.""" + super().__init__(FlowCacheService) + self._flow_cache_service_instance: FlowCacheService | None = None + + def create(self): + """Create or return the cached FlowCacheService instance. + + Returns: + FlowCacheService: The singleton FlowCacheService instance + """ + # Cache the FlowCacheService instance to avoid repeated instantiation + if self._flow_cache_service_instance is None: + self._flow_cache_service_instance = FlowCacheService() + return self._flow_cache_service_instance diff --git a/src/backend/base/langflow/services/flow_cache/service.py b/src/backend/base/langflow/services/flow_cache/service.py new file mode 100644 index 000000000000..610cbf6942f7 --- /dev/null +++ b/src/backend/base/langflow/services/flow_cache/service.py @@ -0,0 +1,195 @@ +from __future__ import annotations + +import sys +from copy import deepcopy +from typing import TYPE_CHECKING + +from loguru import logger + +from langflow.services.cache.service import AsyncInMemoryCache + +if TYPE_CHECKING: + from lfx.graph.graph.base import Graph + + from langflow.services.database.models.flow import Flow + + +class FlowCacheService(AsyncInMemoryCache): + """A cache service for storing and retrieving Flow Graph instances. + + This service provides an in-memory cache for Graph instances created from Flow data. + It's designed to improve performance by avoiding repeated Graph creation for deployed flows. + """ + + name = "flow_cache_service" + + async def add_flow_to_cache(self, flow: Flow, *, silent: bool = False) -> None: + """Add a flow's Graph instance to the cache. + + Args: + flow (Flow): The flow to cache + silent (bool): If True, suppress debug logging (used during refresh) + """ + if flow.data is None: + if not silent: + logger.warning(f"Flow {flow.id} has no data, skipping cache") + return + + from lfx.graph.graph.base import Graph + + flow_id_str = str(flow.id) + graph_data = flow.data.copy() + + # Parse the Graph payload, catch parsing issues + try: + graph = Graph.from_payload(graph_data, flow_id=flow_id_str, user_id=flow.user_id, flow_name=flow.name) + except (ValueError, TypeError, AttributeError, KeyError) as e: + if not silent: + logger.warning(f"Flow {flow_id_str} cannot be cached due to parsing error: {e!s}") + return + + # Store in cache, catch cache-specific errors + try: + await self.set(flow_id_str, graph) + if flow.endpoint_name: + await self.set(flow.endpoint_name, graph) + if not silent: + logger.debug(f"Added flow {flow_id_str} to cache") + except (KeyError, RuntimeError) as e: + logger.error(f"Error caching graph for flow {flow_id_str}: {e!s}") + + async def remove_flow_from_cache( + self, flow: Flow, *, silent: bool = False, old_endpoint_name: str | None = None + ) -> None: + """Remove a flow's Graph instance from the cache. + + Removes all cache keys associated with the flow: UUID, current endpoint_name, + and optionally a previous endpoint_name (for handling renames). + + Args: + flow (Flow): The flow to remove from cache + silent (bool): If True, suppress debug logging (used during refresh) + old_endpoint_name (str | None): Previous endpoint name to remove (for renames) + """ + flow_id_str = str(flow.id) + + # Collect all keys to remove: UUID + current endpoint + old endpoint + keys_to_remove = [flow_id_str] + if flow.endpoint_name: + keys_to_remove.append(flow.endpoint_name) + if old_endpoint_name: + keys_to_remove.append(old_endpoint_name) + + # Remove each key independently + for key in keys_to_remove: + try: + await self.delete(key) + if not silent: + logger.debug(f"Removed cache key: {key}") + except KeyError: + if not silent: + logger.debug(f"Cache key not found: {key}") + except RuntimeError as e: + logger.error(f"Error removing cache key {key}: {e!s}") + + async def get_cached_graph(self, flow_id: str) -> Graph | None: + """Get a cached Graph instance for a flow. + + Returns a deep copy to prevent concurrent requests from mutating shared state. + + Args: + flow_id (str): The flow ID to look up + + Returns: + Graph | None: A deep copy of the cached Graph instance or None if not found + """ + try: + cached = await self.get(flow_id) + # Check for cache miss sentinel + if not cached: + return None + # Return a deep copy to prevent concurrent requests from sharing mutable state + return deepcopy(cached) + + except KeyError as e: + logger.error(f"Cache miss retrieving graph for flow {flow_id}: {e!s}") + except RuntimeError as e: + logger.error(f"Error retrieving cached graph for flow {flow_id}: {e!s}") + return None + + async def refresh_flow_in_cache(self, flow: Flow, *, old_endpoint_name: str | None = None) -> None: + """Refresh a flow's Graph instance in the cache. + + This removes the existing cached version (if any) and adds the updated version. + Useful when a deployed flow's data has been modified or endpoint renamed. + + Args: + flow (Flow): The flow to refresh in cache + old_endpoint_name (str | None): Previous endpoint name to remove (for renames) + """ + flow_id_str = str(flow.id) + try: + # Remove old version from cache, including old endpoint alias if provided + await self.remove_flow_from_cache(flow, silent=True, old_endpoint_name=old_endpoint_name) + # Add updated version to cache with new endpoint name + await self.add_flow_to_cache(flow, silent=True) + logger.debug(f"Refreshed flow {flow_id_str} in cache") + except (KeyError, RuntimeError) as e: + logger.error(f"Error refreshing flow {flow_id_str} in cache: {e!s}") + + async def get_cache_stats(self) -> dict[str, int | float | list[str] | None]: + """Get statistics about the current cache state. + + Returns: + dict: Dictionary containing: + - size: Number of items in cache + - max_size: Maximum cache size (None if unlimited) + - keys: List of cached flow identifiers (IDs and endpoint names) + - memory_bytes: Approximate memory usage in bytes + - memory_mb: Approximate memory usage in megabytes + """ + try: + async with self.lock: + cache_size = len(self.cache) + cache_keys = list(self.cache.keys()) + + # Calculate approximate memory footprint + # Note: This is an approximation using sys.getsizeof + # The cache structure is: {key: {"value": Graph, "time": float}} + total_bytes = sys.getsizeof(self.cache) + for key, cache_entry in self.cache.items(): + # Add size of the key (flow ID or endpoint name string) + total_bytes += sys.getsizeof(key) + # Add size of the cache entry dict wrapper + total_bytes += sys.getsizeof(cache_entry) + + # Add size of the actual cached content + if isinstance(cache_entry, dict): + # Get the actual Graph object (or pickled bytes) + cached_value = cache_entry.get("value") + if cached_value is not None: + total_bytes += sys.getsizeof(cached_value) + # Add timestamp + cached_time = cache_entry.get("time") + if cached_time is not None: + total_bytes += sys.getsizeof(cached_time) + + memory_mb = total_bytes / (1024 * 1024) + + except (KeyError, RuntimeError) as e: + logger.error(f"Error getting cache stats: {e!s}") + return { + "size": 0, + "max_size": self.max_size, + "keys": [], + "memory_bytes": 0, + "memory_mb": 0.0, + } + else: + return { + "size": cache_size, + "max_size": self.max_size, + "keys": cache_keys, + "memory_bytes": total_bytes, + "memory_mb": round(memory_mb, 2), + } diff --git a/src/backend/base/langflow/services/schema.py b/src/backend/base/langflow/services/schema.py index 6745f572ac32..30df54bd17e2 100644 --- a/src/backend/base/langflow/services/schema.py +++ b/src/backend/base/langflow/services/schema.py @@ -19,4 +19,5 @@ class ServiceType(str, Enum): TRACING_SERVICE = "tracing_service" TELEMETRY_SERVICE = "telemetry_service" JOB_QUEUE_SERVICE = "job_queue_service" + FLOW_CACHE_SERVICE = "flow_cache_service" MCP_COMPOSER_SERVICE = "mcp_composer_service" diff --git a/src/backend/base/langflow/services/socket/utils.py b/src/backend/base/langflow/services/socket/utils.py index 91e539bd284f..ad338fbb81ec 100644 --- a/src/backend/base/langflow/services/socket/utils.py +++ b/src/backend/base/langflow/services/socket/utils.py @@ -2,16 +2,16 @@ from collections.abc import Callable import socketio +from lfx.graph.graph.base import Graph +from lfx.graph.graph.utils import layered_topological_sort +from lfx.graph.vertex.base import Vertex from lfx.log.logger import logger from sqlmodel import select from langflow.api.utils import format_elapsed_time from langflow.api.v1.schemas import ResultDataResponse, VertexBuildResponse -from langflow.graph.graph.base import Graph -from langflow.graph.graph.utils import layered_topological_sort -from langflow.graph.utils import log_vertex_build -from langflow.graph.vertex.base import Vertex from langflow.services.database.models.flow.model import Flow +from langflow.services.database.models.vertex_builds.crud import log_vertex_build from langflow.services.deps import get_session diff --git a/src/backend/base/langflow/services/utils.py b/src/backend/base/langflow/services/utils.py index 876140d5d555..e70c98a572ee 100644 --- a/src/backend/base/langflow/services/utils.py +++ b/src/backend/base/langflow/services/utils.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING from lfx.log.logger import logger +from lfx.services.deps import session_scope from lfx.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD from sqlalchemy import delete from sqlalchemy import exc as sqlalchemy_exc @@ -12,17 +13,20 @@ from langflow.services.auth.utils import create_super_user, verify_password from langflow.services.cache.base import ExternalAsyncBaseCacheService from langflow.services.cache.factory import CacheServiceFactory +from langflow.services.database.models.flow.model import DeploymentStateEnum, Flow from langflow.services.database.models.transactions.model import TransactionTable from langflow.services.database.models.vertex_builds.model import VertexBuildTable from langflow.services.database.utils import initialize_database from langflow.services.schema import ServiceType -from .deps import get_db_service, get_service, get_settings_service, session_scope +from .deps import get_db_service, get_flow_cache_service, get_service, get_settings_service if TYPE_CHECKING: - from lfx.services.settings.manager import SettingsService + from lfx.services.settings.service import SettingsService from sqlmodel.ext.asyncio.session import AsyncSession + from langflow.services.flow_cache.service import FlowCacheService + async def get_or_create_super_user(session: AsyncSession, username, password, is_default): from langflow.services.database.models.user.model import User @@ -223,6 +227,15 @@ async def clean_vertex_builds(settings_service: SettingsService, session: AsyncS # Don't re-raise since this is a cleanup task +async def load_flow_cache(session: AsyncSession) -> None: + """Load the flow cache from the database.""" + flow_cache_service: FlowCacheService = get_flow_cache_service() + + flows = (await session.exec(select(Flow).where(Flow.status == DeploymentStateEnum.DEPLOYED))).all() + for flow in flows: + await flow_cache_service.add_flow_to_cache(flow) + + def register_all_service_factories() -> None: """Register all available service factories with the service manager.""" # Import all service factories @@ -285,9 +298,10 @@ async def initialize_services(*, fix_migration: bool = False) -> None: async with session_scope() as session: settings_service = get_service(ServiceType.SETTINGS_SERVICE) await setup_superuser(settings_service, session) - try: - await get_db_service().assign_orphaned_flows_to_superuser() - except sqlalchemy_exc.IntegrityError as exc: - await logger.awarning(f"Error assigning orphaned flows to the superuser: {exc!s}") - await clean_transactions(settings_service, session) - await clean_vertex_builds(settings_service, session) + try: + await get_db_service().assign_orphaned_flows_to_superuser(session) + except sqlalchemy_exc.IntegrityError as exc: + await logger.awarning(f"Error assigning orphaned flows to the superuser: {exc!s}") + await clean_transactions(settings_service, session) + await clean_vertex_builds(settings_service, session) + await load_flow_cache(session) diff --git a/src/backend/tests/unit/api/v1/test_flows.py b/src/backend/tests/unit/api/v1/test_flows.py index fdd7895d9011..905a6f2314cd 100644 --- a/src/backend/tests/unit/api/v1/test_flows.py +++ b/src/backend/tests/unit/api/v1/test_flows.py @@ -1,12 +1,23 @@ +import json import tempfile import uuid +from pathlib import Path as FilePath +import pytest from anyio import Path from fastapi import status from httpx import AsyncClient from langflow.services.database.models import Flow +@pytest.fixture +def valid_flow_payload(): + """Load valid complete flow payload from test fixtures.""" + flow_path = FilePath(__file__).parent.parent.parent.parent / "data" / "MemoryChatbotNoLLM.json" + with flow_path.open() as f: + return json.load(f) + + async def test_create_flow(client: AsyncClient, logged_in_headers): flow_file = Path(tempfile.tempdir) / f"{uuid.uuid4()}.json" try: @@ -213,7 +224,7 @@ async def test_read_flows_user_isolation(client: AsyncClient, logged_in_headers, await session.refresh(other_user) # Login as the other user to get headers - login_data = {"username": "other_test_user", "password": "testpassword"} + login_data = {"username": "other_test_user", "password": "testpassword"} # pragma: allowlist secret response = await client.post("api/v1/login", data=login_data) assert response.status_code == 200 tokens = response.json() @@ -322,3 +333,299 @@ async def test_read_flows_user_isolation(client: AsyncClient, logged_in_headers, if user: await session.delete(user) await session.commit() + + +async def test_update_flow_deployment_status(client: AsyncClient, logged_in_headers): + """Test updating flow deployment status from DRAFT to DEPLOYED.""" + # Create a flow + basic_case = { + "name": "deployment_test_flow", + "description": "Test deployment status", + "icon": "string", + "icon_bg_color": "#ff00ff", + "gradient": "string", + "data": {}, + "is_component": False, + "webhook": False, + "endpoint_name": "deployment_test", + "tags": ["test"], + "folder_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + } + response = await client.post("api/v1/flows/", json=basic_case, headers=logged_in_headers) + assert response.status_code == status.HTTP_201_CREATED + flow_data = response.json() + flow_id = flow_data["id"] + + # Verify initial status is DRAFT + assert "status" in flow_data + assert flow_data["status"] == "DRAFT" + + # Update flow status to DEPLOYED + update_payload = {"status": "DEPLOYED"} + response = await client.patch(f"api/v1/flows/{flow_id}", json=update_payload, headers=logged_in_headers) + assert response.status_code == status.HTTP_200_OK + updated_flow = response.json() + + # Verify status was updated + assert updated_flow["status"] == "DEPLOYED" + assert updated_flow["id"] == flow_id + + # Update back to DRAFT + update_payload = {"status": "DRAFT"} + response = await client.patch(f"api/v1/flows/{flow_id}", json=update_payload, headers=logged_in_headers) + assert response.status_code == status.HTTP_200_OK + updated_flow = response.json() + + # Verify status was updated back to DRAFT + assert updated_flow["status"] == "DRAFT" + + +async def test_deployed_flow_locked_status(client: AsyncClient, logged_in_headers): + """Test that deployed flows are automatically locked.""" + # Create a flow + basic_case = { + "name": "locked_test_flow", + "description": "Test locked status", + "icon": "string", + "icon_bg_color": "#ff00ff", + "gradient": "string", + "data": {}, + "is_component": False, + "webhook": False, + "endpoint_name": "locked_test", + "tags": ["test"], + "folder_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + } + response = await client.post("api/v1/flows/", json=basic_case, headers=logged_in_headers) + assert response.status_code == status.HTTP_201_CREATED + flow_data = response.json() + flow_id = flow_data["id"] + + # Deploy the flow + update_payload = {"status": "DEPLOYED"} + response = await client.patch(f"api/v1/flows/{flow_id}", json=update_payload, headers=logged_in_headers) + assert response.status_code == status.HTTP_200_OK + deployed_flow = response.json() + + # Verify flow is locked when deployed + assert deployed_flow["status"] == "DEPLOYED" + assert deployed_flow["locked"] is True + + # Undeploy the flow + update_payload = {"status": "DRAFT"} + response = await client.patch(f"api/v1/flows/{flow_id}", json=update_payload, headers=logged_in_headers) + assert response.status_code == status.HTTP_200_OK + draft_flow = response.json() + + # Verify flow is unlocked when in draft + assert draft_flow["status"] == "DRAFT" + assert draft_flow["locked"] is False + + +async def test_create_flow_default_status(client: AsyncClient, logged_in_headers): + """Test that newly created flows have DRAFT status by default.""" + basic_case = { + "name": "default_status_flow", + "description": "Test default status", + "icon": "string", + "icon_bg_color": "#ff00ff", + "gradient": "string", + "data": {}, + "is_component": False, + "webhook": False, + "endpoint_name": "default_status_test", + "tags": ["test"], + "folder_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + } + response = await client.post("api/v1/flows/", json=basic_case, headers=logged_in_headers) + assert response.status_code == status.HTTP_201_CREATED + result = response.json() + + # Verify default status is DRAFT + assert "status" in result + assert result["status"] == "DRAFT" + assert result["locked"] is False + + +async def test_cache_stats_endpoint(client: AsyncClient, logged_in_headers): + """Test the cache stats endpoint returns correct structure.""" + response = await client.get("api/v1/flows/cache/stats", headers=logged_in_headers) + assert response.status_code == status.HTTP_200_OK + + result = response.json() + assert "size" in result + assert "max_size" in result + assert "keys" in result + assert "memory_bytes" in result + assert "memory_mb" in result + + # Verify types + assert isinstance(result["size"], int) + assert isinstance(result["keys"], list) + assert isinstance(result["memory_bytes"], int) + assert isinstance(result["memory_mb"], (int, float)) + assert result["memory_bytes"] >= 0 + assert result["memory_mb"] >= 0.0 + + +async def test_cache_stats_requires_auth(client: AsyncClient): + """Test that cache stats endpoint requires authentication.""" + response = await client.get("api/v1/flows/cache/stats") + # In test environment with AUTO_LOGIN, this might return 200 with default user + # Otherwise should return 401 or 403. Accept any of these as valid depending on test config. + # Some test environments may use 403 Forbidden instead of 401 Unauthorized + assert response.status_code in [ + status.HTTP_200_OK, + status.HTTP_401_UNAUTHORIZED, + status.HTTP_403_FORBIDDEN, + ], f"Expected 200, 401, or 403, got {response.status_code}" + + +async def test_deployed_flow_added_to_cache(client: AsyncClient, logged_in_headers): + """Test that deploying a flow adds it to the cache.""" + # Get initial cache stats + stats_before = await client.get("api/v1/flows/cache/stats", headers=logged_in_headers) + initial_size = stats_before.json()["size"] + + # Create a flow + flow_data = { + "name": "cache_test_flow", + "description": "Test caching", + "data": {"nodes": [], "edges": []}, + "status": "DRAFT", + } + create_response = await client.post("api/v1/flows/", json=flow_data, headers=logged_in_headers) + assert create_response.status_code == status.HTTP_201_CREATED + flow = create_response.json() + flow_id = flow["id"] + + # Deploy the flow + update_response = await client.patch( + f"api/v1/flows/{flow_id}", json={"status": "DEPLOYED"}, headers=logged_in_headers + ) + assert update_response.status_code == status.HTTP_200_OK + + # Check cache stats - should have increased + stats_after = await client.get("api/v1/flows/cache/stats", headers=logged_in_headers) + result = stats_after.json() + + # Cache size should have increased (by 1 for flow_id, potentially +1 if endpoint_name exists) + assert result["size"] >= initial_size + 1 + assert flow_id in result["keys"] + assert result["memory_bytes"] > 0 + + +async def test_undeployed_flow_removed_from_cache(client: AsyncClient, logged_in_headers): + """Test that undeploying a flow removes it from the cache.""" + # Create and deploy a flow + flow_data = { + "name": "undeploy_cache_test", + "description": "Test cache removal", + "data": {"nodes": [], "edges": []}, + "status": "DEPLOYED", + } + create_response = await client.post("api/v1/flows/", json=flow_data, headers=logged_in_headers) + assert create_response.status_code == status.HTTP_201_CREATED + flow = create_response.json() + flow_id = flow["id"] + + # Verify it's in cache + stats_deployed = await client.get("api/v1/flows/cache/stats", headers=logged_in_headers) + assert flow_id in stats_deployed.json()["keys"] + + # Undeploy the flow + update_response = await client.patch(f"api/v1/flows/{flow_id}", json={"status": "DRAFT"}, headers=logged_in_headers) + assert update_response.status_code == status.HTTP_200_OK + + # Verify it's removed from cache + stats_draft = await client.get("api/v1/flows/cache/stats", headers=logged_in_headers) + assert flow_id not in stats_draft.json()["keys"] + + +async def test_cache_refresh_on_deployed_flow_update(client: AsyncClient, logged_in_headers, valid_flow_payload): + """Test that updating a deployed flow refreshes it in cache.""" + # Use valid flow payload and set it to DEPLOYED + flow_data = valid_flow_payload.copy() + flow_data["name"] = "refresh_cache_test" + flow_data["status"] = "DEPLOYED" + + create_response = await client.post("api/v1/flows/", json=flow_data, headers=logged_in_headers) + assert create_response.status_code == status.HTTP_201_CREATED + flow = create_response.json() + flow_id = flow["id"] + + # Note: The flow may or may not be cached depending on whether its components are available + # This test primarily verifies the refresh code path is called without errors + + # Update the deployed flow (should call refresh_flow_in_cache) + update_response = await client.patch( + f"api/v1/flows/{flow_id}", + json={"description": "Updated description"}, + headers=logged_in_headers, + ) + assert update_response.status_code == status.HTTP_200_OK + + # Verify the flow update succeeded + get_response = await client.get(f"api/v1/flows/{flow_id}", headers=logged_in_headers) + assert get_response.json()["description"] == "Updated description" + + +async def test_cache_with_endpoint_name(client: AsyncClient, logged_in_headers): + """Test that flows with endpoint names are cached by both ID and endpoint name.""" + # Create a flow with endpoint_name + flow_data = { + "name": "endpoint_cache_test", + "description": "Test endpoint caching", + "endpoint_name": "test-endpoint-cache", + "data": {"nodes": [], "edges": []}, + "status": "DEPLOYED", + } + create_response = await client.post("api/v1/flows/", json=flow_data, headers=logged_in_headers) + assert create_response.status_code == status.HTTP_201_CREATED + flow = create_response.json() + flow_id = flow["id"] + endpoint_name = flow["endpoint_name"] + + # Check cache stats + stats = await client.get("api/v1/flows/cache/stats", headers=logged_in_headers) + cache_keys = stats.json()["keys"] + + # Both flow_id and endpoint_name should be in cache + assert flow_id in cache_keys + assert endpoint_name in cache_keys + + +async def test_cache_memory_tracking(client: AsyncClient, logged_in_headers): + """Test that cache memory tracking fields are calculated correctly.""" + # Get cache stats + stats = await client.get("api/v1/flows/cache/stats", headers=logged_in_headers) + result = stats.json() + + # Verify memory tracking fields exist and are valid + assert "memory_bytes" in result + assert "memory_mb" in result + assert isinstance(result["memory_bytes"], int) + assert isinstance(result["memory_mb"], (int, float)) + assert result["memory_bytes"] >= 0 + assert result["memory_mb"] >= 0.0 + + # Verify MB is correctly calculated from bytes + expected_mb = round(result["memory_bytes"] / (1024 * 1024), 2) + assert result["memory_mb"] == expected_mb + + +async def test_create_deployed_flow_is_auto_locked(client: AsyncClient, logged_in_headers): + """Test that flows created with DEPLOYED status are automatically locked.""" + flow_data = { + "name": "deployed_on_creation", + "description": "Test auto-lock on creation", + "data": {}, + "status": "DEPLOYED", + } + response = await client.post("api/v1/flows/", json=flow_data, headers=logged_in_headers) + assert response.status_code == 201 + result = response.json() + + # Verify flow is both deployed AND locked + assert result["status"] == "DEPLOYED" + assert result["locked"] is True diff --git a/src/backend/tests/unit/api/v1/test_mcp_projects.py b/src/backend/tests/unit/api/v1/test_mcp_projects.py index bb5f739b6b3c..5dfaa2319912 100644 --- a/src/backend/tests/unit/api/v1/test_mcp_projects.py +++ b/src/backend/tests/unit/api/v1/test_mcp_projects.py @@ -515,7 +515,7 @@ async def test_update_project_auth_settings_encryption( "oauth_server_url": "http://localhost:3000", "oauth_callback_path": "/callback", "oauth_client_id": "test-client-id", - "oauth_client_secret": "test-oauth-secret-value-456", + "oauth_client_secret": "test-oauth-secret-value-456", # pragma: allowlist secret "oauth_auth_url": "https://oauth.example.com/auth", "oauth_token_url": "https://oauth.example.com/token", "oauth_mcp_scope": "read write", @@ -564,7 +564,7 @@ async def test_update_project_auth_settings_encryption( async with session_scope() as session: project = await session.get(Folder, user_test_project.id) decrypted_settings = decrypt_auth_settings(project.auth_settings) - assert decrypted_settings["oauth_client_secret"] == "test-oauth-secret-value-456" # noqa: S105 + assert decrypted_settings["oauth_client_secret"] == "test-oauth-secret-value-456" # noqa: S105 # pragma: allowlist secret async def test_project_sse_creation(user_test_project): @@ -680,3 +680,156 @@ async def test_mcp_longterm_token_fails_without_superuser(): async with get_db_service().with_session() as session: with pytest.raises(HTTPException, match="Auto login required to create a long-term token"): await create_user_longterm_token(session) + + +async def test_list_project_tools_includes_status_field(): + """Test that list_project_tools endpoint includes deployment status in response.""" + async with session_scope() as session: + # Create user, project, and deployed flow + user = User( + username="test_mcp_user", + password=get_password_hash("test123"), + is_active=True, + is_superuser=False, + ) + session.add(user) + await session.commit() + await session.refresh(user) + + project = Folder(name="Test MCP Project", user_id=user.id) + session.add(project) + await session.commit() + await session.refresh(project) + + flow = Flow( + name="Test MCP Flow", + data={}, + folder_id=project.id, + user_id=user.id, + mcp_enabled=True, + status="DEPLOYED", + ) + session.add(flow) + await session.commit() + await session.refresh(flow) + + # Import here to avoid circular dependency + from langflow.api.v1.mcp_projects import list_project_tools + + # Call the endpoint + result = await list_project_tools(project_id=project.id, current_user=user, mcp_enabled=False) + + # Verify status field is included + assert result is not None + assert len(result.tools) > 0 + tool = next((t for t in result.tools if t.id == flow.id), None) + assert tool is not None + assert tool.status == "DEPLOYED" + + +async def test_list_project_tools_includes_all_flows_regardless_of_status(): + """Test that both DRAFT and DEPLOYED flows are included in MCP (no breaking changes).""" + async with session_scope() as session: + # Create user and project + user = User( + username="test_mcp_all_flows_user", + password=get_password_hash("test123"), + is_active=True, + is_superuser=False, + ) + session.add(user) + await session.commit() + await session.refresh(user) + + project = Folder(name="Test MCP All Flows Project", user_id=user.id) + session.add(project) + await session.commit() + await session.refresh(project) + + # Create a DRAFT flow + draft_flow = Flow( + name="Draft MCP Flow", + data={}, + folder_id=project.id, + user_id=user.id, + mcp_enabled=True, + status="DRAFT", + ) + session.add(draft_flow) + + # Create a DEPLOYED flow + deployed_flow = Flow( + name="Deployed MCP Flow", + data={}, + folder_id=project.id, + user_id=user.id, + mcp_enabled=True, + status="DEPLOYED", + ) + session.add(deployed_flow) + await session.commit() + await session.refresh(draft_flow) + await session.refresh(deployed_flow) + + from langflow.api.v1.mcp_projects import list_project_tools + + result = await list_project_tools(project_id=project.id, current_user=user, mcp_enabled=False) + + tool_ids = [t.id for t in result.tools] + + # Verify BOTH flows are included (deployment is just for caching, not access control) + assert deployed_flow.id in tool_ids + assert draft_flow.id in tool_ids + + +async def test_mcp_status_field_reflects_deployment_state(): + """Test that status field in MCP tools reflects the flow's deployment state.""" + async with session_scope() as session: + # Create user and project + user = User( + username="test_mcp_status_user", + password=get_password_hash("test123"), + is_active=True, + is_superuser=False, + ) + session.add(user) + await session.commit() + await session.refresh(user) + + project = Folder(name="Test Status Project", user_id=user.id) + session.add(project) + await session.commit() + await session.refresh(project) + + # Create a DRAFT flow + flow = Flow( + name="Status Test Flow", + data={}, + folder_id=project.id, + user_id=user.id, + mcp_enabled=True, + status="DRAFT", + ) + session.add(flow) + await session.commit() + await session.refresh(flow) + + from langflow.api.v1.mcp_projects import list_project_tools + + # Verify status is DRAFT + result = await list_project_tools(project_id=project.id, current_user=user, mcp_enabled=False) + tool = next((t for t in result.tools if t.id == flow.id), None) + assert tool is not None + assert tool.status == "DRAFT" + + # Deploy the flow + flow.status = "DEPLOYED" + session.add(flow) + await session.commit() + await session.refresh(flow) + + # Verify status is now DEPLOYED + result = await list_project_tools(project_id=project.id, current_user=user, mcp_enabled=False) + tool = next((t for t in result.tools if t.id == flow.id), None) + assert tool is not None + assert tool.status == "DEPLOYED" diff --git a/src/frontend/src/components/core/flowToolbarComponent/components/__tests__/deploy-dropdown.test.tsx b/src/frontend/src/components/core/flowToolbarComponent/components/__tests__/deploy-dropdown.test.tsx new file mode 100644 index 000000000000..e2f97be414d8 --- /dev/null +++ b/src/frontend/src/components/core/flowToolbarComponent/components/__tests__/deploy-dropdown.test.tsx @@ -0,0 +1,424 @@ +import { fireEvent, render, screen, waitFor } from "@testing-library/react"; +import userEvent from "@testing-library/user-event"; +import "@testing-library/jest-dom"; +import { TooltipProvider } from "@/components/ui/tooltip"; +import PublishDropdown from "../deploy-dropdown"; + +// Mock stores and hooks +const mockMutateAsync = jest.fn(); +const mockSetErrorData = jest.fn(); +const mockSetFlows = jest.fn(); +const mockSetCurrentFlow = jest.fn(); + +const mockCurrentFlow = { + id: "test-flow-id", + name: "Test Flow", + folder_id: "test-folder-id", + access_type: "PRIVATE", + status: "DRAFT", +}; + +const mockFlows = [mockCurrentFlow]; + +jest.mock("@/controllers/API/queries/flows/use-patch-update-flow", () => ({ + usePatchUpdateFlow: () => ({ + mutateAsync: mockMutateAsync, + }), +})); + +jest.mock("@/stores/alertStore", () => ({ + __esModule: true, + default: jest.fn((selector) => + selector({ + setErrorData: mockSetErrorData, + }), + ), +})); + +jest.mock("@/stores/flowsManagerStore", () => ({ + __esModule: true, + default: jest.fn((selector) => + selector({ + currentFlow: mockCurrentFlow, + flows: mockFlows, + setFlows: mockSetFlows, + }), + ), +})); + +jest.mock("@/stores/flowStore", () => ({ + __esModule: true, + default: jest.fn((selector) => + selector({ + setCurrentFlow: mockSetCurrentFlow, + hasIO: true, + }), + ), +})); + +jest.mock("@/stores/authStore", () => ({ + __esModule: true, + default: jest.fn((selector) => + selector({ + autoLogin: true, + }), + ), +})); + +jest.mock("react-router-dom", () => ({ + useHref: () => "/", + useParams: () => ({}), + Link: ({ to, children, ...props }: any) => ( + + {children} + + ), +})); + +jest.mock("@/customization/utils/custom-mcp-open", () => ({ + customMcpOpen: () => "_blank", +})); + +jest.mock("@/customization/feature-flags", () => ({ + ENABLE_PUBLISH: true, + ENABLE_WIDGET: true, +})); + +// Mock modal components +jest.mock("@/modals/apiModal", () => ({ + __esModule: true, + default: ({ open, children }: any) => + open ?
+ Flow Not Deployed +
++ Deploy this flow from the Share menu to make it + available via API. +
+