diff --git a/agent-langgraph-advanced/agent_server/agent.py b/agent-langgraph-advanced/agent_server/agent.py index 98e121b9..5ec6a1b4 100644 --- a/agent-langgraph-advanced/agent_server/agent.py +++ b/agent-langgraph-advanced/agent_server/agent.py @@ -40,7 +40,7 @@ logging.getLogger("mlflow.utils.autologging_utils").setLevel(logging.ERROR) sp_workspace_client = WorkspaceClient() -LLM_ENDPOINT_NAME = "databricks-claude-sonnet-4-5" +LLM_ENDPOINT_NAME = "databricks-gpt-5-2" LAKEBASE_CONFIG = init_lakebase_config() @@ -123,6 +123,20 @@ async def stream_handler( # For on-behalf-of user authentication, pass get_user_workspace_client() to init_agent. agent = await init_agent(store=store, checkpointer=checkpointer) + # When the checkpointer already has prior turns for this thread, + # the chat client's full-history echo is redundant — `add_messages` + # would append duplicates (it dedupes by `id`, but MLflow's + # `responses_to_cc` doesn't preserve IDs, so dedup never fires). + # Forward only the latest user message; the checkpointer prepends + # the rest. + state = await agent.aget_state(config) + if state and state.values.get("messages") and input_state["messages"]: + last_user = next( + (m for m in reversed(input_state["messages"]) if m.get("role") == "user"), + None, + ) + input_state["messages"] = [last_user] if last_user else [] + async for event in process_agent_astream_events( agent.astream(input_state, config, stream_mode=["updates", "messages"]) ): diff --git a/agent-langgraph-advanced/agent_server/start_server.py b/agent-langgraph-advanced/agent_server/start_server.py index 4886afa3..a8cd2b02 100644 --- a/agent-langgraph-advanced/agent_server/start_server.py +++ b/agent-langgraph-advanced/agent_server/start_server.py @@ -41,6 +41,16 @@ def transform_stream_event(self, event, response_id): poll_interval_seconds=float(os.getenv("POLL_INTERVAL_SECONDS", "1.0")), ) +log_level = os.getenv("LOG_LEVEL", "INFO") +_lvl = getattr(logging, log_level.upper(), logging.INFO) +logging.getLogger("agent_server").setLevel(_lvl) +# Surface [durable] lifecycle logs from LongRunningAgentServer into apps logs. +logging.getLogger("databricks_ai_bridge").setLevel(_lvl) +if not logging.getLogger().handlers: + logging.basicConfig(level=_lvl, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +else: + logging.getLogger().setLevel(_lvl) + # Define the app as a module level variable to enable multiple workers app = agent_server.app # noqa: F841 setup_mlflow_git_based_version_tracking() diff --git a/agent-langgraph-advanced/pyproject.toml b/agent-langgraph-advanced/pyproject.toml index f6d10170..3973fada 100644 --- a/agent-langgraph-advanced/pyproject.toml +++ b/agent-langgraph-advanced/pyproject.toml @@ -39,6 +39,12 @@ setup = [ [tool.uv] default-groups = ["dev", "setup"] +# TEMPORARY: point at the open PR branch while ML-64230 durable-resume +# changes are in review. Revert to the registry release once merged. +[tool.uv.sources] +databricks-ai-bridge = { git = "https://github.com/databricks/databricks-ai-bridge", branch = "dhruv0811/durable-execution-prose-recovery" } +databricks-langchain = { git = "https://github.com/databricks/databricks-ai-bridge", branch = "dhruv0811/durable-execution-prose-recovery", subdirectory = "integrations/langchain" } + [tool.pytest.ini_options] base_url = "http://localhost:8000" diff --git a/agent-langgraph-advanced/scripts/start_app.py b/agent-langgraph-advanced/scripts/start_app.py index 557fe4e7..93553d96 100644 --- a/agent-langgraph-advanced/scripts/start_app.py +++ b/agent-langgraph-advanced/scripts/start_app.py @@ -139,14 +139,27 @@ def clone_frontend_if_needed(self): if Path("e2e-chatbot-app-next").exists(): return True - print("Cloning e2e-chatbot-app-next...") + # TEMPORARY: checkout the dhruv0811/durable-execution-templates branch + # so the deployed frontend includes the background-mode /invocations + # proxy with auto-resume. Revert to cloning main after that PR merges. + branch = os.getenv("APP_TEMPLATES_BRANCH", "dhruv0811/durable-execution-templates") + print(f"Cloning e2e-chatbot-app-next from branch '{branch}'...") for url in [ "https://github.com/databricks/app-templates.git", "git@github.com:databricks/app-templates.git", ]: try: subprocess.run( - ["git", "clone", "--filter=blob:none", "--sparse", url, "temp-app-templates"], + [ + "git", + "clone", + "--filter=blob:none", + "--sparse", + "--branch", + branch, + url, + "temp-app-templates", + ], check=True, capture_output=True, ) @@ -219,8 +232,22 @@ def run(self, backend_args=None): print("WARNING: Failed to clone frontend. Continuing with backend only.") self.no_ui = True else: - # Set API_PROXY environment variable for frontend to connect to backend - os.environ["API_PROXY"] = f"http://localhost:{self.port}/invocations" + # Point the Node AI SDK at the Express /invocations handler + # (same Node process, port CHAT_APP_PORT) so streaming POSTs + # go through the background-mode rewrite + auto-resume proxy. + # The proxy forwards to AGENT_BACKEND_URL (the Python backend + # on self.port). Respect any externally-provided values so + # operators can override per-deployment. + frontend_port = int( + os.environ.get("CHAT_APP_PORT", os.environ.get("PORT", "3000")) + ) + os.environ.setdefault( + "AGENT_BACKEND_URL", + f"http://localhost:{self.port}/invocations", + ) + os.environ["API_PROXY"] = ( + f"http://localhost:{frontend_port}/invocations" + ) # Open log files self.backend_log = open("backend.log", "w", buffering=1) diff --git a/agent-openai-advanced/agent_server/start_server.py b/agent-openai-advanced/agent_server/start_server.py index 5ccc822e..bc5b984d 100644 --- a/agent-openai-advanced/agent_server/start_server.py +++ b/agent-openai-advanced/agent_server/start_server.py @@ -56,7 +56,18 @@ def transform_stream_event(self, event, response_id): ) log_level = os.getenv("LOG_LEVEL", "INFO") -logging.getLogger("agent_server").setLevel(getattr(logging, log_level.upper(), logging.INFO)) +_lvl = getattr(logging, log_level.upper(), logging.INFO) +logging.getLogger("agent_server").setLevel(_lvl) +# Surface [durable] lifecycle logs from LongRunningAgentServer into apps logs. +# These are INFO-level in databricks_ai_bridge but the library logger defaults +# to WARNING unless the host process sets it explicitly. +logging.getLogger("databricks_ai_bridge").setLevel(_lvl) +# Ensure the root handler actually emits at this level too. uvicorn sets up +# its own handlers for 'uvicorn.*' but leaves root untouched. +if not logging.getLogger().handlers: + logging.basicConfig(level=_lvl, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +else: + logging.getLogger().setLevel(_lvl) # Wrap the existing lifespan to ensure session tables are created before serving requests _original_lifespan = agent_server.app.router.lifespan_context diff --git a/agent-openai-advanced/agent_server/utils.py b/agent-openai-advanced/agent_server/utils.py index bec8919c..54f60655 100644 --- a/agent-openai-advanced/agent_server/utils.py +++ b/agent-openai-advanced/agent_server/utils.py @@ -29,6 +29,7 @@ class LakebaseConfig: autoscaling_endpoint: Optional[str] autoscaling_project: Optional[str] autoscaling_branch: Optional[str] + memory_schema: Optional[str] = None @property def description(self) -> str: @@ -103,13 +104,15 @@ def init_lakebase_config() -> LakebaseConfig: " Option 3 (provisioned): LAKEBASE_INSTANCE_NAME=\n" ) + memory_schema = os.getenv("LAKEBASE_AGENT_MEMORY_SCHEMA") or None + # Priority: endpoint > project+branch > instance_name (mutually exclusive in the library) if endpoint: - return LakebaseConfig(instance_name=None, autoscaling_endpoint=endpoint, autoscaling_project=None, autoscaling_branch=None) + return LakebaseConfig(instance_name=None, autoscaling_endpoint=endpoint, autoscaling_project=None, autoscaling_branch=None, memory_schema=memory_schema) elif has_autoscaling: - return LakebaseConfig(instance_name=None, autoscaling_endpoint=None, autoscaling_project=project, autoscaling_branch=branch) + return LakebaseConfig(instance_name=None, autoscaling_endpoint=None, autoscaling_project=project, autoscaling_branch=branch, memory_schema=memory_schema) else: - return LakebaseConfig(instance_name=resolve_lakebase_instance_name(raw_name), autoscaling_endpoint=None, autoscaling_project=None, autoscaling_branch=None) + return LakebaseConfig(instance_name=resolve_lakebase_instance_name(raw_name), autoscaling_endpoint=None, autoscaling_project=None, autoscaling_branch=None, memory_schema=memory_schema) def get_lakebase_access_error_message(lakebase_description: str) -> str: @@ -187,9 +190,12 @@ async def deduplicate_input(request: ResponsesAgentRequest, session: AsyncDatabr """Return the input messages to pass to the Runner, avoiding duplication with session history. When a client sends the full conversation history AND the session already has - that history persisted, passing everything through would duplicate messages. - If the session already covers the prior turns, only the latest message is needed - since the session will prepend the full history automatically. + that history persisted, passing everything through would duplicate messages + in the LLM call (Runner combines session items + input items, and the OpenAI + SDK's `_dedupe_key` doesn't dedupe role-bearing items — see + `agents/run_internal/items.py:224-250`). If the session already has any + items, the prior turns are persisted there and we only need to forward the + latest user message. """ messages = [i.model_dump() for i in request.input] # Normalize assistant message content from string to structured list format. @@ -204,7 +210,10 @@ async def deduplicate_input(request: ResponsesAgentRequest, session: AsyncDatabr ): msg["content"] = [{"type": "output_text", "text": msg["content"], "annotations": []}] session_items = await session.get_items() - if len(session_items) >= len(messages) - 1: + # Trust the session as authoritative for prior turns. Forward only the + # latest message (the new user turn). The Runner will prepend session + # history on the LLM call automatically. + if session_items and len(messages) > 1: return [messages[-1]] return messages diff --git a/agent-openai-advanced/pyproject.toml b/agent-openai-advanced/pyproject.toml index 2f010f6c..84a6f52b 100644 --- a/agent-openai-advanced/pyproject.toml +++ b/agent-openai-advanced/pyproject.toml @@ -43,6 +43,12 @@ setup = [ [tool.uv] default-groups = ["dev", "setup"] +# TEMPORARY: point at the open PR branch while ML-64230 durable-resume +# changes are in review. Revert to the registry release once merged. +[tool.uv.sources] +databricks-ai-bridge = { git = "https://github.com/databricks/databricks-ai-bridge", branch = "dhruv0811/durable-execution-prose-recovery" } +databricks-openai = { git = "https://github.com/databricks/databricks-ai-bridge", branch = "dhruv0811/durable-execution-prose-recovery", subdirectory = "integrations/openai" } + [tool.pytest.ini_options] base_url = "http://localhost:8000" addopts = "-n 7" diff --git a/agent-openai-advanced/scripts/start_app.py b/agent-openai-advanced/scripts/start_app.py index 557fe4e7..93553d96 100644 --- a/agent-openai-advanced/scripts/start_app.py +++ b/agent-openai-advanced/scripts/start_app.py @@ -139,14 +139,27 @@ def clone_frontend_if_needed(self): if Path("e2e-chatbot-app-next").exists(): return True - print("Cloning e2e-chatbot-app-next...") + # TEMPORARY: checkout the dhruv0811/durable-execution-templates branch + # so the deployed frontend includes the background-mode /invocations + # proxy with auto-resume. Revert to cloning main after that PR merges. + branch = os.getenv("APP_TEMPLATES_BRANCH", "dhruv0811/durable-execution-templates") + print(f"Cloning e2e-chatbot-app-next from branch '{branch}'...") for url in [ "https://github.com/databricks/app-templates.git", "git@github.com:databricks/app-templates.git", ]: try: subprocess.run( - ["git", "clone", "--filter=blob:none", "--sparse", url, "temp-app-templates"], + [ + "git", + "clone", + "--filter=blob:none", + "--sparse", + "--branch", + branch, + url, + "temp-app-templates", + ], check=True, capture_output=True, ) @@ -219,8 +232,22 @@ def run(self, backend_args=None): print("WARNING: Failed to clone frontend. Continuing with backend only.") self.no_ui = True else: - # Set API_PROXY environment variable for frontend to connect to backend - os.environ["API_PROXY"] = f"http://localhost:{self.port}/invocations" + # Point the Node AI SDK at the Express /invocations handler + # (same Node process, port CHAT_APP_PORT) so streaming POSTs + # go through the background-mode rewrite + auto-resume proxy. + # The proxy forwards to AGENT_BACKEND_URL (the Python backend + # on self.port). Respect any externally-provided values so + # operators can override per-deployment. + frontend_port = int( + os.environ.get("CHAT_APP_PORT", os.environ.get("PORT", "3000")) + ) + os.environ.setdefault( + "AGENT_BACKEND_URL", + f"http://localhost:{self.port}/invocations", + ) + os.environ["API_PROXY"] = ( + f"http://localhost:{frontend_port}/invocations" + ) # Open log files self.backend_log = open("backend.log", "w", buffering=1) diff --git a/e2e-chatbot-app-next/packages/ai-sdk-providers/src/providers-server.ts b/e2e-chatbot-app-next/packages/ai-sdk-providers/src/providers-server.ts index f19dad35..28777b76 100644 --- a/e2e-chatbot-app-next/packages/ai-sdk-providers/src/providers-server.ts +++ b/e2e-chatbot-app-next/packages/ai-sdk-providers/src/providers-server.ts @@ -10,7 +10,10 @@ import { } from '@chat-template/auth'; import { createDatabricksProvider } from '@databricks/ai-sdk-provider'; import { extractReasoningMiddleware, wrapLanguageModel } from 'ai'; -import { shouldInjectContextForEndpoint } from './request-context'; +import { + getApiProxyUrl, + shouldInjectContextForEndpoint, +} from './request-context'; // Header keys for passing context through streamText headers export const CONTEXT_HEADER_CONVERSATION_ID = 'x-databricks-conversation-id'; @@ -71,7 +74,19 @@ export async function getWorkspaceHostname(): Promise { // Environment variable to enable SSE logging const LOG_SSE_EVENTS = process.env.LOG_SSE_EVENTS === 'true'; -const API_PROXY = process.env.API_PROXY; +const API_PROXY = getApiProxyUrl(); + +// Always-rotate alias map: chat_id (the conversation_id the client uses) → +// rotated conversation_id from the most recent durable-resume. +// +// When the bridge resumes a crashed run, it rotates `context.conversation_id` +// to `{base}::attempt-N` and emits `response.resumed { conversation_id: ... }` +// in the SSE stream. We capture that rotation here so subsequent turns from +// the same chat send the rotated value as `context.conversation_id`, landing +// on the rotated (clean) SDK session instead of the original orphan-poisoned +// row. In-memory only — fine for a single Express process; a multi-pod +// deployment would persist this in the chat row. +const rotatedConversationIdByChat = new Map(); // Cache for endpoint details to check task type and OBO scopes const endpointDetailsCache = new Map< @@ -96,6 +111,69 @@ function shouldInjectContext(): boolean { return shouldInjectContextForEndpoint(endpointTask); } +// Wrap an SSE response body so we can sniff `response.resumed` events and +// remember the rotated conversation_id for the originating chat. The original +// stream is passed through untouched to the AI SDK consumer. +function wrapResponseToCaptureRotation( + response: Response, + chatId: string, +): Response { + if (!response.body) { + return response; + } + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + const wrapped = new ReadableStream({ + async pull(controller) { + const { done, value } = await reader.read(); + if (done) { + controller.close(); + return; + } + controller.enqueue(value); + + // Parse data: lines for response.resumed events. Buffered because + // SSE frames can split across chunks. + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + for (const line of lines) { + if (!line.startsWith('data:')) continue; + const data = line.slice(5).trim(); + if (!data || data === '[DONE]') continue; + try { + const evt = JSON.parse(data); + if ( + evt?.type === 'response.resumed' && + typeof evt.conversation_id === 'string' + ) { + const prior = rotatedConversationIdByChat.get(chatId); + if (prior !== evt.conversation_id) { + rotatedConversationIdByChat.set(chatId, evt.conversation_id); + console.log( + `[durable-alias] chat ${chatId} ← rotated conversation_id ${evt.conversation_id} captured from response.resumed`, + ); + } + } + } catch { + // Non-JSON data line — ignore. + } + } + }, + cancel() { + reader.cancel(); + }, + }); + + return new Response(wrapped, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }); +} + // Custom fetch function to transform Databricks responses to OpenAI format export const databricksFetch: typeof fetch = async (input, init) => { const url = input.toString(); @@ -110,9 +188,26 @@ export const databricksFetch: typeof fetch = async (input, init) => { headers.delete(CONTEXT_HEADER_USER_ID); requestInit = { ...requestInit, headers }; - // Inject context into request body if appropriate + // Resolve the always-rotate alias: if a prior turn for this chat ended on + // a rotated conversation_id (durable-resume), use the rotated value so we + // land on the clean rotated SDK session instead of the orphan-poisoned + // original. + const effectiveConversationId = + (conversationId && rotatedConversationIdByChat.get(conversationId)) || + conversationId; if ( + effectiveConversationId && conversationId && + effectiveConversationId !== conversationId + ) { + console.log( + `[durable-alias] chat ${conversationId} → using rotated conversation_id ${effectiveConversationId}`, + ); + } + + // Inject context into request body if appropriate + if ( + effectiveConversationId && userId && requestInit?.body && typeof requestInit.body === 'string' @@ -124,7 +219,7 @@ export const databricksFetch: typeof fetch = async (input, init) => { ...body, context: { ...body.context, - conversation_id: conversationId, + conversation_id: effectiveConversationId, user_id: userId, }, }; @@ -159,7 +254,21 @@ export const databricksFetch: typeof fetch = async (input, init) => { } } - const response = await fetch(url, requestInit); + let response = await fetch(url, requestInit); + + // Capture rotated conversation_id from the SSE stream's response.resumed + // sentinel so subsequent requests for the same chat use the rotated value + // (always-rotate alias). Wraps the body once; the SSE-logging wrap below + // composes on top if enabled. + if (response.body && conversationId) { + const contentType = response.headers.get('content-type') || ''; + if ( + contentType.includes('text/event-stream') || + contentType.includes('application/x-ndjson') + ) { + response = wrapResponseToCaptureRotation(response, conversationId); + } + } // If SSE logging is enabled and this is a streaming response, wrap the body to log events if (LOG_SSE_EVENTS && response.body) { diff --git a/e2e-chatbot-app-next/packages/ai-sdk-providers/src/request-context.ts b/e2e-chatbot-app-next/packages/ai-sdk-providers/src/request-context.ts index 4f08882a..6980322a 100644 --- a/e2e-chatbot-app-next/packages/ai-sdk-providers/src/request-context.ts +++ b/e2e-chatbot-app-next/packages/ai-sdk-providers/src/request-context.ts @@ -2,11 +2,28 @@ * Utility functions for request context handling. */ +/** + * Resolve the URL the AI SDK provider should POST to, or ``undefined`` when + * the chatbot should talk directly to a Databricks serving endpoint. + * + * Resolution order: + * 1. Explicit ``API_PROXY`` env var — caller knows best. + * 2. ``DATABRICKS_SERVING_ENDPOINT`` set → direct-endpoint mode; no proxy. + * 3. Advanced-template default → route via this Node server's own + * ``/invocations`` proxy. + */ +export function getApiProxyUrl(): string | undefined { + if (process.env.API_PROXY) return process.env.API_PROXY; + if (process.env.DATABRICKS_SERVING_ENDPOINT) return undefined; + const port = process.env.CHAT_APP_PORT || process.env.PORT || '3000'; + return `http://localhost:${port}/invocations`; +} + /** * Determines whether context should be injected based on endpoint type. * * Context is injected when: - * 1. Using API_PROXY environment variable, OR + * 1. The Express /invocations proxy is in play (explicit or inferred), OR * 2. Endpoint task type is 'agent/v2/chat' or 'agent/v1/responses' * * @param endpointTask - The task type of the serving endpoint (optional) @@ -15,9 +32,7 @@ export function shouldInjectContextForEndpoint( endpointTask: string | undefined, ): boolean { - const API_PROXY = process.env.API_PROXY; - - if (API_PROXY) { + if (getApiProxyUrl()) { return true; } diff --git a/e2e-chatbot-app-next/server/src/index.ts b/e2e-chatbot-app-next/server/src/index.ts index 132d80d4..7aa9007c 100644 --- a/e2e-chatbot-app-next/server/src/index.ts +++ b/e2e-chatbot-app-next/server/src/index.ts @@ -24,6 +24,58 @@ import { ChatSDKError } from '@chat-template/core/errors'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); +// --------------------------------------------------------------------------- +// SSE parsing helpers for the durable-resume /invocations proxy. +// Kept at module scope so each piece is a tiny, single-responsibility, easy +// to read / unit-test pure function. The proxy handler below composes them. +// --------------------------------------------------------------------------- + +type ParsedSseFrame = + | { kind: 'done' } + | { kind: 'passthrough' } + | { kind: 'data'; payload: Record }; + +/** Classify an SSE frame as [DONE], data (parseable JSON), or opaque. */ +function parseSseFrame(frame: string): ParsedSseFrame { + if (frame.includes('data: [DONE]')) return { kind: 'done' }; + const dataLine = frame.split('\n').find((l) => l.startsWith('data:')); + if (!dataLine) return { kind: 'passthrough' }; + try { + const payload = JSON.parse(dataLine.slice(5).trim()); + if (payload && typeof payload === 'object') { + return { kind: 'data', payload: payload as Record }; + } + } catch { + // Non-JSON frame (e.g. heartbeat): fall through to passthrough. + } + return { kind: 'passthrough' }; +} + +/** Pull the response_id off an SSE payload — tolerates the three shapes + * emitted by FastAPI ( `response_id`, `response.id`, top-level `id` prefixed + * with `resp_`). + */ +function extractResponseId( + payload: Record, +): string | null { + if (typeof payload.response_id === 'string') return payload.response_id; + const nested = payload.response as { id?: unknown } | undefined; + if (typeof nested?.id === 'string') return nested.id; + const topId = payload.id; + if (typeof topId === 'string' && topId.startsWith('resp_')) return topId; + return null; +} + +/** True for upstream error frames that will never succeed on retry + * (``task_failed`` / ``task_timeout``). Lets the proxy short-circuit its + * resume loop instead of burning the full attempt budget. */ +function isTerminalErrorFrame(payload: Record): boolean { + if (payload.type !== 'error') return false; + const err = (payload.error as Record | undefined) ?? {}; + const code = err.code as string | undefined; + return code === 'task_failed' || code === 'task_timeout'; +} + const app: Express = express(); const isDevelopment = process.env.NODE_ENV !== 'production'; // Either let PORT be set by env or use 3001 for development and 3000 for production @@ -59,46 +111,214 @@ app.use('/api/config', configRouter); app.use('/api/feedback', feedbackRouter); // Agent backend proxy (optional) -// If API_PROXY is set, proxy /invocations requests to the agent backend -const agentBackendUrl = process.env.API_PROXY; +// If AGENT_BACKEND_URL (or legacy API_PROXY) is set, proxy /invocations +// requests to the agent backend. For streaming POSTs we rewrite into +// LongRunningAgentServer's "background" contract: the backend persists every +// event to Lakebase, the proxy auto-resumes via +// GET /responses/{id}?stream=true&starting_after=N +// if the upstream connection dies before the [DONE] sentinel. This is what +// makes the UI survive mid-response pod crashes — zero client-side changes. +// +// IMPORTANT: when running with the Python FastAPI backend, point +// AGENT_BACKEND_URL at FastAPI (e.g. http://localhost:8000/invocations) and +// set API_PROXY at THIS Express server (e.g. http://localhost:3000/invocations) +// so the AI SDK provider in providers-server.ts routes through this handler +// instead of going direct to FastAPI. +// Default to the advanced-template convention (FastAPI on :8000). Set +// AGENT_BACKEND_URL explicitly to point at a remote agent, or set it to +// empty string to disable the /invocations proxy altogether. +const agentBackendUrl = + process.env.AGENT_BACKEND_URL ?? + process.env.API_PROXY ?? + 'http://localhost:8000/invocations'; if (agentBackendUrl) { - console.log(`✅ Proxying /invocations to ${agentBackendUrl}`); + console.log( + `✅ Proxying /invocations to ${agentBackendUrl} (durable-resume enabled)`, + ); + + // Derive the retrieve endpoint (strip trailing /invocations or /responses) + const backendRoot = agentBackendUrl.replace(/\/(invocations|responses)\/?$/, ''); + const retrieveUrl = (rid: string, startingAfter: number) => + `${backendRoot}/responses/${rid}?stream=true&starting_after=${startingAfter}`; + app.all('/invocations', async (req: Request, res: Response) => { try { const forwardHeaders = { ...req.headers } as Record; - forwardHeaders['content-length'] = undefined; + // biome-ignore lint/performance/noDelete: fetch rejects empty content-length + delete forwardHeaders['content-length']; - const response = await fetch(agentBackendUrl, { - method: req.method, - headers: forwardHeaders, - body: - req.method !== 'GET' && req.method !== 'HEAD' - ? JSON.stringify(req.body) - : undefined, - }); + const isStreamingPost = + req.method === 'POST' && + req.body && + typeof req.body === 'object' && + (req.body.stream === true || req.body.stream === 'true'); - // Copy status and headers - res.status(response.status); - response.headers.forEach((value, key) => { - res.setHeader(key, value); - }); + // Non-streaming or non-POST: original passthrough behavior. + if (!isStreamingPost) { + const response = await fetch(agentBackendUrl, { + method: req.method, + headers: forwardHeaders, + body: + req.method !== 'GET' && req.method !== 'HEAD' + ? JSON.stringify(req.body) + : undefined, + }); + res.status(response.status); + response.headers.forEach((value, key) => res.setHeader(key, value)); + if (response.body) { + const reader = response.body.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + res.write(value); + } + } + res.end(); + return; + } + + // Streaming POST → background mode with auto-resume. + const durableBody = { + ...req.body, + background: true, + stream: true, + }; - // Stream the response body - if (response.body) { - const reader = response.body.getReader(); + // Prime SSE headers immediately so the client starts reading even if the + // first upstream chunk takes a moment. + res.status(200); + res.setHeader('content-type', 'text/event-stream'); + res.setHeader('cache-control', 'no-cache'); + res.setHeader('connection', 'keep-alive'); + res.flushHeaders?.(); + + let responseId: string | null = null; + let lastSeq = 0; + let sawDone = false; + // Terminal-error flag: task_failed / task_timeout from upstream. + let sawTerminalError = false; + // Safety cap so a permanently-broken backend can't loop forever. + const MAX_RESUME_ATTEMPTS = 10; + let resumeAttempt = 0; + + // Read one SSE stream, extract response_id + sequence_number + detect + // terminal errors, forward each frame to the client. Returns whether we + // saw the [DONE] sentinel. Parsing is delegated to the module-level + // helpers above. + const pumpStream = async ( + upstream: globalThis.Response, + ): Promise => { + if (!upstream.body) return false; + const reader = upstream.body.getReader(); + const decoder = new TextDecoder(); + let buf = ''; while (true) { const { done, value } = await reader.read(); - if (done) break; - res.write(value); + if (done) return false; + buf += decoder.decode(value, { stream: true }); + const frames = buf.split(/\n\n/); + buf = frames.pop() || ''; + for (const frame of frames) { + const frameBytes = `${frame}\n\n`; + const parsed = parseSseFrame(frame); + if (parsed.kind === 'done') { + res.write(frameBytes); + return true; + } + if (parsed.kind === 'data') { + const rid = extractResponseId(parsed.payload); + if (rid && !responseId) { + responseId = rid; + console.log( + `[/invocations] background started response_id=${responseId}`, + ); + } + const seq = parsed.payload.sequence_number; + if (typeof seq === 'number' && seq > lastSeq) { + lastSeq = seq; + } + if (isTerminalErrorFrame(parsed.payload)) { + console.log( + `[/invocations] terminal error response_id=${responseId}; not retrying`, + ); + sawTerminalError = true; + } + } + res.write(frameBytes); + } } + }; + + // Kickoff: POST background request. + const initial = await fetch(agentBackendUrl, { + method: 'POST', + headers: forwardHeaders, + body: JSON.stringify(durableBody), + }); + if (!initial.ok) { + const text = await initial.text(); + res.write( + `event: error\ndata: ${JSON.stringify({ error: { message: text, status: initial.status } })}\n\n`, + ); + res.end(); + return; + } + sawDone = await pumpStream(initial); + + // Auto-resume loop: if upstream closed early and we have a response_id, + // reconnect via the retrieve endpoint using our cursor. + if (!sawDone && responseId) { + console.log( + `[/invocations] upstream closed without [DONE] response_id=${responseId} last_seq=${lastSeq}; entering auto-resume`, + ); + } + while (!sawDone && !sawTerminalError && responseId && resumeAttempt < MAX_RESUME_ATTEMPTS) { + resumeAttempt += 1; + console.log( + `[/invocations] resume fetch response_id=${responseId} starting_after=${lastSeq} attempt=${resumeAttempt}`, + ); + const resumed = await fetch(retrieveUrl(responseId, lastSeq), { + method: 'GET', + headers: forwardHeaders, + }); + if (!resumed.ok) { + console.log( + `[/invocations] resume failed response_id=${responseId} status=${resumed.status}`, + ); + res.write( + `event: error\ndata: ${JSON.stringify({ error: { message: 'Resume fetch failed', status: resumed.status } })}\n\n`, + ); + break; + } + sawDone = await pumpStream(resumed); + if (sawDone) { + console.log( + `[/invocations] resume succeeded response_id=${responseId} after ${resumeAttempt} attempts`, + ); + } + } + + if (responseId) { + console.log( + `[/invocations] stream done response_id=${responseId} saw_done=${sawDone} last_seq=${lastSeq} resumes=${resumeAttempt}`, + ); } res.end(); } catch (error) { console.error('[/invocations proxy] Error:', error); - res.status(502).json({ - error: 'Proxy error', - message: error instanceof Error ? error.message : String(error), - }); + if (!res.headersSent) { + res.status(502).json({ + error: 'Proxy error', + message: error instanceof Error ? error.message : String(error), + }); + } else { + try { + res.write( + `event: error\ndata: ${JSON.stringify({ error: { message: error instanceof Error ? error.message : String(error) } })}\n\n`, + ); + res.end(); + } catch {} + } } }); }