Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions src/backend/base/langflow/api/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,26 @@ async def start_flow_build(
current_user: CurrentActiveUser,
queue_service: JobQueueService,
flow_name: str | None = None,
source_flow_id: uuid.UUID | None = None,
) -> str:
"""Start the flow build process by setting up the queue and starting the build task.

Args:
flow_id: The flow ID used for tracking, sessions, and messages.
background_tasks: FastAPI background tasks handler.
inputs: Optional input values for the flow.
data: Optional flow data request.
files: Optional list of file paths.
stop_component_id: Optional component ID to stop the build at.
start_component_id: Optional component ID to start the build from.
log_builds: Whether to log builds.
current_user: The current authenticated user.
queue_service: The job queue service.
flow_name: Optional flow name override.
source_flow_id: If provided, the actual flow ID to load from DB.
Used by public flows where flow_id is a virtual UUID for session isolation
but the flow data must be loaded from the original flow in the database.

Returns:
the job_id.
"""
Expand All @@ -90,6 +107,7 @@ async def start_flow_build(
log_builds=log_builds,
current_user=current_user,
flow_name=flow_name,
source_flow_id=source_flow_id,
)
queue_service.start_job(job_id, task_coro)
except Exception as e:
Expand Down Expand Up @@ -209,6 +227,7 @@ async def generate_flow_events(
log_builds: bool,
current_user: CurrentActiveUser,
flow_name: str | None = None,
source_flow_id: uuid.UUID | None = None,
) -> None:
"""Generate events for flow building process.

Expand Down Expand Up @@ -283,13 +302,19 @@ async def create_graph(fresh_session, flow_id_str: str, flow_name: str | None) -
effective_session_id = flow_id_str

if not data:
return await build_graph_from_db(
flow_id=flow_id,
# For public flows, source_flow_id is the real DB ID, flow_id is virtual.
# Load from DB using the real ID, then override graph.flow_id with virtual.
db_flow_id = source_flow_id if source_flow_id is not None else flow_id
graph = await build_graph_from_db(
flow_id=db_flow_id,
session=fresh_session,
chat_service=chat_service,
user_id=str(current_user.id),
session_id=effective_session_id,
)
if source_flow_id is not None:
graph.flow_id = str(flow_id)
return graph

if not flow_name:
result = await fresh_session.exec(select(Flow.name).where(Flow.id == flow_id))
Expand Down
55 changes: 54 additions & 1 deletion src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,11 @@ async def build_public_tmp(
client_id = request.cookies.get("client_id")
owner_user, new_flow_id = await verify_public_flow_and_get_user(flow_id=flow_id, client_id=client_id)

# Start the flow build using the new flow ID
# flow_id=new_flow_id for tracking/sessions/messages (virtual, per-user isolation).
# source_flow_id=flow_id to load the actual flow data from the database.
job_id = await start_flow_build(
flow_id=new_flow_id,
source_flow_id=flow_id,
background_tasks=background_tasks,
inputs=inputs,
data=data,
Expand All @@ -655,3 +657,54 @@ async def build_public_tmp(
queue_service=queue_service,
event_delivery=event_delivery,
)


@router.get("/build_public_tmp/{job_id}/events")
async def get_build_events_public(
job_id: str,
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
*,
event_delivery: EventDeliveryType = EventDeliveryType.STREAMING,
):
"""Get events for a public flow build job.

This endpoint does not require authentication, matching the public build endpoint.
It is used by the shareable playground to consume build events.
"""
return await get_flow_events_response(
job_id=job_id,
queue_service=queue_service,
event_delivery=event_delivery,
)


@router.post(
"/build_public_tmp/{job_id}/cancel",
response_model=CancelFlowResponse,
)
async def cancel_build_public(
job_id: str,
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
):
"""Cancel a public flow build job.

This endpoint does not require authentication, matching the public build endpoint.
It is used by the shareable playground to cancel builds.
"""
try:
cancellation_success = await cancel_flow_build(job_id=job_id, queue_service=queue_service)

if cancellation_success:
return CancelFlowResponse(success=True, message="Flow build cancelled successfully")
return CancelFlowResponse(success=False, message="Failed to cancel flow build")
except asyncio.CancelledError:
await logger.aerror(f"Failed to cancel public flow build for job_id {job_id} (CancelledError caught)")
return CancelFlowResponse(success=False, message="Failed to cancel flow build")
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except JobQueueNotFoundError as exc:
await logger.aerror(f"Public job not found: {job_id}. Error: {exc!s}")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Job not found: {exc!s}") from exc
except Exception as exc:
await logger.aexception(f"Error cancelling public flow build for job_id {job_id}: {exc}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
124 changes: 124 additions & 0 deletions src/backend/tests/unit/test_chat_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,127 @@ async def mock_cancel_flow_build_with_cancelled_error(*_args, **_kwargs):
finally:
# Restore the original function to avoid affecting other tests
monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", original_cancel_flow_build)


@pytest.mark.benchmark
async def test_should_have_public_events_endpoint_accessible_without_auth(client, logged_in_headers): # noqa: ARG001
"""Test that public events endpoint exists and is accessible without authentication.

Bug: After sending a message in the Shareable Playground, the chat input resets
but no response is rendered. The root cause is that the events endpoint
(/build/{job_id}/events) requires authentication, which the unauthenticated
shareable playground user does not have.

This test proves:
1. The PUBLIC events endpoint exists and responds without auth (404 = route exists, job not found)
2. The AUTHENTICATED events endpoint rejects unauthenticated requests (403)
"""
fake_job_id = str(uuid.uuid4())

# Assert 1 — the PUBLIC events endpoint is accessible without auth
# Returns 404 "Job not found" (route exists, but job doesn't) — NOT 401/403
events_response = await client.get(
f"api/v1/build_public_tmp/{fake_job_id}/events?event_delivery=polling",
headers={"Accept": "application/x-ndjson"},
)
assert events_response.status_code == codes.NOT_FOUND

# The key proof: the public endpoint responded with 404 (route exists, job not found)
# rather than 401/403 (authentication required). Before the fix, this endpoint
# didn't exist at all and would return 404 for the route, not the job.
assert "Job not found" in events_response.json()["detail"]


@pytest.mark.benchmark
async def test_should_have_public_cancel_endpoint_accessible_without_auth(client, logged_in_headers): # noqa: ARG001
"""Test that public cancel endpoint exists and is accessible without authentication.

Same root cause as the events bug: the cancel endpoint requires auth
but the shareable playground user is unauthenticated.
"""
fake_job_id = str(uuid.uuid4())

# The PUBLIC cancel endpoint is accessible without auth
# Returns 404 "Job not found" (route exists, but job doesn't) — NOT 401/403
cancel_response = await client.post(
f"api/v1/build_public_tmp/{fake_job_id}/cancel",
headers={"Content-Type": "application/json"},
)
assert cancel_response.status_code == codes.NOT_FOUND
assert "Job not found" in cancel_response.json()["detail"]


@pytest.mark.benchmark
async def test_build_public_tmp_ignores_data_parameter(client, json_memory_chatbot_no_llm, logged_in_headers):
"""Test that build_public_tmp endpoint silently ignores data parameter for security.

Security Test: Verifies that when a user attempts to provide custom flow data
to the public flow endpoint, FastAPI silently ignores the extra parameter and
the endpoint functions normally using the stored flow data from the database.
"""
# Create a flow
flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers)

# Make the flow public
response = await client.patch(
f"api/v1/flows/{flow_id}",
json={"access_type": "PUBLIC"},
headers=logged_in_headers,
)
assert response.status_code == codes.OK

# Create malicious flow data with different structure
malicious_data = {"nodes": [{"id": "malicious", "data": {"type": "CustomComponent"}}], "edges": []}

# Set a client_id cookie
client.cookies.set("client_id", "test-security-client-123")

# Attempt to build with malicious data - FastAPI will silently ignore it
response = await client.post(
f"api/v1/build_public_tmp/{flow_id}/flow",
json={
"inputs": {"session": "test_session"},
"data": malicious_data, # This will be silently ignored by FastAPI
},
headers={"Content-Type": "application/json"},
)

# Verify the request succeeded - the data parameter is simply ignored
assert response.status_code == codes.OK
response_data = response.json()
assert "job_id" in response_data


@pytest.mark.benchmark
async def test_build_public_tmp_without_data_parameter(client, json_memory_chatbot_no_llm, logged_in_headers):
"""Test that build_public_tmp endpoint works without data parameter.

Security Test: Verifies that when no data parameter is provided, the endpoint
works normally and returns a job_id. This proves the data parameter is optional
and the stored flow definition is always used.
"""
# Create a flow
flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers)

# Make the flow public
response = await client.patch(
f"api/v1/flows/{flow_id}",
json={"access_type": "PUBLIC"},
headers=logged_in_headers,
)
assert response.status_code == codes.OK

# Set a client_id cookie
client.cookies.set("client_id", "test-no-data-client")

# Build without providing data parameter
response = await client.post(
f"api/v1/build_public_tmp/{flow_id}/flow",
json={"inputs": {"session": "test_session"}},
headers={"Content-Type": "application/json"},
)

# Verify the request succeeded
assert response.status_code == codes.OK
response_data = response.json()
assert "job_id" in response_data
Original file line number Diff line number Diff line change
@@ -1,47 +1,60 @@
import { useMessagesStore } from "@/stores/messagesStore";
import type { Message } from "@/types/messages";
import { removeMessages, updateMessage } from "./message-utils";

/**
* Handles message-related events from the build process.
* This keeps all chat message logic within the chat-view scope.
* Updates both React Query cache (used by the internal playground)
* and useMessagesStore (used by the shareable playground / IOModal).
*/
export const handleMessageEvent = (
eventType: string,
data: unknown,
): boolean => {
switch (eventType) {
case "add_message": {
// Add/update message in React Query cache (replaces placeholder if exists)
// Update React Query cache (internal playground)
updateMessage(data as Message);
// Update Zustand store (shareable playground / IOModal)
useMessagesStore.getState().addMessage(data as Message);
return true;
}
case "token": {
// Update message text in React Query cache for streaming
updateMessage({
id: data.id,
flow_id: data.flow_id || "",
session_id: data.session_id || "",
text: data.chunk || "",
sender: data.sender || "Machine",
sender_name: data.sender_name || "AI",
timestamp: data.timestamp || new Date().toISOString(),
files: data.files || [],
edit: data.edit || false,
background_color: data.background_color || "",
text_color: data.text_color || "",
properties: { ...data.properties, state: "partial" },
} as Message);
const d = data as Record<string, unknown>;
const tokenMessage = {
id: d.id,
flow_id: d.flow_id || "",
session_id: d.session_id || "",
text: d.chunk || "",
sender: d.sender || "Machine",
sender_name: d.sender_name || "AI",
timestamp: d.timestamp || new Date().toISOString(),
files: d.files || [],
edit: d.edit || false,
background_color: d.background_color || "",
text_color: d.text_color || "",
properties: { ...(d.properties as object), state: "partial" },
} as Message;
// Update React Query cache (internal playground)
updateMessage(tokenMessage);
// Update Zustand store (shareable playground / IOModal)
useMessagesStore.getState().addMessage(tokenMessage);
return true;
}
case "remove_message": {
// Remove message from React Query cache
removeMessages([data.id], data.session_id || "", data.flow_id || "");
const rm = data as Record<string, string>;
// Remove from React Query cache
removeMessages([rm.id], rm.session_id || "", rm.flow_id || "");
// Remove from Zustand store
useMessagesStore.getState().removeMessage(data as Message);
return true;
}
case "error": {
if (data?.category === "error") {
// Add error message to React Query cache
if ((data as Record<string, unknown>)?.category === "error") {
// Update React Query cache
updateMessage(data as Message);
// Update Zustand store
useMessagesStore.getState().addMessage(data as Message);
}
return true;
}
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/src/customization/utils/custom-buildUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ export const customBuildUrl = (flowId: string, playgroundPage?: boolean) => {
return `${getBaseUrl()}${playgroundPage ? "build_public_tmp" : "build"}/${flowId}/flow`;
};

export const customCancelBuildUrl = (jobId: string) => {
return `${getBaseUrl()}build/${jobId}/cancel`;
export const customCancelBuildUrl = (
jobId: string,
playgroundPage?: boolean,
) => {
return `${getBaseUrl()}${playgroundPage ? "build_public_tmp" : "build"}/${jobId}/cancel`;
};

export const customEventsUrl = (jobId: string) => {
return `${getBaseUrl()}build/${jobId}/events`;
export const customEventsUrl = (jobId: string, playgroundPage?: boolean) => {
return `${getBaseUrl()}${playgroundPage ? "build_public_tmp" : "build"}/${jobId}/events`;
};
4 changes: 2 additions & 2 deletions src/frontend/src/utils/buildUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ export async function buildFlowVertices({

const { job_id } = await buildResponse.json();

const cancelBuildUrl = customCancelBuildUrl(job_id);
const cancelBuildUrl = customCancelBuildUrl(job_id, playgroundPage);

// Get the buildController from flowStore
const buildController = new AbortController();
Expand All @@ -363,7 +363,7 @@ export async function buildFlowVertices({
});
useFlowStore.getState().setBuildController(buildController);
// Then stream the events
const eventsUrl = customEventsUrl(job_id);
const eventsUrl = customEventsUrl(job_id, playgroundPage);
const buildResults: Array<boolean> = [];

if (eventDelivery === EventDeliveryType.STREAMING) {
Expand Down
Loading
Loading