Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
930f651
feat: add status column to flow model with deployment states
ogabrielluiz May 7, 2025
a95bac9
feat: add deployed status handling to flow management
ogabrielluiz May 7, 2025
04c6f08
fix: remove debug log from deployed switch handler
ogabrielluiz May 7, 2025
2f28163
feat: implement FlowCacheService and FlowCacheServiceFactory for cach…
ogabrielluiz May 7, 2025
87c4d38
feat: add caching support for flow retrieval
ogabrielluiz May 7, 2025
406432c
feat: add FlowCacheService retrieval function and update ServiceType …
ogabrielluiz May 7, 2025
697798e
feat: enhance flow update functionality with caching support
ogabrielluiz May 7, 2025
c515318
feat: update flow handling to support Graph instances and improve flo…
ogabrielluiz May 7, 2025
fa2679d
fix: update import path for EventManager in endpoints.py
ogabrielluiz May 7, 2025
90e7299
feat: add cache status endpoint to retrieve cache information and mem…
ogabrielluiz May 7, 2025
d7678f3
feat: add load_flow_cache function to initialize flow cache from the …
ogabrielluiz May 7, 2025
3f97245
fix: handle flow data retrieval for both graph_data and data attributes
ogabrielluiz May 7, 2025
8403cd2
fix: refactor assign_orphaned_flows_to_superuser method to accept ses…
ogabrielluiz May 7, 2025
056304a
feat: enhance graph payload parsing and caching with user and flow name
ogabrielluiz May 7, 2025
18b3e88
fix: update migration to check for status column existence with conne…
ogabrielluiz May 8, 2025
9fbc043
feat: enhance Vertex component instantiation with custom class handling
ogabrielluiz May 12, 2025
a8dc0af
feat: implement component reset functionality in Graph and Vertex cla…
ogabrielluiz May 12, 2025
3eb5820
fix: correct assignment of custom_component in Vertex class instantia…
ogabrielluiz May 12, 2025
7e1c1e4
fix: update default cache usage in get_flow_by_id_or_endpoint_name_fr…
ogabrielluiz May 12, 2025
ab4e546
Merge branch 'main' into deploy-flow
ogabrielluiz Oct 6, 2025
657eea9
feat: Add async handling for deployed status switch in PublishDropdown
ogabrielluiz Oct 6, 2025
822ea16
refactor: Update Alembic migration to add status column in flow
ogabrielluiz Oct 6, 2025
284779b
refactor: Remove unused imports from endpoints.py
ogabrielluiz Oct 6, 2025
d1aa7e7
refactor: Enhance error handling and session management in initialize…
ogabrielluiz Oct 6, 2025
cc816e6
refactor: Update import statement for Graph in FlowCacheService
ogabrielluiz Oct 6, 2025
156f287
refactor: Update import statements in socket utils for consistency
ogabrielluiz Oct 6, 2025
7aa2fd4
refactor: Add refresh_flow_in_cache method and enhance get_cache_stat…
ogabrielluiz Oct 6, 2025
9071d31
refactor: Update flow cache handling and add endpoint for cache stati…
ogabrielluiz Oct 6, 2025
51b486d
refactor: Improve caching logic in get_flow_by_id_or_endpoint_name_fr…
ogabrielluiz Oct 6, 2025
eb565eb
refactor: Add tests for flow deployment status and default status beh…
ogabrielluiz Oct 6, 2025
ff9658b
test: Add unit and integration tests for PublishDropdown deployment f…
ogabrielluiz Oct 7, 2025
0ad50d3
refactor: Enhance flow caching logic in create_flow and update_flow f…
ogabrielluiz Oct 7, 2025
944c552
test: Add comprehensive unit tests for flow caching and statistics en…
ogabrielluiz Oct 7, 2025
c57c81c
feat: Include flow status in save flow logic
ogabrielluiz Oct 7, 2025
85f2eeb
refactor: Simplify loading module by re-exporting functions for backw…
ogabrielluiz Oct 7, 2025
7d17059
refactor: Update flow unlocking logic in update_flow function
ogabrielluiz Oct 7, 2025
1f9bf8e
refactor: Enhance flow cache service with silent logging option
ogabrielluiz Oct 7, 2025
0563c60
refactor: Adjust deployment status logic in PublishDropdown component
ogabrielluiz Oct 7, 2025
c2180c1
refactor: Enhance MemoizedCanvasControls component with improved lock…
ogabrielluiz Oct 7, 2025
0ebef8f
test: Refactor deployment switch tests in PublishDropdown component
ogabrielluiz Oct 7, 2025
a81ca32
test: Update deploy switch test to reflect new functionality without …
ogabrielluiz Oct 7, 2025
af20b24
refactor: Enhance MemoizedCanvasControls with deployment and lock sta…
ogabrielluiz Oct 7, 2025
36f9240
feat: Add deployment status notification to ApiModal component
ogabrielluiz Oct 7, 2025
7172f9b
feat: Implement deploy toggle functionality in ToolsTable component
ogabrielluiz Oct 7, 2025
5244477
feat: Filter project tools to include only deployed flows
ogabrielluiz Oct 7, 2025
d2c3204
feat: Enhance flow handling in MCP by verifying deployment status and…
ogabrielluiz Oct 7, 2025
05f8f33
test: Add unit tests for project tools to verify flow deployment status
ogabrielluiz Oct 7, 2025
67ad3de
test: Add unit tests for ToolsTable component to verify deployment st…
ogabrielluiz Oct 7, 2025
94c6430
test: Add comprehensive tests for MCP deployment status functionality
ogabrielluiz Oct 7, 2025
a9da2a0
feat: Implement caching for FlowCacheService instance in FlowCacheSer…
ogabrielluiz Oct 7, 2025
03bdb6e
refactor: Consolidate flow update logic in PublishDropdown component
ogabrielluiz Oct 7, 2025
98df5f2
feat: Introduce constants for flow access types and deployment statuses
ogabrielluiz Oct 7, 2025
f9f3ba8
refactor: Update MCP project tools to include all flow statuses
ogabrielluiz Oct 7, 2025
7a93f0e
fix: Handle cache miss scenario in flow retrieval logic
ogabrielluiz Oct 7, 2025
c08e22a
fix: Prevent shared state mutation in cached graph retrieval
ogabrielluiz Oct 7, 2025
32c2986
test: Add unit tests for CacheMiss sentinel class behavior
ogabrielluiz Oct 7, 2025
2c4e196
Merge branch 'main' into deploy-flow
ogabrielluiz Oct 7, 2025
6d0d9fa
doc: Enhance documentation for FlowCacheServiceFactory and its methods
ogabrielluiz Oct 7, 2025
e507302
rename mcp-deployment test file
ogabrielluiz Oct 7, 2025
5b2d53f
refactor: Remove unused cache endpoint and clean up imports
ogabrielluiz Oct 7, 2025
c799727
test: Update ToolsTable tests for deployment status handling
ogabrielluiz Oct 7, 2025
8ae698f
feat: Lock deployed flows before committing and improve cache handling
ogabrielluiz Oct 7, 2025
349e9e9
test: Add unit test for auto-locking deployed flows on creation
ogabrielluiz Oct 7, 2025
7343950
refactor: Update MCPSettings status field to use Literal type
ogabrielluiz Oct 7, 2025
f45cb1c
refactor: Simplify data retrieval in get_components_versions function
ogabrielluiz Oct 7, 2025
df7fcc2
feat: Enhance flow cache management with old endpoint handling
ogabrielluiz Oct 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""add status column in flow.

Revision ID: ea8c52f13171
Revises: d37bc4322900
Create Date: 2025-05-07 14:30:49.260805

"""

from collections.abc import Sequence

import sqlalchemy as sa
from alembic import op

from langflow.utils import migration

# revision identifiers, used by Alembic.
revision: str = "ea8c52f13171" # pragma: allowlist secret
down_revision: str | None = "d37bc4322900" # pragma: allowlist secret
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
conn = op.get_bind()
# ### commands auto generated by Alembic - please adjust! ###
deployment_state_enum = sa.Enum("DRAFT", "DEPLOYED", name="deployment_state_enum")
deployment_state_enum.create(conn, checkfirst=True)
with op.batch_alter_table("flow", schema=None) as batch_op:
if not migration.column_exists(table_name="flow", column_name="status", conn=conn):
batch_op.add_column(
sa.Column("status", deployment_state_enum, server_default=sa.text("'DRAFT'"), nullable=False)
)

# ### end Alembic commands ###


def downgrade() -> None:
conn = op.get_bind()
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("flow", schema=None) as batch_op:
if migration.column_exists(table_name="flow", column_name="status", conn=conn):
batch_op.drop_column("status")

deployment_state_enum = sa.Enum("DRAFT", "DEPLOYED", name="deployment_state_enum")
deployment_state_enum.drop(conn, checkfirst=True)

# ### end Alembic commands ###
21 changes: 14 additions & 7 deletions src/backend/base/langflow/api/v1/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from lfx.graph.schema import RunOutputs
from lfx.log.logger import logger
from lfx.schema.schema import InputValueRequest
from lfx.services.cache.utils import CACHE_MISS
from lfx.services.settings.service import SettingsService
from sqlmodel import select

Expand All @@ -40,25 +41,31 @@
from langflow.events.event_manager import create_stream_tokens_event_manager
from langflow.exceptions.api import APIException, InvalidChatInputError
from langflow.exceptions.serialization import SerializationError
from langflow.helpers.flow import get_flow_by_id_or_endpoint_name
from langflow.helpers.flow import get_flow_by_id_or_endpoint_name, get_flow_by_id_or_endpoint_name_from_cache
from langflow.interface.initialize.loading import update_params_with_load_from_db_fields
from langflow.processing.process import process_tweaks, run_graph_internal
from langflow.schema.graph import Tweaks
from langflow.services.auth.utils import api_key_security, get_current_active_user, get_webhook_user
from langflow.services.cache.utils import save_uploaded_file
from langflow.services.database.models.flow.model import Flow, FlowRead
from langflow.services.database.models.flow.model import Flow
from langflow.services.database.models.flow.utils import get_all_webhook_components_in_flow
from langflow.services.database.models.user.model import User, UserRead
from langflow.services.deps import get_session_service, get_settings_service, get_telemetry_service
from langflow.services.deps import (
get_session_service,
get_settings_service,
get_telemetry_service,
)
from langflow.services.telemetry.schema import RunPayload
from langflow.utils.compression import compress_response
from langflow.utils.version import get_version_info

if TYPE_CHECKING:
from langflow.events.event_manager import EventManager

router = APIRouter(tags=["Base"])

# Constants for byte size conversion
BYTES_PER_KB = 1024.0


async def parse_input_request_from_body(http_request: Request) -> SimplifiedAPIRequest:
"""Parse SimplifiedAPIRequest from HTTP request body.
Expand Down Expand Up @@ -133,7 +140,7 @@ def validate_input_and_tweaks(input_request: SimplifiedAPIRequest) -> None:


async def simple_run_flow(
flow: Flow,
flow: Flow | Graph,
input_request: SimplifiedAPIRequest,
*,
stream: bool = False,
Expand Down Expand Up @@ -303,7 +310,7 @@ async def run_flow_generator(
async def simplified_run_flow(
*,
background_tasks: BackgroundTasks,
flow: Annotated[FlowRead | None, Depends(get_flow_by_id_or_endpoint_name)],
flow: Annotated[Graph | None, Depends(get_flow_by_id_or_endpoint_name_from_cache)],
input_request: SimplifiedAPIRequest | None = None,
stream: bool = False,
api_key_user: Annotated[UserRead, Depends(api_key_security)],
Expand Down Expand Up @@ -351,7 +358,7 @@ async def simplified_run_flow(
if input_request is None:
input_request = await parse_input_request_from_body(http_request)

if flow is None:
if flow is None or flow is CACHE_MISS:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found")

# Extract request-level variables from headers with prefix X-LANGFLOW-GLOBAL-VAR-*
Expand Down
56 changes: 55 additions & 1 deletion src/backend/base/langflow/api/v1/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from langflow.initial_setup.constants import STARTER_FOLDER_NAME
from langflow.services.database.models.flow.model import (
AccessTypeEnum,
DeploymentStateEnum,
Flow,
FlowCreate,
FlowHeader,
Expand All @@ -35,7 +36,8 @@
from langflow.services.database.models.flow.utils import get_webhook_component_in_flow
from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME
from langflow.services.database.models.folder.model import Folder
from langflow.services.deps import get_settings_service
from langflow.services.deps import get_flow_cache_service, get_settings_service
from langflow.services.flow_cache.service import FlowCacheService
from langflow.utils.compression import compress_response

# build router
Expand Down Expand Up @@ -157,14 +159,24 @@ async def create_flow(
session: DbSession,
flow: FlowCreate,
current_user: CurrentActiveUser,
flow_cache_service: Annotated[FlowCacheService, Depends(get_flow_cache_service)],
):
try:
db_flow = await _new_flow(session=session, flow=flow, user_id=current_user.id)

# If flow is created as DEPLOYED, lock it before committing
if db_flow.status == DeploymentStateEnum.DEPLOYED:
db_flow.locked = True

await session.commit()
await session.refresh(db_flow)

await _save_flow_to_fs(db_flow)

# Add deployed flows to cache
if db_flow.status == DeploymentStateEnum.DEPLOYED:
await flow_cache_service.add_flow_to_cache(db_flow)

except Exception as e:
if "UNIQUE constraint failed" in str(e):
# Get the name of the column that failed
Expand Down Expand Up @@ -321,6 +333,7 @@ async def update_flow(
flow_id: UUID,
flow: FlowUpdate,
current_user: CurrentActiveUser,
flow_cache_service: Annotated[FlowCacheService, Depends(get_flow_cache_service)],
):
"""Update a flow."""
settings_service = get_settings_service()
Expand All @@ -334,6 +347,9 @@ async def update_flow(
if not db_flow:
raise HTTPException(status_code=404, detail="Flow not found")

# Store old endpoint name for cache cleanup if it changes
old_endpoint_name = db_flow.endpoint_name

update_data = flow.model_dump(exclude_unset=True, exclude_none=True)

# Specifically handle endpoint_name when it's explicitly set to null or empty string
Expand All @@ -356,6 +372,17 @@ async def update_flow(
default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first()
if default_folder:
db_flow.folder_id = default_folder.id
if db_flow.status == DeploymentStateEnum.DEPLOYED:
# Refresh the flow in the in-memory cache to ensure we have the latest version
# Pass old_endpoint_name in case it changed, to clean up stale aliases
old_name = old_endpoint_name if old_endpoint_name != db_flow.endpoint_name else None
await flow_cache_service.refresh_flow_in_cache(db_flow, old_endpoint_name=old_name)
db_flow.locked = True
elif db_flow.status == DeploymentStateEnum.DRAFT and update_data.get("status") == DeploymentStateEnum.DRAFT:
# Only unlock if status was explicitly changed to DRAFT (not just omitted from request)
# Pass old_endpoint_name to clean up all cache aliases
await flow_cache_service.remove_flow_from_cache(db_flow, old_endpoint_name=old_endpoint_name)
db_flow.locked = False

session.add(db_flow)
await session.commit()
Expand Down Expand Up @@ -539,6 +566,33 @@ async def download_multiple_file(
return flows_without_api_keys[0]


@router.get("/cache/stats", response_model=dict, status_code=200)
async def get_flow_cache_stats(
*,
_current_user: CurrentActiveUser,
flow_cache_service: Annotated[FlowCacheService, Depends(get_flow_cache_service)],
):
"""Get statistics about the flow cache.

Returns information about the current state of the flow cache, including:
- Number of flows currently cached
- Maximum cache size (if configured)
- List of cached flow identifiers (IDs and endpoint names)

This is useful for monitoring cache performance and debugging deployment issues.

Requires authentication (user must be logged in).

Args:
_current_user (User): The current authenticated user (required for auth)
flow_cache_service (FlowCacheService): The flow cache service

Returns:
dict: Cache statistics including size, max_size, and cached keys
"""
return await flow_cache_service.get_cache_stats()


all_starter_folder_flows_response: Response | None = None


Expand Down
8 changes: 7 additions & 1 deletion src/backend/base/langflow/api/v1/mcp_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,12 @@ async def list_project_tools(
raise HTTPException(status_code=404, detail="Project not found")

# Query flows in the project
flows_query = select(Flow).where(Flow.folder_id == project_id, Flow.is_component == False) # noqa: E712
# Note: All flows are available via MCP regardless of deployment status
# Deployed flows will use cache for better performance
flows_query = select(Flow).where(
Flow.folder_id == project_id,
Flow.is_component == False, # noqa: E712
)

# Optionally filter for MCP-enabled flows only
if mcp_enabled:
Expand Down Expand Up @@ -248,6 +253,7 @@ async def list_project_tools(
# inputSchema=json_schema_from_flow(flow),
name=flow.name,
description=flow.description,
status=flow.status,
)
tools.append(tool)
except Exception as e: # noqa: BLE001
Expand Down
10 changes: 8 additions & 2 deletions src/backend/base/langflow/api/v1/mcp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from langflow.schema.message import Message
from langflow.services.database.models import Flow
from langflow.services.database.models.user.model import User
from langflow.services.deps import get_settings_service, get_storage_service, session_scope
from langflow.services.deps import get_flow_cache_service, get_settings_service, get_storage_service, session_scope

T = TypeVar("T")
P = ParamSpec("P")
Expand Down Expand Up @@ -192,6 +192,12 @@ async def execute_tool(session):
msg = f"Flow '{name}' not found in project {project_id}"
raise ValueError(msg)

# Try to get the flow from cache for better performance (deployed flows are cached)
# If not in cache, use the flow from database - no breaking changes
flow_cache_service = get_flow_cache_service()
cached_graph = await flow_cache_service.get_cached_graph(str(flow.id))
flow_to_run = cached_graph if cached_graph is not None else flow

# Process inputs
processed_inputs = dict(arguments)

Expand Down Expand Up @@ -231,7 +237,7 @@ async def send_progress_updates(progress_token):
try:
try:
result = await simple_run_flow(
flow=flow,
flow=flow_to_run,
input_request=input_request,
stream=False,
api_key_user=current_user,
Expand Down
1 change: 1 addition & 0 deletions src/backend/base/langflow/api/v1/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ class MCPSettings(BaseModel):
action_description: str | None = None
name: str | None = None
description: str | None = None
status: Literal["DRAFT", "DEPLOYED"] | None = None


class MCPProjectUpdateRequest(BaseModel):
Expand Down
32 changes: 30 additions & 2 deletions src/backend/base/langflow/helpers/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

from fastapi import HTTPException
from lfx.log.logger import logger
from lfx.services.cache.utils import CACHE_MISS
from pydantic.v1 import BaseModel, Field, create_model
from sqlmodel import select

from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.services.database.models.flow.model import Flow, FlowRead
from langflow.services.deps import get_settings_service, session_scope
from langflow.services.database.models.flow import Flow
from langflow.services.database.models.flow.model import FlowRead
from langflow.services.deps import get_flow_cache_service, get_settings_service, session_scope

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
Expand Down Expand Up @@ -295,6 +297,32 @@ async def get_flow_by_id_or_endpoint_name(flow_id_or_name: str, user_id: str | U
return FlowRead.model_validate(flow, from_attributes=True)


async def get_flow_by_id_or_endpoint_name_from_cache(flow_id_or_name: str, *, use_cache: bool = True):
"""Get a flow by ID or endpoint name, using cache when available.

Args:
flow_id_or_name: Flow UUID or endpoint name
use_cache: Whether to check the cache first (default: True)

Returns:
Graph instance if using cache and found, FlowRead otherwise

Notes:
- If use_cache=True, tries cache first for deployed flows
- Falls back to database if not found in cache
- If use_cache=False, always queries database
"""
if use_cache:
flow_cache_service = get_flow_cache_service()
flow = await flow_cache_service.get_cached_graph(flow_id_or_name)
if flow is not None and flow != CACHE_MISS:
# Cache hit - return the Graph instance
return flow
# Cache miss - fall through to database query

return await get_flow_by_id_or_endpoint_name(flow_id_or_name)


async def generate_unique_flow_name(flow_name, user_id, session):
original_name = flow_name
n = 1
Expand Down
Loading
Loading